rocketmq 消费消息
2023-10-19 17:32:17 3 举报
rocketmq 消费消息流程
作者其他创作
大纲/内容
font color=\"#323232\
构建 MessageFilter 主要是支持过滤消息,通过sql或者tag进行实现
OffsetStore#load如果是广播模式则从本地文件中加载
MQClientInstance#updateTopicRouteInfoFromNameServer请求 ns 获取 topic 信息
设置 opaque ,保证请求和返回是同一个
DefaultMQPushConsumerImpl#checkConfig校验各种配置
DefaultMessageStore#getMessage从 commitLog 中读取消息
PullAPIWrapper#processPullResult处理返回的消息
RemoteBrokerOffsetStoreoffset保存到broker中
MessageListenerOrderly#consumeMessageMessageListenerOrderly#consumeMessage调用客户端实现的接口,消费数据
各种校验如,是否有权限,topic是否存在
PullMessageService#run开启线程
MQClientAPIImpl#pullMessageAsync
PullMessageProcessor#processRequest在启动是会注册一个处理器,处理拉取的请求
FilterMessageHook#filterMessage
MessageDecoder#decodesBatch解码消息
PullCallback#onSuccess异步消息的回调方法
.MQClientAPIImpl#pullMessage发送请求
所有消息都按照一定的格式存放,消息体里面通过版本号,sysFlag来决定消息属性的长度
ConsumeMessageService#submitConsumeRequest提交消费请求,有不同的实现类,取决于客户端的调用方式
MQClientInstance#findBrokerAddressInSubscribe存缓存里面找
MessageDecoder#decode
PullAPIWrapper#pullKernelImpl去broker拉取消息
PullAPIWrapper#executeHook调用过滤器过滤消息
LocalFileOffsetStore当使用广播模式时offset保存在本地
DefaultMQPushConsumerImpl#pullMessage
CommitLog#getMessage从文件的偏移量拉取数据
GetMessageResult#addMessage把消息放到返回对象上
RemotingClient#invokeAsync发送请求
DefaultMQPushConsumerImpl#start开始消费数据
通过SubscriptionData里面设置的 Tag 过滤消息
.MQPushConsumer#start
获取offsetStore
ConsumeQueueInterface#iterateFrom搞个迭代器,把topic的位置啥的映射到 commitLog 文件上。就是说按照topic的index的顺序读取commitLog文件
.DefaultMessageStore#getMessageAsync
0 条评论
回复 删除
下一页