消费者启动流程图
2024-12-31 13:50:17 1 举报
消费者启动流程图
作者其他创作
大纲/内容
设置消费者组名称
判断是顺序消息消费还是并发消息消费
否
生产者端设置TLS加密通信
关闭消费者实例服务
发送心跳,判断服务状态
设置消息偏移量
订阅Topic
拉取消息
构建主题订阅信息 SubscriptionData 并加入RebalanceImpl的订阅消息中
进行负载均衡
更新主题路由信息
检查消费者配置是否合法
设置AsyncTraceDispatcher的相关属性,包括消费者组、消息分批大小、主题名称等
结束
更新消费者实例服务状态为刚创建
是
设置消费者实例服务状态为启动失败
创建MQClient实例
消息跟踪调度器是否已初始化
开始
实例化消费者组名称
是否开启消息跟踪配置
启动消息跟踪调度器
消费进度存储
是否注册成功
负载均衡
以org.apache.rocketmq.example.quickstart.Consumer代码为例
注册消费者实例
健康
启动消费者实例
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
指定NameServer地址
启动消息消费的具体实现逻辑
org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#start
判断消费者实例服务状态
刚创建
a:校验GroupNameb:校验消费模式:集群/广播c:校验ConsumeFromWhered:校验开始消费的指定时间e:校验AllocateMessageQueueStrategyf: 校验订阅关系g:校验是否注册消息监听h:校验消费线程数l: 校验单次拉取的最大消息数i: 校验单次消费的最大消息数
注册回调实现类来处理从broker拉取回来的消息
加载消费进度
检查在Broker上的状态
注册一个消息消费钩子ConsumeMessageTraceHookImpl,用于在消息消费时触发跟踪逻辑
MQClientInstance启动
消息消费服务启动
运行中、已关闭、启动失败
0 条评论
下一页