消息消费流程
2021-08-25 10:34:41 3 举报
并发消费,顺序消费,broker端处理拉取消息流程等
作者其他创作
大纲/内容
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) && !this.processQueue.isLocked()) { log.warn(\
构建请求头PullMessageRequestHeader
invokeAsyncImpl
找到消息:1:设置下一次拉取的消息偏移量参数2:统计拉取RT DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT3:如果经过过滤之后的消息为空,则立马再拉取一次 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest)4:将拉取的消息放入到processQueue processQueue.putMessage(pullResult.getMsgFoundList())5:开启线程异步消费ProcessQueue中的数据 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest6:进行下一次数据拉取
this.mQClientFactory.getMQClientAPIImpl()pullMessage
在执行消费消息之前,还有如下操作:1:listener = ConsumeMessageConcurrentlyService.this.messageListener2:context = new ConsumeConcurrentlyContext(messageQueue);3:ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs); 如果topic是%retry%开始的,需要还原成原始topic4:listener.consumeMessage 业务系统自己消费逻辑
while(true)
LocalFileOffsetStore
END
pullMessageService.start
1、font color=\"#d32f2f\
把pullRequest放入ManyPullRequest
processQueue.isLocked()
并发消费场景
processQueue.getMaxSpan > consumeConcurrentlyMaxSpan
此处put请求到queue中了,前面pullMessageService.start阻塞在那的线程,才可以继续往下走了
如果支持挂起,则将response设为null,netty服务端将不会立即向netty客户端写入响应——channel.writeAndFlush
OffsetStoreupdateOffset
defaultMQPushConsumerchangeInstanceNameToPID
从本地记录的offset.json文件中获取进度
ProcessQueueremoveMessage
每隔15分钟清理超过15分钟都没消费的消息
lockMQPeriodically
this.brokerController.getConsumerOffsetManager().queryOffset
线程run
获取消费锁,然后执行业务系统自身的逻辑
isTransferMsgByHeaptrue(默认):从堆外内存copy回jvm内存一次,然后将byteBuffer的字节数组塞到body种false:直接构建FileRegion对象,通过channel直接发送出去,少一次copy了
startScheduledTask
load
ConsumeMessageOrderlyService.this.processConsumeResult
defaultMQProducer.getDefaultMQProducerImpl().start
mQClientFactoryrebalanceImmediately
1、判断拉取回来的数据是否超过1000条都没有消费了2、判断拉取回来的数据是否超过100MB都没有消费了如果是促发流控,过50ms再来拉数据
CONSUME_SUCCESS
updateProcessQueueTableInRebalance
cachedMessageCount>pullThresholdForQueuecachedMessageSizeInMiB>pullThresholdSizeForQueue
registerMessageListener
NettyClientHandlerprocessMessageReceived
成功就略过失败的话,把消息逐条发送给broker
BROADCASTING||processQueue.isLocked
consumeMessageService
一样了
defaultMQPushConsumerImpl.subscribe
过3秒再来拉取消息executePullRequestLater
顺序
mqClientFactory.doRebalance
commitOffsetValue
广播模式或者有未过期的锁
notifyMessageArriving
广播
RequestHoldServicesuspendPullRequest
NettyRemotingClientinvokeAsync
死循环消费
this.submitConsumeRequestLater
new DefaultMQPushConsumerImpl
RECONSUME_LATER
MQClientManagermQClientFactory=getAndCreateMQClientInstance
this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset
processQueue没加锁
构建PullCallback
run
支持挂起的情况下
RemoteBrokerOffsetStorefetchConsumeOffsetFromBroker
ConsumeMessageOrderlyService
this.rebalanceImpl.computePullFromWherepullRequest.setNextOffset(offset)
判断消费模式
Remote模式
挂起请求
FOUND
pullAPIWrapperprocessPullResult
并发
ConsumerManageProcessorqueryConsumerOffset
broker查询满足条件的数据
1、4.1.0以上才可以使用SQL92过滤2、如果是slave,则清除commit offset标记
mQClientAPIImpl.start
PULL_OFFSET_MOVED
DefaultMQPushConsumerImpldoRebalance
processQueue加锁
MessageListenerOrderly
获取borker地址信息
rebalanceImpl.setConsumerGrouprebalanceImpl.setMessageModelrebalanceImpl.setAllocateMessageQueueStrategyrebalanceImpl.setmQClientFactory
PullMessageProcessorexecuteRequestWhenWakeup
response赋值
defaultMQPushConsumerImplcheckConfig
processQueuetakeMessags
rebalanceImpl.getSubscriptionInner().get
集群模式,如果没锁了或者锁过期了延迟10ms再消费
RemoteBrokerOffsetStorereadOffset
mQClientFactoryregisterConsumer
CLUSTERING
广播消费的时候,进度是存本地的,这个地方就是更新下本地内存中记录的消息进度offsetTable.putIfAbsent每5s会持久化到本地文件中
messageListener.consumeMessage
实例化offsetStore
PullCallbackonSuccess
继续
mQClientFactorystart
到broker端拉取数据
DefaultMQPushConsumer::new
成功
response=null
1、确保当前状态是runnibg,否则默认3秒后再来拉取2、判断当前consumer是否被暂停拉取数据了,如果是1s后再来拉取
有新消息了,并且匹配
defaultMQPushConsumerImplcopySubscription
RebalanceServicerun
很重要:处理返回的ACK
subscribe(\"topic\
构建PullRequest
OFFSET_ILLEGAL
check
synchronized (objLock)
RebalancePushImpldispatchPullRequest
记录成功和失败消息的数量this.getConsumerStatsManager().incConsumeOKTPSthis.getConsumerStatsManager().incConsumeFailedTPS
start
顺序消费场景
InvokeCallbackoperationComplete
构建MessageFilter
pullRequest.setNextOffset(pullResult.getNextBeginOffset())DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);1:设置下次拉取的进度2:offsetTable中修改进度3:重新拉取一次
SUSPEND_CURRENT_QUEUE_A_MOMENT
mQClientFactorysendHeartbeatToAllBrokerWithLock
拉取结果
channel.writeAndFlush
发生给broker
RemoteBrokerOffsetStore
updateOffset
能获取到锁,就是在这里加的
ackIndex置为-1,为后面发回broker准备记录失败的消息数量this.getConsumerStatsManager().incConsumeFailedTPS
成功就略过失败的话打印下每条消息的错误记录
ResponseFutureexecuteInvokeCallback
ConsumeMessageConcurrentlyService
processRequest
RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
如果消费模式为集群模式默认每隔20s执行一次锁定分配给自己的消息消费队列
this.pullAPIWrapper.pullKernelImpl
checkHoldRequest
incGroupGetNumsincGroupGetSizeincBrokerGetNums
构建返回的结果对象
this.pullRequestQueue.take()pullMessage
this.defaultMQPushConsumerImplexecutePullRequestImmediately(pullRequest)
集群模式下,把内存中的消费进度提交给broker
PullAPIWrapperfindBrokerAddressInSubscribe
统计信息修改
SUCCESS
mQClientFactorycheckClientInBroker
默认最多65535个异步请求同时拉取消息
PullMessageProcessorprocessRequest
dispatchPullRequest
BROADCASTING
Local模式
rebalanceByTopic
pullAPIWrappernew PullAPIWrapper
NO_NEW_MSGNO_MATCHED_MSG
PULL_NOT_FOUND
cleanExpireMsg
ConsumeMessageConcurrentlyServicesubmitConsumeRequest
重试消息的起点
1:构建response、responseHeader2:获取requetHeader3:判断当前broker是否可读4:根据requestHeader中的consumerGroup来获取其订阅信息SubscriptionGroupConfig,如果group不存在,默认回自动创建一个5:根据请求参数,获取一些请求flag: a: hasSuspendFlag ---消息是否支持被挂起 b: hasCommitOffsetFlag --- 是否从内存中读取消费进度 c: hasSubscriptionFlag --- 消息过滤机制的支持,天然就是true6: topic和queue做一些基本校验、TagType的一些校验等等
DefaultMQPushConsumerImplpullMessage
有一把写锁,来保证取消息的顺序把要消费的消息从msgTreeMap移动到consumingMsgOrderlyTreeMap
LocalFileOffsetStorereadOffset
这个方法有两处被调用:1、其中一处是PullRequestHoldService中的定时任务,每隔 一段时间就会去查啊看是否有新消息到来2、doReput;也就是broker收到消息,异步构建ConsumeQueue、IndexFile的过程
先获取borker的ip和端口,再去服务端拉数据
this.semaphoreAsync.tryAcquire
ConsumeMessageOrderlyServicesubmitConsumeRequest
ConsumeMessageConcurrentlyService.this.processConsumeResult
this.brokerController.getMessageStore().getMinOffsetInQueue
rebalanceService.start
并发消费的时候,如果maxSpan>2000,则触发流控,50s后再来看看maxSpan并不是消息的数量,而是processQueue内部treeMap中首尾消息的偏移量差值;这个偏移量,是在consumeQueue文件的逻辑offset
1、incConsumeFailedTPS2、checkReconsumeTimes,检查重试次数,如果小于16次,可以重新消费:makeMessageToCosumeAgainsubmitConsumeRequestLater3:否则进死信队列了(会sendMessageBack)
套娃了,又到了broker处理拉取消息的流程了这次brokerAllowSuspend=false了,首次默认是true的
集群
结果为-1
DefaultMessageStoregetMessage
MQClientAPIImplprocessPullResponse
获取当前messageQueue的锁
tryLockLaterAndReconsume
broker端只是根据tagCode来过滤数据了,可能有hash冲突consumer端进行二次过滤,这次是名称equals比较
把消费成功的消息从processQueue中删除
makeSureStateOKisPause
失败
listener.consumeMessage
获取返回的对象的字节数组response.setBody(r)
defaultMQPushConsumerImplstart
this.rebalanceService.wakeup()
RemoteBrokerOffsetStorefindBrokerAddressInAdmin
设置nextBeginOffset、minOffset、maxOffset如果master太忙,设置从salve拉取信息
PullMessageServicethis.pullRequestQueue.put(pullRequest)
PullResultExt
RebalanceImpldoRebalance
font color=\"#1976d2\
如果sendMessageBack失败,则将失败的消息存入新list,延迟5S再消费
MessageListenerConcurrently
updateTopicSubscribeInfoWhenSubscriptionChanged
收藏
收藏
0 条评论
下一页