RocketMQ源码研究
2019-07-08 09:50:06 11 举报
AI智能生成
RocketMQ源码学习,包括生产者发送消息等
作者其他创作
大纲/内容
Consumer消费消息流程
1. 配置检查
校验组(和Producer组校验规则一致)
消息模型MessageModel不能为空
ConsumeFromWhere不能为空
消费时间戳不能为空
队列分配策略AllocateMessageQueueStrategy不能为空
消息监听MessageListener
最大/小线程数不能大于1000或小于1
2. 拷贝订阅规则
3. 若消费模式为MessageModel.CLUSTERING,则改变实例名
4. 创建客户端实例MQClientInstance,与producer端类似
5. 设置rebalanceImpl、pullAPIWrapper对象
6. 创建OffsetStore对象并加载OffsetStore.load()
7. 创建ConsumeMessageService对象并启动ConsumeMessageService.start()
8. 将Consumer实存入consumerTable
9. MQClientInstance.start(),和Producer端启动一致
10. 更新主题订阅信息
11. 检查broker中的消费端
12. 发送心跳
13. 负载均衡
Broker启动流程
2. 初始化BrokerController
1. topicConfigManager.load()
2. consumerOffsetManager.load()
3. subscriptionGroupManager.load()
4. consumerFilterManager.load()
7. 创建线程:sendMessageExecutor、pullMessageExecutor、queryMessageExecutor、adminBrokerExecutor、clientManageExecutor、consumerManageExecutor
8. 注册处理器BrokerController.registerProcessor()
1. 创建SendMessageProcessor并向remotingServer、fastRemotingServer注册处理器
2. 向remotingServer注册读取消息处理器pullMessageProcessor
3. 创建NettyRequestProcessor对象并向remotingServer、fastRemotingServer注册处理器(与上述步骤类似)
4. 创建ClientManageProcessor对象并向remotingServer、fastRemotingServer注册处理器(与上述步骤类似)
5. 创建ConsumerManageProcessor对象并向remotingServer、fastRemotingServer注册处理器(与上述步骤类似)
6. 创建EndTransactionProcessor对象并向remotingServer、fastRemotingServer注册处理器(与上述步骤类似)
7. 创建AdminBrokerProcessor对象并向remotingServer、fastRemotingServer注册处理器(与上述步骤有点区别)
9. 创建定时任务:
10. BrokerController.start()
RocketMQ
Producer发送消息流程
1. producerGroup检查
2. 创建MQClientInstance对象并注册
创建ClientRemotingProcessor对象
创建MQClientAPIImpl对象并更新namesrv地址
创建NettyRemotingClient对象
注册消息处理器
创建MQAdminImpl对象
创建PullMessageService对象
创建RebalanceService对象
创建DefaultMQProducer对象
创建DefaultMQProducerImpl对象
创建ConsumerStatsManager对象
3. 启动Netty
Netty参数配置
定时任务:NettyRemotingClient.this.scanResponseTable()
4. 启动定时任务
1. MQClientInstance.this.updateTopicRouteInfoFromNameServer()
2. MQClientInstance.this.cleanOfflineBroker()
3. MQClientInstance.this.sendHeartbeatToAllBrokerWithLock()
4. MQClientInstance.this.persistAllConsumerOffset()
5. MQClientInstance.this.adjustThreadPool()
6. PullMessageService.start()
7. RebalanceService.start(),RebalanceService和PullMessageService都是Runnable的子类
8. this.defaultMQProducer.getDefaultMQProducerImpl().start(false)
9. MQClientInstance.sendHeartbeatToAllBrokerWithLock()
10. DefaultMQProducerImpl.send(...)
1. DefaultMQProducerImpl.makeSureStateOK()检查服务是否正在运行
2. Validators.checkMessage(...)
3. 创建TopicPublishInfo对象:DefaultMQProducerImpl.tryToFindTopicPublishInfo(topic)
4. 判断是同步方式还是异步方式,如果是同步方式,则发送3次,异步方式只发送一次
5. MQFaultStrategy.selectOneMessageQueue(...)
6. DefaultMQProducerImpl.sendKernelImpl(...)
1. MQClientInstance.findBrokerAddressInPublish(brokerName)
2. 如果配置了VIP通道:将上步获取到的broker地址转成VIP通道(端口号减2):MixAll.brokerVIPChannel(..)
3. 如果不是批量发送消息
1. 设置消息全局ID:MessageClientIDSetter.setUniqID(msg)
2. 压缩消息(如果消息长度大于4096):DefaultMQProducerImpl.tryToCompressMessage(msg)
4. DefaultMQProducerImpl.hasCheckForbiddenHook()
5. DefaultMQProducerImpl.hasSendMessageHook()
6. 创建SendMessageRequestHeader对象
7. 消息发送
1. 创建远程请求:RemotingCommand.createRequestCommand
2. 发送消息:MQClientAPIImpl.sendMessage
sync:NettyRemotingClient.invokeSync,处理结果
async:NettyRemotingClient.invokeAsync
oneway:NettyRemotingClient.invokeOneway
Namesrv启动流程
1. NamesrvConfig、NettyConfig、日志配置等
2. 根据namesrvConfig、nettyConfig创建NamesrvController对象并初始化
3. 初始化NamesrvController
1. 加载kvConfig.json配置文件
2. 创建NettyRemotingServer对象
3. 注册请求处理器NettyRequestProcessor
4. 启动两个定时任务
4. NamesrvController.start()
1. 创建DefaultEventExecutorGroup对象,负责处理请求
2. 配置ServerBootstrap
5. 若channelEventListener不为空,启动NettyEventExecutor.start()
0 条评论
下一页