RocketMq框架源码
2022-12-03 00:12:39 0 举报
rocketMq框架源码流程图
作者其他创作
大纲/内容
CommitLogDispatcherBuildIndex
run()
检测环境变量
创建线程 run方法
switch (communicationMode)
创建 处理 consumer 的pull 请求线程池
serverHandler
this.filterServerManager.start()
创建消息存储组件
this.tryToFindTopicPublishInfo(msg.getTopic())
对不同的请求 使用不同的 处理器进行处理
RouteInfoManager 路由表
启动客户端 就是netty连接
while (!this.isStopped())
进行处理请求
初始化 acl
this.mQClientAPIImpl.start()
启动netty 客户端
添加请求
设置topic 和 tag
else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently)
创建 brokerController 对象
设置监听端口 9876
namesrv的 handler
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType())
this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor
Netty的 handler
broker netty 客户端
创建心跳线程池
寻找队列 负载均衡实现
this.thread.start()
可信客户端工厂
consumerQueue分发器 用于在 consumerQueue记录偏移量
创建好 具体的RemotingCommand 获通过netty发送
this.pullRequestHoldService.start()
new NettyServerConfig()
this.registerProcessor()
this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor
this.reputMessageService.start()
异步写入 commitlog文件 后面有一个延迟队列处理方式
this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName())
this.brokerController.getMessageStore().asyncPutMessage(msgInner)
tpInfo.getMessageQueueList().get(pos)
通过netty 去 namesrv获取路由信息
加载 就是 kvConfig.json 文件
this.adminBrokerExecutor = Executors.newFixedThreadPool
this.doReput()
创建服务端线程池
执行发送方法
异步刷盘
this.namesrvController.getRouteInfoManager().registerBroker
tpInfo.getSendWhichQueue().incrementAndGet()
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor
this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor
this.topicConfigManager.load()
创建 producer对象
实现 handler 方法 处理业务channelRead0
大部分都是集群模式,只有存在远端裁可以保证消息只往一个消费者组的一个消费者发送
进行初始化
处理请求 这里是 DefaultRequestProcessor
initialRpcHooks()
new NamesrvConfig()
创建定时任务延迟5秒 每隔10描扫一次 Broker 移除不活跃的Broker
执行处理
注册
this.scheduledExecutorService.scheduleAtFixedRate
consumerqueue文件会由线程去扫描文件是否变化去写入 consumerqueue
实现刷盘
延迟消息实现 如果延迟时间 > 0
每隔1毫秒检查一次 commitlog 偏移量 是否由变化
创建线程
会通过netty 发送到 服务端 handler 处触发读请求
注册很多处理器 内容太多源码上看
启动存储组件 主要用于 commitlog的写入是事件 分发给 comsumerQueue 喝 indexFile
创建同步刷盘请求
启动控制器
没有环境变量直接退出
线程任务
以每隔10ms的时间调用操作系统刷盘
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable())
创建 任务 把run方法传入
this.brokerOuterAPI.start()
NamesrvController.this.kvConfigManager.printAllPeriodically()
重新加载配置 防止丢失
发送消息到 broker
获取
Broker入口
CommitLogDispatcherBuildConsumeQueue
switch (this.defaultMQPushConsumer.getMessageModel())
根据索引返回具体的 queue
注册 broker
客户端工厂
start()
new NettyClientConfig()
执行处理 reuqest
根据不通类型设置不通消费消息模式 顺序消费和并发消费
创建 Namesrv对象 传入两个配置对象
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor
启动netty 整个核心交互业务
this.remotingServer.start()
初始化事务
启动 netty服务 vip
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
广播消息 本地存储 offset偏移量
这里会初始化 commitlog 对象 用于写入文件
this.runnable.run()
PipeLine
index 分发器构建 维护index索引
客户端 发送消息入口
同步刷盘
实现机制,把消息存到 修改的topic中去,后端有线程根据时间在进行处理转发到 需要发送的 topic
initialAcl()
创建 Netty服务端配置
加载磁盘文件
会通过netty 发送到 服务端 handler 处触发读请求
new CommitLog(this)
执行心跳注册 30 秒执行一次
consumer在构造方法中 就声明了负载均衡策略 AllocateMessageQueueAveragely
this.consumerManageExecutor = Executors.newFixedThreadPool
broker 是服务端 也是客户端 服务端连接java程序 客户端连接 namesrv注册
创建 broker配置类
设置监听方法 等待消费消息
注册的处理器
创建 Netty 客户端配置
创建回复消息线程池
如果有变化会进行分发
if (this.getMessageListenerInner() instanceof MessageListenerOrderly)
构造方法中会创建 RouteInfoManager路由表
由start方法启动
this.consumerOffsetManager.load()
consumer.subscribe(\"orderTopic\
创建 Netty 配置类
设置负载均衡策略
this.clientManageExecutor = new ThreadPoolExecuto
启动的这个线程任务 调用run方法
启动commitlog start线程
this.fileWatchService.start()
this.defaultMQProducerImpl.start()
获取 namesrv地址
创建 Netty 服务端对象
同步
MessageModel 来区分是集群模式还是广播模式
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
if (msg.getDelayTimeLevel() > 0)
new GroupCommitService()
生产者获取 topic 通过topic 可以获取 具体的消息队列 messageQueue
维护客户端工厂 缓存好实例和topic关系 也就是生产者组
创建 Namesrv配置类
执行run方法
创建 Netty远程服务端
this.fastRemotingServer.start()
initialTransaction()
处理请求
CommitLog.this.mappedFileQueue.flush(0)
NamesrvStartup
设置好 并发消息还是顺序消息后 运行start 这里只画并发消息
start(createBrokerController(args))
Netty
this.commitLog.start()
写入commitlog
DefaultMessageStore.this.doDispatch(dispatchRequest)
再次加载属性 防止丢失
this.processorTable.get(cmd.getCode())
启动netty
客户端请求数据
this.kvConfigManager.load()
this.messageStore.start()
this.commitLog.asyncPutMessage(msg)
mQClientFactory.start()
加载磁盘配置 conf目录下的 json文件
获取路由信息
执行传入的 run方法
创建客户端线程池
执行异步处理request SendMessageProcessor 这个类
获取对应的处理器这里是 DefaultRequestProcessor
启动 netty 服务
this.messageStore.load();
this.clientHousekeepingService.start()
构建了 fast监听 算是vip通道 端口是 监听端口 - 2
this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor
start(controller)
controller.getConfiguration().registerConfig(properties)
createNamesrvController(args)
this.doCommit()
传入
Thread.sleep(1)循环
实现非常简单 每次进行+1操作 取模对垒数量
调用启动方法
集群模式在broker远端存储 offset 偏移量
controller.initialize()
new DefaultMQPushConsumer(“consumerGroup)
注册心跳
通过连接namesrv netty服务端获取 路由信息
broker 注册 processor
while循环 每隔10ms进行处理一次刷盘
if (null == namesrvConfig.getRocketmqHome())
nettyServerConfig.setListenPort(9876)
创建commitServeice线程 用于刷盘
producer.send(zeroMsg)
this.defaultMQPushConsumerImpl.start()
pull 长轮询组件
修改 topic 为 SCHEDULE_TOPIC_XXXX和 queueId
while (!this.isStopped())
找到相应的 MessageQueue在根据 queue找到broker 往broker发送消息
msg.setTopic(topic); msg.setQueueId(queueId)
启动 broker
执行线程的start方法
new DefaultMQProducer(\"producerGroup\")
createBrokerController
回调 方法 执行 callback
启动客户端 netty
NameSrv 入口
触发netty进行注册路由信息
controller.start()
this.consumerFilterManager.load()
如果不是延迟消息以零拷贝的方式写入文件存储消息如果commitlog文件不够存储消息 会根据偏移量创建新的文件
这是两个方法 先创建BrokerController
new BrokerConfig()
创建发送消息线程池
客户端 消费消息入口
(AsyncNettyRequestProcessor)pair.getObject1()
(AsyncNettyRequestProcessor)pair.getObject1();
ServerUtil.buildCommandlineOptions(new Options())
获取 broker地址
入口
创建查询消息线程池
调用 send方法 发送消息
brokerConfig.getNamesrvAddr()
这些线程池有一系列任务 统计任务持久化任务等等
逻辑大致相同 创建了 netty 客户端 加入handler
通过netty 客户端进行发送到服务端
service.putRequest(request)
获取MessageStroe 进行异步写入数据
BrokerStartup
创建 BrokerController
循环进行分发
后面代码是 如果以前有过发送失败 就会尽量绕过失败的 broker
启动
整个过程是创建controller
this.consumeMessageService.start()
绝大数是并发消息
处理 channel 注册路由
检测命令行参数 -c 不是重点
选择具体的 处理器
System.exit(-2)
this.subscriptionGroupManager.load()
callback(RemotingCommand response)
调用 ReputMessageService run方法ReputMessageService 是继承了ServiceThread 所以执行具体类的 run方法
根据不通的请求 找到对应的 RemotingCommand 进行发送
dispatcher.dispatch(req)
顺序消息
扫描任务
0 条评论
下一页