RocketMQ源码以及关键流程分析
2024-08-01 18:37:17 9 举报
AI智能生成
"RocketMQ是一款开源的消息中间件,支持高并发、高可用、高可靠、低延迟等特性。源码分析主要关注关键流程,如消息的生产、存储、消费、事务等。消息的生产过程包括生产者将消息发送到Broker,Broker存储消息,Consumer从Broker获取消息。存储环节涉及到消息的分发策略、消息刷盘、索引文件构建等。消费环节包括Push和Pull模型,Pull模型中Consumer主动拉取消息。事务消息涉及到本地事务和分布式事务的处理。RocketMQ提供了多种配置和优化策略,以满足不同场景的需求。"
作者其他创作
大纲/内容
消费进度保存机制
消费者启动时会同时启动位点管理器,RocketMQ设计了远程位点管理和本地位点管理
两种位点管理方式.
集群消费时,位点由客户端提交给Broker保存.
广播消费时,位点保存在消费者本地磁盘上
两种位点管理方式.
集群消费时,位点由客户端提交给Broker保存.
广播消费时,位点保存在消费者本地磁盘上
OffsetStore接口核心方法
void load():加载位点信息
void updateOffset():更新缓存位点信息
long readOffset():读取本地位点信息
void persistAll():持久化全部队列的位点信息
void persist():持久化某一个队列的位点信息
void remove():删除某一个队列的位点信息
Map<MessageQueue,Long> cloneOffsetTable():复制一份缓存位点信息
void updateConsumeOffsetToBroker():将本地消费位点持久化到Broker中
客户端消费进度保存也叫消费进度持久化,RocketMQ4.2.0支持定时持久化和不定时持久化两种方式
定时持久化位点org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask,
定时持久化位点逻辑是通过定时任务来实现的,在启动程序10s后,会定时调用持久化方法,
MQClientInstance.this.persistAllConsumerOffset(),持久化每一个消费者消费的每一个MessageQueue的进度
定时持久化位点逻辑是通过定时任务来实现的,在启动程序10s后,会定时调用持久化方法,
MQClientInstance.this.persistAllConsumerOffset(),持久化每一个消费者消费的每一个MessageQueue的进度
不定时持久化也叫Pull-And-Commit,也就是在执行Pull方法同时,把队列最新消费位点信息发给Broker,
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage(),该方法有两处持久化位点消息
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage(),该方法有两处持久化位点消息
1.在拉取完成后,如果拉取位点非法,则此时客户端会主动提交一次最新的消费位点信息给Broker,以便下次能使用正确的位点拉取消息
2.在执行消息拉取动作时,如果是集群消费,并且本地位点值大于0,那么把最新的位点上传给Broker,
代码中通过commitOffsetEnable、sysFlag两个字段表示是否可以上报消费位点给Broker,在执行Pull
请求时,将sysFlag作为网络请求的消息头传递给Broker,Broker中处理该字段的逻辑在
代码中通过commitOffsetEnable、sysFlag两个字段表示是否可以上报消费位点给Broker,在执行Pull
请求时,将sysFlag作为网络请求的消息头传递给Broker,Broker中处理该字段的逻辑在
以上Broker处理代码中有3个核心变量.
hasCommitOffsetFlag:Pull请求中的sysFlag参数是决定Broker是否执行持久化消费位点的一个因素.
brokerAllowSuspend:Broker是否能挂起。如果Broker是挂起状态,将不能持久化位点。
storeOffsetEnable:true表示Broker需要持久化消费位点,false则不用持久化位点
hasCommitOffsetFlag:Pull请求中的sysFlag参数是决定Broker是否执行持久化消费位点的一个因素.
brokerAllowSuspend:Broker是否能挂起。如果Broker是挂起状态,将不能持久化位点。
storeOffsetEnable:true表示Broker需要持久化消费位点,false则不用持久化位点
3.还有一种持久化位点的机制,那就是消费者在关闭时持久化位点信息,
以Push消费者程序关闭为例,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#shutdown(long)
以Push消费者程序关闭为例,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#shutdown(long)
实现过程是从Rebalance服务中获取全部消费的队列信息,再调用persistAll()方法持久化全部队列的位点信息
理论上位点信息越是及时上报Broker,越能减少消息重复的可能性,RocketMQ再设计时并不完全支持Exactly-Once类型的幂等语义,
因为实现该语义的代价颇大,并且使用该场景极少,再加上用户侧实现幂等的代价更小,故而RocketMQ在设计时将幂等操作交由用户处理
因为实现该语义的代价颇大,并且使用该场景极少,再加上用户侧实现幂等的代价更小,故而RocketMQ在设计时将幂等操作交由用户处理
消费者的Rebalance机制
客户端是通过Rebalance服务做到高可靠的。当发生Broker掉线、消费者实例掉线、
Topic扩容等各种突发情况时,消费者组中的消费者实例是怎么重平衡的,以支持全部队列的
正常消费的?
Topic扩容等各种突发情况时,消费者组中的消费者实例是怎么重平衡的,以支持全部队列的
正常消费的?
Rebalance服务的类图
RebalanceImpl的核心属性
ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable:记录MessageQueue和ProcessQueue的关系,
MessageQueue可以简单地理解为ConsumeQueue的客户端实现;ProcessQueue是保存Pull消息的本地容器
MessageQueue可以简单地理解为ConsumeQueue的客户端实现;ProcessQueue是保存Pull消息的本地容器
ConcurrentMap<String, Set<MessageQueue>> topicSubscribeInfoTable:Topic路由信息。保存Topic和MessageQueue的关系
ConcurrentMap<String(Topic), SubscriptionData> subscriptionInner:真正的订阅关系,保存当前消费者组订阅了哪些Topic的哪些Tag
AllocateMessageQueueStrategy allocateMessageQueueStrategy:MessageQUeue消息分配策略的实现
MQClientInstance mQClientFactory:client实例对象
RebalanceImpl的核心方法
boolean lock():为MessageQueue加锁
void doRebalance():执行Rebalance操作
void messageQueueChanged():通知Message发生变化,这个方法在Push和Pull两个类中被重写
boolean removeUnnecessaryMessageQueue():去掉不再需要的MessageQueue
void dispatchPullRequest():执行消息拉取请求
boolean updateProcessQueueTableInRebalance():在Rebalance中更新processQueue
Rebalance过程
消费者实例在收到Broker通知后是怎么执行Reblance的?这个操作是通过调用
MQClientInstance.rebalanceImmediately()来实现的
MQClientInstance.rebalanceImmediately()来实现的
这种设计是RocketMQ种典型的锁方式,执行wakeup命令后,this.waitForRunning()就会暂停,
再执行this.mqClientFactory.doRebalance()
再执行this.mqClientFactory.doRebalance()
doRebalance()方法主要有以下几个步骤
1.查找当前clientId对应的全部的消费者组,全部执行一次Rebalance.
虽然消费者实现分别为Pull消费和Push消费两种默认实现,调用的是不同实现类的Rebalance方法,
但是实现逻辑都差不多
1.查找当前clientId对应的全部的消费者组,全部执行一次Rebalance.
虽然消费者实现分别为Pull消费和Push消费两种默认实现,调用的是不同实现类的Rebalance方法,
但是实现逻辑都差不多
2.判断Rebalance开关,如果没有被暂停,则调用RebalancePushImpl.rebalance()方法
3.在RebalancePushImpl.rebalance()方法中,获取当前消费者全部订阅关系中的Topic,
循环对每个Topic进行Rebalance.待全部的Rebalance都执行完之后,将不属于当前
消费者的队列删除
循环对每个Topic进行Rebalance.待全部的Rebalance都执行完之后,将不属于当前
消费者的队列删除
4.Topic队列重新分配,这里也就是客户端Rebalance的核心逻辑之处,根据是集群消费还是广播消费分别执行
MessageQueue重新分配的逻辑,以集群消费为例分析
MessageQueue重新分配的逻辑,以集群消费为例分析
1.获取当前Topic的全部MessageQueue(代码中是mqSet)和该Topic的所有消费者的clientId(代码中是cidAll)
只有当两者都不为空时,才执行Rebalance
只有当两者都不为空时,才执行Rebalance
2.将全部的MessageQueue(代码中时mqAll)和消费者客户端(cidAll)进行排序。
由于不是所有消费者的客户端都能彼此通信,所以将mqAll和cidAll排序的目的在于,
保证所有消费者客户端在做Rebalance的时候,看到的MessageQueue列表和消费者
客户端都是一样的试图,做Rebalance时才不会分配错
由于不是所有消费者的客户端都能彼此通信,所以将mqAll和cidAll排序的目的在于,
保证所有消费者客户端在做Rebalance的时候,看到的MessageQueue列表和消费者
客户端都是一样的试图,做Rebalance时才不会分配错
3.按照当前设置的队列分配策略执行Queue分配。队列分配策略接口AllocateMessageQueueStrategy,
该接口中,有两个方法allocate()和getName()
该接口中,有两个方法allocate()和getName()
allocate():执行队列分配操作,该方法必须满足全部队列都能分配到消费者
getName():获取当前分配算法的名字
目前队列分配策略有五种实现:
AllocateMessageQueueAveragely:平均分配,也就是默认使用的策略(强烈推荐)
AllocateMessageQueueAveragelyByCircle:环形分配策略
AllocateMessageQueueByConfig:手动配置
AllocateMessageQueueConsistentHash:一致性Hash分配
AllocateMessageQueueByMachineRoom:机房分配策略
AllocateMessageQueueAveragely:平均分配,也就是默认使用的策略(强烈推荐)
AllocateMessageQueueAveragelyByCircle:环形分配策略
AllocateMessageQueueByConfig:手动配置
AllocateMessageQueueConsistentHash:一致性Hash分配
AllocateMessageQueueByMachineRoom:机房分配策略
4.动态更新ProcessQueue,在队列重新分配后,当前消费者消费的队列可能不会发生变化,
也可能发生变化,不管时新增加了队列需要消费,还是减少了队列,都需要执行
updateProcessQueueTableInRebalance()方法来更新ProcessQueue,如果有MessageQueue
不再分配给当前的消费者消费,则设置ProcessQueue.setDropped(true),表示放其当前MessageQueue的
Pull消息,
也可能发生变化,不管时新增加了队列需要消费,还是减少了队列,都需要执行
updateProcessQueueTableInRebalance()方法来更新ProcessQueue,如果有MessageQueue
不再分配给当前的消费者消费,则设置ProcessQueue.setDropped(true),表示放其当前MessageQueue的
Pull消息,
如果在重新分配MessageQueue后,新增加了MessageQueue,
则添加一个对应的ProcessQueue,查询Queue拉取位点,包装一个新的PullRequest
来Pull消息;同理如果减少了MessageQueue,则将其对应的ProcessQueue删除,
不管MessageQueue时新增还是减少,都会设置changed为True,表示当前消费者
消费的MessageQueue有变化,,源码中是分别两个集合遍历来判断是新增还是减少的。
则添加一个对应的ProcessQueue,查询Queue拉取位点,包装一个新的PullRequest
来Pull消息;同理如果减少了MessageQueue,则将其对应的ProcessQueue删除,
不管MessageQueue时新增还是减少,都会设置changed为True,表示当前消费者
消费的MessageQueue有变化,,源码中是分别两个集合遍历来判断是新增还是减少的。
PullRequest初始化的具体实现,新增的PullRequest对象将被分配出去拉取MessageQueue中的消息。
5.执行messageQueueChanged()方法,如果有MessageQueue订阅关系发生变化,
则更新本地订阅关系版本,修改本地消费者有限流的一些参数,然后发送心跳,
通知所有Broker,当前订阅关系发生了改变
则更新本地订阅关系版本,修改本地消费者有限流的一些参数,然后发送心跳,
通知所有Broker,当前订阅关系发生了改变
消息过滤
RocketMQ设计了消息过滤,来解决大量无意义流量的传输:即对于客户端不需要的消息,
Broker就不会传输给客户端,以免浪费宽带,RocketMQ4.2.0支持Tag过滤、SQL92过滤、Filter Server过滤
Broker就不会传输给客户端,以免浪费宽带,RocketMQ4.2.0支持Tag过滤、SQL92过滤、Filter Server过滤
Tag过滤流程
第一步:用户发送一个带Tag的消息
第二步:用户订阅一个Topic的Tag,RocketMQ Broker会保存订阅关系
第三步:在Broker端做Tag过滤。消费者在Pull消息时,RocketMQ Broker会根据Tag的HasCode进行对比,
如果不满足条件,消息不会返回给消费者,以节约带宽
也许你们会问,为什么不直接用字符串进行对比和过滤呢?原因是HashCode对比存在Hash碰撞而导致过滤失败,
字符串比较的速度相较HashCode慢。HashCode对比是数字比较,Java底层可以直接通过位运算进行对比,
而字符串对比需要按照字符顺序比较,相比位运算更加耗时。由于HashCode对比有Hash碰撞的危险,所以才
引出第四步
如果不满足条件,消息不会返回给消费者,以节约带宽
也许你们会问,为什么不直接用字符串进行对比和过滤呢?原因是HashCode对比存在Hash碰撞而导致过滤失败,
字符串比较的速度相较HashCode慢。HashCode对比是数字比较,Java底层可以直接通过位运算进行对比,
而字符串对比需要按照字符顺序比较,相比位运算更加耗时。由于HashCode对比有Hash碰撞的危险,所以才
引出第四步
第四步:客户端Tag过滤。Hash碰撞相信大家都有所了解,就是不同的Tag计算出来的Hash值可能是一样的,
在这种情况下过滤的消息是错误的,所以RocketMQ设计了客户端字符串对比功能,用来做第二次Tag过滤
在这种情况下过滤的消息是错误的,所以RocketMQ设计了客户端字符串对比功能,用来做第二次Tag过滤
Tag过滤为什么设计成Broker端使用Hash过滤,而客户端使用Tag字符串进行对比过滤呢?
Broker端使用Hash过滤可以快速过滤海量消息,即使偶尔有"漏网之鱼",在客户端字符串
过滤后也能被成功过滤。这种层次设计 的过滤方式在做系统时可以参考
Broker端使用Hash过滤可以快速过滤海量消息,即使偶尔有"漏网之鱼",在客户端字符串
过滤后也能被成功过滤。这种层次设计 的过滤方式在做系统时可以参考
SQL过滤流程
第一步:消费订阅Topic,上传过滤SQL语句,RocketMQ Broker编译SQL保存
第二步:消费者Pull消息
第一次过滤:使用Bloom过滤器的isHit()方法做第一次过滤。Bloom过滤器效率高,但是也存在
缺陷,即只能判断不需要的消息,过滤后的消息也不保证都是需要消费的。
缺陷,即只能判断不需要的消息,过滤后的消息也不保证都是需要消费的。
第二次过滤:执行编译后的SQL方法evaluate()即可过滤出最终的结果
在使用SQL过滤前,需要在启动Broker时配置如下几个参数:
enableConsumeQueueExt=true
filterSupportRetry=true
enablePropertyFilter=true
enableCalcFilterBitMap=true
在使用SQL过滤前,需要在启动Broker时配置如下几个参数:
enableConsumeQueueExt=true
filterSupportRetry=true
enablePropertyFilter=true
enableCalcFilterBitMap=true
FilterServer过滤流程
这是一种不常用但是非常灵活的过滤方式,要使用Filter Server过滤必须在启动Broker时,添加如下配置:
filterServerNums=大于0的数字.这样就可以启动一个或多个过滤服务器,每个过滤服务在启动时会自动
注册到Namesrv中
filterServerNums=大于0的数字.这样就可以启动一个或多个过滤服务器,每个过滤服务在启动时会自动
注册到Namesrv中
第一步:用户消费者从Namesrv获取Topic路由信息,同时上传自定义的过滤器实现类源代码到FilterServer中,
FilterServer编译并实例化过滤器类
FilterServer编译并实例化过滤器类
第二步:用户发送拉取消息请求到FilterServer,FilterServer通过Pull consumer从Broker拉取消息,
执行过滤类中的过滤方法,返回过滤后的消息
执行过滤类中的过滤方法,返回过滤后的消息
事务消息
事务消息机制。
事务消息的发送和处理总结为四个过程:
1.生产者发送事务消息和执行本地事务
2.Broker存储事务消息
3.Broker回查事务消息
4.Broker提交或回滚事务消息
事务消息的发送和处理总结为四个过程:
1.生产者发送事务消息和执行本地事务
2.Broker存储事务消息
3.Broker回查事务消息
4.Broker提交或回滚事务消息
生产者发送事务消息和执行本地事务。
发送过程分为两个阶段:
第一阶段,发送事务消息
第二阶段,发送endTransaction消息
事务消息发送过程的实现类TransactionMQProducer,该类继承鱼DefaultMQProducer,
不仅能发送事务消息,还能发送其他消息。虽然4.2.0版本有事务消息代码,但实际是4.3.0
版本才全面支持事务消息。
发送过程分为两个阶段:
第一阶段,发送事务消息
第二阶段,发送endTransaction消息
事务消息发送过程的实现类TransactionMQProducer,该类继承鱼DefaultMQProducer,
不仅能发送事务消息,还能发送其他消息。虽然4.2.0版本有事务消息代码,但实际是4.3.0
版本才全面支持事务消息。
TransactionMQProducer的核心属性和方法:
transactionListener:事务监听器,主要功能是执行本地事务和执行事务回查。
事务监听器包含executeLocalTransaction()和checkLocalTransaction()两个方法。
executeLocalTransaction()方法执行本地事务,checkLocalTransaction()方法是
当生产者由于各种问题导致未发送Commit或Rollback消息给Broker时,Broker
回调生产者查询本地事务专改的处理方法
事务监听器包含executeLocalTransaction()和checkLocalTransaction()两个方法。
executeLocalTransaction()方法执行本地事务,checkLocalTransaction()方法是
当生产者由于各种问题导致未发送Commit或Rollback消息给Broker时,Broker
回调生产者查询本地事务专改的处理方法
executorService:Broker回查请求处理的线程池
start():事务消息生产者启动方法,与普通启动方法不同,增加了this.defaultMQProducerImpl.initTransactionEnv()
的调用,即增加了初始化事务消息的环境信息
的调用,即增加了初始化事务消息的环境信息
事务消息的环境初始化主要用于初始化Broker回查请求处理的线程池,
在初始化事务消息生产者时我们可以指定初始化对象,如果不指定初始化对象,
那么这里会初始化一个单线程的线程池
在初始化事务消息生产者时我们可以指定初始化对象,如果不指定初始化对象,
那么这里会初始化一个单线程的线程池
shutdown():关闭生产者,回收生产者资源。该方法时启动方法的逆过程,
功能时关闭生产者、销毁事务环境。销毁事务环境是指销毁事务回查线程池,
清楚回查任务队列
功能时关闭生产者、销毁事务环境。销毁事务环境是指销毁事务回查线程池,
清楚回查任务队列
生产者发送事务消息主要分为如下两个阶段:
1.发送Half消息的过程
2.发送Commit或Rollback消息
1.发送Half消息的过程
2.发送Commit或Rollback消息
发送Half消息的过程。
事务消息的发送是通过sendMessageInTransaction()方法来完成的
事务消息的发送是通过sendMessageInTransaction()方法来完成的
第一步,数据校验,判断TransactionListener的值是否为null、消息Topic为空检查、消息体为空检查等
第二步:消息预处理。预处理的主要功能是在消息扩展字段中设置消息类型。
MessageConst.PROPERTY_TRANSACTION_PREPARED表示当前消息是事务Half消息。
MessageConst.PROPERTY_PRODUCER_GROUP用于设置发送消息的生产者组名,以及
设置事务消息的扩展字段
MessageConst.PROPERTY_TRANSACTION_PREPARED表示当前消息是事务Half消息。
MessageConst.PROPERTY_PRODUCER_GROUP用于设置发送消息的生产者组名,以及
设置事务消息的扩展字段
第三步:发送事务消息,调用同步发送消息的方法将事务消息发送出去
发送Commit或Rollback消息
在本地事务处理完成后,根据本地事务的执行结果调用DefaultMQProducerImpl.endTransaction()方法
通知Broker进行Commit或Rollback
当前Half消息发送完成后,会返回生产者消息发送到哪个Broker、消息位点是多少、再根据本地事务的执行
结果封装EndTransactionRequestHeader对象,最后调用MQClientAPIimpl.endTransactionOneway()方法
通知Broker进行Commit或Rollback
在本地事务处理完成后,根据本地事务的执行结果调用DefaultMQProducerImpl.endTransaction()方法
通知Broker进行Commit或Rollback
当前Half消息发送完成后,会返回生产者消息发送到哪个Broker、消息位点是多少、再根据本地事务的执行
结果封装EndTransactionRequestHeader对象,最后调用MQClientAPIimpl.endTransactionOneway()方法
通知Broker进行Commit或Rollback
brokerAddr:存储当前Half消息的Broker服务器的socket地址
localTransactionState:本地事务执行结果
transactionId:事务消息的事务id
endTranactionOneway():以发送oneway消息的方式发送该RPC请求给Broker.
Broker存储事务消息。
在Broker中,事务消息的初始化是通过BrokerController.initialTransaction()方法执行的。
3个核心的初始化变量
在Broker中,事务消息的初始化是通过BrokerController.initialTransaction()方法执行的。
3个核心的初始化变量
1.TransactionalMessageService.
事务消息主要用于处理服务,默认实现类是TransactionalMessageServiceImpl.如果想自定义事务消息
处理实现类,需要实现TransactionMessageService接口,然后通过ServiceProvider.loadClass()方法进行加载。
TransactionalMessageService接口的基本操作定义如下
事务消息主要用于处理服务,默认实现类是TransactionalMessageServiceImpl.如果想自定义事务消息
处理实现类,需要实现TransactionMessageService接口,然后通过ServiceProvider.loadClass()方法进行加载。
TransactionalMessageService接口的基本操作定义如下
prepareMessage():用于保存Half事务消息,用户可以对其进行Commit或Rollback
deletePrepareMessage():用于删除事务消息,一般用于Broker回查失败的Half消息。
commitMessage():用于提交事务消息,使消费者可以正常地消费事务消息
rollbackMessage():用于回滚事务消息,回滚后消费者将不能够消费该消息。
通常用于生产者主动进行Rollback时,以及Broker回查的生产者本地事务失败时
通常用于生产者主动进行Rollback时,以及Broker回查的生产者本地事务失败时
open():用于打开事务服务
close():用于关闭事务服务
2.transactionMessageCheckListener.
事务消息回查监听器,默认实现类是DefaultTransactionalMessageCheckListener.如果想自定义回查监听处理,
需要继承AbstractTransactionalMessageCheckListener接口,然后通过ServiceProvider.loadClass()方法被加载
事务消息回查监听器,默认实现类是DefaultTransactionalMessageCheckListener.如果想自定义回查监听处理,
需要继承AbstractTransactionalMessageCheckListener接口,然后通过ServiceProvider.loadClass()方法被加载
3.transactionalMessageCheckService.
事务消息回查服务是一个线程服务,定时调用transactionalMessageService.check()方法,检查超时的Half消息
是否需要回查
事务消息回查服务是一个线程服务,定时调用transactionalMessageService.check()方法,检查超时的Half消息
是否需要回查
上面三个事务处理类完成初始化后,Broker就可以处理事务消息了。
Broker存储事务消息和普通消息都是通过SendMessageProcessor类进行处理的,只是在存储消息时有两处事务
消息需要单独处理。
Broker存储事务消息和普通消息都是通过SendMessageProcessor类进行处理的,只是在存储消息时有两处事务
消息需要单独处理。
第一个单独处理,sendMessage()
这里获取消息中的扩展字段MessageConst.PROPERTY_TRANSACTION_PREPARED的值,
如果该值为True则返回当前消息是事务消息;再判断当前Broker的配置是否支持事务消息,
如果不支持就返回生产者不支持事务消息的信息;如果支持,则调用TransactionalMessageService
#prepareMessage()方法保存Half消息
这里获取消息中的扩展字段MessageConst.PROPERTY_TRANSACTION_PREPARED的值,
如果该值为True则返回当前消息是事务消息;再判断当前Broker的配置是否支持事务消息,
如果不支持就返回生产者不支持事务消息的信息;如果支持,则调用TransactionalMessageService
#prepareMessage()方法保存Half消息
第二个单独处理:存储前事务消息预处理,处理方法是TransactionalMessageBridge.praseHalfMessageInner()
该方法的功能是将原消息的Topic、queueId、susFlg存储在消息的扩展字段中,
并且修改Topic的值为RMQ_SYS_TRANS_HALF_TOPIC,修改queueId的值为0,
然后,与其他消息一样,调用DefaultMessageStore.putMessage()方法保存到
CommitLog中,CommitLog存储成功后,通过CommitLog.DefaultAppendMessageCallback.doAppend()
方法单独对事务消息进行处理
并且修改Topic的值为RMQ_SYS_TRANS_HALF_TOPIC,修改queueId的值为0,
然后,与其他消息一样,调用DefaultMessageStore.putMessage()方法保存到
CommitLog中,CommitLog存储成功后,通过CommitLog.DefaultAppendMessageCallback.doAppend()
方法单独对事务消息进行处理
Prepared消息其实就是Half消息,其实现逻辑是,设置当前Half消息的
queueOffset值为0,而不是其真实的位点值。这样该位点就不会建立ConsumeQueue
索引,自然也不能被消费者消费
queueOffset值为0,而不是其真实的位点值。这样该位点就不会建立ConsumeQueue
索引,自然也不能被消费者消费
Broker回查事务消息。
如果用户由于某种原因,在第二阶段中没有将endTransaction消息发送给Broker,
Broker的Half消息又将如何处理。
RocketMQ在设计时已经考虑到这个问题,通过"回查机制"处理第二阶段既未发送Commit
也没有发送Rollback的消息。回查是Broker发起的,Broker认为在接收Half消息后的一段时间内,
如果生产者都没有发送Commit或Rollback消息给Broker,那么Broker会主动"询问"生产者该
事务消息对应的本地事务执行结果,以此来决定事务是否要Commit.
TransactionalMessageCheckService是回查服务的实现类
如果用户由于某种原因,在第二阶段中没有将endTransaction消息发送给Broker,
Broker的Half消息又将如何处理。
RocketMQ在设计时已经考虑到这个问题,通过"回查机制"处理第二阶段既未发送Commit
也没有发送Rollback的消息。回查是Broker发起的,Broker认为在接收Half消息后的一段时间内,
如果生产者都没有发送Commit或Rollback消息给Broker,那么Broker会主动"询问"生产者该
事务消息对应的本地事务执行结果,以此来决定事务是否要Commit.
TransactionalMessageCheckService是回查服务的实现类
TransactionalMessageCheckService是一个线程服务,它在后台一直执行run()方法,
run()方法一直调用waitForRunning()方法。关于waitForRunning()方法,这是RocketMQ
的Broker中典型的"sleep"实现方式。该方式可以大致理解为"休息"一段时间再执行onWaitEnd()
方法,而TransactionalMessageCheckService服务重写了onWaitEnd()方法.
接下来分析下代码中的核心变量。
run()方法一直调用waitForRunning()方法。关于waitForRunning()方法,这是RocketMQ
的Broker中典型的"sleep"实现方式。该方式可以大致理解为"休息"一段时间再执行onWaitEnd()
方法,而TransactionalMessageCheckService服务重写了onWaitEnd()方法.
接下来分析下代码中的核心变量。
timeout:事务消息超时时间,如果消息在这个事件内没有进行Commit或Rollback,
则执行第一次回查,默认6000ms
则执行第一次回查,默认6000ms
checkMax:最大回查次数,如果回查超过这个次数,事务消息将被忽略。
回查的实现逻辑是每间隔一定时间执行TransactionalMessageServiceImpl
#check()方法,判断哪些消息超时,对超时的消息开始执行回查
回查的实现逻辑是每间隔一定时间执行TransactionalMessageServiceImpl
#check()方法,判断哪些消息超时,对超时的消息开始执行回查
事务消息的最大回查次数默认15次
发送Half事务消息、执行Commit/Rollback命令、事务回查过程简图
RMQ_SYS_TRANS_HALF_TOPIC:保存事务消息的Topic,它存储
用户发送的Half消息,有的消息已经进行了Rollback,有的消息
状态是未知的
用户发送的Half消息,有的消息已经进行了Rollback,有的消息
状态是未知的
RMQ_SYS_TRANS_OP_HALF_TOPIC:也叫OP主题,当事务消息被Commit或
Rollback后,会将原始事务消息的offset保存在该OP主题中
Rollback后,会将原始事务消息的offset保存在该OP主题中
RMQ_SYS_TRANS_HALF_TOPIC和RMQ_SYS_TRANS_OP_HALF_TOPIC配合流程。
首先,取出RMQ_SYS_TRANS_HALF_TOPIC中达到回查条件但没有回查过的消息,
到RMQ_SYS_TRANS_OP_HALF_TOPIC主题中确认是否已经会回查,如果没有回查过
则发起回查操作。
首先,取出RMQ_SYS_TRANS_HALF_TOPIC中达到回查条件但没有回查过的消息,
到RMQ_SYS_TRANS_OP_HALF_TOPIC主题中确认是否已经会回查,如果没有回查过
则发起回查操作。
然后具体分析会回查方法TransactinalMessageServiceImpl.check()的实现过程。
获取RMQ_SYS_TRANS_HALF_TOPIC主题的全部队列,依次循环每一个队列中的
全部未消费的消息,确认是否需要回查。
对于每一条消息又是如何确认是否需要回查的呢?具体逻辑在TransactionalMessageServiceImpl
#check()方法中的while(true)代码中
获取RMQ_SYS_TRANS_HALF_TOPIC主题的全部队列,依次循环每一个队列中的
全部未消费的消息,确认是否需要回查。
对于每一条消息又是如何确认是否需要回查的呢?具体逻辑在TransactionalMessageServiceImpl
#check()方法中的while(true)代码中
check()
第一步:回查前校验。如果当前回查执行的时间超过了最大允许的回查时间,
(默认为60s)则跳出当前回查过程,如果当前回查的消息已经执行了Commit/Rollback,
则忽略当前消息,直接回查下一条消息
校验代码中的核心变量:
MAX_PROCESS_TIME_LIMIT:回查时间限制,默认是60s且不能配置
removeMap:该变量用于存储已经执行Commit/Rollback的Half消息位点
i:当前回查的Half消息的位点值。
如果当前Half消息在回查时,即在允许的回查时间内,又没有被生产者进行Commit/Rollback
(默认为60s)则跳出当前回查过程,如果当前回查的消息已经执行了Commit/Rollback,
则忽略当前消息,直接回查下一条消息
校验代码中的核心变量:
MAX_PROCESS_TIME_LIMIT:回查时间限制,默认是60s且不能配置
removeMap:该变量用于存储已经执行Commit/Rollback的Half消息位点
i:当前回查的Half消息的位点值。
如果当前Half消息在回查时,即在允许的回查时间内,又没有被生产者进行Commit/Rollback
填充removeMap的过程
第二步:检查是否有消息需要回查。如果从RMQ_SYS_TRANS_HALF_TOPIC主题中获取
Half消息为空的次数超过允许的最大次数或者没有消息,那么表示目前没有需要再回查的
消息了,可以结束本次回查过程,当然如果传入的位点是非法的,则继续下一个回查的位点。
代码中的核心参数:
msgExt:Half消息对象
getMessageNullCount:当前空消息的次数
MAX_RETRY_COUNT_WHEN_HALF_NULL:表示可以允许的最大Half消息为空的次数,超过
则结束回查,默认为1次,并且不能配置。
messageQueue:RMQ_SYS_TRANS_HALF_TOPIC主题中正在被检查的队列
如果RMQ_SYS_TRANS_HALF_TOPIC中已经没有待回查的消息,则立即终止当前的回查过程
Half消息为空的次数超过允许的最大次数或者没有消息,那么表示目前没有需要再回查的
消息了,可以结束本次回查过程,当然如果传入的位点是非法的,则继续下一个回查的位点。
代码中的核心参数:
msgExt:Half消息对象
getMessageNullCount:当前空消息的次数
MAX_RETRY_COUNT_WHEN_HALF_NULL:表示可以允许的最大Half消息为空的次数,超过
则结束回查,默认为1次,并且不能配置。
messageQueue:RMQ_SYS_TRANS_HALF_TOPIC主题中正在被检查的队列
如果RMQ_SYS_TRANS_HALF_TOPIC中已经没有待回查的消息,则立即终止当前的回查过程
第三步,回查次数校验,消息是否过期校验。如果Half消息回查次数已经超过了允许的
最大回查次数,则不再回查,实现该校验的方法是TransactionMessageServiceImpl.needDisard();
如果Half消息对应的CommitLog已经过期,那么也不回查,该校验实现的方法是
TransactionalMessageServiceImpl.needSkip()
最大回查次数,则不再回查,实现该校验的方法是TransactionMessageServiceImpl.needDisard();
如果Half消息对应的CommitLog已经过期,那么也不回查,该校验实现的方法是
TransactionalMessageServiceImpl.needSkip()
第四步:新发送的Half消息不用回查,对于不是新发送的Half消息,如果在免疫回查时间(免疫期)内,
也不用回查。免疫期是指生产者在发送Half消息后、执行Commit/Rollback前,Half消息都不需要
回查,这段时间就是这个Half消息的回查免疫期。免疫期的判断逻辑如图。
代码中的核心变量如下:
valueOfCurrentMinusBorn:当前时间减去消息的发送时间
checkImmunityTimeStr:用户设置的消息回查免疫时间,换言之,就是生产者本地事务的最长执行时间,
也就是默认6s.
当checkImmunityTimeStr和transactionTimeout同时存在时,免疫时间将通过
getImmunityTime(checkImmunityTimeStr, transactionTimeout)方法计算后可以得出最终的免疫期,进而进行
免疫期回查判断
也不用回查。免疫期是指生产者在发送Half消息后、执行Commit/Rollback前,Half消息都不需要
回查,这段时间就是这个Half消息的回查免疫期。免疫期的判断逻辑如图。
代码中的核心变量如下:
valueOfCurrentMinusBorn:当前时间减去消息的发送时间
checkImmunityTimeStr:用户设置的消息回查免疫时间,换言之,就是生产者本地事务的最长执行时间,
也就是默认6s.
当checkImmunityTimeStr和transactionTimeout同时存在时,免疫时间将通过
getImmunityTime(checkImmunityTimeStr, transactionTimeout)方法计算后可以得出最终的免疫期,进而进行
免疫期回查判断
第五步:最终判断是否需要回查生产者本地事务执行结果
满足图中条件之一就可以进行回查:
1.如果没有OP消息,并且当前Half消息在免疫期外
2.当前Half消息存在OP消息,并且最后一个本批次OP消息中的最后一个消息在免疫期外,
也就是满足回查时间
3.Broker与客户端有时间差
4.重新将当前Half消息存储在RMQ_SYS_TRANS_HALF_TOPIC主题中,原因是回查是一个
异步过程,Broker不确定回查的结果是成功还是失败,所以RocketMQ做最坏的打算,如果
回查失败则下次继续回查;如果本地回查成功则写入OP消息,下次再读取Half消息时也不会回查
满足图中条件之一就可以进行回查:
1.如果没有OP消息,并且当前Half消息在免疫期外
2.当前Half消息存在OP消息,并且最后一个本批次OP消息中的最后一个消息在免疫期外,
也就是满足回查时间
3.Broker与客户端有时间差
4.重新将当前Half消息存储在RMQ_SYS_TRANS_HALF_TOPIC主题中,原因是回查是一个
异步过程,Broker不确定回查的结果是成功还是失败,所以RocketMQ做最坏的打算,如果
回查失败则下次继续回查;如果本地回查成功则写入OP消息,下次再读取Half消息时也不会回查
第六步:执行回查。在当前批次的Half消息回查完毕后,更新Half主题和OP主题的消费位点,
推进回查进度。Broker将回查消息通过回查线程池异步地发送给生产者,执行事务结果回查
推进回查进度。Broker将回查消息通过回查线程池异步地发送给生产者,执行事务结果回查
Broker提交或回滚事务消息。
当生产者本地事务处理完成并且Broker回查事务消息后,不管执行Commit还是Rollback,
都会根据用户本地事务的执行结果发送一个End_transaction的RPC请求给Broker,Broker
端处理该请求的类是EndTransactionProcessor
当生产者本地事务处理完成并且Broker回查事务消息后,不管执行Commit还是Rollback,
都会根据用户本地事务的执行结果发送一个End_transaction的RPC请求给Broker,Broker
端处理该请求的类是EndTransactionProcessor
第一步,End_Transaction请求校验,主要检查项如下
1.Broker角色检查。Slave Broker不处理事务消息
2.事务消息类型检查。EndTransactionProcessor只处理
Commit或Rollback类型的事务消息,其余消息不处理,
这里区分了事务回查
1.Broker角色检查。Slave Broker不处理事务消息
2.事务消息类型检查。EndTransactionProcessor只处理
Commit或Rollback类型的事务消息,其余消息不处理,
这里区分了事务回查
第二步,进行Commit或Rollback。根据生产者请求头中的参数
判断,是Commit请求还是Rollback请求,然后分别进行处理
判断,是Commit请求还是Rollback请求,然后分别进行处理
commitMessage():提交Half消息/这是事务消息服务接口中的一个方法,
根据消息位点查询了Half消息,并将Half消息返回
根据消息位点查询了Half消息,并将Half消息返回
checkPrepareMessage():Half消息数据校验。校验内容包括发送消息的
生产者组与当前执行Commit/Rollback的生产者是否一致,当前Half消息是否与
请求Commit/Rollback的消息是否是同一条消息
生产者组与当前执行Commit/Rollback的生产者是否一致,当前Half消息是否与
请求Commit/Rollback的消息是否是同一条消息
endMessageTransaction():消息对象类型转化,将MessageExt对象转化为
MessageExtBrokerInner对象,并且还原消息之前的Topic和ConsumeQueue等信息
MessageExtBrokerInner对象,并且还原消息之前的Topic和ConsumeQueue等信息
sendFinalMessage():将还原后的事务消息最终发送到CommitLog中,一旦发送成功,
消费者就可以正常拉取消息并消费
消费者就可以正常拉取消息并消费
deletePrepareMessage():在sendFinalMessage()执行成功后,删除Half消息。
其实RocketMQ是不能真正删除消息的,其实质是顺序写磁盘,相当于做了一个
"假删除"。"假删除"通过putOpMessage()方法将消息保存到TransactionMessageUtil.
buildOpTopic()的Topic中,并且做上标记TransactionalMessageUtil.REMOVETAG,
表示消息已删除
其实RocketMQ是不能真正删除消息的,其实质是顺序写磁盘,相当于做了一个
"假删除"。"假删除"通过putOpMessage()方法将消息保存到TransactionMessageUtil.
buildOpTopic()的Topic中,并且做上标记TransactionalMessageUtil.REMOVETAG,
表示消息已删除
如果消息被标记为已删除,则调用addRemoveTagInTransactionOp()方法,
利用标记为已删除的OP消息构造Message消息对象,并且调用存储方法保存
消息
利用标记为已删除的OP消息构造Message消息对象,并且调用存储方法保存
消息
TransactionalMessageUtil.buildOpTopic()方法跟保存Half消息时的逻辑类似
Half消息保存在名为MixAll.RMQ_SYS_TRANS_HALF_TOPIC的Topic中,
执行Commit和Rollback后的消息都保存在MixAll.RMQ_SYS_TRANS_OP_HALF_TOPIC
对象中,以便Broker判断是否需要回查生产者事务的执行状态
执行Commit和Rollback后的消息都保存在MixAll.RMQ_SYS_TRANS_OP_HALF_TOPIC
对象中,以便Broker判断是否需要回查生产者事务的执行状态
调用存储层方法,真正地将OP消息保存到了CommitLog中
Rollback实现逻辑。
Rollback并没有真正删除消息,而是标记Half消息为删除,
在Broker回查时机会跳过不检查
Rollback并没有真正删除消息,而是标记Half消息为删除,
在Broker回查时机会跳过不检查
rollbackMessage():该方法与CommitMessage()方法一样,都是查询Half消息并返回消息对象。
checkPrepareMessage():消息校验,与Commit调用的是同一个方法
deletePrepareMessage():删除Half消息,与Commit调用的是同一个方法
Consumer启动机制
RocketMQ客户端中有两个独立的消费者实现类分别为DefaultMQPullConsumer和DefaultMQPushConsumer
DefaultMQPullConsumer,该消费者使用时需要用户主动从Broker中Pull消息和消费消息,
提交消费位点
提交消费位点
DefualtMQPullConsumer的继承关系
核心属性
namesrvAddr:继承自ClientConfig,表示RocketMQ集群的Namesrv地址,如果是多个,则用逗号分开
如:127.0.0.1:9876,127.0.0.2:9876
如:127.0.0.1:9876,127.0.0.2:9876
clientIP:使用客户端的程序所在机器的IP地址,目前支持IPV4和IPV6,同时排除了本地环会地址(127.0.xxx.xxx)
和私有内网地址(192.168.xxx.xxx),如果在Docket中运行,获取的IP地址是容器所在的IP地址,而非宿主主机的IP地址
和私有内网地址(192.168.xxx.xxx),如果在Docket中运行,获取的IP地址是容器所在的IP地址,而非宿主主机的IP地址
instanceName:实例名,顾名思义每个实例都需要取不一样的名字,加入要在多个机器上部署
多个程序进程,那么每个进程的实例名必须不相同,否则程序会启动失败,因为在创建MQClient时,
会用到IP和instancename名称来
多个程序进程,那么每个进程的实例名必须不相同,否则程序会启动失败,因为在创建MQClient时,
会用到IP和instancename名称来
vipChannelEnabled:这是一个boolean值,表示是否开启VIP通道。VIP通道和非VIP通道的区别是使用不同的端口号进行通信
clientCallbackExecutorThreads:客户端回调线程数。该线程数等于Netty通信层回调线程的个数,默认值为
Runtime.getRuntime().availableProcessors();表示当前有效的CPU个数
Runtime.getRuntime().availableProcessors();表示当前有效的CPU个数
pollNameServerInterval:获取Topic路由信息间隔,单位为ms,默认为30000ms(30s)
heartbeatBrokerInterval:客户端和Broker心跳间隔,单位为ms,默认30000ms(30s)
persistCOnsumerOffsetInterval:持久化消费位点时间间隔,单位为ms,默认为5000ms(5s)
defaultMQPullConsumer:默认pull消费者的具体实现
consumerGroup:消费者组名字
brokerSuspendMaxTimeMills:在长轮询模式下,Broker的最大挂起请求时间,建议不要修改此值
consumerTimeoutMillsWhenSuspend:在长轮询模式下,消费者的最大请求超时时间,必须比brokerSuspendMaxTimeMills大,不建议修改
messageModel:消费模式,现在支持集群模式消费和广播模式消费
messageQueueListener:消息路由信息变化时回调处理监听器,一般在重新平衡时被调用
offsetStore:位点存储模块。集群模式位点会持久化到Broker中,广播模式持久化到本地文件中(某个实例消费失败,生产者也不会重发),
位点存储模块有两个实现类,分别为RemoteBrokerOffsetStore和LocalFileOffsetStore
位点存储模块有两个实现类,分别为RemoteBrokerOffsetStore和LocalFileOffsetStore
allocateMessageQUeueStrategy:消费Queue分配策略管理器,默认是平均分配策略
private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
maxReconsumeTimes:最大重试次数,可以配置
核心方法
registerMessageQueueListener():注册队列变化监听器,当队列发生变化是会被监听到
pull():从Broker中Pull消息,如果有PullCallback参数,则表示异步拉取
pullBlockIfNotFound():长轮询方式拉取,如果没有拉取到消息,
那么Broker会讲请求Hold住一段时间,
当有消息来临时再发送pull请求
那么Broker会讲请求Hold住一段时间,
当有消息来临时再发送pull请求
updateConsumeOffset():更新某一个Queue的消费位点
fetchConsumeOffset():查找某个Queue的消费位点
sendMessageBack():如果消费发送失败,则可以讲消息重新发回Broker,
这个消费者组延迟一段时间后可以再消费(也就是重试)
这个消费者组延迟一段时间后可以再消费(也就是重试)
fetchSubscribeMessageQueues():获取一个Topic的全部Queue信息
Pull消费启动流程
1.最初创建defaultMQPullConsumerImpl时的状态为ServiceState.CREATE_JSUT
然后设置消费者的默认启动状态为失败
然后设置消费者的默认启动状态为失败
2.检查消费者的配置比,如消费者组名、消费类型、Queue的分配策略等参数是否
符合规范,将订阅关系数据发给Rebalance服务对象
符合规范,将订阅关系数据发给Rebalance服务对象
3.校验消费者实例名,如果时默认的名字,则更改为当前的程序进程id
4.获取一个MQClientInstance,如果MQClientInstance已经初始化,则直接返回初始化的实例。
这是核心对象,每个ClientID缓存一个实例
这是核心对象,每个ClientID缓存一个实例
5.设置Rebalance对象消费组、消费类型、Queue分配策略、MQClientInstance等参数
6.对BrokerAPI的封装类pullAPIWrapper进行初始化,同时注册消息,过滤filter
7.初始化位点管理器并加载位点信息,位点管理器分为本地管理和远程管理,集群消费时
消费位点保存在Broker中,由远程管理器管理,广播消息时位点在本地,由本地管理其管理
消费位点保存在Broker中,由远程管理器管理,广播消息时位点在本地,由本地管理其管理
8.本地注册消费者实例,如果注册成功,则表示消费者启动成功
DefaultMQPushConsumer,大部分属性、方法和DefaultMQPullConsumer是一样的
核心属性和方法
defaultMQPushConsumerImpl:默认的Push消费者具体实现类
consumeFromWhere:一个枚举,表示从什么位点开始消费,
CONSUME_FROM_LAST_OFFSET:默认从上次消费的位点开始消费,相当于断点继续
CONSUME_FROM_TIMESTAMP:从指定时间开始消费
CONSUME_FROM_FIRST_OFFSET:从ConsumeQueue的最小位点开始消费
CONSUME_FROM_LAST_OFFSET:默认从上次消费的位点开始消费,相当于断点继续
CONSUME_FROM_TIMESTAMP:从指定时间开始消费
CONSUME_FROM_FIRST_OFFSET:从ConsumeQueue的最小位点开始消费
consumeTimestamp:表示从哪一时刻开始消费,时间格式为yyyyMMDDHHmmss,默认半小时前,
当consumeFromWhere=CONSUME_FROM_TIMESTAMP时,consumeTimestamp设置的值才生效
当consumeFromWhere=CONSUME_FROM_TIMESTAMP时,consumeTimestamp设置的值才生效
allocateMessageQueueStrategy:消费者订阅topic-queue策略
subscription:订阅关系,表示当前消费者订阅了哪些Topic的哪些Tag
messageListener:消息Push回调监听器
consumeThreadMin:最小消费线程数,必须小于consumeThreadMax
consumeThreadMax:最大线程数,必须大于consumeThreadMin
consumeThreadMax:最大线程数,必须大于consumeThreadMin
adjustThreadPoolNumsThreshold:动态调整消费线程池的线程数大小,开源版本不支持
consumeConcurrentlyMaxSpan:并发消息的最大位点差,,如果Pull消息的位点差超过该值,拉取变慢
pullThresholdForQueue:一个Queue能缓存的最大消息数,超过该值则采取拉取流控措施,默认是1000
pullThresholdSizeForQueue:一个Queue最大能缓存的消息字节数,单位是MB,默认是10MB
pullThresholdForTopic:一个Topic最大能缓存的消息数。超过该值则采取拉取流控措施,该字段值默认是-1,
该值根据pullThresholdForQueue的配置决定是否生效,pullThresholdForTopic的优先级低于pullThresholdForQueue
该值根据pullThresholdForQueue的配置决定是否生效,pullThresholdForTopic的优先级低于pullThresholdForQueue
pullThreasholdSizeForTopic:一个Topic最大能缓存的消息字节数,单位是MB,默认为-1,结合pullThresholdSizeForQueue
配置项生效,该配置项的优先级低于pullThresholdSizeForQueue
配置项生效,该配置项的优先级低于pullThresholdSizeForQueue
pullInterval:拉取间隔,单位为ms
consumeMessageBatchMaxSize:消费者每次批量消费时,最多消费多少条消息,默认是1
pullBatchSize:一次最大拉取多少条消息,默认32条
postSubscriptionWhenPull:每次拉取消息时是否更新订阅关系,默认false
maxReconsumeTimes:最大重试次数,默认-1,表示最大重试次数为16次
suspendCurrentQueueTimeMillis为段轮询场景设置的挂起时间,比如顺序消息场景
consumeTimeout:消费超时时间,单位为min,默认是15
Push消费启动流程
1-7和Pull模式类似
8.初始化消费服务并启动,之所以用户"感觉"消息是Broker主动推送给自己的,
是因为DefaultMQPushConsumer通过Pull服务将消息
拉取到本地,再通过Callbakc的形式,将本地消息Push给用户的消费代码,
DefaultMQPushConsumer和DefaultMQPullConsumer
获取消息的方式一样,本质上都是拉取。
消费服务分为两种,即并行消费服务和顺序消费服务,对应的实现类分别是
ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService
根据用户监听器继承的不同接口初始化不同的消费服务程序
是因为DefaultMQPushConsumer通过Pull服务将消息
拉取到本地,再通过Callbakc的形式,将本地消息Push给用户的消费代码,
DefaultMQPushConsumer和DefaultMQPullConsumer
获取消息的方式一样,本质上都是拉取。
消费服务分为两种,即并行消费服务和顺序消费服务,对应的实现类分别是
ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService
根据用户监听器继承的不同接口初始化不同的消费服务程序
9.启动MQClientInstance实例
10.更新本地订阅关系和路由信息,通过Broker检查是否支持消费者的过滤类型;
向集群中的所有Broker发送消费者组的心跳信息
向集群中的所有Broker发送消费者组的心跳信息
11.立即执行一次Rebalance
this.mQClientFactory.rebalanceImmediately();
this.mQClientFactory.rebalanceImmediately();
Consumer消费方式
RocketMQ的消费方式包含Pull和Push两种
Pull方式。
用户主动Pull消息,自主管理位点,可以灵活地掌控消费进度和消费速度,适合流计算、消费特别耗时等特殊的消费场景。
缺点也显而易见,需要从代码层面精准地控制消费,对开发人员有一定要求,再RocketMQ中
org.apache.rocketmq.client.consumer.DefaultMQPullConsume是默认的Pull消费者实现类
用户主动Pull消息,自主管理位点,可以灵活地掌控消费进度和消费速度,适合流计算、消费特别耗时等特殊的消费场景。
缺点也显而易见,需要从代码层面精准地控制消费,对开发人员有一定要求,再RocketMQ中
org.apache.rocketmq.client.consumer.DefaultMQPullConsume是默认的Pull消费者实现类
Pull消费流程
1.fetchSubscribeMessageQueues(String topic).拉取全部可以消费的Queue.如果某一个
Broker下线,这里也可以实时感知到
Broker下线,这里也可以实时感知到
2.遍历全部Queue,拉取每个Queue可以消费的消息
3.如果拉取到消息,则执行用户编写的消费代码
4.保存消费进度。消费进度可以执行updateConsumeOffset()方法,将消息位点上报给Broker,
也可以自行保存消费位点。比如流计算平台Flink使用Pull方式拉取消息消费,通过Checkpoint管理消费进度
也可以自行保存消费位点。比如流计算平台Flink使用Pull方式拉取消息消费,通过Checkpoint管理消费进度
Push方式。
代码介入非常简单,适合大部分业务场景。缺点灵活度差,在了解消费原理后,排查消费问题可简单快捷.
在RocketMQ中org.apache.rocketmq.client.consumer.DefaultMQPushConsumer是默认的Push消费者实现类
代码介入非常简单,适合大部分业务场景。缺点灵活度差,在了解消费原理后,排查消费问题可简单快捷.
在RocketMQ中org.apache.rocketmq.client.consumer.DefaultMQPushConsumer是默认的Push消费者实现类
Push消费流程
1.初始化Push消费者实例。业务代码初始化DefaultMQPushConsumer实例,启动Push服务PullMessageService.
该服务是一个线程服务,不断执行run()方法拉取已经订阅Topic的全部队列的消息,将消息保存在本地的缓存队列中
该服务是一个线程服务,不断执行run()方法拉取已经订阅Topic的全部队列的消息,将消息保存在本地的缓存队列中
启动PullMessageService的拉取服务
Push消费者拉取消息
1.PullMessageService不断拉取消息。pullRequestQueue中保存着待拉取地Topic和Queue消息,
程序不断从pullRequestQueue中获取PullRequest并执行拉取消息方法
程序不断从pullRequestQueue中获取PullRequest并执行拉取消息方法
2.消费者拉取消息并消费,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
1.基本校验。校验ProcessQueue是否dropped;校验消费者服务状态是否正常;
校验消费者是否被挂起。
在Rebalance时,org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance
方法在运行时设置ProcessQueue.setDropped(true)的逻辑,,设置成功后,在执行拉取消息时,将不再拉取dropped为true的
ProcessQueue
校验消费者是否被挂起。
在Rebalance时,org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance
方法在运行时设置ProcessQueue.setDropped(true)的逻辑,,设置成功后,在执行拉取消息时,将不再拉取dropped为true的
ProcessQueue
2.拉取条数、字节数限制检查。如果本地缓存消息数量大于配置的最大拉取条数(默认1000,可以调整),则延迟一段时间再拉取;
如果本地缓存消息字节数大于配置的最大缓存字节数,则延迟i短时间再拉取,这两种校验方式都相当于本地流控
如果本地缓存消息字节数大于配置的最大缓存字节数,则延迟i短时间再拉取,这两种校验方式都相当于本地流控
3.并发消费和顺序消费校验。
在并发消费时,processQueue.getMaxSpan()方法是用于计算本地缓存队列中的哥消息和最后一个消息的offset差值。
顺序消费时,如果当前拉取的队列在Broker没有被锁定,说明已经由拉取正在执行,当前拉取请求晚点执行,如果不是第一次拉取,
需要先计算最新的拉取位点并修正最新的待拉取位点信息,再执行拉取
在并发消费时,processQueue.getMaxSpan()方法是用于计算本地缓存队列中的哥消息和最后一个消息的offset差值。
顺序消费时,如果当前拉取的队列在Broker没有被锁定,说明已经由拉取正在执行,当前拉取请求晚点执行,如果不是第一次拉取,
需要先计算最新的拉取位点并修正最新的待拉取位点信息,再执行拉取
本地缓存队列的Span如果大于配置的最大差值(默认2000,可以调整),
则认为本地消费过慢,需要执行本地流控
则认为本地消费过慢,需要执行本地流控
队列锁定
1.订阅关系校验。如果待拉取的Topic在本地缓存中订阅关系为空,则本地拉取不执行,
待下一个正常心跳或者Rebalance后订阅关系恢复正常,方可正常拉取
待下一个正常心跳或者Rebalance后订阅关系恢复正常,方可正常拉取
2.封装拉取请求和拉取后的回调对象PullCallback。这里主要将消息拉取请求和拉取结果处理封装成PullCallback,
并通过调用PullAPIWrapper.pullKernelImpl()方法将拉取请求发出去。
如果拉取到消息,那么将消息保存到对应的本地缓存队列ProcessQueue中,然后将这些消息交给ConsumeService服务;
并通过调用PullAPIWrapper.pullKernelImpl()方法将拉取请求发出去。
如果拉取到消息,那么将消息保存到对应的本地缓存队列ProcessQueue中,然后将这些消息交给ConsumeService服务;
ConsumeMessageService是一个通用的消费服务接口,它包含两个实现类,
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService和
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService,这两个实现类
分别用于并发消费和顺序消费
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService和
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService,这两个实现类
分别用于并发消费和顺序消费
核心方法
start()方法和shudown()方法分别在启动和关闭服务时使用
updateCorePoolSize():更新消费线程池的核心线程数
incCorePoolSize():增加一个消费线程池的核心线程数
decCorePoolSize():减少一个消费者线程池的核心线程数
getCorePoolSize():获取消费线程池的核心线程数
consumeMessageDirectly():如果一个消息已经被消费过了,但是还项再消费一次,
就需要实现这个方法
就需要实现这个方法
submitConsumeRequest():将消息封装成线程池任务,提交给消费服务,
消费服务再将消息传递给业务消费进行处理
消费服务再将消息传递给业务消费进行处理
1.ConsumeMessageService消息消费分发。ConsumeMessageService服务通过
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest
接口接收消息消费任务后,将消息按照固定条数封装成多个ConsumeRequest任务对象,并发送到
消费线程池,等待分发给业务消费;ConsumeMessageOrderlyService先将Pull的全部消息放在一个本地队列中
然后提交一个ConsumeRequest到消费者线程池
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest
接口接收消息消费任务后,将消息按照固定条数封装成多个ConsumeRequest任务对象,并发送到
消费线程池,等待分发给业务消费;ConsumeMessageOrderlyService先将Pull的全部消息放在一个本地队列中
然后提交一个ConsumeRequest到消费者线程池
ConsumeMessageConcurrentlyService
ConsumeMessageOrderlyService
2.消费消息。消费的主要逻辑再ConsumeMessageService接口的两个实现类中,以并发消费为例.
消费消息主要分为消费前预处理、消费回调、消费结构统计、消费结果处理4个步骤
消费消息主要分为消费前预处理、消费回调、消费结构统计、消费结果处理4个步骤
第一步:消费执行前进行预处理。执行消费前的hook和重试消息预处理。消费前的hook可以理解为
消费前的消息预处理(比如消息格式校验)。如果拉取的消息来自重试队列,则将Topic重置为原来的Topic,
而不用重试Topc名
消费前的消息预处理(比如消息格式校验)。如果拉取的消息来自重试队列,则将Topic重置为原来的Topic,
而不用重试Topc名
第二步:消费回调。首先设置消息开始消费时间为当前时间,再将消息列表转为不可修改的List,
然后通过status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
方法将消息传递给用户编写的业务消费代码进行处理
然后通过status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
方法将消息传递给用户编写的业务消费代码进行处理
第三步:消费结果统计和执行消费后的hook.客户端原声支持基本消费指标统计,
比如消费耗时;消费后的hook和消费前的hook要一一对应,
用户可以用消费后的hook统计与自身业务相关的指标
比如消费耗时;消费后的hook和消费前的hook要一一对应,
用户可以用消费后的hook统计与自身业务相关的指标
第四步:消费结果处理。包含消费指标统计、消费重试处理和消费位点处理。
消费指标主要是对消费成功和失败的TPS的统计;消费重试处理主要将消费重试次数+1;
消费位点处理主要根据消费结果更新消费位点记录
消费指标主要是对消费成功和失败的TPS的统计;消费重试处理主要将消费重试次数+1;
消费位点处理主要根据消费结果更新消费位点记录
4.拉取消息之前先将MessageListenerConcurrently/MessageListenerOrderly进行初始化,
并调用start()方法进行启动,由于ConsumeMessageService内部是一个线程,所以需要看run()方法
并调用start()方法进行启动,由于ConsumeMessageService内部是一个线程,所以需要看run()方法
2.核心-消费消息。由消费服务ConsumeMessageConcurrentlyService或者ConsumeMessagOrderlyService
将本地缓存队列中的消息不断放入到消费线程池,异步回调业务消费代码,此时业务代码可以消费消息
将本地缓存队列中的消息不断放入到消费线程池,异步回调业务消费代码,此时业务代码可以消费消息
3.核心-保存消费进度。业务代码消费后,将消费结果返回给消费服务,再由消费服务将消费进度保存在本地,
由消费进度管理服务定时和不定时地持久化到本地(LcoalFileOffsetStore支持)或者远程Broker(RemoteBrokerOffsetStore支持)中,
对于消费失败地消息,RocketMQ客户端处理后发回给Broker,并告知消费失败
由消费进度管理服务定时和不定时地持久化到本地(LcoalFileOffsetStore支持)或者远程Broker(RemoteBrokerOffsetStore支持)中,
对于消费失败地消息,RocketMQ客户端处理后发回给Broker,并告知消费失败
Pull和Push方式的比较
RocketMQ是一个消息队列,FIFO先进先出规则如何再消费失败时保证消息的顺序呢?
可以从消费任务实现类ConsumeRequest和本地缓存队列ProcessQueue的设计来看主要差异
可以从消费任务实现类ConsumeRequest和本地缓存队列ProcessQueue的设计来看主要差异
并发消费
顺序消费
顺序消息的ConsumeRequest中并没有保存需要消费的消息,再顺序消费时通过调用ProcessQueue.takeMessage()
方法获取需要消费的消息,而且消费也是同步进行的。
方法获取需要消费的消息,而且消费也是同步进行的。
takeMessages()方法实现
msgTreeMap:是一个TreeMap<Long, MessageExt>类型,key是物理位点值,value是消息对象,
该容器是ProcessQueue用来缓存本地顺序消息的,保存的数据是按照key(就是物理位点值)顺序排列的
该容器是ProcessQueue用来缓存本地顺序消息的,保存的数据是按照key(就是物理位点值)顺序排列的
consumingMssgOrderlyTreeMap:是一个TreeMap<Long,MessagExt>类型,key是消息物理位点值,
value是消息对象,保存当前正在处理的顺序消息集合,是msgTreeMap的一个子集,保存的数据是按照key
(就是物理位点值)顺序排列的
value是消息对象,保存当前正在处理的顺序消息集合,是msgTreeMap的一个子集,保存的数据是按照key
(就是物理位点值)顺序排列的
batchSize:一次从本地缓存中获取多少条消息回调给用户消费。顺序消息是如何通过ProcessQueue.takeMesages()
获取消息给业务代码消费的呢?
获取消息给业务代码消费的呢?
从msgTreeMap中获取batchSize数量的消息放入consumingMsgOrderlyTreeMap中,并返回给用户消费,
由于当前的MessageQueue是被Synchronized锁住的,并且获取的消费消息也是按照消费位点顺序排列的,
所以消费时用户能按照物理位点顺序消费消息
由于当前的MessageQueue是被Synchronized锁住的,并且获取的消费消息也是按照消费位点顺序排列的,
所以消费时用户能按照物理位点顺序消费消息
如果消费失败,又是怎么保证顺序的呢?来看processConsumeResult()实现
RocketMQ支持自动提交offset和手动提交offset两种方式。以自动提交offset为例,手动提交与其完全一致,
先看入参
先看入参
msg:当前处理的一批消息
status:消费结果的状态。目前支持SUCCESS和SUSPEND_CURRENT_QUEUE_A_MOMENT两种状态
消费成功后,程序会执行commit()方法提交当前位点,统计消费成功的TPS。
消费失败后,程序会统计消费失败的TPS,通过执行makeMessageToCOnsumeAgin()方法
删除消费失败的消息,通过定时任务将消费失败的消息在延迟一定时间后,重新提交到消费线程池
消费失败后,程序会统计消费失败的TPS,通过执行makeMessageToCOnsumeAgin()方法
删除消费失败的消息,通过定时任务将消费失败的消息在延迟一定时间后,重新提交到消费线程池
makeMessagToConsumeAgin()方法将消息从consumingMsgOrderlyTreeMap中删除再重新放入本地缓存度列msgTreeMap中,
等待下次被重新消费
等待下次被重新消费
submitConsumeRequestLater()方法会执行一个定时任务,延迟一定时间后重新将消费请求发送到消费线程池中,以供下一轮消费
做完这两个操作后,试想以下,消费线程在下一次消费时会发生什么事情?如果是从msgTreeMap中获取一批消息,
那么返回的消息又是那些呢?消息物理位点最小的,也就是之前未成功消费的消息,如果顺序消息消费失败,会再次投递给消费者消费,
直到消费成功,以此来保证顺序性
那么返回的消息又是那些呢?消息物理位点最小的,也就是之前未成功消费的消息,如果顺序消息消费失败,会再次投递给消费者消费,
直到消费成功,以此来保证顺序性
Producer
消息结构
public class Message implements Serializable {
private static final long serialVersionUID = 8445773977080406428L;
private String topic;
private int flag;
private Map<String, String> properties;
private byte[] body;
public void setKeys(String keys);
public void setKeys(Collection<String> keyCollection);
public void setTags(String tags);
public void setDelayTimeLevel(int level);
public void setTopic(String topic);
public void putUserProperty(final String name, final String value);
}
private static final long serialVersionUID = 8445773977080406428L;
private String topic;
private int flag;
private Map<String, String> properties;
private byte[] body;
public void setKeys(String keys);
public void setKeys(Collection<String> keyCollection);
public void setTags(String tags);
public void setDelayTimeLevel(int level);
public void setTopic(String topic);
public void putUserProperty(final String name, final String value);
}
putUserProperty():如果还有其他扩展信息,可以存放在这里,内部是一个Map,重估调用会覆盖旧值
setDelayTimeLevel():设置延迟级别,延迟多久消费者可以消费
setTags():消息过滤的标记,用户可以订阅某个Topic的某些Tag,这样Broker只会把订阅了topic-tag的消息发送给消费者
setKeys():设置消息的key,多个key可以用MessageConst.KEY_SEPARATOR(空格)分隔或者直接用另一个重载方法。
如果Broker中的messageIndexEnable=true则会根据key创建消息的hash索引,帮助用户快速过滤
如果Broker中的messageIndexEnable=true则会根据key创建消息的hash索引,帮助用户快速过滤
Body:消息体,字节数组,需要注意生产者使用什么编码,消费者也必须使用相同编码节码,否则会产生乱码
Properties:消息扩展信息,Tag、Keys、延迟级别都保存在这里
Flag:目前没用
Topic:主题名字,可以通过RocketMQ Console创建
Producer启动流程
生产者启动的流程比消费者启动的流程更加简单一般用户使用DefaultMQProducer的
构造函数构造一个生产者实例,并设置各种参数,比如,Namesrv地址、生产者组名等,
调用start()方法启动生产者实例,start()方法调用了生产者默认实现类的start()方法启动,
这里我们主要分析start()方法内部是怎么实现的
构造函数构造一个生产者实例,并设置各种参数,比如,Namesrv地址、生产者组名等,
调用start()方法启动生产者实例,start()方法调用了生产者默认实现类的start()方法启动,
这里我们主要分析start()方法内部是怎么实现的
1.构造函数初始化实例
这里初始化了namespace、producerGroup以及defaultMQProducerImpl
2.启动生产者实例start()
源码中关键点在于defualtMQProducerImpl的启动方法
调用了重载的start()方法
3.CREATE_JUST。通过switch-case判断当前生产者的服务状态,创建时默认是CREATE_JUST,
设置默认启动状态为启动失败
设置默认启动状态为启动失败
这里可以看到ServiceState第一次初始化出来的时候就是CREATE_JUST
当服务启动时状态肯定是CREATE_JUST,当再次调用start()方法启动时,
ServiceState就会发生变化,RUNNING或者SHUTDOWN,RocketMQ
会提示你曾经启动过,无需再次启动
ServiceState就会发生变化,RUNNING或者SHUTDOWN,RocketMQ
会提示你曾经启动过,无需再次启动
4.将生产者置为启动失败
5.生产者参数校验,执行checkConfig()方法,校验生产者实例设置的各种参数,
比如生产者组名是否为空,是否满足命名规则,长度是否满足等等
比如生产者组名是否为空,是否满足命名规则,长度是否满足等等
6.默认生产者组名转换为进行id,执行changeInstanceNameToPID()方法,
校验instance name,如果是默认名字则将其修改为进程id.接着调用getOrCreateMQClientInstance()
创建一个MQClientInstance实例,它与ClientId是一一对应,clientId是由ClientIP、instance及unitName
构成的,一般来讲,为了减少客户端的使用资源,如果将所有的instanceName和unitName设置为同样的值,
就只会创建一个MQClientInstance实例,而MQClientInstance实例的功能是管理本实例中全部的生产者与消费者的生产和消费行为。
同时将调用registerProducer()将生产者放入组中
校验instance name,如果是默认名字则将其修改为进程id.接着调用getOrCreateMQClientInstance()
创建一个MQClientInstance实例,它与ClientId是一一对应,clientId是由ClientIP、instance及unitName
构成的,一般来讲,为了减少客户端的使用资源,如果将所有的instanceName和unitName设置为同样的值,
就只会创建一个MQClientInstance实例,而MQClientInstance实例的功能是管理本实例中全部的生产者与消费者的生产和消费行为。
同时将调用registerProducer()将生产者放入组中
MQClientInstance中核心组件
producerTable:当前Client实例的全部生产者的内部实例
consumerTable:当前client实例的全部消费者的内部实例
adminExtTable:当前client实例的全部管理实例
mQClientAPIImpl:其实每个client也是一个NettyServer,也会支持Broker访问,这里实现了全部Client支持的接口
mQAdminImpl:管理接口的本地实现类
topicRouteTable:当前生产者、消费者中全部Topic的本地缓存路由信息
scheduledExecutorService:本地定时任务。比如定期获取当前Namesrv地址、定期同步Namesrv信息、定期更新Topic路由信息、
定期发送心跳给Broker、定期清理已下线的Broker、定期持久化消费位点、定期调整消费线程数
定期发送心跳给Broker、定期清理已下线的Broker、定期持久化消费位点、定期调整消费线程数
clientRemotingProcessor:请求的处理器,从处理方法processRequest()中我们可以直到目前支持哪些功能接口
pullMessageService:pull服务,这里是一个兼容写法,Pull服务是由一个状态变量方法this.isStopped()控制的,
这个stopped状态变量默认是false,而pullRequestQueue也是空的,所以这里之启动了pullMessageService,并没有真正地执行Pull操作
这个stopped状态变量默认是false,而pullRequestQueue也是空的,所以这里之启动了pullMessageService,并没有真正地执行Pull操作
rebalanceService:重新平衡服务,定期执行重新平衡方法this.mqClientFactory.doRebalance(),这里地mqClientFactory就是MQClientInstance实例,
通过依次调用MQClientInstance中保存地消费者实例地doRebalance()方法,来感知订阅关系地变化、集群变化等,以达到重新平衡
通过依次调用MQClientInstance中保存地消费者实例地doRebalance()方法,来感知订阅关系地变化、集群变化等,以达到重新平衡
consumerStateManager:消费监控,比如拉取RT(Response Time,响应时间)、拉取TPS(Transaction Per Second,每秒处理消息数),消费RT等都可以统计
MQClientInstance中核心方法
updateTopicRouteInfoFromNameServer:从多个Namesrv中获取最新Topic路由信息,更新本地缓存
cleanOfflineBroker:清理已下线的Broker
checkClientInBroker:检查Client是否在Broker中有效
sendHeartbeatToAllBrokerWithLock:发送客户端的心跳给所有的broker
registerConsumer:在本地注册一个消费者
unregisterConsumer:取消本地注册的消费者
registerProducer:在本地注册一个生产者
unregisterProducer:取消本地注册的生产者
registerAdminExt:注册一个管理实例
rebalanceImmediately:立即执行一次Rebalance.该操作是通过RokcetMQ的一个CountDownLatch2锁来实现的
doRebalance:对于所有已经注册的消费者实例,执行一次Rebalance
findBrokerAddressInAdmin:在本地缓存中查找Master或Slave Broker信息
findBrokerAddressInSubscribe:在本地缓存中查找Slave Broker信息
findBrokerAddressInPublish:在本地缓存中查找Master Broker地址
findConsumerIdList:查找消费者id列表
findBrokerAddressByTopic:通过Topic名字查找Broker地址
resetOffset:重置消费位点
getConsumerStastusL获取一个订阅关系中每个队列的消费速度
getTopicRouteTable:获取本地缓存Topic路由
consumeMessageDirectly:直接将消息发送给指定的消费者消费,和正常投递不同的是,指定了已经订阅的
消费者组中的一个,而不是全部已经订阅的消费者,一般使用于在消费消息后,某个消费者组想再香妃一次的场景
消费者组中的一个,而不是全部已经订阅的消费者,一般使用于在消费消息后,某个消费者组想再香妃一次的场景
consumerRunningInfo:获取消费者的消费统计信息,包含消费RT、消费TPS
7.注册本地路由信息
8.启动MQClientInstance,置为服务启动失败状态
9.如果没有配置Namesrv,则调用fetchNameServerAddr()远程获取
10.启动通信模块.this.mQClientAPIImpl.start();
11..启动各种定时任务。this.startScheduledTask();
12.启动消息拉取服务。this.pullMessageService.start();
13.启动负载均衡服务。this.rebalanceService.start();
14.启动push服务,this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
15.置为服务启动状态。this.serviceState = ServiceState.RUNNING;
Producer消息发送流程
业务层:通常指直接调用RocketMQClient发送API的业务代码
消息处理层:指RocketMQ Client获取业务发送的消息对象后,一系列的参数检查、消息发送准备、参数包装等操作
通信层:指RocketMQ基于Netty封装的一个RPC通信服务,RocketMQ的各个组件之间的通信全部使用该通信层
首先RocketMQ客户端接收业务层消息,然后通过DefaultMQProducerImpl发送一个RPC请求给Broker,
再由Broker处理请求并保存消息,以DefualtMQProducerImpl.send()接口为例,消息发送流程具体分为3步
再由Broker处理请求并保存消息,以DefualtMQProducerImpl.send()接口为例,消息发送流程具体分为3步
1.调用defaultMQProducerImpl.send()方法发送消息
2.通过设置的发送超时时间,调用defaultMQProducerImpl.send()方法发送消息,
设置的超时事件可以通过sendMsgTimeout进行变更,其默认值为3s
设置的超时事件可以通过sendMsgTimeout进行变更,其默认值为3s
3.执行defaultMQProducerImpl.sendDefaultImpl()方法.该方法是发送消息的核心方法
入参分析
msg:我们拼装好的Message
communicationMode:通信模式,同步、异步还是单向,默认调用send(Message msg)是同步调用
sendCallback:对于异步模式,需要设置发送完成后的回调
timeout:超时时间:默认3s
执行过程
1.两个检查:生产者状态this.makeSureStateOK();、消息及消息内容Validators.checkMessage(msg, this.defaultMQProducer);
没有运行的生产者不能发送消息。消息检查主要检查消息是否为空,消息的Topic的名字是否为空或者是否符合规范;消息体大小
是否符合要求,最大值为4MB,可以通过maxMessageSize进行设置
没有运行的生产者不能发送消息。消息检查主要检查消息是否为空,消息的Topic的名字是否为空或者是否符合规范;消息体大小
是否符合要求,最大值为4MB,可以通过maxMessageSize进行设置
this.makeSureStateOK()
Validators.checkMessage(msg, this.defaultMQProducer);
Validators.checkTopic(msg.getTopic());
Validators.isNotAllowedSendTopic(msg.getTopic());
2.执行this.tryToFindTopicPublishInfo(msg.getTopic());
获取Topic路由信息,如果不存在则发出异常提醒用户。如果本地缓存没有路由信息,更新到本地,再返回
获取Topic路由信息,如果不存在则发出异常提醒用户。如果本地缓存没有路由信息,更新到本地,再返回
3.计算消息发送的重试次数,同步重试和异步重试的执行方式是不同的,同步为3次,异步1次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
4.执行队列选择方法this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);,根据队列对象中保存的上一次发送消息的
broker的名字和Topic路由,选择(轮询)一个Queue将消息发送到Broker.我们可以sendLatencyFaultEnable来设置是否总是发送到延迟级别
较低的Broker,默认为false
broker的名字和Topic路由,选择(轮询)一个Queue将消息发送到Broker.我们可以sendLatencyFaultEnable来设置是否总是发送到延迟级别
较低的Broker,默认为false
5.执行this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
该方法是发送消息的核心方法,主要用于准备通信层的入参(比如Broker地址,请求体等),将消息传递给通信层,内部实现
是基于Nettty的,再封装为通信层request对象RemotingCommand前,会设置RequestCode标识当前请求发送的单个消息还是批量消息
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage()
Netty本身是一个异步的网络通信框架,怎么实现同步的呢?
this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
this.remotingClient.invokeSync(addr, request, timeoutMillis);
this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
在每次发送同步请求后,程序会执行waitResponse()方法,直到Netty接收到Broker的返回结果,如果在一定的时间内没有得到返回,则认为
发送消息失败,当Netty中的ChannelFuture有返回结果时,会调用putResponse进行释放锁,进而让请求线程同步返回
该方法是发送消息的核心方法,主要用于准备通信层的入参(比如Broker地址,请求体等),将消息传递给通信层,内部实现
是基于Nettty的,再封装为通信层request对象RemotingCommand前,会设置RequestCode标识当前请求发送的单个消息还是批量消息
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage()
Netty本身是一个异步的网络通信框架,怎么实现同步的呢?
this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
this.remotingClient.invokeSync(addr, request, timeoutMillis);
this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
在每次发送同步请求后,程序会执行waitResponse()方法,直到Netty接收到Broker的返回结果,如果在一定的时间内没有得到返回,则认为
发送消息失败,当Netty中的ChannelFuture有返回结果时,会调用putResponse进行释放锁,进而让请求线程同步返回
注:异步发送时有很多request,每个response返回后怎么与request进行对应呢?这里有一个关键参数opaque,RocketMQ每次发送
同步请求前都会为一个request分配一个opaque,这一个原子自增的id,一个response会以opaque作为key保存在responseTable中
这样用opaque就将request和response连接起来了,无论请求发送成功与否,都执行updateFaultItem()方法,这也是第3步讲的总是
发送到延迟级别较低的Broker逻辑
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
同步请求前都会为一个request分配一个opaque,这一个原子自增的id,一个response会以opaque作为key保存在responseTable中
这样用opaque就将request和response连接起来了,无论请求发送成功与否,都执行updateFaultItem()方法,这也是第3步讲的总是
发送到延迟级别较低的Broker逻辑
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
Namesrv
Namesrv核心数据结构和API
Namesrv核心数据结构和API.
Namesrv中保存的数据被称为Topic路由信息,Topic路由决定了Topic消息发送到哪些Broker,
消费者从哪些Broker消费消息,那么路由信息都包含哪些数据呢?
路由数据结构的实现代码都在RouteInfoManager类中
Namesrv中保存的数据被称为Topic路由信息,Topic路由决定了Topic消息发送到哪些Broker,
消费者从哪些Broker消费消息,那么路由信息都包含哪些数据呢?
路由数据结构的实现代码都在RouteInfoManager类中
BROKER_CHANNEL_EXPIRED_TIME:Broker存活的事件周期,默认为120s
topicQUeueTable:保存Topic和队列的信息,也叫真正的路由信息。一个Topic全部的Queue可能分布在
不同的Broker中,也可能分布在同一个Broker中
不同的Broker中,也可能分布在同一个Broker中
brokerAddrTable:存储了Broker名字和Broker信息的对应关系
clusterAddrTable:集群和Broker的对应关系
brokerLiveTable:当前在线的Broker地址和Broker信息的对应关系
filterServerTable:过滤服务器信息
Namesrv支持的全部API在DefaultRequestProcessor类中
RequestCode.REGISTER_BROKER:Broker注册自身信息到Namesrv
RequestCode.UNREGISTER_BROKER:Broker取消注册自身信息到Namesrv
RequestCode.GET_ROUTEINFO_BY_TOPIC:获取Topic路由信息
RequestCode.WIPE_WRITE_PERM_OF_BROKER:删除Broker的写权限
RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:获取全部Topic名字
RequestCode.DELETE_TOPIC_IN_NAMESRV:删除Topic信息
RequestCode.UPDATE_NAMESRV_CONFIG:更新Namesrv配置,当前配置是实时生效的
RequestCode.GET_NAMESRV_CONFIG:获取Namesrv配置
曾几何时,RocketMQ也采用Zookeeper作为协调者,但是繁杂的运行机制和过多的依赖导致RocketMQ
最终完全重新开发了一个零依赖、更简洁的Namesrv来替换
最终完全重新开发了一个零依赖、更简洁的Namesrv来替换
Namesrv启动流程
第一步:脚本和启动参数配置。
启动命令 nohup ./bin/mqnamesrv -c ./conf/namesrv.conf > dev/null 2>&1 &
通过脚本配置启动基本参数,比如配置文件路径、JVM参数,调用NamesrvStartup.main()方法,解析命令行的参数,
将处理好的参数转化为Java实例,传递给NamesrvController实例
启动命令 nohup ./bin/mqnamesrv -c ./conf/namesrv.conf > dev/null 2>&1 &
通过脚本配置启动基本参数,比如配置文件路径、JVM参数,调用NamesrvStartup.main()方法,解析命令行的参数,
将处理好的参数转化为Java实例,传递给NamesrvController实例
第二步:new 一个NamesrvController,加载命令行传递的配置参数,调用controller.initialize()方法初始化NamesrvController。
Namesrv启动的主要初始化过程也在这个方法中,代码如图
1.加载KV配置。主要是从本地文件中加载KV配置到内存中
2.初始化Netty通信层实例。RocketMQ基于Netty实现了一个RPC服务端,即NettyRemotingServer.通过参数nettyServerConfig,
会启动9876端口监听
3.Namesrv主动检测Broker是否可用,如果不可用就剔除。生产者、消费者也能通过心跳发现被踢出的路由,从而感知Broker下线
4.Namesrv定时打印配置信息到日志中。
Namesrv启动的主要初始化过程也在这个方法中,代码如图
1.加载KV配置。主要是从本地文件中加载KV配置到内存中
2.初始化Netty通信层实例。RocketMQ基于Netty实现了一个RPC服务端,即NettyRemotingServer.通过参数nettyServerConfig,
会启动9876端口监听
3.Namesrv主动检测Broker是否可用,如果不可用就剔除。生产者、消费者也能通过心跳发现被踢出的路由,从而感知Broker下线
4.Namesrv定时打印配置信息到日志中。
第三步:NamesrvController在初始化后添加JVM Hook.Hook中会调用NamesrvController.shutdown()方法来关闭整个Namesrv服务
第四步:调用NamesrvController.start()方法,启动整个Namesrv。其实start()方法只启动了Namesrv接口处理线程池
Namesrv关闭流程
为什么需要了解停止流程呢?RocketMQ在设计之初已经考虑了很多异常情况,比如Namesrv异常退出、
突然断电、内存被打满等等,只有了解了正常的停止流程才能对异常退出导致的问题进行精确的分析和排障。
突然断电、内存被打满等等,只有了解了正常的停止流程才能对异常退出导致的问题进行精确的分析和排障。
通常Namesrv的停止是通过关闭命令./mqshutdown namesrv来实现的。这个命令通过调用kill命令将关闭进程
通知发给JVM,JVM调用观级Hook执行停止逻辑。具体实现如下
通知发给JVM,JVM调用观级Hook执行停止逻辑。具体实现如下
1.关闭Netty服务端,主要是关闭Netty事件处理器、时间监听器等全部已经初始化的组件
2.关闭Namesrv接口处理线程池
3.关闭全部已经启动的定时任务
Namesrv路由原理。
Namesrv获取的Topic路由信息来自Broker定时心跳,心跳时Broker将Topic信息和其他信息发送到Namesrv。
Namesrv通过RequestCode.REGISTER_BROKER接口将心跳中的Broker信息和Topic信息存储在Namesrv中
Namesrv获取的Topic路由信息来自Broker定时心跳,心跳时Broker将Topic信息和其他信息发送到Namesrv。
Namesrv通过RequestCode.REGISTER_BROKER接口将心跳中的Broker信息和Topic信息存储在Namesrv中
路由注册。
registerBrokerWithFilterServer()方法中的this.namesrvController.getRouteInfoManager.registerBroker()方法,
该方法的主要功能是将request解码为路由对象,保存在Namesrv中。
在路由信息注册完成后,Broker会每隔30s发送一个注册请求给集群中全部的Namesrv,俗称心跳信,会把最新的
Topic路由信息注册到Namesrv中
registerBrokerWithFilterServer()方法中的this.namesrvController.getRouteInfoManager.registerBroker()方法,
该方法的主要功能是将request解码为路由对象,保存在Namesrv中。
在路由信息注册完成后,Broker会每隔30s发送一个注册请求给集群中全部的Namesrv,俗称心跳信,会把最新的
Topic路由信息注册到Namesrv中
路由剔除
如果Broker长久没有心跳或者宕机,那么Namesrv会将这些不提供服务的Broker剔除,同时生产者和消费者与Namesrv
心跳后也会感知被踢掉的Broker,如此Broker扩容或者宕机对生产、消费无感知的情况就处理完了
Namesrv有两种剔除Broker的方式
如果Broker长久没有心跳或者宕机,那么Namesrv会将这些不提供服务的Broker剔除,同时生产者和消费者与Namesrv
心跳后也会感知被踢掉的Broker,如此Broker扩容或者宕机对生产、消费无感知的情况就处理完了
Namesrv有两种剔除Broker的方式
第一种:Broker主动关闭时,会调用Namesrv的取消注册Broker的接口RequestCode=RequestCode.UNREGISTER_BROKER,
将自身从集群中删除
将自身从集群中删除
第二种:Namesrv通过定时扫描已经下线的Broker,将其主动剔除,实现过程在NamesrvController.initialize()方法中,
这里定时执行scanNotActiveVroker(),该方法会扫描全部已经注册的Broker,依次将每一个Broker心跳的最后更新时间
和当前时间做对比,如果Broker心跳的最后更新时间超过BROKER_CHANNEL_EXPIRED_TIME(1000 * 60 * 2= 120s)
则将Broker剔除,从此没有心跳的Broker从路由中被剔除,而客户端无任何感知
这里定时执行scanNotActiveVroker(),该方法会扫描全部已经注册的Broker,依次将每一个Broker心跳的最后更新时间
和当前时间做对比,如果Broker心跳的最后更新时间超过BROKER_CHANNEL_EXPIRED_TIME(1000 * 60 * 2= 120s)
则将Broker剔除,从此没有心跳的Broker从路由中被剔除,而客户端无任何感知
Broker
Broker存储目录结构
Broker启动流程
BrokerStartup.java类主要负责为真正的启动过程做准备,解析脚本传过来的参数,初始化Broker配置,创建BrokerController实例等工作。
BrokerController.java类是Broker的掌控者,它管理和控制Broker的各个模块,包含通信模块、存储模块、索引模块、定时任务等。
在BrokerController全部模块初始化并启动成功后,将在日志中输出info信息"boot success"
BrokerStartup.java类主要负责为真正的启动过程做准备,解析脚本传过来的参数,初始化Broker配置,创建BrokerController实例等工作。
BrokerController.java类是Broker的掌控者,它管理和控制Broker的各个模块,包含通信模块、存储模块、索引模块、定时任务等。
在BrokerController全部模块初始化并启动成功后,将在日志中输出info信息"boot success"
第一步:初始化启动环境。
这是由./bin/mqbroker和./bin/runbroker.sh两个脚本来完成的,/bin/mqbroker脚本主要用于设置RocketMQ根目录环境变量
if [ -z "$ROCKETMQ_HOME" ] ; then
....
fi
export ROCKETMQ_HOME
sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup $@
./bin/runbroker.sh脚本的主要功能是检测JDK的环境配置和JVM的参数配置。JDK的环境配置的检查逻辑的实现代码如下:
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=$(dirname $0)/..
export CLASSPATH=.:${BASE_DIR}/conf:${BASE_DIR}/lib/*:${CLASSPATH}
下面是JVM的参数配置,通常-Xms -Xmx -Xmn -XX:MaxDirectMemorySize这四个参数会随着部署RocketMQ服务器的物理内存大小的变化
而进行相应的改变
choose_gc_options()
{
JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | head -1 | cut -d'"' -f2 | sed 's/^1\.//' | cut -d'.' -f1)
if [ -z "$JAVA_MAJOR_VERSION" ] || [ "$JAVA_MAJOR_VERSION" -lt "8" ] ; then
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
else
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
fi
if [ -z "$JAVA_MAJOR_VERSION" ] || [ "$JAVA_MAJOR_VERSION" -lt "9" ] ; then
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
else
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,filesize=30M"
fi
}
choose_gc_log_directory
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g"
choose_gc_options
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
这是由./bin/mqbroker和./bin/runbroker.sh两个脚本来完成的,/bin/mqbroker脚本主要用于设置RocketMQ根目录环境变量
if [ -z "$ROCKETMQ_HOME" ] ; then
....
fi
export ROCKETMQ_HOME
sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup $@
./bin/runbroker.sh脚本的主要功能是检测JDK的环境配置和JVM的参数配置。JDK的环境配置的检查逻辑的实现代码如下:
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=$(dirname $0)/..
export CLASSPATH=.:${BASE_DIR}/conf:${BASE_DIR}/lib/*:${CLASSPATH}
下面是JVM的参数配置,通常-Xms -Xmx -Xmn -XX:MaxDirectMemorySize这四个参数会随着部署RocketMQ服务器的物理内存大小的变化
而进行相应的改变
choose_gc_options()
{
JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | head -1 | cut -d'"' -f2 | sed 's/^1\.//' | cut -d'.' -f1)
if [ -z "$JAVA_MAJOR_VERSION" ] || [ "$JAVA_MAJOR_VERSION" -lt "8" ] ; then
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
else
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
fi
if [ -z "$JAVA_MAJOR_VERSION" ] || [ "$JAVA_MAJOR_VERSION" -lt "9" ] ; then
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
else
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,filesize=30M"
fi
}
choose_gc_log_directory
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g"
choose_gc_options
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
第二步:初始化BrokerController
该初始化主要包含ROcketMQ启动命令行参数解析、Broker各个模块配置参数解析、Broker各个模块初始化、进程关机Hook初始化等过程.
RocketMQ启动命令行参数解析。其代码在BrokerStartup.createBrokerController()方法中,RocketMQ的启动参数支持启动命令指定,
也可以在配置文件中进行配置。通常,命令行参数的优先级大于配置文件。
通过第三方库将命令行输入参数解析为commandLine对象,再获取输入参数值。命令行参数的启动比较简单,如果大量的RocketMQ配置
项放在启动命令中,就会导致启动命令较长,难以维护,一般推荐启动RocketMQ使用配置文件的方式。配置文件在createBrokerController()
方法中被解析的代码如图所示
在brokerConfig、nettyServerConfig、nettyClientConfig、messageStoreConfig这些基本配置对象初始化完毕后,还有后续代码依据
各种启动条件重新调整部分参数。在各个配置对象初始化完毕后,程序会调用BrokerController.initialize()方法对Broker的各个模块进行
初始化
xxxConfigManager.load()方法的功能是加载Broker基础数据配置,包含Broker中的Topic、消费位点、订阅关系、消费过滤(无实际数据需要加载).
这些配置加载成功后,初始化存储层服务对象messageStore和Broker监控统计对象brokerStats.
然后,Broker会初始化通信层服务和一些列定时任务,通信层服务主要初始化正常通信通道、VIP通信通道和通信线程池。
这里以VIP通道为例,分析通信层服务初始化,以消费进度定时持久化为例,分析定时任务初始化。
fastConfig就是VIP通信层的配置,其配置对象"克隆"自正常通信的配置对象,唯独通信端口是nettyServerConfig.getListenPort()-2,
也就是10911-2.利用fastConfig初始化fastRemotingServer的结果也就是我们常用的VIP通道.
从fastConfig和fastRemotingServer的实现类命名来看,RocketMQ的通信层实现本质上是基于Netty的,那么通信层又是如何处理客户端
发送的Netty请求的呢?
通信层对象初始化完成后,会调用this.registerProcessor()方法,这里将正常的通信层对象和VIP通道的通信层对象与各个请求处理器进行
关联,比如将发送消息的请求交给接收消息的请求处理器进行处理
消费进度定时持久化。
Broker在接收到消费者上报的消费进度后,会定期持久化到物理文件中,当消费者因为重新发布或者宕机而重启时,能从消费进度中得知恢复,
不至于重复消费,持久化周期可以通过参数flushConsumerOffsetInterval(以ms为单位)进行配置
该初始化主要包含ROcketMQ启动命令行参数解析、Broker各个模块配置参数解析、Broker各个模块初始化、进程关机Hook初始化等过程.
RocketMQ启动命令行参数解析。其代码在BrokerStartup.createBrokerController()方法中,RocketMQ的启动参数支持启动命令指定,
也可以在配置文件中进行配置。通常,命令行参数的优先级大于配置文件。
通过第三方库将命令行输入参数解析为commandLine对象,再获取输入参数值。命令行参数的启动比较简单,如果大量的RocketMQ配置
项放在启动命令中,就会导致启动命令较长,难以维护,一般推荐启动RocketMQ使用配置文件的方式。配置文件在createBrokerController()
方法中被解析的代码如图所示
在brokerConfig、nettyServerConfig、nettyClientConfig、messageStoreConfig这些基本配置对象初始化完毕后,还有后续代码依据
各种启动条件重新调整部分参数。在各个配置对象初始化完毕后,程序会调用BrokerController.initialize()方法对Broker的各个模块进行
初始化
xxxConfigManager.load()方法的功能是加载Broker基础数据配置,包含Broker中的Topic、消费位点、订阅关系、消费过滤(无实际数据需要加载).
这些配置加载成功后,初始化存储层服务对象messageStore和Broker监控统计对象brokerStats.
然后,Broker会初始化通信层服务和一些列定时任务,通信层服务主要初始化正常通信通道、VIP通信通道和通信线程池。
这里以VIP通道为例,分析通信层服务初始化,以消费进度定时持久化为例,分析定时任务初始化。
fastConfig就是VIP通信层的配置,其配置对象"克隆"自正常通信的配置对象,唯独通信端口是nettyServerConfig.getListenPort()-2,
也就是10911-2.利用fastConfig初始化fastRemotingServer的结果也就是我们常用的VIP通道.
从fastConfig和fastRemotingServer的实现类命名来看,RocketMQ的通信层实现本质上是基于Netty的,那么通信层又是如何处理客户端
发送的Netty请求的呢?
通信层对象初始化完成后,会调用this.registerProcessor()方法,这里将正常的通信层对象和VIP通道的通信层对象与各个请求处理器进行
关联,比如将发送消息的请求交给接收消息的请求处理器进行处理
消费进度定时持久化。
Broker在接收到消费者上报的消费进度后,会定期持久化到物理文件中,当消费者因为重新发布或者宕机而重启时,能从消费进度中得知恢复,
不至于重复消费,持久化周期可以通过参数flushConsumerOffsetInterval(以ms为单位)进行配置
MixAll.properties2Object()方法的主要功能是,按照properties中配置的key与目标对象字段名是否相同来设置
对应的值
对应的值
BrokerStartup中调用BrokerController#initialize()
第三步,启动RocketMQ的各个组件
组件启动在BrokerController.start()方法中
组件启动在BrokerController.start()方法中
this.messageStore():存储层服务,比如CommitLog、ConsumeQueue存储管理
this.remotingServer:普通通道请求处理服务。一般的请求都是在这里被处理的
this.fastRemotingServer:VIP通道请求处理服务,如果普通通道比较忙,那么可以使用VIP通道,
一般作为客户端降级使用
一般作为客户端降级使用
this.brokerOuterAPI:Broker访问对外接口的封装对象
this.pullRequestHoldService:Pull长轮询服务
this.clientHouseKeepingService:清理心跳超时的生产者、消费者、过滤服务器
this.filterServerManager:过滤服务器管理
接下来将Broker信息注册到Namesrv,并处理Master与Slave的关系
this.brokerStatsManager:Broker监控数据统计管理
this.brokerFastFailure:Broker快速失败处理
Broker关闭流程
Broker关闭只是调用BrokerStartup.java中注册JVM Hook 的BrokerController.shutdown()方法,
该方法再调用各个模块关闭方法,最后关闭整个进程
Broker关闭只是调用BrokerStartup.java中注册JVM Hook 的BrokerController.shutdown()方法,
该方法再调用各个模块关闭方法,最后关闭整个进程
Broker消息存储机制
存储概述
Commit目录下有多个CommitLog文件,其实CommitLog只有一个文件,
为了方便保存和读写,被切分为多个子文件,所有的子文件通过其保存的
第一个和最后一个消息的物理位点进行连接。
Broker按照时间和物理的offset顺序写CommitLog文件,每次写的时候需要
加锁。每个CommitLog子文件的大小默认是1GB,可以通过mappedFileSizeCommitLog
进行配置。当一个CommitLog写满后,创建一个新的CommitLog,继续上一个CommitLog的
Offset写操作,直到写满换成下一个文件,所有的CommitLog子文件之间的Offset是连续的,
所以最后一个CommitLog总是被写入的
为了方便保存和读写,被切分为多个子文件,所有的子文件通过其保存的
第一个和最后一个消息的物理位点进行连接。
Broker按照时间和物理的offset顺序写CommitLog文件,每次写的时候需要
加锁。每个CommitLog子文件的大小默认是1GB,可以通过mappedFileSizeCommitLog
进行配置。当一个CommitLog写满后,创建一个新的CommitLog,继续上一个CommitLog的
Offset写操作,直到写满换成下一个文件,所有的CommitLog子文件之间的Offset是连续的,
所以最后一个CommitLog总是被写入的
为什么写文件这么快?
RocketMQ的存储涉及中,很大一部分是基于Kafka的涉及进行优化的。
PageCache:现代操作系统内核被设计为按照Page读取文件,每个Page默认4KB,
因为程序一般符合局部性原理,所以操作系统在读取一段文件内容时,会将该段
内容和附件的文件内容都读取到内核Page中(预读),下次读取的内容如果命中PageCache
就可以直接返回内容,不用再次读取磁盘
PageCache机制也不是完全无缺点的,当遇到操作系统进行脏回写、内存回收、内存交换等情况,
就会引起较大的消息读写延迟。对于这些情况,RocketMQ采用了多种优化技术,比如内存预分配、文件预热、
mlock系统调用等,以保证再最大限度地发徽PageCache机制的优点的同时,尽可能地减少消息读写延迟。
所以在生产环境部署RocketMQ的时候,尽量采用SSD独享磁盘,这样就可以最大限度地保证读写性能
Virtual Memory(虚拟内存):为了保证每个程序有足够地运行空间和编程空间,可以将一些暂时不用的内存
数据保存到交换区(其实是磁盘)中,这样就可以运行更多的程序,这种"内存"被称为虚拟内存(因为不是真的存在)
操作系统的可分配内存大小=虚拟内存大小+物理内存大小
零拷贝和Java文件映射:从文件读取流程可以看到,读取到内核态的数据会经历两次拷贝,第一次从内核态内存
拷贝到用户态内存,第二次从用户态内存拷贝到Java进程的某个变量地址,这样Java变量才能读取数据。
为了提高读写文件的效率,IBM实现了零拷贝技术,它是世界上最早实现该技术的公司,后来各个厂商(如甲骨文等)
也纷纷实现了该技术。
java.nio.MappedByteBuffer.java文件中实现了零拷贝技术,即Java进程映射到内核态内存,原来内核态内存与用户态
内存的互相拷贝过程就消失了。在消息系统中,用户关心的往往都是最新的数据,理论上,基本的操作都在PageCache
中,PageCache的操作速度和内存基本持平,所以速度非常快。当然,也存在读取历史消息而历史消息不再PageCache
中的情况,比如在流处理和批处理中,经常将消费重置到历史消息位点,以重新计算全部结果。这种情况只是在第一次
拉取消息时会读取磁盘,以后可以利用磁盘预读,几乎可以做到不再直接读取磁盘,其性能与利用PageCache相比,
只在第一次有差异
RocketMQ的存储涉及中,很大一部分是基于Kafka的涉及进行优化的。
PageCache:现代操作系统内核被设计为按照Page读取文件,每个Page默认4KB,
因为程序一般符合局部性原理,所以操作系统在读取一段文件内容时,会将该段
内容和附件的文件内容都读取到内核Page中(预读),下次读取的内容如果命中PageCache
就可以直接返回内容,不用再次读取磁盘
PageCache机制也不是完全无缺点的,当遇到操作系统进行脏回写、内存回收、内存交换等情况,
就会引起较大的消息读写延迟。对于这些情况,RocketMQ采用了多种优化技术,比如内存预分配、文件预热、
mlock系统调用等,以保证再最大限度地发徽PageCache机制的优点的同时,尽可能地减少消息读写延迟。
所以在生产环境部署RocketMQ的时候,尽量采用SSD独享磁盘,这样就可以最大限度地保证读写性能
Virtual Memory(虚拟内存):为了保证每个程序有足够地运行空间和编程空间,可以将一些暂时不用的内存
数据保存到交换区(其实是磁盘)中,这样就可以运行更多的程序,这种"内存"被称为虚拟内存(因为不是真的存在)
操作系统的可分配内存大小=虚拟内存大小+物理内存大小
零拷贝和Java文件映射:从文件读取流程可以看到,读取到内核态的数据会经历两次拷贝,第一次从内核态内存
拷贝到用户态内存,第二次从用户态内存拷贝到Java进程的某个变量地址,这样Java变量才能读取数据。
为了提高读写文件的效率,IBM实现了零拷贝技术,它是世界上最早实现该技术的公司,后来各个厂商(如甲骨文等)
也纷纷实现了该技术。
java.nio.MappedByteBuffer.java文件中实现了零拷贝技术,即Java进程映射到内核态内存,原来内核态内存与用户态
内存的互相拷贝过程就消失了。在消息系统中,用户关心的往往都是最新的数据,理论上,基本的操作都在PageCache
中,PageCache的操作速度和内存基本持平,所以速度非常快。当然,也存在读取历史消息而历史消息不再PageCache
中的情况,比如在流处理和批处理中,经常将消费重置到历史消息位点,以重新计算全部结果。这种情况只是在第一次
拉取消息时会读取磁盘,以后可以利用磁盘预读,几乎可以做到不再直接读取磁盘,其性能与利用PageCache相比,
只在第一次有差异
1.Broker存储流程
RocketMQ首先将消息数据写入操作系统PageCache,然后定时将数据刷入磁盘。
接下来主要分析RocketMQ是如何接收发送消息请求并将消息写入PageCache的,
整个过程如图
RocketMQ首先将消息数据写入操作系统PageCache,然后定时将数据刷入磁盘。
接下来主要分析RocketMQ是如何接收发送消息请求并将消息写入PageCache的,
整个过程如图
1.Broker接收客户端发送消息的请求并做预处理。
SendMessageProcessor.processRequest()方法会自动被调用接收、解析客户端请求为消息实例。
该方法执行分为四个过程:解析请求参数、执行发送处理前的Hook、调用保存方法存储消息、
执行发送处理后的Hook
随着RocketMQ版本的迭代更新,通信层的协议也出现了不兼容的变化,比如解析请求需要根据
不同的客户端请求协议版本做不同处理
SendMessageProcessor.processRequest()方法会自动被调用接收、解析客户端请求为消息实例。
该方法执行分为四个过程:解析请求参数、执行发送处理前的Hook、调用保存方法存储消息、
执行发送处理后的Hook
随着RocketMQ版本的迭代更新,通信层的协议也出现了不兼容的变化,比如解析请求需要根据
不同的客户端请求协议版本做不同处理
2.Broker存储前预处理消息.
预处理方法为SendMessageProcessor.sendMessage()
Netty是异步执行的,也就是说,请求发送到Broker被处理后,返回结果时,在客户端的处理线程
已经不再时发送亲贵的线程,那么客户端如何确定返回结果对应哪个请求呢?很简单,我们可以
通过返回标志来判断。
其次,做一系列存储前发送请求的数据检查,比如死信消息处理、Broker是否拒绝事务消息处理、
消息基本检查等。消息基本检查方法为AbstractSendMessageProcessor.msgCheck():
该方法的主要功能如下:
a.校验Broker是否配置可写
b.校验Topic名字是否为默认值
c.校验Topic配置是否存在
d.校验queueId与读写队列数是否匹配
e.校验Broker是否支持事务消息(msgCheck之后进行的校验)
预处理方法为SendMessageProcessor.sendMessage()
Netty是异步执行的,也就是说,请求发送到Broker被处理后,返回结果时,在客户端的处理线程
已经不再时发送亲贵的线程,那么客户端如何确定返回结果对应哪个请求呢?很简单,我们可以
通过返回标志来判断。
其次,做一系列存储前发送请求的数据检查,比如死信消息处理、Broker是否拒绝事务消息处理、
消息基本检查等。消息基本检查方法为AbstractSendMessageProcessor.msgCheck():
该方法的主要功能如下:
a.校验Broker是否配置可写
b.校验Topic名字是否为默认值
c.校验Topic配置是否存在
d.校验queueId与读写队列数是否匹配
e.校验Broker是否支持事务消息(msgCheck之后进行的校验)
3.执行DefaultMessageStore.putMessage()方法进行消息校验和存储模块检查。
在真正保存消息前,会对消息数据做基本检查、对存储服务做可用性检查、对Broker做是否Slave的检查等
总结如下:
a.校验存储模块是否已经关闭
b.校验Broker是否是Slave
c.校验存储模块运行标记
d.校验Topic长度
e.校验扩展信息的长度
f.校验操作系统Page Cache是否繁忙
在真正保存消息前,会对消息数据做基本检查、对存储服务做可用性检查、对Broker做是否Slave的检查等
总结如下:
a.校验存储模块是否已经关闭
b.校验Broker是否是Slave
c.校验存储模块运行标记
d.校验Topic长度
e.校验扩展信息的长度
f.校验操作系统Page Cache是否繁忙
begin:CommitLog加锁开始时间,写CommitLog成功后,该值为0
diff:当前时间和CommitLog持有锁时间的差值
如果isOSPageCacheBusy()方法返回true,则表示当前有消息正在写入CommitLog,
并且持有锁的时间超过设置的阈值
diff:当前时间和CommitLog持有锁时间的差值
如果isOSPageCacheBusy()方法返回true,则表示当前有消息正在写入CommitLog,
并且持有锁的时间超过设置的阈值
4.执行CommitLog.putMessage()方法,后面版本中将默认异步保存。
存储消息的核心处理过程如下:
a.设置消息保存时间为当前时间戳,设置消息完整性校验码CRC(循环冗余码)
b.延迟消息处理.如果发送的消息是延迟消息,这里会单独设置延迟消息的
数据字段,比如修改Topic为延迟消息特有的Topic--SCHEDULE_TOPIC_XXX,
并且备份原来的Topic和queueId,以便延迟消息在投递后被消费者消费
c.获取最后一个CommitLog文件实例MappedFile。锁住该MappedFile.默认
为自旋锁,也可以通过useReetrantLockWhenPutMessage进行配置、修改
和使用ReentrantLock
d:校验最后一个MappedFile,如果结果为空或已写满,则新创建一个MappedFile返回
e:调用MappedFile.appendMEssage()方法,将消息写入MappedFile
存储消息的核心处理过程如下:
a.设置消息保存时间为当前时间戳,设置消息完整性校验码CRC(循环冗余码)
b.延迟消息处理.如果发送的消息是延迟消息,这里会单独设置延迟消息的
数据字段,比如修改Topic为延迟消息特有的Topic--SCHEDULE_TOPIC_XXX,
并且备份原来的Topic和queueId,以便延迟消息在投递后被消费者消费
c.获取最后一个CommitLog文件实例MappedFile。锁住该MappedFile.默认
为自旋锁,也可以通过useReetrantLockWhenPutMessage进行配置、修改
和使用ReentrantLock
d:校验最后一个MappedFile,如果结果为空或已写满,则新创建一个MappedFile返回
e:调用MappedFile.appendMEssage()方法,将消息写入MappedFile
根据消息是单个消息还是批量消息来调用AppendMessageCallback.doAppend()方法,
并将消息写入PageCache,该方法的功能包含以下几点:
1.查找即将写入的消息物理Offset
2.事务消息单独处理。这里主要处理Prepared类型和Rollback类型的消息,设置消息queueOffset为0
3.序列化消息,并将序列化结果保存到ByteBuffer中(文件内存映射的PageCache或Direct Memory,
简称DM).特别地,如果将输盘设置为异步刷盘,那么当transientStorePoolEnable=true时,会先写入DM,
DM中地数据再异步写入文件内存映射地PageCache中,因为消费者始终时从PageCache中读取消息消费的,
所以这个机制也称为"读写分离"
4.更新消息所在Queue的位点
并将消息写入PageCache,该方法的功能包含以下几点:
1.查找即将写入的消息物理Offset
2.事务消息单独处理。这里主要处理Prepared类型和Rollback类型的消息,设置消息queueOffset为0
3.序列化消息,并将序列化结果保存到ByteBuffer中(文件内存映射的PageCache或Direct Memory,
简称DM).特别地,如果将输盘设置为异步刷盘,那么当transientStorePoolEnable=true时,会先写入DM,
DM中地数据再异步写入文件内存映射地PageCache中,因为消费者始终时从PageCache中读取消息消费的,
所以这个机制也称为"读写分离"
4.更新消息所在Queue的位点
以上代码中,CommitLog.this.TopicQueueTable类型是HashMap<String/* topic-queueid */, Long/* offset */>,
CommitLog.this.TopicQueueTable的key是Topic名字与消息所在的Queue的QueueId的构成,value是消息位点值
CommitLog.this.TopicQueueTable的key是Topic名字与消息所在的Queue的QueueId的构成,value是消息位点值
在消息存储完成后,会处理刷盘逻辑和主从同步逻辑,分别调用(有些版本是handleDiskFlush()方法和handleHA()方法)
CommitLog.submitFlushRequest()和submitReplicaRequest()
在Broker处理发送消息时,由于处理器SendMessageProcessor本身是一个线程池服务,所以涉及了快速失败逻辑,
方便在高峰时自我保护。实现代码在BrokerFastFailure.cleanExpiredRequest()方法中
在BrokerController启动BrokerFailure服务时,会启动一个定时任务处理快速失败的的异常
CommitLog.submitFlushRequest()和submitReplicaRequest()
在Broker处理发送消息时,由于处理器SendMessageProcessor本身是一个线程池服务,所以涉及了快速失败逻辑,
方便在高峰时自我保护。实现代码在BrokerFastFailure.cleanExpiredRequest()方法中
在BrokerController启动BrokerFailure服务时,会启动一个定时任务处理快速失败的的异常
从以上代码可以看到,每间隔10ms会执行一次cleanExpiredRequest()方法,
清理一些非法过期的请求。
第一种,系统繁忙时发送消息请求快速失败处理。
当操作系统PageCache繁忙时,会将发送消息请求从发送消息请求线程池工作
队列中取出来,直接返回SYSTEM_BUSY。如果此种情况持续发生说明系统已经
不堪重负,需要增加系统资源或者扩容来减轻当前Broker的压力
第二种,发送请求超时处理
第三种,拉取消息请求超时处理
第二种和第三种的代码逻辑与第一种代码逻辑的处理类似,如果出现了,说明请求
在线程池的工作队列中排队时间超过预期配置的时间,那么增加排队等待时间即可。
如果请求持续超时,说明系统可能达到瓶颈,那么需要增加系统资源或者扩容
清理一些非法过期的请求。
第一种,系统繁忙时发送消息请求快速失败处理。
当操作系统PageCache繁忙时,会将发送消息请求从发送消息请求线程池工作
队列中取出来,直接返回SYSTEM_BUSY。如果此种情况持续发生说明系统已经
不堪重负,需要增加系统资源或者扩容来减轻当前Broker的压力
第二种,发送请求超时处理
第三种,拉取消息请求超时处理
第二种和第三种的代码逻辑与第一种代码逻辑的处理类似,如果出现了,说明请求
在线程池的工作队列中排队时间超过预期配置的时间,那么增加排队等待时间即可。
如果请求持续超时,说明系统可能达到瓶颈,那么需要增加系统资源或者扩容
2.内存映射机制与高效磁盘。
RocketMQ在存储涉及中通过内存映射、顺序写文件等方式实现了高吞吐。
RocketMQ的基本数据结构:
CommitLog:RocketMQ对存储消息的物理文件的抽象实现,也就是对物理CommitLog文件的具体实现。
MappedFile:CommitLog文件在内存中的映射文件,映射文件同时具有内存的写入速度和与磁盘一样可靠的
持久化方式.
MappedFileQueue:映射文件队列中有全部的CommitLog映射文件,第一个映射文件为最先过期的文件,
最后一个文件是最后过期的文件,最新的消息总是写入最后一个映射文件。
RocketMQ在存储涉及中通过内存映射、顺序写文件等方式实现了高吞吐。
RocketMQ的基本数据结构:
CommitLog:RocketMQ对存储消息的物理文件的抽象实现,也就是对物理CommitLog文件的具体实现。
MappedFile:CommitLog文件在内存中的映射文件,映射文件同时具有内存的写入速度和与磁盘一样可靠的
持久化方式.
MappedFileQueue:映射文件队列中有全部的CommitLog映射文件,第一个映射文件为最先过期的文件,
最后一个文件是最后过期的文件,最新的消息总是写入最后一个映射文件。
每个MappedFileQueue包含多个MappedFile,就是真是的物理CommitLog文件.
在Java中通过java.nio.MappedByteBuffer来实现文件的内存映射,即文件读写
都是通过MappedByteBuffer(其实是PageCache)来操作的。
写入数据时先加锁,然后通过Append方式写入最新MappedFile。对于读取消息,
大部分情况下用户只关心最新数据,而这些数据都在PageCache中,也就是说,
读写文件就是在PageCache中进行的,其速度几乎等于直接操作内存的速度
在Java中通过java.nio.MappedByteBuffer来实现文件的内存映射,即文件读写
都是通过MappedByteBuffer(其实是PageCache)来操作的。
写入数据时先加锁,然后通过Append方式写入最新MappedFile。对于读取消息,
大部分情况下用户只关心最新数据,而这些数据都在PageCache中,也就是说,
读写文件就是在PageCache中进行的,其速度几乎等于直接操作内存的速度
3.文件刷盘机制
消息存储完成后,会被操作系统持久化到磁盘,也就是刷盘。
RocketMQ支持两种刷盘方式,在Broker启动时配置flushDiskType=SYNC_FLUSH表示同步刷盘.
配置flushDiskType=ASYNC_FLUSH表示异步刷盘
刷盘涉及以下3个线程服务。如图所示
消息存储完成后,会被操作系统持久化到磁盘,也就是刷盘。
RocketMQ支持两种刷盘方式,在Broker启动时配置flushDiskType=SYNC_FLUSH表示同步刷盘.
配置flushDiskType=ASYNC_FLUSH表示异步刷盘
刷盘涉及以下3个线程服务。如图所示
GroupCommitService就是CommitLog.GroupCommitService--同步刷盘任务。
在Broker存储消息到PageCache后,在Broker存储消息到PageCache后,同步将PageCache刷到
磁盘,在返回客户端消息并写入结果
在Broker存储消息到PageCache后,在Broker存储消息到PageCache后,同步将PageCache刷到
磁盘,在返回客户端消息并写入结果
FlushRealTimeService就是CommitLog.FlushRealTimeService--异步刷盘服务。
在Broker存储消息到PageCache后,立即返回客户端写入结果,然后异步刷盘服务将PageCache
异步刷到磁盘,
在Broker存储消息到PageCache后,立即返回客户端写入结果,然后异步刷盘服务将PageCache
异步刷到磁盘,
CommitRealTimeService就是CommitLog.CommitRealTimeService--异步转存服务。
Broker通过读写分离将消息写入直接内存(Direct Meomory,简称DM),然后通过异步
转存服务,将DM中的数据再次存储到PageCache中,以供异步刷盘服务将PageCache
刷到磁盘中,转存服务过程如上
Broker通过读写分离将消息写入直接内存(Direct Meomory,简称DM),然后通过异步
转存服务,将DM中的数据再次存储到PageCache中,以供异步刷盘服务将PageCache
刷到磁盘中,转存服务过程如上
将消息成功保存到CommitLog映射文件后,调用submitFlushRequest()/handleDiskFlush()方法处理刷盘逻辑,
同步刷盘、异步刷盘都是在这个方法中发起的
同步刷盘、异步刷盘都是在这个方法中发起的
3.1同步刷盘和异步刷盘
同步刷盘
存储消息线程:主要负责将消息存储到PageCache或者DM中,存储成功后通过调用handleDiskFlush()/submitFlushRequest()
方法将同步刷盘请求"发送给"GroupCommitService服务,并在该刷盘请求上执行锁等待
方法将同步刷盘请求"发送给"GroupCommitService服务,并在该刷盘请求上执行锁等待
同步刷盘服务线程:通过GroupCommitService类实现的同步刷盘服务
正常同步刷盘线程会间隔10ms执行一次CommitLog.GroupService.doCommit()方法,
该方法循环每一个同步刷盘请求,如果刷盘成功,那么唤醒等待刷盘请求锁的存储消息
线程,并告知刷盘成功
该方法循环每一个同步刷盘请求,如果刷盘成功,那么唤醒等待刷盘请求锁的存储消息
线程,并告知刷盘成功
由于操作系统刷盘耗时及每次刷多少字节数据到磁盘等,都不是RocketMQ进程能掌控的,
所以在每次刷盘前都需要做必要的检查,以确认当前同步刷盘请求对应位点是否已经被刷盘
,如果已经被刷盘,当前刷盘请求就不需要执行,
RocketMQ进程正常关闭时,如果有同步刷盘请求未执行完时,那么数据会丢失吗?
答案是:不会的,在关闭刷盘服务时,会执行Thread.sleep(10)等待所有的同步刷盘请求
保存到刷盘请求队列中后,交换保存刷盘请求的队列,再执行doCommit()方法
所以在每次刷盘前都需要做必要的检查,以确认当前同步刷盘请求对应位点是否已经被刷盘
,如果已经被刷盘,当前刷盘请求就不需要执行,
RocketMQ进程正常关闭时,如果有同步刷盘请求未执行完时,那么数据会丢失吗?
答案是:不会的,在关闭刷盘服务时,会执行Thread.sleep(10)等待所有的同步刷盘请求
保存到刷盘请求队列中后,交换保存刷盘请求的队列,再执行doCommit()方法
异步刷盘。
如果Broker配置读写分离,则异步刷盘过程包含异步转存数据和真正的异步刷盘操作。
如果Broker配置读写分离,则异步刷盘过程包含异步转存数据和真正的异步刷盘操作。
异步转存数据是通过CommitRealTimeService.run()方法实现的
1.获取转存参数。整个转存过程的参数都是可配置的。
interval:对应的配置项名字是commitIntervalCommitLog,转存操作线程两次执行操作
的时间间隔默认为200ms
的时间间隔默认为200ms
commitDataLeastPages:最小转存PageCache的Page数,默认为4
commitDataThoroughInterval:对应的配置项名字是commitComitLogThoroughInterval,
两次转存操作的最长间隔时间默认为200ms
如果距离上次转存操作时间超过commitCommitLogThoroughInterval,则设置commitDataLeastPages=0
表示继续将上次未完成的数据刷盘
两次转存操作的最长间隔时间默认为200ms
如果距离上次转存操作时间超过commitCommitLogThoroughInterval,则设置commitDataLeastPages=0
表示继续将上次未完成的数据刷盘
2.执行转存数据,转存实现代码。
转存过程主要调用CommitLog.this.mappedFileQueue.commit()方法转存数据,并且统计了
转存耗时,如果转存耗时特别大,说明系统繁忙,应该考虑增加系统资源或者扩容
转存过程主要调用CommitLog.this.mappedFileQueue.commit()方法转存数据,并且统计了
转存耗时,如果转存耗时特别大,说明系统繁忙,应该考虑增加系统资源或者扩容
转存代码
org.apache.rocketmq.store.MappedFileQueue#commit
org.apache.rocketmq.store.MappedFile#commit
org.apache.rocketmq.store.MappedFile#commit0
CommitLog.this.mappedFileQueue.commit()方法最终会调用MappedFile.commit0()方法
进行真正的数据转存
MappedFile.commit0()方法的作用就是将writeBuffer(DM)中的数据读取出来,写入fileChannel
(CommitLog映射文件)
CommitLog.this.mappedFileQueue.commit()方法最终会调用MappedFile.commit0()方法
进行真正的数据转存
MappedFile.commit0()方法的作用就是将writeBuffer(DM)中的数据读取出来,写入fileChannel
(CommitLog映射文件)
writePosition:DM中已写入的消息位置
committedPosition:已经转存的消息位置
writeBuffer:配置Broker读写分离后,当存储消息流传到ByteBuffer时,会优先写入writeBuffer
(实际是DM,不是真正的PageCache,也可以叫作内存缓冲区)
fileChannel:CommitLog映射文件的读写通道
(实际是DM,不是真正的PageCache,也可以叫作内存缓冲区)
fileChannel:CommitLog映射文件的读写通道
3.转存失败.唤醒异步刷盘线程。转存数据失败,并不代表没有数据被转存到PageCache中,
而是说明有部分数据转存成功,部分数据转存失败。所以可以唤醒刷盘线程执行刷盘操作,
而如果转存成功,则正常进行异步刷盘即可
而是说明有部分数据转存成功,部分数据转存失败。所以可以唤醒刷盘线程执行刷盘操作,
而如果转存成功,则正常进行异步刷盘即可
异步刷盘操作。
在异步转存服务和存储服务把消息写入Page Cache后,由异步刷盘服务将消息刷入
磁盘中,过程如图。
异步刷盘服务的主要功能是将PageCache中的数据异步刷入磁盘,并记录Checkpoint信息.
异步刷盘的实现代码主要在CommitLog.FlushRealTimeService.run()方法中
在异步转存服务和存储服务把消息写入Page Cache后,由异步刷盘服务将消息刷入
磁盘中,过程如图。
异步刷盘服务的主要功能是将PageCache中的数据异步刷入磁盘,并记录Checkpoint信息.
异步刷盘的实现代码主要在CommitLog.FlushRealTimeService.run()方法中
1.获取刷盘参数.
当前刷盘操作距离上次刷盘时间大于flushPhysicQueueThoroughInterval时,
设置flushPhysicQueueLeastPages=0,表示继续将上次未完成的数据进行刷盘
当前刷盘操作距离上次刷盘时间大于flushPhysicQueueThoroughInterval时,
设置flushPhysicQueueLeastPages=0,表示继续将上次未完成的数据进行刷盘
flushCommitLogTimed:是否定时刷盘,设置为True表示定时刷盘;设置为False表示实时刷盘,默认为False.
即实时刷盘
即实时刷盘
interval:在Broker中配置项名是flushIntervalCommitLog,刷盘间隔默认为500ms
flushPhysicQueueLeastPages:每次刷盘的页数,默认为4页
flushPhysicQueueThoroughInterval:两次刷盘操作的最长间隔时间,默认为10s
2.等待刷盘间隔。Broker是如何实现定时和实时刷盘的呢?
this.waitForRunning()方法是RocketMQ通过自定义锁实现的线程等待,
如果没有通知过刷盘线程,则调用waitPoint.reset()方法重置count,调用
waitPoint.await()方法让当前刷盘线程等待interval时间(或者被唤醒)后,
再执行刷盘
异步刷盘线程是如何被唤醒的呢?当数据存储到Page Cache后,通过调用
CommitLog.handleDiskFlush()/submitFlushRequest()方法唤醒异步刷盘线程
this.waitForRunning()方法是RocketMQ通过自定义锁实现的线程等待,
如果没有通知过刷盘线程,则调用waitPoint.reset()方法重置count,调用
waitPoint.await()方法让当前刷盘线程等待interval时间(或者被唤醒)后,
再执行刷盘
异步刷盘线程是如何被唤醒的呢?当数据存储到Page Cache后,通过调用
CommitLog.handleDiskFlush()/submitFlushRequest()方法唤醒异步刷盘线程
3.执行刷盘。最终刷盘逻辑是在MappedFile.flush()
下面进行两个数据校验:this.isAbleToFlush(flushLeastPages)和this.hold()
在配置读写分离的场景下,writeBuffer和fileChannel总是不为空。此时要
调用this.fileChannel.force(false)方法刷盘;而正常刷盘则是调用
this.mappedByteBuffer.force()方法
下面进行两个数据校验:this.isAbleToFlush(flushLeastPages)和this.hold()
在配置读写分离的场景下,writeBuffer和fileChannel总是不为空。此时要
调用this.fileChannel.force(false)方法刷盘;而正常刷盘则是调用
this.mappedByteBuffer.force()方法
this.isAbleToFlush(flushLeastPages)方法校验需要刷盘的页码中的数据是否被刷入磁盘,
如果被刷入磁盘,则不用再执行刷盘操作;反之,则需要计算是否还有数据需要刷盘
如果被刷入磁盘,则不用再执行刷盘操作;反之,则需要计算是否还有数据需要刷盘
this.hold()方法的功能是,在映射文件被销毁时尽量不要对在读写的数据造成困扰。所以
MappedFile自己实现了引用计数功能,只有存在引用时才会执行刷盘操作
MappedFile自己实现了引用计数功能,只有存在引用时才会执行刷盘操作
4.记录Checkpoint和耗时日志。这里主要记录最后刷盘成功时间和刷盘耗时超过500ms的情况
同步刷盘、异步刷盘对比
Broker读写分离机制
在RocketMQ中,有两处地方使用了"读写分离"机制
Broker Master-Slave读写分离:写消息到Master Broker,从Slave Broker读取消息。Broker配置为Slave Broker读取消息。
Broker配置为slaveReadEnable=True(默认False),消息占用内存百分比配置为accessMessageInMemoryMaxRatio=40(默认40)
Broker Direct Memory-Page Cache读写分离:写消息到Direct Memory(直接内存,简称DM),从操作系统的PageCache中读取消息。
Master Broker配置读写分离开关为transientStorePoolEnable=True(默认False),写入DM存储数量,配置transientStorePoolSize
至少大于0(默认为5,建议不修改),刷盘类型配置为flushDiskType=FlushDiskType.ASYNC_FLUSH,即异步刷盘
读写分离能够最大限度地提供吞吐量,同时会增加数据不一致的风险
在RocketMQ中,有两处地方使用了"读写分离"机制
Broker Master-Slave读写分离:写消息到Master Broker,从Slave Broker读取消息。Broker配置为Slave Broker读取消息。
Broker配置为slaveReadEnable=True(默认False),消息占用内存百分比配置为accessMessageInMemoryMaxRatio=40(默认40)
Broker Direct Memory-Page Cache读写分离:写消息到Direct Memory(直接内存,简称DM),从操作系统的PageCache中读取消息。
Master Broker配置读写分离开关为transientStorePoolEnable=True(默认False),写入DM存储数量,配置transientStorePoolSize
至少大于0(默认为5,建议不修改),刷盘类型配置为flushDiskType=FlushDiskType.ASYNC_FLUSH,即异步刷盘
读写分离能够最大限度地提供吞吐量,同时会增加数据不一致的风险
Master-Slave读写分离机制。通常Master提供读写处理,如果Master负载较高就从Slave读取
第一步:Broker在处理Pull消息时,计算下次是否从Slave拉取消息,是通过DefaultMessageStore.getMessage()
方法实现的,
diff>memory 表示没有拉取的消息比分配的内存大,如果diff > memory的值为True,则说明此时Master Broker
内存繁忙,应该选择从Slave拉取消息
方法实现的,
diff>memory 表示没有拉取的消息比分配的内存大,如果diff > memory的值为True,则说明此时Master Broker
内存繁忙,应该选择从Slave拉取消息
maxOffsetPy:表示当前Master Broker存储的所有消息的最大物理位点
maxPhyOffsetPulling:表示拉取的最大消息位点
diff:是上面两者的差值,表示还有多少消息没有拉取
StoreUtil.TOTAL_PYHSICAL_MEMORY_SIZE:表示当前Master Broker全部的物理内存
memory:Broker认为可使用的最大内存,该值可以通过accessMessageInMemoryMaxRatio配置项决定,
默认accessMessageInMemoryMaxRatio=40,如果物理内存为100MB,那么memory=40MB
默认accessMessageInMemoryMaxRatio=40,如果物理内存为100MB,那么memory=40MB
第二步:通知客户端下次从哪个Broker拉取消息。在消费者Pull消息返回结果时,根据第一步设置的suggestPullingFromSlave
值返回给消费者,该过程通过PullMessageProcessor.processRequest()方法实现。
通过查看以上代码,我们直到要想从Slave读取消息,需要设置slaveReadEnable=True,此时会根据第一步返回的suggestPullingFromSlave
值告诉客户端下次可以从哪个Broker拉取消息。suggestPullingFromSlave=1表示从Slave拉取,suggestPullingFromSlave=0
表示从Master拉取。
值返回给消费者,该过程通过PullMessageProcessor.processRequest()方法实现。
通过查看以上代码,我们直到要想从Slave读取消息,需要设置slaveReadEnable=True,此时会根据第一步返回的suggestPullingFromSlave
值告诉客户端下次可以从哪个Broker拉取消息。suggestPullingFromSlave=1表示从Slave拉取,suggestPullingFromSlave=0
表示从Master拉取。
Direct Memory-Page Cache的读写分离机制
以上逻辑通过MappedFile.appendMessagesInner()方法来实现,核心代码如图
以上逻辑通过MappedFile.appendMessagesInner()方法来实现,核心代码如图
这段代码中,writeBuffer表示从DM中申请的缓存;mappedByteBuffer表示从PageCache中申请的缓存,
如果Broker设置transientStorePoolEnable=true,并且异步刷盘,则存储层DefaultMessageStore在
初始化会调用TransientStorePool.init()方法(按照配置的Buffer个数)初始化writeBuffer
如果Broker设置transientStorePoolEnable=true,并且异步刷盘,则存储层DefaultMessageStore在
初始化会调用TransientStorePool.init()方法(按照配置的Buffer个数)初始化writeBuffer
初始化writeBuffer后,当生产者将消息发送到Broker时,Broker将消息写入writeBuffer,然后被异步转存服务
不断地从DM中Commit到Page中,消费者此时从哪儿读取数据呢?消费者拉取消息的实现在MappedFile.selectMappedBuffer()方法中
不断地从DM中Commit到Page中,消费者此时从哪儿读取数据呢?消费者拉取消息的实现在MappedFile.selectMappedBuffer()方法中
从代码中可以看到,消费者始终从mappedByteBuffer(即Pagecache)读取消息。
Broker CommitLog索引机制
索引的数据结构
ConsumerQueue消费队列。
主要用于消费拉取消息、更新消费位点等所用的索引。
源代码参考org.apache.rocketmq.store.ConsumerQueue.
该文件内保存了消息的物理位点、消息体大小、消息Tag的Hash值
物理位点:消息在CommitLog中的位点值
消息体大小:包含消息Topic值大小、CRC值大小、消息体大小等全部数据的总大小,单位是字节
Tag的Hash值:由MessageExtBrokerInner.tagsString2tagsCode()方法计算得来。如果消息有
Tag值,那么该值可以通过String的Hashcode获得
主要用于消费拉取消息、更新消费位点等所用的索引。
源代码参考org.apache.rocketmq.store.ConsumerQueue.
该文件内保存了消息的物理位点、消息体大小、消息Tag的Hash值
物理位点:消息在CommitLog中的位点值
消息体大小:包含消息Topic值大小、CRC值大小、消息体大小等全部数据的总大小,单位是字节
Tag的Hash值:由MessageExtBrokerInner.tagsString2tagsCode()方法计算得来。如果消息有
Tag值,那么该值可以通过String的Hashcode获得
Index File:是一个RocketMQ实现的Hash索引,主要在用户用消息key查询时使用,该索引是通过IndexFile类实现的。
在RocketMQ中同时存在多个IndexFile文件,这些文件按照消息产生的时间顺序排列。
每个INdex File文件包含文件头、Hash槽位、索引数据。每个文件的Hash槽位个数、索引数据个数都是固定的。
Hash槽位可以通过Broker启动参数maxHashSlotNum进行配置,默认值为500万;索引数据可以通过Broker启动参数
maxIndexNum进行配置,默认值为500万,一个Index File约为400MB.
IndexFile的索引设计在一定程度上参考了Java中的HashMap设计,只是当IndexFile遇到Hash碰撞时只会用链表。
而Java8中在一定情况下链表会转化为红黑树。
在RocketMQ中同时存在多个IndexFile文件,这些文件按照消息产生的时间顺序排列。
每个INdex File文件包含文件头、Hash槽位、索引数据。每个文件的Hash槽位个数、索引数据个数都是固定的。
Hash槽位可以通过Broker启动参数maxHashSlotNum进行配置,默认值为500万;索引数据可以通过Broker启动参数
maxIndexNum进行配置,默认值为500万,一个Index File约为400MB.
IndexFile的索引设计在一定程度上参考了Java中的HashMap设计,只是当IndexFile遇到Hash碰撞时只会用链表。
而Java8中在一定情况下链表会转化为红黑树。
在Hash碰撞时,Hash槽位中保存的总是最新消息的指针,这是因为在消息队列中,
用户最关心的总是最新的数据
用户最关心的总是最新的数据
索引的构建过程
创建ConsumeQueue和IndexFile。
ConsumeQueue和IndexFile两个索引都是由ReputMessageService类创建的
ConsumeQueue和IndexFile两个索引都是由ReputMessageService类创建的
RequestMessageService类图
ReputMessageService服务启动后的执行过程。
doReput()方法用于创建索引的入口,通常通过以下几个步骤来创建索引:
doReput()方法用于创建索引的入口,通常通过以下几个步骤来创建索引:
第一步:从CommitLog中查找未创建索引的消息,将消息组装成DispatchRequest对象.该逻辑主要在
CommitLog.checkMesageAndReturnSize()方法中实现
CommitLog.checkMesageAndReturnSize()方法中实现
第二步:调用doDispatch()方法,该方法会循环多个索引处理器(这里初始化了
CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex两个索引处理器)
并调用索引处理器的dispatch()方法来处理DispatchRequest
CommitLogDispatcherBuildConsumeQueue索引处理器用于构建ConsumeQueue,CommitLogDispatcherBuildIndex
用于构建IndexFile
ConsumeQueue是必须创建的,IndexFile是否需要创建则是通过设置messageIndexEnable为True或False来实现的,默认
为True.ConsumeQueue的索引信息被保存到PageCache后,其持久化的过程和CommitLog异步刷盘的过程类似,
执行DefaultMessageStore.FlushConsumeQueueService服务
CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex两个索引处理器)
并调用索引处理器的dispatch()方法来处理DispatchRequest
CommitLogDispatcherBuildConsumeQueue索引处理器用于构建ConsumeQueue,CommitLogDispatcherBuildIndex
用于构建IndexFile
ConsumeQueue是必须创建的,IndexFile是否需要创建则是通过设置messageIndexEnable为True或False来实现的,默认
为True.ConsumeQueue的索引信息被保存到PageCache后,其持久化的过程和CommitLog异步刷盘的过程类似,
执行DefaultMessageStore.FlushConsumeQueueService服务
索引创建失败怎么办?
如果消息写入CommitLog后Broker宕机了,那么ConsumeQueue和IndexFile索引肯定就创建失败了,
此时ReputMessageService如何保证创建索引的可靠性呢?
ConsumeQueue和IndexFile每次刷盘时都会做Checkpoint操作,Broker每次重启的时候可以根据Checkpoint
信息得知哪些消息还未创建索引,
如果消息写入CommitLog后Broker宕机了,那么ConsumeQueue和IndexFile索引肯定就创建失败了,
此时ReputMessageService如何保证创建索引的可靠性呢?
ConsumeQueue和IndexFile每次刷盘时都会做Checkpoint操作,Broker每次重启的时候可以根据Checkpoint
信息得知哪些消息还未创建索引,
索引如何使用?
1.按照位点查消息
RocketMQ支持Pull和Push两种消费模式,Push模式是基于Pull模式的,两种模式
都是通过拉取消息进行消费和提交位点的。这里我们主要讲Broker在处理客户端拉取
消息请求时是怎么查询消息的。
RocketMQ支持Pull和Push两种消费模式,Push模式是基于Pull模式的,两种模式
都是通过拉取消息进行消费和提交位点的。这里我们主要讲Broker在处理客户端拉取
消息请求时是怎么查询消息的。
group:消费者组名
Topic:主题名字,group订阅了Topic才能拉取到消息
queueId:一般一个Topic会有很多分区,客户端轮询全部分区
拉取并消费消息
拉取并消费消息
offset:拉取位点大于等于该值的消息
maxMsgNums:一次拉取多少消息,在客户端由pullBatchSize进行配置
messageFilter:消息过滤器
getMessage()方法查询消息的过程
第一步:拉取前校验DefaultMessageStore服务是否已经关闭(正常关闭进程时会被关闭)
校验
校验
第二步:根据Topic和queueId查找ConsumeQueue索引映射文件。判断根据查找到的
ConsumeQueue索引文件校验传入的待查询的位点值是否核里,如果不合理,重新计算
下一次可以拉取的位点值
ConsumeQueue索引文件校验传入的待查询的位点值是否核里,如果不合理,重新计算
下一次可以拉取的位点值
第三步:循环查询满足maxMsgNums条数的消息。循环从ConsumeQueue中读取消息管理位点、
消息大小和消息Tag的Hash值。先做Hash过滤,再使用过滤后的消息物理位点到CommitLog中
查找消息体,并放入结果列表中
消息大小和消息Tag的Hash值。先做Hash过滤,再使用过滤后的消息物理位点到CommitLog中
查找消息体,并放入结果列表中
第四步:监控指标统计,返回拉取的消息结果
2.按照时间段查消息。
这是社区提供的管理平台的功能,输入Topic、起始时间、结束时间可以查到这段时间内的消息,这是一个根据
Consume Queue索引查询消息的扩展查询
这是社区提供的管理平台的功能,输入Topic、起始时间、结束时间可以查到这段时间内的消息,这是一个根据
Consume Queue索引查询消息的扩展查询
第一步:查找这个Topic下的所有Queue
第二步:在每一个队列中查找起始时间、结束时间对应的起始offset和最后消息的offset
如何根据时间查找物理位点呢?
主要在于构建Consume Queue,这个文件是按照时间顺序写的,每条消息的索引数据结构大小
是固定20字节。可以根据时间做二分折半搜索,找到与时间最接近的一个位点。
主要在于构建Consume Queue,这个文件是按照时间顺序写的,每条消息的索引数据结构大小
是固定20字节。可以根据时间做二分折半搜索,找到与时间最接近的一个位点。
第三步:根据起始位点、最后消息位点和Topic,循环拉取所有Queue就可以拉取到消息
3.按照key查询消息。
如果通过设置messageIndexEnable=True(默认是True)来开启Index索引服务,那么在写入消息时
会根据key自动构建IndexFile索引。用户可以通过Topic和key查询消息,查询方法为
DefaultMessageStore#queryMessage()
如果通过设置messageIndexEnable=True(默认是True)来开启Index索引服务,那么在写入消息时
会根据key自动构建IndexFile索引。用户可以通过Topic和key查询消息,查询方法为
DefaultMessageStore#queryMessage()
第一步:调用indexService.queryOffset()方法,通过Topic、key查找目标消息的物理位点信息
第二步:根据物理位点信息在CommitLog中循环查找消息体内容
第三步:返回查询结果
Broker过期文件删除机制。
RocketMQ中主要保存了CommitLog、ConsumeQueue、IndexFile三种数据文件。
由于内存和磁盘都是有限的资源,Broker不可能永久地保存所有数据,所以一些超过
保存期限的数据会被定期删除。RocketMQ通过设置数据过期时间来删除额外的数据
文件,具体的实现逻辑是通过DefaultMessageStore.start()方法中的this.addScheduleTask();
来实现的
RocketMQ中主要保存了CommitLog、ConsumeQueue、IndexFile三种数据文件。
由于内存和磁盘都是有限的资源,Broker不可能永久地保存所有数据,所以一些超过
保存期限的数据会被定期删除。RocketMQ通过设置数据过期时间来删除额外的数据
文件,具体的实现逻辑是通过DefaultMessageStore.start()方法中的this.addScheduleTask();
来实现的
1.CommitLog文件的删除过程
定时执行
先看commitLog文件删除
DefaultMessageStore.CleanCommitLogService类提供的一个线程服务周期性地执行删除操作
this.deleteExpiredFiles()的功能是删除过期文件
this.deleteExpiredFiles()的功能是删除过期文件
当满足三个条件之中的任一条件时执行删除操作
第一,当前时间等于已经配置的删除时间
第二,磁盘使用空间超过85%
第三,手动执行删除(开源版本RocketMQ4.2.0不支持)
核心逻辑,DefaultMessageStore.this.commitLog.deleteExpiredFile()方法直接调用了
this.mappedFileQueue.deleteExpiredFileByTime()方法
this.mappedFileQueue.deleteExpiredFileByTime()方法
deleteExpiredFileByTime()方法的实现分为如下两步
第一步:克隆全部的CommitLog文件。CommitLog文件可能随时有数据写入,
为了不影响正常写入,所以克隆一份来操作
为了不影响正常写入,所以克隆一份来操作
第二步:检查每一个CommitLog文件是否过期,如果已过期则立即通过调用
destroy()方法进行删除。在删除前会做一系列检查:检查文件被引用的次数、
清理映射的所有内存数据对象、释放对象.
destroy()方法进行删除。在删除前会做一系列检查:检查文件被引用的次数、
清理映射的所有内存数据对象、释放对象.
this.redeleteHangedFiel()方法表示再次删除被挂起的过期文件,为什么会有被挂起的文件呢?
第一次删除有可能失败,比如有线程引用该过期文件,内存映射清理失败等,都可能导致删除失败,
如果文件已经关闭,删除前检查没有通过,则可以通过第二次删除来处理。
第一次删除有可能失败,比如有线程引用该过期文件,内存映射清理失败等,都可能导致删除失败,
如果文件已经关闭,删除前检查没有通过,则可以通过第二次删除来处理。
ConsumeQueue、IndexFile文件的删除过程。
ConsumeQueue和IndexFile都是索引文件,在CommitLog文件被删除后,对应的
索引文件起始没有存在的意义,并且占用磁盘空间,所以这些文件应该被删除。
RocketMQ的删除策略是定时检查,满足删除条件时会删除过期或者无意义的文件。
最终程序调用CleanConsumeQueueService.deleteExpiredFiles()方法来删除索引文件
ConsumeQueue和IndexFile都是索引文件,在CommitLog文件被删除后,对应的
索引文件起始没有存在的意义,并且占用磁盘空间,所以这些文件应该被删除。
RocketMQ的删除策略是定时检查,满足删除条件时会删除过期或者无意义的文件。
最终程序调用CleanConsumeQueueService.deleteExpiredFiles()方法来删除索引文件
核心变量:
minOffset:CommitLog全部文件中的最小物理位点
lastPhysicalMinOffset:上次检查到的最小物理位点
当minOffset > this.lastPyysicalMinOffset时,说明有新数据没有被检查过,
就会调用MappedFileQueue.deleteExpiredFileByOffset()方法进行检查及删除
minOffset:CommitLog全部文件中的最小物理位点
lastPhysicalMinOffset:上次检查到的最小物理位点
当minOffset > this.lastPyysicalMinOffset时,说明有新数据没有被检查过,
就会调用MappedFileQueue.deleteExpiredFileByOffset()方法进行检查及删除
maxOffsetInLogicQueue时ConsumeQueue中最大的位点值,
offset是检查的最小位点,
如果maxOffsetInLogicQueue < offset 说明该Consume Queue已经过期了,可以删除
如果mappedFile.isAvailable()方法false,说明存储服务已经被关闭(或者该文件曾经被删除,
但是删除失败),这种文件也是可以被删除的
DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);删除过程类似
offset是检查的最小位点,
如果maxOffsetInLogicQueue < offset 说明该Consume Queue已经过期了,可以删除
如果mappedFile.isAvailable()方法false,说明存储服务已经被关闭(或者该文件曾经被删除,
但是删除失败),这种文件也是可以被删除的
DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);删除过程类似
Broker主从同步机制。
主从同步概述
Broker有两种角色Master和Slave.Master主要用于处理生产者、消费者的请求和存储数据。
Slave从Master同步所有数据到本地,具体作用体现在两个方面。
第一,Broker服务高可用。一般生产环境会部署两个主Broker节点和两个从Broker节点(也叫2m2s),
一个Master宕机后,另一个Master可以接管工作;如果两个Master都宕机,消费者可以通过连接
Slave继续消费。这样可以保证服务的高可用
第二,提高服务性能。如果消费从Master Broker拉取消息时,发现拉取消息的offset和commitLog的
物理offset相差太多,会转向Slave拉取消息,这样可以减轻Master的压力,从而提高性能
Broker同步数据的方式有两种:同步复制、异步复制。
Broker有两种角色Master和Slave.Master主要用于处理生产者、消费者的请求和存储数据。
Slave从Master同步所有数据到本地,具体作用体现在两个方面。
第一,Broker服务高可用。一般生产环境会部署两个主Broker节点和两个从Broker节点(也叫2m2s),
一个Master宕机后,另一个Master可以接管工作;如果两个Master都宕机,消费者可以通过连接
Slave继续消费。这样可以保证服务的高可用
第二,提高服务性能。如果消费从Master Broker拉取消息时,发现拉取消息的offset和commitLog的
物理offset相差太多,会转向Slave拉取消息,这样可以减轻Master的压力,从而提高性能
Broker同步数据的方式有两种:同步复制、异步复制。
同步复制是指客户端发送消息到Master,Master将消息同步复制到Slave的过程,可以通过
设置参数brokerRole=BrokerRole.SYNC_MASTER来实现。这种消息配置的可靠性很强,但是
效率比较低,适用于金融、在线教育等对消息有强可靠需求的场景
设置参数brokerRole=BrokerRole.SYNC_MASTER来实现。这种消息配置的可靠性很强,但是
效率比较低,适用于金融、在线教育等对消息有强可靠需求的场景
异步复制是指客户端发送消息到Master,再由异步线程HAService异步同步到Slave的过程,可以
通过设置参数brokerRole=BrokerRole.ASYNC_MASTER来实现。这种消息配置的效率非常高,
可靠性比同步复制差,适用于大部分业务场景
通过设置参数brokerRole=BrokerRole.ASYNC_MASTER来实现。这种消息配置的效率非常高,
可靠性比同步复制差,适用于大部分业务场景
Broker主从同步的逻辑是通过SlaveSynchronize.syncAll()方法来实现的。
该方法在BrokerController.start()方法中被调用,每隔60s同步一次,并且
同步周期不能修改,该实例在BrokerController的构造方法中被初始化
消息数据是生产者发送的消息,保存在CommitLog中,由HAService服务实时
同步到SlaveBroker中,所有实现类都在org.apache.rocketmq.store.ha包下
该方法在BrokerController.start()方法中被调用,每隔60s同步一次,并且
同步周期不能修改,该实例在BrokerController的构造方法中被初始化
消息数据是生产者发送的消息,保存在CommitLog中,由HAService服务实时
同步到SlaveBroker中,所有实现类都在org.apache.rocketmq.store.ha包下
实例初始化
方法调用
主从同步流程
名词解释。
配置数据同步流程。
配置数据包含4种类型:Topic配置、消费者位点、延迟位点、订阅关系配置。每种配置数据由一个
继承自ConfigManager的类来管理,继承关系如图。
Slave如何从Master同步这些配置呢?我们先来看一下初始化服务的步骤
配置数据包含4种类型:Topic配置、消费者位点、延迟位点、订阅关系配置。每种配置数据由一个
继承自ConfigManager的类来管理,继承关系如图。
Slave如何从Master同步这些配置呢?我们先来看一下初始化服务的步骤
第一步:Master Broker在启动时,初始化一个BrokerOuterAPI,这个服务的功能包含Broker注册到Namesrv、
Broker从Namesrv解绑、获取Topic配置信息、获取消费者位点信息、获取延迟位点信息及订阅关系等。
Broker从Namesrv解绑、获取Topic配置信息、获取消费者位点信息、获取延迟位点信息及订阅关系等。
第二步:Slave Broker在初始化Controller的定时任务时,会初始化SlaveSynchronize服务,每60s调用
一次SlaveSynchronize.syncAll()方法
一次SlaveSynchronize.syncAll()方法
第三步:syncAll()方法依次调用4种配置数据(Topic配置、消费者位点、延迟位点、订阅关系配置)的
同步方法同步全量数据
同步方法同步全量数据
第四步:syncAll()中执行的4个方法都通过Remoting模块同步调用BrokerOuterAPI,
并从Master Broker获取数据,保存到Slave中
并从Master Broker获取数据,保存到Slave中
第五步:Topic配置和订阅关系配置随着保存内存信息的同时持久化到磁盘上;消费者位点通过
BrokerController初始化定时任务持久化到磁盘上;延迟位点信息通过ScheduleMessageService
定时将内存持久化到磁盘上
BrokerController初始化定时任务持久化到磁盘上;延迟位点信息通过ScheduleMessageService
定时将内存持久化到磁盘上
CommitLog数据同步流程。
CommitLog的数据同步分为同步复制和异步复制两种。
同步复制是指生产者生产消息后,等待Master Broker将数据同步到Slave Broker后,再返回生产者数据存储状态;
异步复制是指生产者在生产消息后,不用等待Slave同步,直接返回Master存储结果
CommitLog的数据同步分为同步复制和异步复制两种。
同步复制是指生产者生产消息后,等待Master Broker将数据同步到Slave Broker后,再返回生产者数据存储状态;
异步复制是指生产者在生产消息后,不用等待Slave同步,直接返回Master存储结果
异步复制。
Master Broker启动时会启动HAService.AcceptSocketService服务,当监听到来自Slave的注册请求时
会创建一个HAConnection,同时HAConnection会创建ReadSocketService和WriteSocketService
两个服务并启动,开始主从数据同步。
ReadSocketService接收Slave同步数据请求,并将这些信息保存在HAConnection中
WriteSocketService根据HAConnection中保存的Slave同步请求,从CommitLog中查询数据,并发送给Slave.
注:ReadSocketService和WriteSocketService是两个独立工作的线程服务,它们通过HAConnection中的
公共变量将CommitLog同步给Slave
slaveRequestOffset表示Slave请求同步的位点值;
slaveAckOffset表示slave已经保存的位点值
Master Broker启动时会启动HAService.AcceptSocketService服务,当监听到来自Slave的注册请求时
会创建一个HAConnection,同时HAConnection会创建ReadSocketService和WriteSocketService
两个服务并启动,开始主从数据同步。
ReadSocketService接收Slave同步数据请求,并将这些信息保存在HAConnection中
WriteSocketService根据HAConnection中保存的Slave同步请求,从CommitLog中查询数据,并发送给Slave.
注:ReadSocketService和WriteSocketService是两个独立工作的线程服务,它们通过HAConnection中的
公共变量将CommitLog同步给Slave
slaveRequestOffset表示Slave请求同步的位点值;
slaveAckOffset表示slave已经保存的位点值
同步复制。
在CommitLog将消息存储到PageCache后,会调用CommitLog的handleHA()/submitReplicaRequest方法处理同步复制。
当BrokerRole配置为SYNC_MASTER时表示当前Master Broker需要同步将消息"发送"到Slave.根据Master Broker CommitLog
的存储结果构造一个GroupCommitRequest放入HAService中,再将GroupComitRequest放入GroupTransferService服务中,
等待GroupTransferService同步成功的锁。如果同步成功那么GroupCommit中的锁会被唤醒,并设置flushOK为True,表示生产
者发送的消息被Master Broker和Slave Broker 同时保存。
一个Master Broker可以配置多个Slave Broker,当需要同步数据时,通过service.getWaitNotifyObject().wakeupAll()来唤醒
全部的Slave同步。虽然多个Slave都同步了数据,但是一旦Master Broker不可用时,消费者只会从一个Slave中拉取消息,
所以生产环境建议Slave不要配置太多。
注:Slave在发送请求数据的Request时,会带上Slave请求的位点HAConnection.slaveRequestOffset,该值如果等于-1(默认),
则表示没有Slave请求过位点数据
在CommitLog将消息存储到PageCache后,会调用CommitLog的handleHA()/submitReplicaRequest方法处理同步复制。
当BrokerRole配置为SYNC_MASTER时表示当前Master Broker需要同步将消息"发送"到Slave.根据Master Broker CommitLog
的存储结果构造一个GroupCommitRequest放入HAService中,再将GroupComitRequest放入GroupTransferService服务中,
等待GroupTransferService同步成功的锁。如果同步成功那么GroupCommit中的锁会被唤醒,并设置flushOK为True,表示生产
者发送的消息被Master Broker和Slave Broker 同时保存。
一个Master Broker可以配置多个Slave Broker,当需要同步数据时,通过service.getWaitNotifyObject().wakeupAll()来唤醒
全部的Slave同步。虽然多个Slave都同步了数据,但是一旦Master Broker不可用时,消费者只会从一个Slave中拉取消息,
所以生产环境建议Slave不要配置太多。
注:Slave在发送请求数据的Request时,会带上Slave请求的位点HAConnection.slaveRequestOffset,该值如果等于-1(默认),
则表示没有Slave请求过位点数据
ReadSocketService后台服务不断接收Slave Broker上报的offset,每上报一次都通知HAService.notifyTransferSome()方法,
判断Slave同步的位点是否大于Master标记的已同步位点,如果大于则更新标记值,同时通知同步复制服务GroupTransferService.
GroupTransferService扫描所有的同步请求,依次判断哪些GroupCommitRequest的待同步复制的位点是比已同步位点小的,
释放GroupCommitRequest中的锁,消息处理线程可以将消息存储成功的结果返回给生产者
判断Slave同步的位点是否大于Master标记的已同步位点,如果大于则更新标记值,同时通知同步复制服务GroupTransferService.
GroupTransferService扫描所有的同步请求,依次判断哪些GroupCommitRequest的待同步复制的位点是比已同步位点小的,
释放GroupCommitRequest中的锁,消息处理线程可以将消息存储成功的结果返回给生产者
消费队列文件(ConsumeQueue)和索引文件(IndexFile)这两个文件是在SlaveBroker上追加CommitLog后
由ReputMessageService进行创建的,所以不需要同步
由ReputMessageService进行创建的,所以不需要同步
Broker的关机恢复机制
概述。
Broker关机恢复是指恢复CommitLog、Consume Queue、Index File等数据文件。
Broker关机分为正常调用命令关机和异常被迫进程终止关机两种情况。恢复过程的设计目标是
使正常停止的进程实现零数据丢失,异常停止的进程实现最少量的数据丢失,与关机恢复相关
的主要文件有两个:abort和checkpoint.
Broker关机恢复是指恢复CommitLog、Consume Queue、Index File等数据文件。
Broker关机分为正常调用命令关机和异常被迫进程终止关机两种情况。恢复过程的设计目标是
使正常停止的进程实现零数据丢失,异常停止的进程实现最少量的数据丢失,与关机恢复相关
的主要文件有两个:abort和checkpoint.
abort是一个空文件,标记当前Broker是否正常关机,Broker进程正常启动的时候,创建该文件。
Broker进程正常停止后,该文件就会删除;如果异常退出,则文件依旧存在,创建和删除的过程
如图
Broker进程正常停止后,该文件就会删除;如果异常退出,则文件依旧存在,创建和删除的过程
如图
abort文件创建流程
aboirt文件删除流程
checkpoint是检查点文件,保存Broker最后正常存储各种数据的时间,在重启Broker时,恢复程序
知道从什么时候恢复数据。检查点逻辑由StoreCheckpoint类实现。
在StoreCheckpoint类中保存了3个时间,更新过程如图.
知道从什么时候恢复数据。检查点逻辑由StoreCheckpoint类实现。
在StoreCheckpoint类中保存了3个时间,更新过程如图.
physicMsgTimestamp:最后一条已存储CommitLog的消息的存储时间
logicsMsgTimestamp:最后一条已存储Consume Queue的消息的存储时间
indexMsgTimestamp:最后一条已存储IndexFile的消息的存储时间
physicMsgTimestamp和logicsMsgTimestamp的更新都是在数据存储成功后进行的,过程比较简单。
而indexMsgTimestamp的逻辑是在Index File刷盘时被更新的,Index File刷盘方法IndexService.flush()。
从上述代码可以看到,在IndexFile刷盘后,已刷盘文件文件的最后存储消息时间被赋值给indexMsgTimestamp,
并对Checkpoint文件进行刷盘。
注:IndexFile的刷盘设计和CommitLog、Consume Queue刷盘的方式不同,容易被忽略
而indexMsgTimestamp的逻辑是在Index File刷盘时被更新的,Index File刷盘方法IndexService.flush()。
从上述代码可以看到,在IndexFile刷盘后,已刷盘文件文件的最后存储消息时间被赋值给indexMsgTimestamp,
并对Checkpoint文件进行刷盘。
注:IndexFile的刷盘设计和CommitLog、Consume Queue刷盘的方式不同,容易被忽略
Broker关机恢复流程。
Broker在启动时会初始化abort、checkpoint两个文件。正常关闭进程时会删除abort文件,
将checkpoint文件刷盘;异常关闭时,通常来不及删除abort文件。由此,在重新启动Broker时
会根据abort判断是否需要异常停止进程,而后恢复数据。
Broker启动时,会启动存储服务DefaultMessageStore.存储服务在初始化时执行load方法加载
全部数据,这里主要分析数据加载流程。Broker关机的恢复过程可以分为以下几步.
Broker在启动时会初始化abort、checkpoint两个文件。正常关闭进程时会删除abort文件,
将checkpoint文件刷盘;异常关闭时,通常来不及删除abort文件。由此,在重新启动Broker时
会根据abort判断是否需要异常停止进程,而后恢复数据。
Broker启动时,会启动存储服务DefaultMessageStore.存储服务在初始化时执行load方法加载
全部数据,这里主要分析数据加载流程。Broker关机的恢复过程可以分为以下几步.
第一步:Broker异常退出检查。如果abort文件存在,说明上次是异常退出的。
第二步:加载延迟消息的位点信息。ScheduleMessageService服务通过继承和重写
ConfigManager,调用load()方法从磁盘加载延迟位点文件的内容,并根据配置项
messageDelayLevel初始化延迟级别
ConfigManager,调用load()方法从磁盘加载延迟位点文件的内容,并根据配置项
messageDelayLevel初始化延迟级别
第三步:加载全部CommitLog文件(#1部分)。通过读取CommitLog目录下的所有文件,
依次加载每个CommitLog为MappedFile,并且设置写指针、已刷盘指针、已提交指针,
使所有指针都指向该文件的最末位.
CommitLog文件加载代码如图。如果文件大小已配置的大小不一致,恢复时
就直接被忽略,所以,在重启时不要修改mappedFileSizeCommitLog(默认是1G)
参数的值,否则数据无法恢复
依次加载每个CommitLog为MappedFile,并且设置写指针、已刷盘指针、已提交指针,
使所有指针都指向该文件的最末位.
CommitLog文件加载代码如图。如果文件大小已配置的大小不一致,恢复时
就直接被忽略,所以,在重启时不要修改mappedFileSizeCommitLog(默认是1G)
参数的值,否则数据无法恢复
第四步:加载全部Consume Queue文件及数据(如图#2、#3)。调用loadConsumeQueue方法,
读取./consumequeue/Topic/queueId/目录,加载全部Topic、queueId作为ConsumeQueue对象,
再调用load()方法初始化每一个ConsumeQueue
读取./consumequeue/Topic/queueId/目录,加载全部Topic、queueId作为ConsumeQueue对象,
再调用load()方法初始化每一个ConsumeQueue
第五步:初始化Checkpoint文件为StoreCheckpoint对象,并且初始化三个数据:
physicMsgTimestamp、logicsMsgTimestsamp和indexMsgTimestamp.
physicMsgTimestamp、logicsMsgTimestsamp和indexMsgTimestamp.
初始化StoreCheckpoint对象
在StoreCheckpoint构造方法中初始化三个时间戳
第六步:加载IndexFile索引(#4部分)。加载./index目录下的全部索引文件,如果上次进程
异常退出并且索引文件操作的最后时间戳大于Checkpoint中保存的时间,则说明当前文件
有部分数据可能存在错误,须立即销毁文件
异常退出并且索引文件操作的最后时间戳大于Checkpoint中保存的时间,则说明当前文件
有部分数据可能存在错误,须立即销毁文件
第七步:恢复全部数据(#5部分)
lastExitOK=True,表示上次进程正常退出。全部恢复数据主要恢复ConsumeQueue、
CommitLog、内存中的consumeQueueTable,并纠正Consume Queue中的最新位点值。
lastExitOK=True,表示上次进程正常退出。全部恢复数据主要恢复ConsumeQueue、
CommitLog、内存中的consumeQueueTable,并纠正Consume Queue中的最新位点值。
recoverCOnsumeQueue()方法通过循环所有Topic对应的ConsumeQueue,依次调用
ConsumeQUeue.recover()方法执行数据恢复
ConsumeQUeue.recover()方法执行数据恢复
recoverNormally()方法在Broker正常关闭后重启执行CommitLog恢复(#5,2)
对于CommitLog恢复数据,这里有一个小技巧,正常恢复是从倒数第三个文件
开始直到最后一个文件。正常恢复是假定数据都是正常的,大部分场景都关心最新的
消息,所以恢复最新的三个文件到内存中,消息量大小为3GB,当然,如果恢复文件
个数做成可配置的就更好了
对于CommitLog恢复数据,这里有一个小技巧,正常恢复是从倒数第三个文件
开始直到最后一个文件。正常恢复是假定数据都是正常的,大部分场景都关心最新的
消息,所以恢复最新的三个文件到内存中,消息量大小为3GB,当然,如果恢复文件
个数做成可配置的就更好了
recoverAbnormally()方法在Broker异常关闭后重启时执行CommitLog恢复(#5.3)
CommitLog异常恢复是从最后一个文件开始反向恢复到第一个文件。因为当进程异常
停止后最容易出错的是最新的某些文件。所以异常恢复时,RocketMQ从最后一个文件
开始,倒序找第一个正常的文件开始恢复。
CommitLog.isMappedFileMatchedRecover()方法判断文件是否正常,整个方法的
重点在于,只要文件的最后消息的存储时间都小于在Checkpoint保存的对应时间,
那么该文件并未损坏。
CommitLog恢复完毕,会将该文件中的消息重新分发,创建ConsumeQueue和IndexFile。
分发全部消息还是部分消息时根据duplicationEnable的值(默认为False)来判断的
CommitLog异常恢复是从最后一个文件开始反向恢复到第一个文件。因为当进程异常
停止后最容易出错的是最新的某些文件。所以异常恢复时,RocketMQ从最后一个文件
开始,倒序找第一个正常的文件开始恢复。
CommitLog.isMappedFileMatchedRecover()方法判断文件是否正常,整个方法的
重点在于,只要文件的最后消息的存储时间都小于在Checkpoint保存的对应时间,
那么该文件并未损坏。
CommitLog恢复完毕,会将该文件中的消息重新分发,创建ConsumeQueue和IndexFile。
分发全部消息还是部分消息时根据duplicationEnable的值(默认为False)来判断的
recoverTopicQueueTable():纠正Consume Queue中最小消费位点和恢复ComitLog内存中的TopicTable(#5.4)
延迟消息
概述。
什么是延迟消息呢?延迟消息也叫定时消息,一般地,生产者在发送消息后,消费者
希望在指定的一段时间后再消费。常规做法是,把信息存储在数据库中,使用定时
任务扫描,符合条件的数据再发送给消费者。典型的业务场景春节买票30分钟内完成
订单支付。
RocketMQ延迟消息是通过ScheduleMessageService类实现的
什么是延迟消息呢?延迟消息也叫定时消息,一般地,生产者在发送消息后,消费者
希望在指定的一段时间后再消费。常规做法是,把信息存储在数据库中,使用定时
任务扫描,符合条件的数据再发送给消费者。典型的业务场景春节买票30分钟内完成
订单支付。
RocketMQ延迟消息是通过ScheduleMessageService类实现的
核心属性
SCHEDULE_TOPIC:一个系统内置的Topic,用来保存所有定时消息。RocketMQ全部
未执行的延迟消息保存在这个内部Topic中(现如今保存在TopicValidator中)
未执行的延迟消息保存在这个内部Topic中(现如今保存在TopicValidator中)
FIRST_DELAY_TIME:第一次执行定时任务的延迟时间,默认为1000ms
DELAY_FOR_A_WHILE:第二次及以后的定时任务检查间隔时间,默认为100ms
DELAY_FOR_A_PERIOD:如果延迟消息到时间投递时却失败了,会在DELAY_FOR_A_PERIOD
中设置的ms后重新尝试投递,默认为10 000ms
中设置的ms后重新尝试投递,默认为10 000ms
delayLevelTable:保存延迟队列和延迟时间的映射关系
offsetTable:保存延迟级别及相应的消费位点
timer:用于执行定时任务,线程名叫ScheduleMessageTImerThread
核心方法
queueId2DelayLevel():将queueid转化为延迟级别
delayLevel2QueueId():将延迟级别转化为queueId
一个延迟级别保存在一个Queue中,延迟级别和Queue之间的转化关系为
queueId = delayLevel -1
delayLevel2QueueId():将延迟级别转化为queueId
一个延迟级别保存在一个Queue中,延迟级别和Queue之间的转化关系为
queueId = delayLevel -1
updateOffset():更新延迟消息的Topic的消费位点
computeDeliverTimestamp():根据延迟级别和消息的存储时间计算该延迟消息的投递时间
start():启动延迟消息服务。启动第一次延迟消息投递的检查定时任务和持久化消费位点的定时任务
shutdown():关闭start()方法中启动的timer任务
load():加载延迟消息的消费位点信息和全部延迟级别信息,延迟级别可以通过messageDelayLevel
字段进行设置,默认1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
字段进行设置,默认1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
parseDelayLevel();格式化所有延迟级别信息,并保存到内存中
DeliverDelayedMessageTimerTask内部类用于检查延迟消息是否可以投递,
DeliverDelayedMessageTImerTask是TimerTask的一个扩展实现
DeliverDelayedMessageTImerTask是TimerTask的一个扩展实现
延迟消息存储机制。
在延迟消息的发送流程中,消息体中会设置一个delayTimeLevel,其他发送流程也是如此。
Broker在接收延迟消息时会有几个地方单独处理再存储,其余过程和普通消息存储一致.
延迟消息在保存到CommitLog中的单独处理。CommitLog.putMessage()/asyncPutMessage
方法存储延迟消息的实现逻辑如图
在延迟消息的发送流程中,消息体中会设置一个delayTimeLevel,其他发送流程也是如此。
Broker在接收延迟消息时会有几个地方单独处理再存储,其余过程和普通消息存储一致.
延迟消息在保存到CommitLog中的单独处理。CommitLog.putMessage()/asyncPutMessage
方法存储延迟消息的实现逻辑如图
msg.getDelayTimeLevel()是发送消息时可以设置的延迟级别,如果该值大于0,则表示
当前处理的消息是一个延迟消息,将对该消息做如下修改:
1.将原始Topic、queueId备份在消息的扩展字段中,全部的延迟消息都保存在
SCHEDULE_TOPIC的Topic中
2.备份原始Topic、queueId为延迟消息的Topic、queueId。备份的目的是当消息到达
投递时间时会恢复原始的Topic和queueId,继而被消费者拉取并消费
当前处理的消息是一个延迟消息,将对该消息做如下修改:
1.将原始Topic、queueId备份在消息的扩展字段中,全部的延迟消息都保存在
SCHEDULE_TOPIC的Topic中
2.备份原始Topic、queueId为延迟消息的Topic、queueId。备份的目的是当消息到达
投递时间时会恢复原始的Topic和queueId,继而被消费者拉取并消费
经过处理后,该消息会被正常保存到CommitLog中,然后创建ConsumeQueue和IndexFile两个索引。
在创建ConsumeQueue时,从CommitLog中获取的消息内容会单独进行处理,单独处理的逻辑方法是
CommitLog.checkMessageAndReturnSize().
有一个很精巧的设计:在CommitLog中查询出消息后,调用computeDeliverTimestamp()方法计算消息
具体的投递时间,再将该时间保存在ConsumeQueue的tagCode中。
这样设计的好处是,不需要检查CommitLog大文件,在定时任务检查消息是否需要投递时,只需要检查
ConsumeQueue中的tagCode(不再是Tag的Hash值,而是消息可以投递的时间,单位是ms),如果满足
条件再通过查询CommitLog将消息投递出去即可,如果每次都查询CommitLog,那么可想而知,效率会很低
在创建ConsumeQueue时,从CommitLog中获取的消息内容会单独进行处理,单独处理的逻辑方法是
CommitLog.checkMessageAndReturnSize().
有一个很精巧的设计:在CommitLog中查询出消息后,调用computeDeliverTimestamp()方法计算消息
具体的投递时间,再将该时间保存在ConsumeQueue的tagCode中。
这样设计的好处是,不需要检查CommitLog大文件,在定时任务检查消息是否需要投递时,只需要检查
ConsumeQueue中的tagCode(不再是Tag的Hash值,而是消息可以投递的时间,单位是ms),如果满足
条件再通过查询CommitLog将消息投递出去即可,如果每次都查询CommitLog,那么可想而知,效率会很低
延迟消息投递机制。
RocketMQ在存储延迟消息时,将其保存在一个系统的Topic中,在创建ConsumeQueue时,
tagCode字段中保存着延迟消息需要被投递的时间,通过这个存储实现的思路,我们可以总结出
延迟消息的投递过程:通过定时服务定时扫描ConsumeQueue,满足投递时间条件的消息再通过
CommitLog将消息重新投递到原始的Topic中,消费者就可以接收消息了。
在存储模块初始化时,初始化延迟消息处理类ScheduleMessageService,通过依次调用start()
方法来启动延迟消息定时扫描任务,start()方法核心逻辑如图
RocketMQ在存储延迟消息时,将其保存在一个系统的Topic中,在创建ConsumeQueue时,
tagCode字段中保存着延迟消息需要被投递的时间,通过这个存储实现的思路,我们可以总结出
延迟消息的投递过程:通过定时服务定时扫描ConsumeQueue,满足投递时间条件的消息再通过
CommitLog将消息重新投递到原始的Topic中,消费者就可以接收消息了。
在存储模块初始化时,初始化延迟消息处理类ScheduleMessageService,通过依次调用start()
方法来启动延迟消息定时扫描任务,start()方法核心逻辑如图
核心字段和方法
timer:定时检查延迟消息是否可以投递的定时器
delayLevelTable:该字段用于保存全部的延迟级别
level:延迟级别
timeDelay:延迟时间
offset:延迟级别对应的ConsumeQueue的消费位点,扫描时从这个位点开始
timeDelay:参数表示延迟时间
从代码中的for循环可以知道,每个延迟级别都有一个定时任务进行扫描,每个延迟级别
在第一次扫描时会延迟1000ms,再开始执行扫描。随着延迟消息不断被重新投递,内置
Topic的全部ConsumeQueue的消费位点offset不断向前推进,也会定时执行
ScheduleMessageService.this.persist()方法来持久化消费位点,以便进程重启后从上次
开始扫描检查。
在第一次扫描时会延迟1000ms,再开始执行扫描。随着延迟消息不断被重新投递,内置
Topic的全部ConsumeQueue的消费位点offset不断向前推进,也会定时执行
ScheduleMessageService.this.persist()方法来持久化消费位点,以便进程重启后从上次
开始扫描检查。
this.timer.schedule()定时任务只执行一次,那么之后发送的消息是如何进行投递的呢?
在DeliverDelayedMessageTimeTask.executeOnTimeup()方法中,DeliverDelayed-
MessageTimerTask类是ScheduleMessageService类的一个内部类,同时也是
this.timer.schedule()方法的输入参数
在DeliverDelayedMessageTimeTask.executeOnTimeup()方法中,DeliverDelayed-
MessageTimerTask类是ScheduleMessageService类的一个内部类,同时也是
this.timer.schedule()方法的输入参数
核心属性和方法
delayLevel:延迟级别。
offset:待检查消息的ConsumeQueue的位点值
correctDeliverTimestamp():纠正投递时间
executeOnTimeup():定时扫描核心方法
DeliverDelayedMessageTimerTask默认执行run()方法,run()方法直接调用
executeOnTimeup()方法扫描当前位点的消息是否满足投递条件
executeOnTimeup()方法扫描当前位点的消息是否满足投递条件
核心方法的执行步骤
第一步:查找Consume Queue.其中涉及到了queueId2DelayLevel()
和delayLevel2QueueId(),RocketMQ设计的延迟级别和延迟Topic的
queueId有关系,可以进行互相转化
和delayLevel2QueueId(),RocketMQ设计的延迟级别和延迟Topic的
queueId有关系,可以进行互相转化
第二步:找到投递时间。真正的投递时间deliverTimestamp被存储在
ConsumeQueue的tagCode中,所以我们可以通过offset查找ConsumeQueue中
保存的deliverTimestamp,再通过调用correctDeliverTimestamp()
计算当前消息的真正投递时间deliverTimestamp
ConsumeQueue的tagCode中,所以我们可以通过offset查找ConsumeQueue中
保存的deliverTimestamp,再通过调用correctDeliverTimestamp()
计算当前消息的真正投递时间deliverTimestamp
第三步:如果满足投递时间条件,则重新发送消息到原始Topic中,
在重新投递前调用messageTimeup()方法,将消息的原始Topic、
queueId、tagCode等还原,清除扩展字段中延迟消息的标志
(MessageConstant.PROPERTY_DELAY_TIME_LEVEL),然后被重新
投递、更新消费位点。
重新投递后,消息会正常创建Consume Queue索引、IndexFile索引,
然后被消费者拉取消费,达到定时消费的目的。
在重新投递前调用messageTimeup()方法,将消息的原始Topic、
queueId、tagCode等还原,清除扩展字段中延迟消息的标志
(MessageConstant.PROPERTY_DELAY_TIME_LEVEL),然后被重新
投递、更新消费位点。
重新投递后,消息会正常创建Consume Queue索引、IndexFile索引,
然后被消费者拉取消费,达到定时消费的目的。
第四步:如果第三步投递失败,或者消息没有达到投递时间条件,则
重新提交一个定时任务到timer中,以供下次检查
重新提交一个定时任务到timer中,以供下次检查
0 条评论
下一页