消费者生产者分析
2021-01-15 21:45:35 0 举报
rocketMQ
作者其他创作
大纲/内容
RemotingCommand
consumeRequestCacheBlockingQueue<ConsumeRequest>in DefaultLitePullConsumerImpl
返回
ConsumeRequest
dispatchPullRequest(pullRequestList)
MessageExt
拉取任务的执行
onSuccess
异步调用直接返回
HAService
RebalanceImpl
Message
ExecutorService
ResponseFutrue#wait()
Broker端处理发送消息组件
long (8)
put(pullRequest)
pullMessage
PullCallback
schedule
messageQueueChanged
operationComplete
DefaultMessageStore
NettyRemotingClient
doRebalance
消费者外/内部客户端拉取消息以及返回消息
scheduledExecutorService
MessageQueue
writeAndFlush
executeInvokeCallback
ResponseFuture
body
更新
网络发送出去Netty的异步发送
更新到
负载均衡服务线程消息队列客户端实例中的负载均衡机制
RebalanceLitePullImpl
MessageQueueListenerImpl
监控
PullTaskImpl
HAConnection
run
MQClientAPIImpl
放入
DefaultLitePullConsumer
DefaultLitePullConsumerImpl
scheduledExecutorServicein MQClientInstance
pullKernelImpl
GroupTransferService
AcceptSocketService
send(msg)
get(opaque)从缓存表中获取
List<MessageExt>
MQClientInstance
NettyRemotingClient
InvokeCallback
放进
内部隐藏
SocketChannel轮询OP_WRITE
updateOffset
send
run()
poll
checkMsgsetTopic
RemoteBrokerOffsetStore
bodySize
PullResult
setResponseCommand(cmd)
监听回调机制,如果该消费者的消费队列发生变化,就需要回调
PullAPIWrapper
创建
负载均衡组件
RebalanceService
invokeAsync
DefaultMQProducer#start()调用生产者内部的start()
Set<MessageQueue> mqSet
ConsumerManageProcessor#updateConsumerOffset
processConsumeResult
UPDATE_CONSUMER_OFFSET
updateConsumeOffsetToBroker
同步调用
SendCallback
ConsumerManageProcessor#processRequest
submit
6 获取消息
channelRead0
DefaultMQProducerImpl
take()
ConsumeMessageConcurrentlyService
processResponseCommand
1 请求与master建立连接
HAClient
9 向从节点同步消息
8 向从节点同步消息
offsetTable
NettyRemotingAbstract
pullRequestQueue
定时任务 in BrokerController
sendDefaultImpl
executePullRequestImmediately(pullRequest)
定时任务不断的更新订阅主题的路由信息
responseTable
被放进
RemotingClient
处理消费者更新消费偏移量的请求和更新偏移量到内存的过程
DefaultMQPushConsumerImpl
主Broker
生成
Channel
Request
4 更新偏移量
scheduledThreadPoolExecutor线程池
DefaultMQProducer
更新主题路由的机制
10 从节点保存同步消息化
int (4)
3 从节点向发送消息偏移量
startPullTask
ConsumerOffsetManager#persist()
offset
consumeExecutor
ConsumerOffsetManager#commitOffset
sendKernelImpl
updateTopicRouteInfoFromNameServer
updateConsumerOffsetOneway
DefaultMQProducerImp#start()DefaultMQProducerImp#start(true)1 检查生产者的组名2 设置实例名称:当生产者组名等于默认的组名的时候则不进行更改,否则进行下一步操作;当实例名为Default的时候,则将实例名更改为进程ID,这里就保证了一个JVM里面就只存在一个实例。要么由使用者显示的进行管理,要么就设置为进程ID,能够保证唯一。Instance只有一个节约资源。3
PullAPIWrapper
consumeMessage
DefaultMQProducerImpl
pullMessage(pullRequest)
slaveAckOffset
PullMessageService
pullRequest
updateTopicSubscribeInfo
updatePullTask
submitConsumeRequest
MQClientAPIImpl
2 主节点接受连接并创建爱网络通道
取出
persistConsumerOffset
processMessageReceived
put
从Broker
MasterPhyOffset
ReadSocketService
重新平衡后
pullMessageAsync
获取
persistAllConsumerOffset
WriteSocketService
PullMessageRequestHeader
rebalanceByTopic
persistAll
询问是否已经同步了?
5 更新偏移量
消费者内部客户端创建拉取消息任务
MessageListenerConcurrently
invokeAsyncImpl
7 返回消息
返回同步结果
SocketChannel轮询OP_READ
获取mq对应的offset
NettyClientHandler
将偏移量持久化到磁盘
0 条评论
回复 删除
下一页