rocketmq consumer 拉取消息流程
2020-04-30 15:32:22 2 举报
Rocketmq 消费者拉起消息逻辑流程
作者其他创作
大纲/内容
FOUND
广播模式
构建请求结果的处理对象PullCallback
通过
失败的延迟提交
MQClientInstance#doRebalance()遍历Client 包含的 consumerTable
空
获取topic下所有队列,并更新缓存的消费队列
判断msgFoundList 消息列表
PullMessageService(消息拉取服务)
重新提交拉取消息请求
broker拉取异常
处理业务结果状态
更新本地消费位置
通知broker消费情况
如果processQueue还生效,准备更新本地消费位置
这是干啥的钩子?
和旧缓存对比,新增的构建pullRequest对象
根据consumerGroup获取拉取模式对象,拉取消息
DefaultMQPushConsumerImpl(pull模式实现类)
唤醒负载服务wakeup rebalanceService
执行消费者消息监听器代码
如果缓存有变化,通知broker
向pullRequestQueue添加消息,满了,阻塞
更新topic对应的消息队列缓存updateProcessQueueTableInRebalance
new DefaultMQPushConsumerImpl().start()初始化
消费者模式
消息添加到processQueue
其它状态?NO_NEW_MSGNO_MATCHED_MSGOFFSET_ILLEGAL
submitConsumeRequest构造消费请求,每个请求是一个线程,提交到线程池
集群模式从内存读取消费进度,设置提交进度标记
1.每个20s自己执行2.consumer启动时,唤醒wakeup3.增加减少consumer实例时
触发前置钩子
重新提交
1.创建客mQClientFactory 户端实例2.RebalancePushImpl 消息队列负载组件3.AllocateMessageQueueAveragely 负载策略,默认4.pullAPIWrapper 消息通信组件5.offsetStore 偏移量持久化组件6.consumeMessageService 消费消息组件等
获取消息队列处理对象ProcessQueue
获取消息消费者业务监听器
服务启动时 dispatchPullReques
1.consumerGroup 消费组名2.namesrvAddr 路由服务器地址3.consumeFromWhere 消费位置4 topic 主题5.subExpression 订阅表达式6.MessageListenerConcurrently 消息回调处理
验证如消息总大小、消息总数量、消息跨度、topic订阅数据是否存在等
PullCallback回调执行路基
发生异常,重新提交延迟处理请求
不空
PullResult(拉取消息返回结果)
处理结果,如tag过滤、执行钩子、添加最大最小队列位置
设置订阅表达式、系统标识
统计信息
获取消费的位置
消息不为空
从pullRequestQueue 取消息,消费,没有阻塞
pullAPIWrapper.pullKernelImpl(..)向broker发送拉取消息请求
new DefaultMQPushConsumer 创建
获取消费者的topic订阅缓存遍历给每一个topic负载
获取topic下所有队列信息
集群模式
RebalanceService(消息队列负载服务)
触发后置钩子
判断状态
获取负载均衡策略AllocateMessageQueueStrategy
RebalanceService#run()线程执行负载
默认为平均策略
ConsumeMessageService(消费消息服务)
Rocketmq Consumer 端拉取消息流程图
0 条评论
回复 删除
下一页