RocketMq消息发送流程
2021-08-25 10:26:14 42 举报
rocketMq 同步消息,异步消息,broker端处理流程图
作者其他创作
大纲/内容
处理存储结果
processResponseCommand
CommitLogputMessage
MappedFileappendMessage
doCommit
rsp
this.mappedByteBuffer.force();
1、broker是否有写权限2、topic是否是默认topic3、获取或创建topicConfig4、判断queueId是否超过限制
异步刷盘根据是否开启transientStorePoolEnable机制,刷盘会有点区别。如果transientStorePoolEnable为true,mq会单独申请一个与目标物理文件(CommitLog)相同大小的堆外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存中 然后提交到与物理文件对应的内存映射内存中,再flush到磁盘。 如果transientStorePoolEnable为false,消息直接追加到与物理文件直接映射的内存中,然后刷写到磁盘=============================================在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入(超过10s,或达到4页数据,才会被刷)
invokeAsyncImpl
sendMessage
1、线程中塞入请求,run方法中就能感知到,并处理刷盘请求service.putRequest(request)2、waitForFlush,等待刷盘结果
broker响应之后逻辑
刷盘
DefaultAppendMessageCallbackdoAppend
NettyRemotingAbstractprocessMessageReceived
DefaultMQProducerImplupdateFaultItem
GroupCommitServicerun
DefaultMessageStoreputMessage
DefaultMQProducersend
sendKernelImpl
异步线程池,new Runnable来执行的
1、END_OF_FILE:创建新MappedFile文件,再走之前的appendMessage2、发送消息的一些统计
ChannelwriteAndFlush
返回存储结果
FlushRealTimeServicewakeup
msgCheck
appendMessagesInner
sendDefaultImpl
handleRetryAndDLQ
MappedFileQueueflush
如果从catch异常,递归
其实就是countDownLatch.await在死等响应,netty客户端发送完之后,等待服务端的响应结果
返回成功
异步消息发送的重试,是靠递归来实现的
CommitLoghandleDiskFlush
MQClientAPIImplprocessSendResponse
FlushRealTimeServicerun
NettyRemotingClientinvokeSync
MQClientAPIImplsendMessageAsync
异步
MQClientAPIImplonExceptionImpl
isTransientStorePoolEnable
NettyRemotingClientinvokeAsync
同步
consumer消费失败后,发回的重试消息处理逻辑(含发到死信队列)
Y
client接收broker端响应
DefaultMQProducerImplsend
processRequestCommand
N
等待netty回调
ResponseFutureexecuteInvokeCallback
await和countDown
响应client
borker响应
SendMessageProcessorprocessRequest
做了一堆基础校验不通过,则直接返回对应错误
网络等待
NettyRemotingClient$NettyClientHandlerchannelRead0
putResponse就是把返回的结果赋值一下,同时执行countDown,唤醒发送时候wait的线程
MQClientAPIImplsendMessage
NettyRemotingServer$NettyServerHandlerchannelRead0
sendMessageAsync
ResponseFutureputResponse
同步消息
handleHA
executeInvokeCallback
刷盘机制
invokeSyncImpl
通过netty进行网络发送
通过校验
异常返回
1、获取brokerAddr(IP+PORT)2、给普通message添加uniqId3、对大于4k的普通消息进行压缩4、设置一些标志位(sysFlag,事务消息)5、hook检查6、构建发送消息请求头,requestHeader
client发送请求
是否同步
异步消息
sendMessageSync
wakeup后,run方法就不会一直轮询等待了即又走回到上面的那种异步方式了
InvokeCallbackoperationComplete
CommitRealTimeServicerun
ResponseFuturewaitResponse
10s一次
MappedFileQueuecommit
收藏
收藏
0 条评论
下一页