rocketmq 发送消息
2024-04-05 15:48:22 8 举报
rocketmq 发送消息简单流程
作者其他创作
大纲/内容
TimerMessageStore#doEnqueue
TopicQueueLock#lock用了32个锁根据mq进行hash
是
ServiceThread#waitForRunning执行一次等待 30 ms
TransactionalMessageService#prepareMessage
state
TimerMessageStore#convert转换为正常消息,topic等信息
commit
java.nio.channels.FileChannel#forcejava.nio.MappedByteBuffer#force真正刷盘
处理 RequestHeaderV
MessageStore#putMessage写消息
TimerMessageStore.TimerEnqueuePutService#fetchTimerRequests一次最多取10条
rollback
CommitLog#asyncPutMessage写入消息
是否是事务消息
DefaultMappedFile#flush
TransactionalMessageCheckService
FlushRealTimeService#run根据 flushIntervalCommitLog(默认500ms) 配置的时间间隔进行定时刷盘
TimerMessageStore.TimerEnqueueGetService#run间隔100ms
一大堆 if else 校验
否
返回结果
TimerMessageStore#dequeue
是否同步刷盘
TimerWheel#getSlot获取延迟时间所在的 slot
EndTransactionProcessor#processRequest
TimerWheel#putSlot修改指定槽的偏移量
MQClientInstance#updateTopicRouteInfoFromNameServer组装 RemotingCommand 发送请求获取队列信息
.MessageStore#putMessage写入消息
AbstractTransactionalMessageCheckListener#sendCheckMessage发送消息到client
ndTransactionProcessor#endMessageTransaction组装消息,主要是修改topic。从事务队列转到真实的队列
MappedFileQueue#getLastMappedFile获取要写入的映射文件
TimerWheel 是一个时间轮,一个有 7*24*3600 个槽默认精度 1s,也就是1s一个槽
font color=\"#323232\
CommitLog#handleDiskFlushAndHA刷盘和高可用处理
.UtilAll#crc32(byte[])计算crc
DefaultMQProducerImpl#sendDefaultImpl
TransactionalMessageCheckService#run
TimerMessageStore#enqueue
.MessageStore#asyncPutMessage写入消息
TransactionalMessageServiceImpl#check
TimerRequest构建一个对象
延时消息流程
TimerMessageStore.TimerDequeueGetService#run间隔100ms
dequeuePutQueue.put
MessageStore#putMessage写入消息
如果是设置了org.apache.rocketmq.client.sendSmartMsg或者是批量数据则使用 v2 的头
v2的头相对于v1就是把一些字段名称给变成了 a-z 。压缩了传输数据量
整个 rocket 大部分的异步都是通过 CompletableFuture.completedFuture来包装成异步.但是在写入消息的时候是把异步包装成同步(等待执行完成)
MQClientAPIImpl#sendMessage
是否需要高可用或者主从同步
TimerLog#append保存时间轮队列的消息偏移量以及延迟信息
PutMessageHook#executeBeforePutMessage写入数据前的一个钩子
executeSendMessageHookAfter和上面相同的钩子
需要等到满足要求的slave返回ack
HookUtils#handleScheduleMessage一个钩子在保存消息的时候把延时消息的topic改成默认的
ServiceThread#onWaitEnd
校验当前可用接节点是否满足要求否则抛出 IN_SYNC_REPLICAS_NOT_ENOUGH
DefaultMQProducerImpl#checkTransactionState
如果是 TRANSACTION_NOT_TYPE 状态不处理,下次继续check
这些信息会定时的从nameServer获取
DefaultMQProducer#send各种send方法
TransactionListener#checkLocalTransaction
TimerDequeuePutMessageService#run间隔 10ms 一次
2 个队列4 个定时任务一直倒腾
getBrokerNameFromMessageQueue找到 brokerName
TransactionalMessageServiceImpl#deletePrepareMessage删除消息
findBrokerAddressInPublish找到发送地址
ClientRemotingProcessor#checkTransactionState
正常保存消息到 rmq_sys_wheel_timer
MappedFile#appendMessage
HookUtils#transformTimerMessage把真实的信息保存到属性中
PutMessageLock#unlockTopicQueueLock#unlock解锁
sendMessage
ServiceThread#startbroker 启动的时候调用
TopicQueueMappingManager#rewriteRequestForStaticTopic判断当前broker是不是leader
TimerMessageStore#doPut最多重试 3 次
ransactionalMessageCheckService#onWaitEnd
预计执行时间是否到了
HookUtils#handleScheduleMessage延迟任务校验
EndTransactionProcessor#sendFinalMessage
TopicQueueMappingManager#buildTopicQueueMappingContext获取topic的详细信息
事务消息处理
设置msg的各种标记
processTransactionState
MQClientAPIImpl#endTransactionOneway发送消息到broker
MessageStore#getConsumeQueue从 rmq_sys_wheel_timer 获取消息
FlushManager#handleDiskFlush刷盘
记录执行情况
GroupCommitRequest#wakeupCustomer刷盘后把同步的请求都标记完成
AbstractTransactionalMessageCheckListener#resolveHalfMsg
TimerWheel#getSlot获取当前时间对应的 slot
TimerMessageStore.TimerEnqueuePutService#putMessageToTimerWheel
通过前面设置的发送类型调用 remotingClient 的接口
enqueuePutQueue.offer入队
MessageClientIDSetter#setUniqID设置消息的唯一标识
needHandleHA
SendMessageProcessor#processRequest所有发送的消息都会在 broker 里这个处理器处理
.HookUtils#checkInnerBatch校验批量数据
DefaultMQProducerImpl#sendKernelImpl
needRoll判断是否在当前时间轮中,如果超出则标记到下一轮
TimerMessageStore.TimerEnqueuePutService#run间隔100ms
0 条评论
下一页