RocketMQ
2024-02-14 14:26:53 4 举报
AI智能生成
RocketMQ是一款开源的分布式消息中间件,具有高性能、高可靠、高扩展性等特点。它采用发布/订阅模式,支持多种消息类型,如文本消息、二进制消息等。RocketMQ通过消息存储和传输的优化,实现了低延迟、高吞吐量的消息传递。同时,它还提供了丰富的监控和管理功能,方便用户进行系统运维。RocketMQ广泛应用于电商、金融、物流等领域,为企业提供了稳定可靠的消息服务。 希望这个描述能够满足您的需求。如果您还有其他问题,请随时问我。
作者其他创作
大纲/内容
5.0
为什么选择MQ
孕育RocketMQ雏形的应用场景
异步通信
搜索
社交活动流
数据管道
贸易流程
贸易业务吞吐量上升,消息传递集群的压力变大
选型
随着队列和虚拟主题使用的增加,ActiveMQ IO模块达到了一个瓶颈,尽力通过节流、断路器或降级来解决这个问题,但效果并不理想
Kafka不能满足我们的要求,其尤其表现在低延迟和高可靠性方面
决定
发明一个新的消息传递引擎,覆盖从传统的pub/sub场景到高容量的实时零误差的交易系统。
效果
历经十余年的大规模场景打磨,RocketMQ 已经成为业内共识的金融级可靠业务消息首选方案,被广泛应用于互联网、大数据、移动互联网、物联网等领域的业务场景。
基本该念
主题
消息传递和存储的顶层容器,用于标识同一类业务逻辑的消息。通过TopicName标识和区分
消息类型(按传输特性不同而分类)
普通消息
顺序消息
事务消息
定时/延时消息
消息队列
消息存储和传输的实际容器,也是消息的最小存储单元
消息
最小数据传输单元
消息视图
面向开发视角提供的一种消息只读接口。通过消息视图可以读取消息内部的多个属性和负载信息,但是不能对消息本身做任何修改。
消息标签
细粒度消息分类属性,可以在主题层级之下做消息类型的细分
消息位点(MessageQueueOffset)
消息是按到达Apache RocketMQ 服务端的先后顺序存储在指定主题的多个队列中,
每条消息在队列中都有一个唯一的Long类型坐标,这个坐标被定义为消息位点。
每条消息在队列中都有一个唯一的Long类型坐标,这个坐标被定义为消息位点。
消费位点(ConsumerOffset)
一条消息被某个消费者消费完成后不会立即从队列中删除,Apache RocketMQ 会基于每个消费者分组记录消费过的最新一条消息的位点,即消费位点
消息索引
面向消息的索引属性
生产者
构建并传输消息到服务端的运行实体
事务检查器(TransactionChecker)
生产者执行本地事务检查和异常事务恢复的监听器
事务状态(TransactionResolution)
事务消息的发送过程中,事务提交的状态标识
服务端通过事务状态控制事务消息是否提交和投递
3种状态
事务提交
事务回滚
事务未决
消费者分组
是什么
承载多个消费行为一致的消费者负载均衡分组
消费者分组不是一个运行实体,是一个逻辑资源
作用
初始化多个消费者,实现消费性能的水平扩展以及高可用容灾
消费者
接收并处理消息的运行实体
消费结果
PushConsumer消费监听器处理消息完成后返回的处理结果
消费结果包含消费成功和消费失败
订阅关系
消费者获取消息、处理消息的规则和状态配置
消费过滤
消费者可以通过订阅指定消息标签(Tag)对消息进行过滤,确保最终只接收被过滤后的消息合集
滤规则的计算和匹配在Apache RocketMQ 的服务端完成
重置消费位点
以时间轴为坐标,在消息持久化存储的时间范围内,重新设置消费者分组对已订阅主题的消费进度,设置完成后消费者将接收设定时间点之后,由生产者发送到Apache RocketMQ 服务端的消息
消息轨迹
从生产者到消费的完整链路
消息堆积
消费者的消费能力有限,未能在短时间内将所有消息正确消费掉,此时在服务端保存着未被消费的消息,该状态即消息堆积
事务消息
支持在分布式场景下保障消息生产和本地事务的最终一致性
定时/延时消息
消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。
顺序消息
支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。
参数约束和建议
请求超时时间
默认值:3000毫秒。
取值范围:该参数为客户端本地行为,取值范围建议不要超过30000毫秒。
取值范围:该参数为客户端本地行为,取值范围建议不要超过30000毫秒。
请求超时时间是客户端本地同步调用的等待时间,请根据实际应用设置合理的取值,避免线程阻塞时间过长。
消息大小
默认值:不超过4 MB。不涉及消息压缩,仅计算消息体body的大小。
取值范围:建议不超过4 MB。
取值范围:建议不超过4 MB。
消息传输应尽量压缩和控制负载大小,避免超大文件传输。若消息大小不满足限制要求,可以尝试分割消息或使用OSS存储,用消息传输URL。
消息自定义属性
长度建议:属性的Key和Value总长度不超过16 KB。
系统保留属性:不允许使用以下保留属性作为自定义属性的Key。 保留属性Key
系统保留属性:不允许使用以下保留属性作为自定义属性的Key。 保留属性Key
消息发送重试次数
默认值:3次。
取值范围:无限制。
取值范围:无限制。
消息发送重试是客户端SDK内置的重试策略,对应用不可见,建议取值不要过大,避免阻塞业务线程。 如果消息达到最大重试次数后还未发送成功,建议业务侧做好兜底处理,保证消息可靠性。
消息消费重试次数
默认值:16次。
消费重试次数应根据实际业务需求设置合理的参数值,避免使用重试进行无限触发。重试次数过大容易造成系统压力过量增加。
领域模型
概述
Apache RocketMQ 产品主要应用于异步解耦,流量削峰填谷等场景
消息的生命周期
如图所示,生命周期主要分为消息生产、消息存储、消息消费
消息生产
生产者(Producer)
消息存储
主题(Topic)
主题内部由多个队列组成,消息的存储和水平扩展实际是通过主题内的队列实现的
队列(MessageQueue)
消息传输和存储的实际单元容器,类比于其他消息队列中的分区。
Apache RocketMQ 通过流式特性的无限队列结构来存储消息,消息在队列内具备顺序性存储特征。
Apache RocketMQ 通过流式特性的无限队列结构来存储消息,消息在队列内具备顺序性存储特征。
消息(Message)
消息消费
消费者分组(ConsumerGroup)
用于统一管理底层运行的多个消费者(Consumer)
同一个消费组的多个消费者必须保持消费逻辑和配置一致,共同分担该消费组订阅的消息,实现消费能力的水平扩展
消费者(Consumer)
消费者必须被指定到某一个消费组中。
订阅关系(Subscription)
发布订阅模型中消息过滤、重试、消费进度的规则配置
订阅关系以消费组粒度进行管理,消费组通过定义订阅关系控制指定消费组下的消费者如何实现消息过滤、消费重试及消费进度恢复等。
订阅关系除过滤表达式之外都是持久化的,即服务端重启或请求断开,订阅关系依然保留
通信方式介绍
背景:分布式系统下,复杂系统被拆分为多个独立子模块,例如微服务模块,此时就需要考虑模块间的远程通信
典型的通信方式
同步的RPC远程调用
基于中间件代理的异步通信方式
异步通信的优势
系统拓扑简单
由于调用方和被调用方统一和中间代理通信,系统是星型结构,易于维护和管理
上下游耦合性弱
上下游系统之间弱耦合,结构更灵活,由中间代理负责缓冲和异步恢复。 上下游系统间可以独立升级和变更,不会互相影响。
容量削峰填谷
基于消息的中间代理往往具备很强的流量缓冲和整形能力,业务流量高峰到来时不会击垮下游
消息传输模型介绍
主流的消息中间件的传输模型
点对点模型
特点
消费匿名:消息上下游沟通的唯一的身份就是队列,下游消费者不具备身份
一对一通信:下游消费者即使有多个,但都没有自己独立的身份,因此共享队列中的消息,每一条消息都只会被唯一一个消费者处理
发布订阅模型
特点
消费独立:发布订阅模型中消费方都会具备的身份,一般叫做订阅组(订阅关系),不同订阅组之间相互独立不会相互影响。
一对多通信:基于独立身份的设计,同一个主题内的消息可以被多个订阅组处理,每个订阅组都可以拿到全量消息。
总结
点对点模型和发布订阅模型各有优势,点对点模型更为简单,而发布订阅模型的扩展性更高。
Apache RocketMQ 使用的传输模型为发布订阅模型,因此也具有发布订阅模型的特点。
Apache RocketMQ 使用的传输模型为发布订阅模型,因此也具有发布订阅模型的特点。
主题
定义
消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息
作用
定义数据的分类隔离: 建议将不同业务类型的数据拆分到不同的主题中管理,通过主题实现存储的隔离性和订阅隔离性
定义数据的身份和权限: 消息本身是匿名无身份的,同一分类的消息使用相同的主题来做身份识别和权限管理。
模型关系
主题所处的流程和位置如上所示
主题是 Apache RocketMQ 的顶层存储,所有消息资源的定义都在主题内部完成,但主题是一个逻辑概念,并不是实际的消息容器。
主题内部由多个队列组成,消息的存储和水平扩展能力最终是由队列实现的;并且针对主题的所有约束和属性设置,最终也是通过主题内部的队列来实现。
内部属性
主题名称
用于标识主题,主题名称集群内全局唯一
队列列表
一个主题内包含一个或多个队列,消息实际存储在主题的各队列内
消息类型
普通消息
顺序消息
通过消息分组MessageGroup标记一组特定消息的先后顺序,可以保证消息的投递顺序严格按照消息发送时的顺序。
定时/延时消息
通过指定延时时间控制消息生产后不要立即投递,而是在延时间隔后才对消费者可见
事务消息
行为约束
消息类型强制校验
消息类型必须一致:发送的消息的类型,必须和目标主题定义的消息类型一致
主题类型必须单一:每个主题只支持一种消息类型,不允许将多种类型的消息发送到同一个主题中
队列
作用
天然的顺序性
能从任意位点读任意数量的消息
默认提供消息可靠存储机制,配合生产者和消费者客户端的调用可实现至少投递一次的可靠性语义
读写权限
是什么:当前队列是否可以读写数据,取值由服务端定义
枚举值
6(110):可读写
4(100):只读
2(010):只写
0(000):不可读写
队列的读写权限属于运维侧操作,不建议频繁修改
常见队列增加场景
需要增加队列实现物理节点负载均衡(为了保证集群流量的负载均衡,建议在新的服务节点上新增队列,或将旧的队列迁移到新的服务节点上)
需要增加队列实现顺序消息性能扩展(顺序消息的顺序性在队列内生效的,因此顺序消息的并发度会在一定程度上受队列数量的影响,因此建议仅在系统性能瓶颈时再增加队列)
消息
行为约束
普通和顺序消息不能超过4MB
事务和延时消息不能超过64KB
生产者
版本兼容性
5.x版本开始,生产者是匿名的,无需管理生产者分组(ProducerGroup)
对于历史版本服务端3.x和4.x版本,已经使用的生产者分组可以废弃无需再设置,且不会对当前业务产生影响
消费者分组
内部属性
投递顺序性
是什么:消费者消费消息时,Apache RocketMQ 向消费者客户端投递消息的顺序
根据不同的消费场景,Apache RocketMQ 提供顺序投递和并发投递两种方式
默认是并发投递
消费重试策略
最大重试次数(默认16次)
重试间隔(频率递减的方式)
消费者
内部属性
消费者监听器
使用PushConsumer类型的消费者消费消息时,消费者客户端必须设置消费监听器
行为约束
同一分组内所有消费者的投递顺序和消费重试策略需要保持一致
使用建议
不建议在单一进程内创建大量消费者
不建议频繁创建和销毁消费者
订阅关系
是什么:订阅关系由消费者分组动态注册到服务端系统,并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护
通过配置订阅关系,可控制如下传输行为
消息过滤规则
消费状态:消费者分组在服务端注册订阅关系后,当消费者离线并再次上线后,可以获取离线前的消费进度并继续消费。
订阅关系判断原则
不同消费者分组对于同一个主题的订阅相互独立如上图所示
同一个消费者分组对于不同主题的订阅也相互独立如上图所示
订阅关系的位置和流程
功能特性
普通消息
应用场景
概述:普通消息一般应用于微服务解耦、事件驱动、数据集成等场景,这些场景大多数要求数据传输通道具有可靠传输的能力
典型场景一:微服务异步解耦
如上图所示,以在线的电商交易场景为例,上游订单系统将用户下单支付这一业务事件封装成独立的普通消息并发送至Apache RocketMQ服务端,下游按需从服务端订阅消息并按照本地消费逻辑处理下游任务。每个消息之间都是相互独立的,且不需要产生关联。
典型场景二:数据集成传输
如上图所示,以离线的日志收集场景为例,通过埋点组件收集前端应用的相关操作日志,并转发到 Apache RocketMQ 。每条消息都是一段日志数据,Apache RocketMQ 不做任何处理,只需要将日志数据可靠投递到下游的存储系统和分析系统即可,后续功能由后端应用完成。
延申思考:超店中的PV表页面浏览路径,也是通过埋点收集前端操作日志并发给后端,后端发给MQ异步处理
功能原理
普通消息生命周期
初始化
待消费
消费中
消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。具体信息,请参见消费重试。
消费提交
消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
消息删除
Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制。
使用建议
设置全局唯一业务索引键,方便问题追踪(例如,订单ID,用户ID)
定时/延时消息
备注:定时消息和延时消息本质相同,都是服务端根据消息设置的定时时间在某一固定时刻将消息投递给消费者消费。因此,下文统一用定时消息描述
应用场景
概述:分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的定时事件触发
典型场景一:分布式定时调度
在分布式定时调度场景下,需要实现各类精度的定时任务,例如每天5点执行文件清理,每隔2分钟触发一次消息推送等需求。传统基于数据库的定时调度方案在分布式场景下,性能不高,实现复杂。基于 Apache RocketMQ 的定时消息可以封装出多种类型的定时触发器。
典型场景二:任务超时处理
以电商交易场景为例,订单下单后暂未支付,此时不可以直接关闭订单,而是需要等待一段时间后才能关闭订单。使用 Apache RocketMQ 定时消息可以实现超时任务的检查触发。
功能原理
定时时间设置原则
Apache RocketMQ 定时消息设置的定时时间是一个预期触发的系统时间戳,延时时间也需要转换成当前系统时间后的某一个时间戳,而不是一段延时时长。
定时时长最大值默认为24小时,不支持自定义修改
定时消息生命周期
初始化
定时中:消息被发送到服务端,和普通消息不同的是,服务端不会直接构建消息索引,而是会将定时消息单独存储在定时存储系统中,等待定时时刻到达。
待消费
定时时刻到达后,服务端将消息重新写入普通存储引擎,对下游消费者可见,等待消费者消费的状态。
消费中
消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。具体信息,请参见消费重试。
消费提交
消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
消息删除
Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制。
使用限制
消息类型一致性
定时精度约束(默认精度为1000ms,即毫秒级)
使用建议
避免大量相同定时时刻的消息
定时消息的实现逻辑需要先经过定时存储等待触发,定时时间到达后才会被投递给消费者。因此,如果将大量定时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大,导致消息分发延迟,影响定时精度。
顺序消息
应用场景
概述:在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。
典型场景一:撮合交易
以证券、股票交易撮合场景为例,对于出价相同的交易单,坚持按照先出价先交易的原则,下游处理订单的系统需要严格按照出价顺序来处理订单
典型场景二:数据实时增量同步
以数据库变更增量同步场景为例,上游源端数据库按需执行增删改操作,将二进制操作日志作为消息,通过 Apache RocketMQ 传输到下游搜索系统,下游系统按顺序还原消息数据,实现状态数据按序刷新。如果是普通消息则可能会导致状态混乱,和预期操作结果不符,基于顺序消息可以实现下游状态和上游操作结果一致。
功能原理
如何保证消息的顺序性
生产顺序性
单一生产者
串行发送
消费顺序性
投递顺序(消费者要严格按照接收-处理-应答的语义处理消息,避免因异步处理导致消息乱序)
注意:消费者类型为PushConsumer时, Apache RocketMQ 保证消息按照存储顺序一条一条投递给消费者,若消费者类型为SimpleConsumer,则消费者有可能一次拉取多条消息。此时,消息消费的顺序性需要由业务方自行保证
有限重试
生命周期
与普通消息的生命周期一致
备注
消息消费失败或消费超时,会触发服务端重试逻辑,重试消息属于新的消息,原消息的生命周期已结束
顺序消息消费失败进行消费重试时,为保障消息的顺序性,后续消息不可被消费,必须等待前面的消息消费完成后才能被处理。
使用示例
和普通消息发送相比,顺序消息发送必须要设置消息组(MessageGroup)。
消息组的粒度建议按照业务场景,尽可能细粒度设计,以便实现业务拆分和并发扩展。
消息组的粒度建议按照业务场景,尽可能细粒度设计,以便实现业务拆分和并发扩展。
使用建议
串行消费,避免批量消费导致乱序
消息组尽可能打散,避免集中导致热点(如采用订单ID、用户ID,将不同消息组的消息均匀分布到队列中)
事务消息
应用场景
背景:一个核心事务需要调用多个下游业务,同时要保证他们的执行结果一致
以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更
解决方案一:传统XA事务方案:性能不足
为了保证上述四个分支的执行结果一致性,典型方案是基于XA协议的分布式事务系统来实现。将四个调用分支封装成包含四个独立事务分支的大事务。基于XA分布式事务的方案可以满足业务处理结果的正确性,但最大的缺点是多分支环境下资源锁定范围大,并发度低,随着下游分支的增加,系统性能会越来越差。(概括为:有一个事务协调者,多个事务参与者,由协调者统一调度commit或者rollback。协调者有著名的alibaba的seata)
解决方案二:基于普通消息方案:一致性保障困难
将上述基于XA事务的方案进行简化,将订单系统变更作为本地事务,剩下的系统变更作为普通消息的下游来执行,事务分支简化成普通消息+订单表事务,充分利用消息异步化的能力缩短链路,提高并发度。该方案中消息下游分支和订单系统变更的主分支很容易出现不一致的现象。
如:
消息发送成功,订单没有执行成功,需要回滚整个事务。
订单执行成功,消息没有发送成功,需要额外补偿才能发现不一致。
消息发送超时未知,此时无法判断需要回滚订单还是提交订单变更。
如:
消息发送成功,订单没有执行成功,需要回滚整个事务。
订单执行成功,消息没有发送成功,需要额外补偿才能发现不一致。
消息发送超时未知,此时无法判断需要回滚订单还是提交订单变更。
解决方案三:基于Apache RocketMQ分布式事务消息:支持最终一致性
上述普通消息方案中,普通消息和订单事务无法保证一致的原因,本质上是由于普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。
而基于Apache RocketMQ实现的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。
而基于Apache RocketMQ实现的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。
功能原理
子主题
使用限制
消费事务性
Apache RocketMQ 事务消息保证本地主分支事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务分支自行保证消息正确处理,建议消费端做好消费重试,如果有短暂失败可以利用重试机制保证最终处理成功。
事务超时机制
Apache RocketMQ 事务消息的生命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。事务超时时间,默认值是4小时
使用建议
避免大量未决事务导致超时
正确处理"进行中"的事务(不要返回Rollback或Commit结果,应继续保持Unknown的状态)
一般出现消息回查时事务正在处理的原因为:事务执行较慢,消息回查太快。解决方案如下:
将第一次事务回查时间设置较大一些,但可能导致依赖回查的事务提交延迟较大。
程序能正确识别正在进行中的事务。
将第一次事务回查时间设置较大一些,但可能导致依赖回查的事务提交延迟较大。
程序能正确识别正在进行中的事务。
消息发送重试和流控机制
消息发送重试机制
重试基本概念
在客户端SDK中内置请求重试逻辑
同步发送和异步发送模式均支持消息发送重试
重试流程
生产者在初始化时设置消息发送最大重试次数,生产者发送失败则进行重试
重试间隔
除服务端返回系统流控错误场景,其他触发条件触发重试后,均会立即进行重试,无等待间隔
若由于服务端返回流控错误触发重试,系统会按照指数退避策略进行延迟重试(第一次失败重试前后需等待多久,默认值:1秒)
功能约束
链路耗时阻塞评估。(重试可能会对链路造成阻塞延时,如果后续操作对实时性要求比较高,需要调整合理超时时间以及重试次数)
最终异常兜底(重试不一定最终发送成功,需要业务方做兜底保证数据一致性)
消息重复问题(出现假超时,其实是发送成功并到达服务端,只需拿到响应报文太慢,又重试发送了一次)
流控机制
流控概念:消息流控指的是系统容量或水位过高, Apache RocketMQ 服务端会通过快速失败返回流控错误来避免底层资源承受过高压力。
触发条件
存储压力大(消费位点为最大位点 || 需回溯到指定时刻前开始消费)
服务端请求任务排队溢出(消息堆积)
流控行为
客户端会收到
reply-code:530
reply-text:TOO_MANY_REQUESTS
处理建议
监控系统容量,保证底层资源充足
如果因为突发原因触发消息流控,且客户端内置的重试流程执行失败,则建议业务方将请求调用临时替换到其他系统进行应急处理。
消费者分类
3种类型
PushConsumer
SimpleConsumer
PullConsumer
背景信息
如何实现并发消费
如何实现同步、异步消息处理
如何实现消息可靠处理
功能概述
处理消息的几个阶段
消息获取
消息处理
消息状态提交
注意
若您的业务场景发生变更,或您当前使用的消费者类型不适合当前业务,您可以选择在 PushConsumer 和SimpleConsumer 之间变更消费者类型。变更消费者类型不影响当前Apache RocketMQ 资源的使用和业务处理。
在实际使用场景中,PullConsumer 仅推荐在流处理框架中集成使用,大多数消息收发场景使用 PushConsumer 和 SimpleConsumer 就可以满足需求。
生产环境中相同的 ConsumerGroup 下严禁混用 PullConsumer 和其他两种消费者,否则会导致消息消费异常。
三种类型之间的区别
PushConsumer
注意
PushConsumer 消费消息时,若消息处理逻辑出现预期之外的阻塞导致消息处理一直无法执行成功,SDK会按照消费超时处理强制提交消费失败结果,并按照消费重试逻辑进行处理。消息超时,请参见PushConsumer消费重试策略。
出现消费超时情况时,SDK虽然提交消费失败结果,但是当前消费线程可能仍然无法响应中断,还会继续处理消息。
内部原理
在PushConsumer类型中,消息的实时处理能力是基于SDK内部的典型Reactor线程模型(底层使用队列)实现的。如下图所示,SDK内置了一个长轮询线程,先将消息异步拉取到SDK内置的缓存队列中,再分别提交到消费线程中,触发监听器执行本地消费逻辑。
可靠性重试
客户端SDK严格按照监听器的返回结果判断消息是否消费成功,并做可靠性重试。
所有消息必须以同步方式进行消费处理,并在监听器接口结束时返回调用结果,不允许再做异步化分发
不允许使用以下方式处理消息,否则 Apache RocketMQ 无法保证消息的可靠性
消息还未处理完成,就提前返回消费成功结果
在消费监听器内将消息再次分发到自定义的其他线程,消费监听器提前返回消费结果
顺序性保障
如果消费者分组设置了顺序消费模式,则PushConsumer在触发消费监听器时,严格遵循消息的先后顺序。
适用场景
PushConsumer严格限制了消息同步处理及每条消息的处理超时时间,适用于以下场景
消息处理时间可预估
无异步化、高级定制场景
SimpleConsumer
概述:SimpleConsumer 是一种接口原子型的消费者类型,消息的获取、消费状态提交以及消费重试都是通过消费者业务逻辑主动发起调用完成。
使用方式
try {
// SimpleConsumer 需要主动获取消息,并处理。
List<MessageView> messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// 消费处理完成后,需要主动调用 ACK 提交消费结果。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
logger.error("Failed to ack message, messageId={}", messageView.getMessageId(), e);
}
});
} catch (ClientException e) {
// 如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
logger.error("Failed to receive message", e);
}
// SimpleConsumer 需要主动获取消息,并处理。
List<MessageView> messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// 消费处理完成后,需要主动调用 ACK 提交消费结果。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
logger.error("Failed to ack message, messageId={}", messageView.getMessageId(), e);
}
});
} catch (ClientException e) {
// 如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
logger.error("Failed to receive message", e);
}
可靠性重试
客户端SDK如果处理消息成功则调用AckMessage接口;如果处理失败只需要不回复ACK响应,即可在定义的消费不可见时间到达后触发消费重试流程
顺序性保障
基于 Apache RocketMQ 顺序消息的定义,SimpleConsumer在处理顺序消息时,会按照消息存储的先后顺序获取消息。即需要保持顺序的一组消息中,如果前面的消息未处理完成,则无法获取到后面的消息
适用场景
消息处理时长不可控
需要异步化、批量消费等高级定制场景
需要自定义消费速率
PullConsumer
待补充
使用建议
PushConsumer合理控制消费耗时,避免无限阻塞
对于PushConsumer消费类型,需要严格控制消息的消费耗时,尽量避免出现消息处理超时导致消息重复。如果业务经常会出现一些预期外的长时间耗时的消息,建议使用SimpleConsumer,并设置好消费不可见时间。
消息过滤
应用场景
在实际业务场景中,同一个主题下的消息往往会被多个不同的下游业务方处理,各下游的处理逻辑不同,只关注自身逻辑需要的消息子集
一般是基于同一业务下更具体的分类进行过滤匹配
功能概述
消息过滤定义
将符合条件的消息投递给消费者,而不是将匹配到的消息过滤掉
过滤的原理
子主题
订阅关系一致性
同一消费者分组内的多个消费者的订阅关系包括过滤表达式,必须保持一致,否则可能会导致部分消息消费不到
使用建议
合理规划topic和tag
消息类型是否一致:普通消息、顺序消息需要按topic拆分
业务域是相同
物流消息、支付消息采用不同topic
普通物流消息、加急物流消息采用不同的tag
消息的量级和重要性是否一致
消息量级存在巨大差异、消息链路的重要程度存在差异
消费者负载均衡
概述
费者从 Apache RocketMQ 获取消息消费时,通过消费者负载均衡策略,可将主题内的消息分配给指定消费者分组中的多个消费者共同分担,提高消费并发能力和消费者的水平扩展能力
广播消费和共享消费
广播:每个消费者可以消费到消费者组内所有的消息
共享:消费者共同分担消费者组内的消息(因此只有共享消费才有负载均衡)
2种负载策略
消息粒度
PushConsumer和SimpleConsumer默认负载策略
策略原理
同一个队列中的消息,可被平均分配给多个消费者共同消费
该策略使用的消息分配算法结果是随机的,并不能指定消息被哪一个特定的消费者处理
策略特点
消费分摊更均衡
对非对等消费者更友好(资源较差的消费者只需分担一小部分消息,无需分担一整个队列)
队列分配运维更方便(无需关注队列数量,而队列粒度则必须保证队列数量大于等于消费者数量)
顺序消息负载机制
不同消费者处理同一个消息分区(即Message Queue)的消息时,会严格按照先后顺序锁定消息状态,确保同一消息分区的消息串行消费
适用场景
消息粒度消费负载均衡策略下,同一队列内的消息离散地分布于多个消费者,适用于绝大多数在线事件处理的场景。只需要基本的消息处理能力,对消息之间没有批量聚合的诉求。而对于流式处理、聚合计算场景,需要明确地对消息进行聚合、批处理时,更适合使用队列粒度的负载均衡策略。
队列粒度
PullConsumer默认负载策略
策略原理
每个队列仅被一个消费者消费
若队列数小于消费者数量,可能会出现部分消费者无绑定队列的情况。
策略特点
队列粒度负载均衡策略分配粒度较大,不够灵活。但该策略在流式处理场景下有天然优势,能够保证同一队列的消息被相同的消费者处理,对于批量处理、聚合处理更友好
适用场景
适用于流式计算、数据聚合等需要明确对消息进行聚合、批处理的场景。
适用示例
队列粒度负载均衡策略不需要额外设置,对于历史版本(服务端4.x/3.x版本)的消费者类型PullConsumer默认启用。
使用建议
针对消费逻辑做消息幂等
无论是消息粒度负载均衡策略还是队列粒度负载均衡策略,在消费者上线或下线、服务端扩缩容等场景下,都会触发短暂的重新负载均衡动作。此时可能会存在短暂的负载不一致情况,出现少量消息重复的现象。因此,需要在下游消费逻辑中做好消息幂等去重处理
消费进度管理
原理
消息位点
是什么:每条消息在队列中都有一个唯一的Long类型坐标,这个坐标被定义为消息位点
最早一条消息的位点为最小消息位点(MinOffset);最新一条消息的位点为最大消息位点(MaxOffset)
消息队列逻辑上是无限存储,但由于服务端物理节点的存储空间有限, Apache RocketMQ 会滚动删除队列中存储最早的消息
消息的最小消费位点和最大消费位点会一直递增变化
消费位点
Apache RocketMQ 通过消费位点管理消息的消费进度
RocketMQ会维护一份记录,记录某个消费者组消费某个队列的最新消息的位点
当消费者客户端离线,又再次重新上线时,会严格按照服务端保存的消费进度继续处理消息。
如果服务端保存的历史位点信息已过期被删除,此时消费位点向前移动至服务端存储的最小位点
如果服务端保存的历史位点信息已过期被删除,此时消费位点向前移动至服务端存储的最小位点
消费位点的保存和恢复是基于 Apache RocketMQ 服务端的存储实现,和任何消费者无关。
消费位点之间的关系
消费位点初始值
消费位点初始值指的是消费者分组首次启动消费者消费消息时,服务端保存的消费位点的初始值
消费位点的初始值为消费者首次获取消息时,该时刻队列中的最大消息位点。相当于消费者将从队列中最新的消息开始消费。
重置消费位点
是什么
若消费者分组的初始消费位点或当前消费位点不符合您的业务预期,您可以通过重置消费位点调整您的消费进度。
适用场景
初始消费位点不符合需求
因初始消费位点为当前队列的最大消息位点,即客户端会直接从最新消息开始消费。若业务上线时需要消费部分历史消息,您可以通过重置消费位点功能消费到指定时刻前的消息。
消费堆积快速清理
当下游消费系统性能不足或消费速度小于生产速度时,会产生大量堆积消息。若这部分堆积消息可以丢弃,您可以通过重置消费位点快速将消费位点更新到指定位置,绕过这部分堆积的消息,减少下游处理压力
业务回溯,纠正处理
由于业务消费逻辑出现异常,消息被错误处理。若您希望重新消费这些已被处理的消息,可以通过重置消费位点快速将消费位点更新到历史指定位置,实现消费回溯。
重置功能
重置到队列中的指定位点
重置到某一时刻对应的消费位点,匹配位点时,服务端会根据自动匹配到该时刻最接近的消费位点
使用建议
严格控制消费位点重置的权限
重置消费位点会给系统带来额外处理压力,可能会影响新消息的读写性能。 因此该操作请在适用场景下谨慎执行,并提前做好合理性和必要性评估。
消费重试
是什么
消费者在消费某条消息失败后,Apache RocketMQ 服务端会根据重试策略重新消费该消息,超过一次定数后若还未消费成功,则该消息将不再继续重试,直接被发送到死信队列中
消费重试的触发条件
消费失败
消息处理超时
重试策略的主要行为
重试过程状态机
控制消息在重试流程中的状态和变化逻辑
重试间隔
消费失败或者超时后,下一次重新消费的间隔
最大重试次数
重试策略的差异
PushConsumer
状态机
最大重试次数
最大重试次数为3次,则该消息最多可被投递4次,1次为原始消息,3次为重试投递次数。
重试间隔时间
无序消息
重试间隔为阶梯时间,若重试次数超过16次,后面每次重试间隔都为2小时。
顺序消息
重试间隔为固定时间,默认3000毫秒
使用示例
PushConsumer触发消息重试只需要返回消费失败的状态码即可,当出现非预期的异常时,也会被SDK捕获。
SimpleConsumer simpleConsumer = null;
//消费示例:使用PushConsumer消费普通消息,如果消费失败返回错误,即可触发重试。
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
//返回消费失败,会自动重试,直至到达最大重试次数。
return ConsumeResult.FAILURE;
}
};
//消费示例:使用PushConsumer消费普通消息,如果消费失败返回错误,即可触发重试。
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
//返回消费失败,会自动重试,直至到达最大重试次数。
return ConsumeResult.FAILURE;
}
};
SimpleConsumer
状态机
和PushConsumer消费重试策略不同的是,SimpleConsumer消费者的重试间隔是预分配的,每次获取消息消费者会在调用API时设置一个不可见时间参数 InvisibleDuration,即消息的最大处理时长。若消息消费失败触发重试,不需要设置下一次重试的时间间隔,直接复用不可见时间参数的取值。
总结:PushConsumer有一个重试间隔,而SimpleConsumer有一个不可见时间,无需再设一个重试间隔
您预设消息处理耗时最多20 ms,但实际业务中20 ms内消息处理不完,您可以修改消息不可见时间,延长消息处理时间,避免消息触发重试机制
消息不可见时间修改后立即生效,即从调用API时刻开始,重新计算消息不可见时间
最大重试次数
默认16次
消息重试间隔
消息重试间隔=不可见时间-消息实际处理时长
例如,消息不可见时间为30 ms,实际消息处理用了10 ms就返回失败响应,则距下次消息重试还需要20 ms,此时的消息重试间隔即为20 ms
若直到30 ms消息还未处理完成且未返回结果,则消息超时,立即重试,此时重试间隔即为0 ms
使用示例
SimpleConsumer 触发消息重试只需要等待即可
//消费示例:使用SimpleConsumer消费普通消息,如果希望重试,只需要静默等待超时即可,服务端会自动重试。
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//如果处理失败,希望服务端重试,只需要忽略即可,等待消息再次可见后即可重试获取。
});
} catch (ClientException e) {
//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
e.printStackTrace();
}
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//如果处理失败,希望服务端重试,只需要忽略即可,等待消息再次可见后即可重试获取。
});
} catch (ClientException e) {
//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
e.printStackTrace();
}
使用建议
合理重试,避免因限流等诉求触发消费重试
消息重试适用业务处理失败且当前消费为小概率事件的场景,不适合在连续性失败的场景下使用,例如消费限流场景。
错误示例:如果当前消费速度过高触发限流,则返回消费失败,等待下次重新消费。
正确示例:如果当前消费速度过高触发限流,则延迟获取消息,稍后再消费。
正确示例:如果当前消费速度过高触发限流,则延迟获取消息,稍后再消费。
合理控制重试次数,避免无限重试
建议通过减少重试次数+延长重试间隔来降低系统压力,避免出现无限重试或大量重试的情况。
消息存储和清理机制
原理机制
Apache RocketMQ 使用存储时长作为消息存储的依据,即每个节点对外承诺消息的存储时长。在存储时长范围内的消息都会被保留,无论消息是否被消费;超过时长限制的消息则会被清理掉。
存储机制主要定义以下关键问题
消息存储管理粒度:Apache RocketMQ 按存储节点管理消息的存储时长,并不是按照主题或队列粒度来管理
消息存储判断依据:消息存储按照存储时间作为判断依据,相对于消息数量、消息大小等条件,使用存储时间作为判断依据,更利于业务方对消息数据的价值进行评估
消息存储和是否消费状态无关:Apache RocketMQ 的消息存储是按照消息的生产时间计算,和消息是否被消费无关。按照统一的计算策略可以有效地简化存储机制。
消息在队列中的存储情况
消息存储和消费状态关系说明
当消费者不在线或消息消费异常时,会造成队列中大量消息堆积,且该现象暂时无法有效控制。若此时按照消费状态考虑将未消费的消息全部保留,则很容易导致存储空间不足,进而影响到新消息的读写速度。
消息过期清理机制
消息保存时长并不能完整控制消息的实际保存时间,因为消息存储仍然使用本地磁盘,本地磁盘空间不足时,为保证服务稳定性消息仍然会被强制清理,导致消息的实际保存时长小于设置的保存时长。
使用建议
消息存储时长建议适当增加
建议在存储成本可控的前提下,尽可能延长消息存储时长。延长消息存储时长,可以为紧急故障恢复、应急问题排查和消息回溯带来更多的可操作空间。
4.x
初始RocketMQ
部署模型
子主题
生产者
Producer通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败和重试。
消费者
支持以推(push),拉(pull)两种模式对消息进行消费
支持集群方式和广播方式的消费
提供实时消息订阅机制
名字服务器 NameServer
是什么:是一个简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现
功能
broker管理(心跳检测机制)
路由信息管理(提供broker集群的路由信息给producer、consumer)
设计
可以部署多台NameServer,各实例间不进行通信
broker向每一台NameServer注册自己的路由信息
代理服务器 Broker
Broker 分为 Master 与 Slave。一个Master可以对应多个Slave,但是一个Slave只能对应一个Master
Master 与 Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave
Master也可以部署多个
小结
每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。
Producer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取Topic路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态。
Consumer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave发送心跳。Consumer 既可以从 Master 订阅消息,也可以从Slave订阅消息。
生产者
基础概念
消息的构成
topic
body:存储消息体
properties:消息属性
注意
不管是 RocketMQ 的 Tag 过滤还是延迟消息等都会利用 Properties 消息属性机制,
这些特殊信息使用了系统保留的属性Key,设置自定义属性时需要避免和系统属性Key冲突。
这些特殊信息使用了系统保留的属性Key,设置自定义属性时需要避免和系统属性Key冲突。
transactionId:事务消息中使用
编程中,Message可以设置的属性值有
topic(必填)
body(必填)
tags
用于消息过滤,只支持一个消息设置一个tag
keys
这条消息的业务关键词
服务器会根据 keys 创建哈希索引,设置后,可以在 Console 系统根据 Topic、Keys 来查询消息,
由于是哈希索引,请尽可能保证 key 唯一,例如订单号,商品 Id 等。
由于是哈希索引,请尽可能保证 key 唯一,例如订单号,商品 Id 等。
系统保留字段
TRACE_ON、MSG_REGION、KEYS、TAGS、DELAY、RETRY_TOPIC、REAL_TOPIC、REAL_QID、TRAN_MSG、PGROUP、MIN_OFFSET、MAX_OFFSET、BUYER_ID、ORIGIN_MESSAGE_ID、TRANSFER_FLAG、CORRECTION_FLAG、MQ2_FLAG、RECONSUME_TIME、UNIQ_KEY、MAX_RECONSUME_TIMES、CONSUME_START_TIME、POP_CK、POP_CK_OFFSET、1ST_POP_TIME、TRAN_PREPARED_QUEUE_OFFSET、DUP_INFO、EXTEND_UNIQ_INFO、INSTANCE_ID、CORRELATION_ID、REPLY_TO_CLIENT、TTL、ARRIVE_TIME、PUSH_REPLY_TIME、CLUSTER、MSG_TYPE、INNER_MULTI_QUEUE_OFFSET、_BORNHOST
Flag
默认值0,完全由应用设置
DelayTimeLevel
默认值0,0表示不延迟;
大于0会延时特定时间(有个延迟级别对应的延迟时间)才被消费
大于0会延时特定时间(有个延迟级别对应的延迟时间)才被消费
原理是利用了properties属性机制
WaitStoreMsgOK
默认true,表示消息是否在服务器落盘后才应答
Tag
是什么
Topic 与 Tag 都是业务上用来归类的标识,区别在于 Topic 是一级分类,而 Tag 可以理解为是二级分类。使用 Tag 可以实现对 Topic 中的消息进行过滤。
如何判断是否需要使用tag
消息类型是否一致
普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分
业务是否有关联
同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分
消息优先级是否一致
同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分
消息量级是否相当
有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic
Keys
是什么
每个消息可以在业务层面的设置唯一标识码 keys 字段,方便将来定位消息丢失问题
原理以及应用场景
Broker 端会为每个消息创建索引(哈希索引),应用可以通过 topic、key 来查询这条消息内容,以及消息被谁消费。
由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。
由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。
队列
是什么
为了支持高并发和水平扩展,需要对 Topic 进行分区,在 RocketMQ 中这被称为队列,
一个 Topic 可能有多个队列,并且可能分布在不同的 Broker 上
一个 Topic 可能有多个队列,并且可能分布在不同的 Broker 上
设计与实现
一般来说一条消息,如果没有重复发送(比如因为服务端没有响应而进行重试),则只会存在在 Topic 的其中一个队列中,消息在队列中按照先进先出的原则存储,每条消息会有自己的位点,每个队列会统计当前消息的总条数,这个称为最大位点 MaxOffset;队列的起始位置对应的位置叫做起始位点 MinOffset。队列可以提升消息发送和消费的并发度。
生产者
是什么
消息的发送者,在不同的场景中,需要使用不同类型的消息进行发送
应用场景
延时消息
在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略,此时就需要用到延迟消息
顺序消息
电商场景中,业务上要求同一订单的消息保持严格顺序,此时就要用到顺序消息
批量消息
在日志处理场景中,可以接受的比较大的发送延迟,但对吞吐量的要求很高,希望每秒能处理百万条日志,此时可以使用批量消息
事务消息
在银行扣款的场景中,要保持上游的扣款操作和下游的短信通知保持一致,此时就要使用事务消息
注意
生产环境中不同消息类型需要使用不同的主题,不要在同一个主题内使用多种消息类型,这样可以避免运维过程中的风险和错误
普通消息发送
3种发送方式
同步
是什么 & 优缺点:收到服务端同步响应之后才发下一条消息,阻塞的
应用场景:重要的通知消息、短消息通知
注意:同步发送方式请务必捕获发送异常,并做业务侧失败兜底逻辑,如果忽略异常则可能会导致消息未成功发送的情况。
异步
是什么 & 优缺点:不等服务端返回响应,接着发送下一条消息,不阻塞
注意:异步发送需要实现异步发送回调接口(SendCallback)。
应用场景:一般用于链路耗时较长,对响应时间较为敏感的业务场景。例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等
单向传输
优缺点:只发一次,没有响应结果,不可靠,但一般在毫秒级发送完。
应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集
顺序消息发送
介绍
是什么
对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费
支持分区顺序消息
同时满足生产顺序性和消费顺序性
RocketMQ 通过生产者和服务端的协议保障 单个生产者 串行 地发送消息,并按序存储和持久化
同一个ShardingKey的消息会被分配到同一个队列中
如何保证生产顺序性
单一生产者
串行发送
应用场景
在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理
例如创建订单的场景,需要保证同一个订单的生成、付款和发货,这三个操作被顺序执行。如果是普通消息,订单A的消息可能会被轮询发送到不同的队列中,不同队列的消息将无法保持顺序,而顺序消息发送时将ShardingKey相同(同一订单号)的消息序路由到一个逻辑队列中。
示例代码
调用方法:
SendResult send(Message msg, MessageQueueSelector selector, Object arg);
核心是第二个入参消息队列选择器和第三个入参分片键(同一个分区键会被分配到同一个队列中)
SendResult send(Message msg, MessageQueueSelector selector, Object arg);
核心是第二个入参消息队列选择器和第三个入参分片键(同一个分区键会被分配到同一个队列中)
队列选择器:
public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
第三个入参是分片键
public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
第三个入参是分片键
整个具体代码:
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
建议:生产环境中建议选择最细粒度的分区键进行拆分,例如,将订单ID、用户ID作为分区键关键字,可实现同一终端用户的消息按照顺序处理,不同用户的消息无需保证顺序
一致性
背景:broker掉线了或者broker新加入了,那么队列数是否发生变化
回答:如果发生变化,同一个分区键的消息会被发到不同的队列上面,造成乱序。如果不变化,那么消息被发送到掉线broker的队列上,
必然是失败的
必然是失败的
rocketmq如何解决的?
2种模式
严格保证顺序性
创建 Topic 是要指定 -o 参数(--order)为true,表示顺序消息
其次要保证NameServer中的配置 orderMessageEnable 和 returnOrderTopicConfigToBroker 必须是 true
可用性
不满足顺序性的情况,一律采用可用性
延迟消息发送
是什么
消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费
使用场景
分布式定时调度触发、任务超时处理等场景
18个延迟等级
投递等级(delay level) 延迟时间 投递等级(delay level) 延迟时间
1 1s 10 6min
2 5s 11 7min
3 10s 12 8min
4 30s 13 9min
5 1min 14 10min
6 2min 15 20min
7 3min 16 30min
8 4min 17 1h
9 5min 18 2h
1 1s 10 6min
2 5s 11 7min
3 10s 12 8min
4 30s 13 9min
5 1min 14 10min
6 2min 15 20min
7 3min 16 30min
8 4min 17 1h
9 5min 18 2h
详情见
批量消息发送
是什么
将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。
注意
这里调用非常简单,将消息打包成 Collection<Message> msgs 传入方法中即可,需要注意的是批量消息的大小不能超过 1MiB(否则需要自行分割),其次同一批 batch 中 topic 必须相同。
事务消息发送
介绍
是什么
在一些对数据一致性有强需求的场景,可以用 Apache RocketMQ 事务消息来解决,从而保证上下游数据的一致性
设计与实现
在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。
二阶段提交全流程如上图所示
代码
使用 TransactionMQProducer 进行发送
实现TransactionListener接口
linstener接口中executeLocalTransaction的3种返回状态
LocalTransactionState.COMMIT_MESSAGE 提交事务
LocalTransactionState.ROLLBACK_MESSAGE 回滚事务
LocalTransactionState.UNKNOW
checkLocalTransaction是由于二次确认消息没有收到,Broker端回查事务状态的方法。回查规则:本地事务执行完成后,若Broker端收到的本地事务返回状态为LocalTransactionState.UNKNOW,或生产者应用退出导致本地事务未提交任何状态。则Broker端会向消息生产者发起事务回查,第一次回查后仍未获取到事务状态,则之后每隔一段时间会再次回查
注意
此外,需要注意的是事务消息的生产组名称 ProducerGroupName不能随意设置。事务消息有回查机制,回查时Broker端如果发现原始生产者已经崩溃,则会联系同一生产者组的其他生产者实例回查本地事务执行情况以Commit或Rollback半事务消息。
消费者
基础概念
消费者与消费组
下游消费能力弱,那么消息就会堆积在后端,除了优化消费者的逻辑,最简单的方式就是扩容消费
两种消费模式
集群消费模式:任意一条消息只需要被消费组内的任意一个消费者处理即可。适用于每条消息只需要被处理一次的场景
广播消费模式:RocketMQ将每条消息推送给消费组所有的消费者,保证消息至少被每个消费者消费一次。适用于每条消息需要被消费组的每个消费者处理的场景
负载均衡
背景
集群模式下,同一个消费组内的消费者会分担收到的全量消息,这里的分配策略是怎样的?如果扩容消费者是否一定能提升消费能力?
提供了多种集群模式下的分配策略
平均分配策略(默认)
不是一味地增加消费者就能提升消费能力的,比如下图中Topic的总队列数小于消费者的数量时,
消费者将分配不到队列,即使消费者再多也无法提升消费能力。
平均分配策略下消费组内的消费者会按照类似分页的策略均摊消费。
在平均分配的算法下,可以通过增加消费者的数量来提高消费的并行度。比如下图中,通过增加消费者来提高消费能力
在平均分配的算法下,可以通过增加消费者的数量来提高消费的并行度。比如下图中,通过增加消费者来提高消费能力
不是一味地增加消费者就能提升消费能力的,比如下图中Topic的总队列数小于消费者的数量时,
消费者将分配不到队列,即使消费者再多也无法提升消费能力。
机房优先分配策略
一致性hash分配策略
消费位点
如上图所示,每个队列中有最小消费位点、最大消费位点、某一消费组消费位点
在集群模式下,消费位点是由客户端提交给服务端保存的,在广播模式下,消费位点是由客户端自己保存的
重平衡
如果消费者发生崩溃或有新的消费者加入群组,就会触发重平衡
重平衡完成后,每个消费者可能会分配到新的队列,而不是之前处理的队列
为了能继续之前的工作,消费者需要读取每个队列最后一次的提交的消费位点,然后从消费位点处继续拉取消息
但在实际执行过程中,由于客户端提交给服务端的消费位点并不是实时的,所以重平衡就可能会导致消息少量重复
推拉模式
Push:Broker推给Consumer
优点:实时性高
缺点:但如果Consumer没有做好流控,Broker推送大量消息给Consumer时,就会导致Consumer消息堆积甚至崩溃
Pull:Consumer主动从Broker取消息
优点:Consumer能根据自己的能力消费
缺点:拉取频率需要自己控制,太频繁会给Broker、Consumer都造成压力;间隔太久又容易造成消费不及时
Push消费者
如何使用:需要注册回调接口编写消费逻辑来处理从Broker中收到的消息
集群和广播模式
PushConsumer默认为集群模式
consumer.setMessageModel(MessageModel.CLUSTERING); //设置集群消费
consumer.setMessageModel(MessageModel.BROADCASTING); // 设置广播消费
并发消费和顺序消费
背景:在并发消费中,可能会有多个线程同时消费一个队列的消息,
因此即使发送端通过发送顺序消息保证消息在同一个队列中按照FIFO的顺序,也无法保证消息实际被顺序消费。
因此即使发送端通过发送顺序消息保证消息在同一个队列中按照FIFO的顺序,也无法保证消息实际被顺序消费。
解决:提供顺序消费
顺序消费设置与并发消费API层面只有一处不同,在注册消费回调接口时传入MessageListenerOrderly接口的实现
消息过滤
tag过滤
是什么:用于对某个Topic下的消息进行分类,消费者需根据已经指定的Tag来进行订阅
tag表达式
Topic下所有的消息,Tag用星号(*)
阅了订单和支付两个Tag的消息,在多个Tag之间用两个竖线(||)分隔
注意:同一个消费者多次订阅某个Topic下的Tag,以最后一次订阅为准
SQL92过滤
是什么:在消息发送时设置消息的Tag或自定义属性,消费者订阅时使用SQL语法设置过滤表达式,根据自定义属性或Tag过滤消息。
注意:Tag属于一种特殊的消息属性,在SQL语法中,Tag的属性值为TAGS。 开启属性过滤首先要在Broker端设置配置enablePropertyFilter=true,该值默认为false
例子:
消息重试和私信队列
是什么
若Consumer消费某条消息失败,则RocketMQ会在重试间隔时间后,将消息重新投递给Consumer消费,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列
消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息
区别
顺序消费消费失败后会先在客户端本地重试直到最大重试次数,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序
并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。
死信队列
是什么:消息达到最大重试次数,若仍然消费失败,则该消息被放入死信队列中。死信队列是死信Topic下分区数唯一的单独队列
死信Topic名称为%DLQ%ConsumerGroupName
死信队列的消息将不会再被消费
Pull消费者
Pull Consumer:不提供相关的订阅方法,需要调用pull方法时指定队列进行拉取,并需要自己更新位点
Lite Pull Consumer
Subscribe方式
拉取消息调用的是轮询poll接口,默认自动提交位点
同一个消费组下的多个LitePullConsumer会负载均衡消费
Assign方式
没有自动的负载均衡机制
自由主题
0 条评论
下一页