Producer端核心流程(rocketmq)
2022-03-29 09:49:35 0 举报
RocketMQ
作者其他创作
大纲/内容
启动netty客户端的
mQClientAPIImpl.getTopicRouteInfoFromNameServer
executeSendMessageHookBefore
如果没有选出一个Broker,则选择一个延迟较低的Broker
SendMessageRequestHeader
sendMessageSync
tryToFindTopicPublishInfo(msg.getTopic())
根据返回结果对比新旧topic信息是否发生改变
start()
每隔5s向broker上报一次msg消费Offset
persistAllConsumerOffset()
fetchNameServerAddr() 获取nameserv地址的,每隔2分钟执行一次
mQClientFactory.start()
同步发送消息默认失败重试3次
发送前钩子函数回调
响应表responseTable
tpInfo.getMessageQueueList().get(pos)
发送延迟容错开关,默认为关闭,如果开关打开了,会触发发送延迟容错机制来选择发送Queue
getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey()
this.startScheduledTask() 启动一堆定时任务
根据计算出的下标获取队列
消息检查
cleanOfflineBroker() 清除下线的borkersendHeartbeatToAllBrokerWithLock() 发送心跳到所有的broker
selectOneMessageQueue
new DefaultMQProducer(producerGroup)创建生产者,可以指定组名默认DEFAULT_PRODUCER
doBeforeRpcHooks
通过取模获取Queue的位置下标 Pos
不是批量消息的话,设置唯一的消息ID
remotingClient.invokeSync
处理响应结果
defaultAsyncSenderExecutor初始化发送异步消息的线程池
更新一次后还是本地缓存中找不到则说明没有这个topic如果开启动了自动创建topic则创建一个默认的topic(TBW102)
int index = tpInfo.getSendWhichQueue().getAndIncrement()
topicPublishInfoTable.get(topic)
updateTopicRouteInfoFromNameServer(topic)
topicRouteTable.get(topic)
先查缓存
发送后钩子函数回调
第二步
mQClientFactory.registerProducer
如果设置了钩子函数执行发送消息前的钩子
MQClientAPIImpl.sendMessage
bossGroup 1个线程workGruoup个线程
随机选择一个Broker
如果 Pos对应的 Broker的延迟时间是可以接受的,并且是第一次发送,或者和上次发送的Broker相同,则将Queue返回
异步发送消息API会把消息交给线程池去执行
拿到最终发送的队列
从threadloacl中获取一个自增序号 index
遍历 Topic集合 进行更新路由信息
30s一次
使用Netty同步调用
updateTopicRouteInfoFromNameServer()从nameserv获取topic信息的任务30s拉取一次
1.先创建一个MQClientInstance实例,并放入map缓存就是它完成了和broker的通信
最终启动生产者
sendKernelImpl
processSendResponse
NettyRemotingClien.start
根据MessageQueue里面的brokerName拿到Master地址(brokerId=0)
启动了一个定时任务,1s执行一次首次延时30s执行
更新客户端本地topic路由信息
如果上面两次都没有选出broker
checkMessage
tryToCompressMessage
scanResponseTable()
超时异常就关闭channle
this.pullMessageService.start();
第一步
1.多台namesrv时,使用一个原子计数器实现轮询算法获取一个地址发送获取topic消息给namesrv端2.原子计数器值+1 % namserv的个数3.如果之前以前和namesrvj建立过TCP长连接,并且这个连接对应的channel没有关闭,则之前使用之前的通道发现消息
Netty发送请求,同步调用超时默认3s
获取topic路由信息
this.bootstrap.group(this.eventLoopGroupWorker)
将consumerTable和producerTable这个两个Map所有的topic取出来,合并成成一个集合
默认topic创建
注册Produce
就是将消费者组名和DefaultMQProducerImpl做一个映射放入map中同一个组名使用相同客户端实例
从brokerAddrTable中获取
缓存中没有从nameserv获取然后更新本地缓存
不能发空消息,消息长度也不能太长,默认是不超过4m
启动重平衡任务
默认的消息生产者rpcHook表示钩子,发送消息前后会回调这个钩子方法
选择一个队列
doAfterRpcHooks
超过4k就进行消息压缩
invokeSyncImpl
case SYNC同步发送
同步调用 获取响应
初始一个5W大小的链表队列asyncSenderThreadPoolQueue
this.remotingClient.invokeSync
producer.send(msg)
this.rebalanceService.start()
扫秒响应表,是把超时的响应进行回调,或者是返回
通过获取路由信息维护的一个递增的值每次加+1 和队列数量取模
mQClientAPIImpl.start()
this.mQClientFactory= new MQClientInstance
发送出去
启动拉取消息
先从channelTables获取发送地址对应的channle,如果不可以或没有则建立连接
比如说topic,MessageQueue 队列Id, 出生日期,flag等
封装发送消息头
0 条评论
回复 删除
下一页