RocketMQ Consumer拉取消息流程
2022-01-19 15:25:08 13 举报
RocketMQ Consumer拉取消息流程
作者其他创作
大纲/内容
创建拉取结果的回调函数
不等待拉取结果,直接返回,重新去阻塞队列拉取消息任务
进一步处理PullResult
从pullRequest获取processQueue
根据消费组名获取消费者内部实现类MQConsumerInner
分批封装成多个消费任务
从pullRequestQueue阻塞队列获取拉取消息任务
将消费任务提交给消费线程池消费
判断限流
拉取消息响应结果RemotingCommand
run方法
将消息提交到consumeMessageService中的线程池供消费者消费
有拉取到消息
将pullRequest重新放到队列中去拉取下一批消息
强制转换成DefaultMQPushConsumerImpl
获取批量消费数量(默认1条)
false
设置拉取时间
假设拉取成功
将这一批消息封装成一个消费任务
处理响应,将响应封装为PullResult
使用消息拉取组件PullAPIWrapper拉取消息
true
更新pullRequest中下次的拉取偏移量
拉取数量<=批量消费数量
消费逻辑
如果拉取到的消息列表为空,则将pullRequest重新放回队列中,使可以进行下一次的拉取任务,然后返回
回调函数
直接返回,拉取和消费是分开的
PullMessageService(线程)
DefaultMQPushConsumerImpl
PullAPIWrapper
MQClientAPIImpl
PullCallback
ConsumeMessageService(ConsumeMessageConcurrentlyService)
构建拉取消息请求数据PullMessageRequestHeader
根据brokerName和brokerId得到broker地址
委托给上面的对象拉取消息
将消息列表存入processQueue
remotingClient.invokeAsync异步拉取
调用MQClientAPIImpl异步拉取
调用DefaultMQPushConsumerImpl中创建的回调函数
没找到则先从NameServer拉取路由信息,再重新获取broker地址
0 条评论
下一页