rocketmq-go-client整理
2023-06-29 16:51:59 0 举报
pushConsumer实现整理
作者其他创作
大纲/内容
remoteClient
结束
closeOnce
cachedMsgCount
ProcessSendResponse()
否
处理成功/提交
消息处理结果
MassgeQueue
实例化pushConsumer
mqChanged
offset的span流控次数font color=\"#323232\
tcp链接和操作等信息
启动协程for循环一直执行pc.consumeMessageOrderly pc.consumeMessageConcurrently
处理成功
是
takeMessages
pq.removeMessage(msgs.)
topic
检查数据有效性
拉到消息
不合法offset
subscriptionDataTable
cType
subString
Start
RegisterConsumer()
topicSubscribeInfoTable
执行业务回调
触发改queue的写入还会触发queue的Drop
pc.consumeInner(msgs)
初始化客户端
是否done/Drop则退出
Topic
真正处理消费消息的函数
cachedMsgSize
更新offset
time.sleep
-
group最大255,无特殊字符是否和系统定义组重复是否订阅topic参数设置是否合适(最大值和最小值检查),设置默认值
pushConsumer
offsetId
是否到最大重试次数
为了控制协程数目pc.crCh[mq.Topic] <- struct{}{}
model
map[topic]SubscriptionData
broker健康检测进行一次reblance
从msgCache获取消息
缓存消息数目是否大于PullThresholdForQueue
Shutdown
是否发送成功
把消息存储到msgCache
codeSet
submitToConsume
获取订阅信息:subscriptionDataTable
初始化配置
len(msg)>0
生产的信息
更新域名的ip(每2min执行一次)
启动协程通过prCh消费queue拉消息请求
是否广播
map[topic]massgeQueue
pq.commit()
classFilterMode
提交的offset
消息结构
版本
是否需要sleep进行sleep
storage
push/pull
msgCache
ReconsumeTimes
消息大小
消费锁,有序消息使用
processQueueTable
是否有变更
设置重试topic消费协程数
makeMessageToCosumeAgain
ReqConsumerSendMsgBack
消息拦截器
打印日志
chan []*primitive.MessageExt -- 长度32
暂停终止
pc.pullMessage(processQueue)
MaxSpan是否大于ConsumeConcurrentlyMaxSpan
consumerGroup
初始化异步任务对排序模式加全局锁
缓存消息内存大小
获取本地消息pq.takeMessages(batchSize)
pq.updateOffset()
PullMessage()
topic/retryTopicconsumeFunc业务回调函数
判断是否大于最大重试次数
defaultConsumer
processQueue
消费组名称
SubscriptionData
consumerMap
启动协程并发执行批量消费option.ConsumeMessageBatchMaxSize
MsgId
初始化失败重试topic并初始化消费客户端
删除consumingMsgOrderlyTreeMap中消息
queueId
rmqClient
crCh、subscriptionDataTable、subscribedTopic、consumeFunc
把消息发送给broker到延迟队列
commit
client
消息模式:广播,竞争
回滚
重新使消息被消费到
massgeQueue
UpdateTopicRouteInfo()
初始化消息拦截器:interceptor
producerMap
consumeLock
上面是否成立
Start()
存消息[treemap.Map]
事务的offset
option
更新本地queue的offet到broker(每5s执行一次)
prCh
清理该map
消息写入retryTopic的queue里
从broker拉消息client.PullMessage
expressionType
*defaultConsumer
更新路由信息订阅、queue数量
做一些tag、sql等过滤配置
消息ID
消费组维度
更新路由信息[topic-queue和broker](每30s执行一次)
竞争模式
rmq链接客户端
消费失败重试次数
getMessage
启动消费客户端(rmqClient) Start()
InvokeAsync()
消息写入死信toipc的queue里
缓存消息内存是否大于PullThresholdSizeForQueue
启动消费对象
是否消费完
CommitLogOffset
rocketmq内部逻辑
消费组启生命周期过程
pg.makeMessageToCosumeAgain()
QueueOffset
Shutdown()
订阅的topic
没拉到/全过滤
tag的集合
Message
tagsSet
过滤信息
removeMessage
PreparedTransactionOffset
执行消息tag过滤,hook过滤
把该consumer注册到:font color=\"#323232\
subVersion
queueFlowControlTimes
心跳检测,是否有变更
是否有序
chan pullRequest
域名解析、初始化:client
TransactionId
consumeOrderly
stat
crCh
consumeMessageConcurrentlyfont color=\"#323232\
创建retryTopic
是否消费成功
有序消息[treemap.Map]
consumeFunc
把消息推到processQueue.msgCh
Body
统计信息
cleanExpiredMsg
ClientOptions:font color=\"#323232\
NewPushConsumer
pq.加锁
回滚/提交
putMessage
pullFromWhichNodeTable
设置订阅topic和回调业务函数
MessageExt
brokerName
interceptor
写入该mapconsumingMsgOrderlyTreeMap
RebalanceIfNotPaused()
msgCh
dc.stat = NewStatsManager()统计信息初始化
queueLock
InvokeSync()
pq.getMessages()获取消息
consumingMsgOrderlyTreeMap
是否自动提交
Subscribe
存储客户端queue的offset信息
msgCh中获取消息
map[massgeQueue]processQueue
StoreSize
消费组信息map[group]pushConsumer
processQueue->putMessage
是否存储retryTopic
queueMaxSpanFlowControlTimes
subscribedTopic
执行
广播还是竞争
创建死信队列topic
OffsetMsgId
是否需要设置为重试topic
...
消息重新写入msgCache
当queue有变更时,会把消息写到这个chan,启动协程去一直消费这个queue,直到下次该queue被删除
Massge
执行rebalance(每20s执行一次)
开始无限for循环
启动异步任务清理超时消息
初始化默认消费配置:option
缓存消息数
有序消费才会调用
配置项
消费queue的信息
消费组的queue维度
启动协程从chan里获取消息
广播
设置拉取消息函数:submitToConsume
打印warning
是否存在死信队列
1、重试topic是自动创建的2、死信队列也是自动创建的,只是我们没有消费
订阅的业务回调函数,底层是map[topic]func
RegisterProducer()
queue的offset
表达式类型:tag、sql
done
0 条评论
回复 删除
下一页