消息队列与RocketMQ总结
2024-08-30 22:37:34 0 举报
AI智能生成
消息队列与RocketMQ总结,基于官方文档 5.0 进行整理
作者其他创作
大纲/内容
消息队列基础
基础
概述
消息模型
点对点模型
消息只能被一个消费者消费一次
发布/订阅模型
多个消费者可以重复消费消息
使用场景
异步
提高响应速度、吞吐量
解耦
减少服务之间的依赖,提供系统稳定性和可拓展性
削峰
可以应对突发的流量冲击
使用消息队列的问题
系统可用性降低、复杂性提高、可能有一致性问题
可靠性
生产者可靠性
消费者可靠性
JMS与AMQP
常见消息队列
RabbitMQ
Kafka
RocketMQ
RabbitMQ
基础
RabbitMQ架构
Producer与Consumer
Exchange
交换器
消息需要发送给交换器,由交换器将消息路由转发到不同的消息队列中
Queue
消息队列
用于存储消息
Broker
Kafka
概述
应用场景
消息队列
数据处理
日志收集
数据处理
日志收集
Kafka架构模型
Producer
生产者
Consumer
消费者
Consumer Group
消费者组
一个分区只能由消费者组消费,消费者组之间互不影响
Broker
相当于 Kafka 服务器实例
Topic
主题
Partition
分区
可以理解为消息队列
一个 Topic 有多个 Partition,不同 Partition 可以分布到不同的 Broker 上
Replica
副本数量
生产者
发消息流程
Kafka 每次发送数据都是向 Leader 分区发送数据, 并顺序写入到磁盘, 然后 Leader 分区会将数据同步到各个 Follower 分区
发消息模型
消费者
消费者与消费者组
多个消费者可以组成一个消费组, 同一个消费组者的消费者可以消费同一个 topic 下不同分区的数据, 但一个分区的消息只会被一个消费组内的一个消费者消费, 防止出现重复消费的问题。建议消费者组的消费者数量与分区数量保持一致
一个 Topic 两个消费者组的例子
分区重平衡
新的消费者加入消费者组,或者有消费者宕机,就会造成分区的重新分配,这个就是重平衡
重平衡期间消费者都不能消费消息
Kafka消费模式
Push 模式
Broker 给消费者推消息
可能造成消费者来不及处理消息
Pull 模式
消费者主动从 Broker 拉取消息
更适合 Kafka,按照自己的节奏消费消息
消费位置
每个消费者组都会维护一个已经消费到消息的 offset
分区从不删除消息,除非消息到期
消息消费顺序
Kafka 只会保证在同一个分区内消息是有序的, 而不能保证 Topic(主题) 中所有 Partition(分区) 的全局有序性。只能通过参数让消息发送到指定的 Partition 中
offset的维护
group + topic + partition 才能确定一个 offset
offset 会存储在 Kafka 本地的一个 Topic 中
常见消费问题解决
保证消息不丢失/可靠性
生产者丢失消息
添加发送失败的回调方法,并在回调后重试
消费者丢失消息
关闭自动提交 offset 机制,消费完成后自己提交 offset
Kafka自身丢失消息
设置好数据同步的参数
保证消息不被重复消费
重复消费问题
保证幂等性
可靠性
概述
多副本机制
副本数据同步策略
Kafka与Zookeeper
使用 ZK 提供元数据管理功能
Broker 注册
Topic 注册
负载均衡
Kafka与存储实现
Kafka与文件系统
任何发布到 Partition 的消息都会被追加到 Partition 数据文件的尾部, 这样的顺序写磁盘操作让 Kafka 的效率非常高
底层存储细节
分为多个文件(Segment)顺序存储消息
Kafka高效读写数据
顺序写磁盘
追加到文件末尾的顺序写更加高效
零拷贝技术
消息可能被多个消费者组消费,采用零拷贝将消息从磁盘拷贝到页面缓冲区,然后直接发送到网络中
Kafka事务
生产者事务
消费者事务
基本使用
生产者API
信息发送流程
异步发送API
不带回调函数的异步
带回调函数的异步
同步发送API
同步发送
消费者API
自动提交offset
手动提交offset
同步提交commitSync offset
异步提交commitAsync offset
数据漏消费与重复消费分析
自定义存储offset
RocketMQ
基础
基本概念与名词定义
领域模型
领域模型概述
RocketMQ领域模型
分为三部分:消息生成、消息存储、消息消费
通信方式
同步RPC调用模型
异步通信模型
消息传输模型
点对点模型
发布订阅模型
RocketMQ就是此模型
RocketMQ部署架构
基本架构
集群架构
跟基本架构差不多,只不过各个组件基本都变成了分布式部署
Broker模型
基础
Broker 就是消息服务器
Broker角色
Broker 角色分为 ASYNC_MASTER(异步主机), SYNC_MASTER(同步主机)以及 SLAVE(从机)
Broker参数配置
最佳实践
Broker崩溃以后有什么影响?
master结点崩溃
消息不能再发送到该 Broker 集群, 但是如果有另一个可用的 Broker 集群, 那么在主题存在的条件下仍然可以发送消息
一些salve结点崩溃
只要有另一个工作的 Slave, 就不会影响发送消息
所有slave结点崩溃
向 Master 发送消息不会有任何影响, 但如果 Master是 SYNC_MASTER, Producer会得到一个 SLAVE_NOT_AVAILABLE, 表示消息没有发送给任何 Slave
NameServer
RocketMQ 自研的注册中心,而非使用 ZK。它是去中心化的,没有主节点,彼此之间不会信息同步。单个 Broker 会和所有 NameServer 保持长连接
主题
定义与模型关系
定义
用于标识同一类业务逻辑的消息
将不同业务类型的数据拆分到不同的主题中管理, 通过主题实现存储的隔离性和订阅隔离性
主题内部由多个队列组成, 消息的存储和水平扩展能力最终是由队列实现的; 并且针对主题的所有约束和属性设置, 最终也是通过主题内部的队列来实现.
内部属性
主题名称
队列列表
队列作为主题的组成单元, 是消息存储的实际容器, 一个主题内包含一个或多个队列, 消息实际存储在主题的各队列内
消息类型
每个主题只允许发送一种类型的消息。主题支持的消息类型:普通消息、顺序消息、延时消息、事务消息
行为约束
使用示例
使用建议
按照业务分类合理拆分主题
将相同业务域内同一功能属性的消息划分为同一主题
拆分方式
信息类型是否一致
消息业务是否关联
消息量级是否一样
单一主题只收发一种类型消息, 避免混用
通过主题隔离业务, 不同业务逻辑的消息建议使用不同的主题
队列
定义与模型关系
队列是 RocketMQ 中消息存储和传输的实际容器, 也是 RocketMQ 消息的最小存储单元. RocketMQ 的所有主题都是由多个队列组成, 以此实现队列数量的水平拆分和队列内部的流式存储
队列的作用
存储顺序性、流式操作语义
RocketMQ 的队列与 Kafka 的 partition 模型类似
RocketMQ 确保所有消息至少传递一次,因为还可以重置消费位点进行重复消费。RocketMQ 中通过修改队列数量, 以此实现横向的水平扩容和缩容
使用建议
按照实际业务消耗设置队列数
遵循少用够用原则
常见需要队列增加场景
需要增加队列实现物理节点负载均衡
消息
定义与模型关系
定义
消息是 RocketMQ 中的最小数据传输单元. 生产者将业务数据的负载和拓展属性包装成消息发送到 RocketMQ 服务端, 服务端按照相关语义将消息投递到消费端进行消费
消息的特点
消息不可变性
一旦产生后, 消息的内容不会发生改变
消息持久化
将接收到的消息存储到 RocketMQ 服务端的存储文件中, 保证消息的可回溯性和系统故障场景下的可恢复性
内部属性
消息内部包含:主题名称、消息类型、消息队列、消息位点、消息ID、过滤标签 Tag、定时 时间、消费重试次数、消息负载等属性
使用建议
单条消息不建议传输超大负载
大负载按照固定大小做报文拆分, 或者结合文件存储等方法进行传输
消息中转时做好不可变设计
生产者
定义与模型关系
定义
生产者是 RocketMQ 系统中用来构建并传输消息到服务端的运行实体
生产者与主题关系为多对多
内部属性
生产者内部包含:客户端ID、通信参数、发送重试策略等属性
使用建议
不建议单一进程创建大量生产者
生产者和主题是多对多的关系, 支持同一个生产者向多个主题发送消息;如果有需要发送消息到多个主题的场景, 无需为每个主题都创建一个生产者
不建议频繁创建和销毁生产者
消费者组
定义与模型关系
定义
消费者分组是 RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾
消费行为
订阅关系
投递顺序性
消费重试策略
内部属性
消费者组内部包含:订阅关系、投递顺序性、消费重试策略(最大重试次数、重试间隔)等参数
行为约束
使用建议
按照业务合理拆分分组
消费者投递顺序一致
消费者业务类型一致
消费者
定义与模型关系
定义
消费者是 RocketMQ 中用来接收并处理消息的运行实体
内部属性
行为约束
使用建议
不建议在单一进程内创建大量消费者
大部分场景下, 单一进程内同一个消费分组只需要初始化唯一的一个消费者即可
不建议频繁创建和销毁消费者
订阅关系
定义与模型关系
定义
订阅关系是 RocketMQ 系统中消费者获取消息, 处理消息的规则和状态配置
订阅关系由消费者分组动态注册到服务端系统, 并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护
订阅关系可以控制如下传输行为
消息过滤规则
消费状态
订阅关系判断原则
一个订阅关系指的是指定某个消费者分组对于某个主题的订阅
判断原则
不同消费者分组对于同一个主题的订阅相互独立
同一个消费者分组对于不同主题的订阅也相互独立
内部属性
内部包含属性有:过滤类型、过滤表达式
行为约束
同一消费者分组内的消费者在消费逻辑上必须保持一致, 否则会出现消费冲突, 导致部分消息消费异常
使用建议
建议不要频繁修改订阅关系
订阅关系关联了过滤规则, 消费进度等元数据和相关配置, 同时系统需要保证消费者分组下的所有消费者的消费行为, 消费逻辑, 负载策略等一致, 整体运算逻辑比较复杂
订阅关系一致
订阅关系一致是指, 同一个消费者组下所有消费者实例所订阅的 Topic, Tag 必须完全一致
在服务端视角看来一个 Group 下的所有 Consumer 都应该是相同的副本逻辑
正确订阅关系示例
订阅关系不一致的排查
常见订阅关系不一致问题
同一ConsumerGroup下的Consumer实例订阅的Topic不同(3.x, 4.x SDK适用)
同一ConsumerGroup下的Consumer实例订阅的Topic相同,但订阅的Tag不一致
功能特性
普通消息
定义
普通消息一般应用于微服务解耦, 事件驱动, 数据集成等场景, 这些场景大多数要求数据传输通道具有可靠传输的能力, 且对消息的处理时机, 处理顺序没有特别要求
功能原理
普通消息生命周期
初始化
消息被生产者构建并完成初始化, 待发送到服务端的状态
待消费
消息被发送到服务端, 对消费者可见, 等待消费者消费的状态
消费中
消息被消费者获取, 并按照消费者本地的业务逻辑进行处理的过程. 此时服务端会等待消费者完成消费并提交消费结果, 如果一定时间后没有收到消费者的响应, RocketMQ 会对消息进行重试处理
消费提交
消费者完成消费处理, 并向服务端提交消费结果, 服务端标记当前消息已经被处理(包括消费成功和失败). RocketMQ 默认支持保留所有消息, 此时消息数据并不会立即被删除, 只是逻辑标记已消费. 消息在保存时间到期或存储空间不足被删除前, 消费者仍然可以回溯消息重新消费
消息删除
RocketMQ 按照消息保存机制滚动清理最早的消息数据, 将消息从物理文件中删除
使用建议
普通消息支持设置消息索引键, 消息过滤标签等信息, 用于消息过滤和搜索查找
设置全局唯一业务索引键, 方便问题追踪
应用场景
微服务异步解耦
数据集成传输
离线日志收集
定时延时消息
定义
服务端根据消息设置的定时时间在某一固定时刻将消息投递给消费者消费
功能原理
定时消息精度为秒级
若存储系统异常重启, 可能会导致定时消息投递出现一定延迟.
生命周期
相比普通消息,多了一个【定时中】状态
和普通消息不同的是, 服务端不会直接构建消息索引, 而是会将定时消息单独存储在定时存储系统中, 等待定时时刻到达
定时时刻到达后, 服务端将消息重新写入普通存储引擎, 此时才变化为【待消费】状态
使用建议
避免创建大量相同定时时刻的消息
会造成系统压力过大, 导致消息分发延迟, 影响定时精度
应用场景
分布式定时调度
任务超时处理
比如订单超时未支付进行取消的场景
顺序消息
基础
定义
支持消费者按照发送消息的先后顺序获取消息
强调多条消息间的先后顺序关系
特点
顺序消息的顺序关系通过消息组(MessageGroup)判定和识别, 发送顺序消息时需要为每条消息设置归属的消息组, 相同消息组的多条消息之间遵循先进先出的顺序关系, 不同消息组, 无消息组的消息之间不涉及顺序性
功能原理
生产顺序性
通过生产者和服务端的协议保障单个生产者串行地发送消息, 并按序存储和持久化
可以使用 hash 法将同一个业务 ID 的消息发送到同一个队列中
顺序消息发送必须要设置消息组
服务端存储逻辑
相同消息组的消息按照先后顺序被存储在同一个队列.
不同消息组的消息可以混合在同一个队列中, 且不保证连续.
消费顺序性
按照投递的顺序进行消费
顺序消息投递仅在重试次数限定范围内, 即一条消息如果一直重试失败, 超过最大重试次数后将不再重试, 跳过这条消息消费, 不会一直阻塞后续消息处理。顺序消息消费失败进行消费重试时, 为保障消息的顺序性, 后续消息不可被消费, 必须等待前面的消息消费完成后才能被处理
使用建议
建议串行消费, 避免批量消费导致乱序
消息组尽可能打散, 避免集中导致热点
一般建议的消息组设计会采用订单 ID, 用户 ID 作为顺序参考, 即同一个终端用户的消息保证顺序, 不同用户的消息无需保证顺序
建议将业务以消息组粒度进行拆分
应用场景
在有序事件处理, 撮合交易, 数据实时增量同步等场景下, 异构系统间需要维持强一致的状态同步, 上游的事件变更需要按照顺序传递到下游进行处理的场景
股票的撮合交易:先出价的先交易
数据实时增量同步
上游源端数据库按需执行增删改操作, 将二进制操作日志作为消息
事务消息
功能原理
用于保障消息生产和本地事务的最终一致性
处理流程
1. 生产者将消息发送至 RocketMQ 服务端.
2. RocketMQ 服务端将消息持久化成功之后, 向生产者返回 Ack 确认消息已经发送成功, 此时消息被标记为 "暂不能投递", 这种状态下的消息即为半事务消息.
3. 生产者开始执行本地事务逻辑.
4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit 或是 Rollback), 服务端收到确认结果后处理逻辑如下:
二次确认结果为 Commit: 服务端将半事务消息标记为可投递, 并投递给消费者.
二次确认结果为 Rollback: 服务端将回滚事务, 不会将半事务消息投递给消费者.
5. 在断网或者是生产者应用重启的特殊情况下, 若服务端未收到发送者提交的二次确认结果, 或服务端收到的二次确认结果为 Unknown 未知状态, 经过固定时间后, 服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查. 服务端回查的间隔时间默认值: 60秒.
6. 生产者收到消息回查后, 需要检查对应消息的本地事务执行的最终结果.
7. 生产者根据检查到的本地事务的最终状态再次提交二次确认, 服务端仍按照步骤 4 对半事务消息进行处理.
2. RocketMQ 服务端将消息持久化成功之后, 向生产者返回 Ack 确认消息已经发送成功, 此时消息被标记为 "暂不能投递", 这种状态下的消息即为半事务消息.
3. 生产者开始执行本地事务逻辑.
4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit 或是 Rollback), 服务端收到确认结果后处理逻辑如下:
二次确认结果为 Commit: 服务端将半事务消息标记为可投递, 并投递给消费者.
二次确认结果为 Rollback: 服务端将回滚事务, 不会将半事务消息投递给消费者.
5. 在断网或者是生产者应用重启的特殊情况下, 若服务端未收到发送者提交的二次确认结果, 或服务端收到的二次确认结果为 Unknown 未知状态, 经过固定时间后, 服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查. 服务端回查的间隔时间默认值: 60秒.
6. 生产者收到消息回查后, 需要检查对应消息的本地事务执行的最终结果.
7. 生产者根据检查到的本地事务的最终状态再次提交二次确认, 服务端仍按照步骤 4 对半事务消息进行处理.
事务生命周期
相比普通消息,多了事务待提交、消息回滚、提交待消费的状态
初始化
生产者发送的是半消息
事务待提交
并不会直接被服务端持久化, 而是会被单独存储到事务存储系统中, 等待第二阶段本地事务返回执行结果后再提交. 此时消息对下游消费者不可见
消息回滚
如果事务执行结果明确为回滚, 服务端会将半事务消息回滚, 该事务消息流程终止
提交待消费
如果事务执行结果明确为提交, 服务端会将半事务消息重新存储到普通存储系统中, 此时消息对下游消费者可见, 等待被消费者获取并消费
使用限制
消费事务性
事务消息保证本地主分支事务和下游消息发送事务的一致性, 但不保证消息消费结果和上游事务的一致性
中间状态可见性
RocketMQ 事务消息为最终一致性, 即在消息提交到下游消费端处理完成之前, 下游分支和上游事务之间的状态会不一致. 因此, 事务消息仅适合接受异步执行的事务场景
使用示例
发送事务消息前, 需要开启事务并关联本地的事务执行
为保证事务一致性, 在构建生产者时, 必须设置事务检查器和预绑定事务消息发送的主题列表, 客户端内置的事务检查器会对绑定的事务主题做异常状态恢复
使用建议
避免大量未决事务导致超时
RocketMQ 支持在事务提交阶段异常的情况下发起事务回查, 保证事务一致性. 但生产者应该尽量避免本地事务返回未知结果. 大量的事务检查会导致系统性能受损, 容易导致事务处理延迟
需要正确处理"进行中"的事务
消息回查时, 对于正在进行中的事务不要返回 Rollback 或 Commit 结果, 应继续保持 Unknown 的状态
应用场景
分布式事务的诉求
如何保证核心业务和多个下游业务的执行结果完全一致, 是分布式事务需要解决的主要问题
基于 RocketMQ 分布式事务消息: 支持最终一致性
基于 RocketMQ 实现的分布式事务消息功能, 在普通消息基础上, 支持二阶段的提交能力. 将二阶段提交和本地事务绑定, 实现全局提交结果的一致性
消息发送重试和流控机制
消息发送重试机制
消息发送重试机制主要解答如下问题
部分节点异常是否影响消息发送?
请求重试是否会阻塞业务调用?
请求重试会带来什么不足?
请求重试是否会阻塞业务调用?
请求重试会带来什么不足?
重试基本概念
定义
RocketMQ 客户端连接服务端发起消息发送请求时, 可能会因为网络故障, 服务异常等原因导致调用失败. 为保证消息的可靠性, RocketMQ 在客户端 SDK 中内置请求重试逻辑, 尝试通过重试发送达到最终调用成功的效果
重试流程
生产者初始化最大重试次数,后续发送消息失败会一直重试,直到最大次数
重试间隔
除服务端返回系统流控错误场景, 其他触发条件触发重试后, 均会立即进行重试, 无等待间隔
若由于服务端返回流控错误触发重试, 系统会按照指数退避策略进行延迟重试
可以配置退避策略
功能约束
需要合理评估每次调用请求的超时时间以及最大重试次数, 避免影响全链路的耗时
RocketMQ 客户端内置的发送请求重试机制并不能保证消息发送一定成功,业务方需要做好兜底
消息流控机制
流控机制主要解答如下问题
系统在什么情况下会触发流控?
触发流控时客户端行为是什么?
应该如何避免触发流控, 以及如何应对突发流控?
触发流控时客户端行为是什么?
应该如何避免触发流控, 以及如何应对突发流控?
定义
消息流控指的是系统容量或水位过高, RocketMQ 服务端会通过快速失败返回流控错误来避免底层资源承受过高压力.
触发条件
存储压力大
业务上新等需要回溯到指定时刻前开始消费, 此时队列的存储压力会瞬间飙升, 触发消息流控
服务端请求任务排队溢出
消费者消费能力不足,大量消息堆积,超过一定数量触发流控
流控行为
系统触发消息发送流控时, 客户端会收到系统限流错误和异常
reply-code: 530 reply-text: TOO_MANY_REQUESTS
客户端收到系统流控错误码后, 会根据指数退避策略进行消息发送重试.
处理建议
通过监控避免触发流控
消费者分类
基础
RocketMQ 支持 PushConsumer, SimpleConsumer 以及 PullConsumer
功能概述
消费者处理消息时主要经过以下阶段: 消息获取->消息处理->消费状态提交。不同类型消费者处理方式有所不同
PullConsumer 仅推荐在流处理框架中集成使用, 大多数消息收发场景使用 PushConsumer 和 SimpleConsumer 就可以满足需求
PushConsumer
PushConsumers 是一种高度封装的消费者类型, 消费消息仅通过消费监听器处理业务并返回消费结果。PushConsumer 的使用方式比较固定, 在消费者初始化时注册一个消费监听器, 并在消费监听器内部实现消息处理逻辑. 由 RocketMQ 的 SDK 在后台完成消息获取, 触发监听器调用以及进行消息重试处理
内部原理
基于 SDK 内部的典型 Reactor 线程模型实现
SDK 内置了一个长轮询线程, 先将消息异步拉取(pull)到 SDK 内置的缓存队列中, 再分别提交到消费线程中, 触发监听器执行本地消费逻辑
使用方式
所有消息必须以同步方式进行消费处理, 并在监听器接口结束时返回调用结果, 不允许再做异步化分发
适用场景
PushConsumer 严格限制了消息同步处理及每条消息的处理超时时间
消息处理时间可预估场景
无异步化, 高级定制场景
SimpleConsumer
基础
消息的获取, 消费状态提交以及消费重试都是通过消费者业务逻辑主动发起调用完成。由业务逻辑按需调用接口获取消息, 然后分发给业务线程处理消息, 最后按照处理的结果调用提交接口, 返回服务端当前消息的处理结果
适用场景
SimpleConsumer 提供原子接口, 用于消息获取和提交消费结果, 相对于 PushConsumer 方式更加灵活
消息处理时长不可控场景
需要异步化, 批量消费等高级定制场景
需要自定义消费速率场景
PullConsumer
使用建议
PushConsumer合理控制消费耗时,避免无限阻塞
消息过滤
定义
若消费者只需要关注部分消息, 可通过设置过滤条件在 RocketMQ 服务端进行过滤, 只获取到需要关注的消息子集, 避免接收到大量无效的消息
过滤的含义指的是 RocketMQ 服务端将符合条件的消息投递给消费者, 而不是消费者将匹配到的消息过滤掉
应用场景
多个不同的下游业务方处理, 各下游的处理逻辑不同, 只关注自身逻辑需要的消息子集
比如交易系统会产生订单、支付、物流相关的消息。不同的业务线可以对消息进行过滤,只消费自己感兴趣的消息
功能概述
消息过滤原理
生产者
生产者在初始化消息时预先为消息设置一些属性和标签, 用于后续消费时指定过滤目标
消费者
消费者在初始化及后续消费流程中通过调用订阅关系注册接口, 向服务端上报需要订阅指定主题的哪些消息, 即过滤条件
服务端
消费者获取消息时会触发服务端的动态过滤计算, RocketMQ 服务端根据消费者上报的过滤条件的表达式进行匹配, 并将符合条件的消息投递给消费者
支持 Tag 标签过滤和 SQL 属性过滤两种过滤方式
Tag标签过滤
生产者在发送消息时, 设置消息的 Tag 标签, 消费者需指定已有的 Tag 标签来进行匹配订阅
每条消息允许设置一个 Tag 标签
Tag 标签过滤为精准字符串匹配
SQL属性过滤
生产者为消息设置的属性(Key)及属性值(Value)进行匹配
生产者发送消息时可以自定义消息属性, 每个属性都是一个自定义的键值对(Key-Value). 每条消息支持设置多个属性
由于 SQL 属性过滤是生产者定义消息属性, 消费者设置 SQL 过滤条件, 因此过滤条件的计算结果具有不确定性
使用建议
合理划分主题和Tag标签
业务消息的拆分可以基于主题进行筛选, 也可以基于主题内消息的 Tag 标签及属性进行筛选
消费者负载均衡
基础
消费者获取消息消费时, 通过消费者负载均衡策略, 可将主题内的消息分配给指定消费者分组中的多个消费者共同分担, 提高消费并发能力和消费者的水平扩展能力
可以解决的问题
消息消费处理的容灾策略
消息消费的顺序性机制
消息分配的水平拆分策略
广播消费和共享消费
消费组间广播消费
每个消费者分组只初始化唯一一个消费者, 每个消费者可消费到消费者分组内所有的消息, 各消费者分组都订阅相同的消息, 以此实现单客户端级别的广播一对多推送效果. 该方式一般可用于网关推送, 配置推送等场景
消费组内共享消费
每个消费者分组下初始化了多个消费者, 这些消费者共同分担消费者分组内的所有消息, 实现消费者分组内流量的水平拆分和均衡负载. 该方式一般可用于微服务解耦场景
什么事消费者负载均衡
广播消费场景下,消费者组中仅一个消费者,不涉及负载均衡
消费组内共享消费场景下, 消费者分组内多个消费者共同分担消息, 消息按照哪种逻辑分配给哪个消费者, 就是由消费者负载均衡策略所决定的
分类
消息粒度负载均衡: PushConsumer 和 SimpleConsumer 默认负载策略
队列粒度负载均衡: PullConsumer 默认负载策略
消息粒度负载均衡
消息粒度的负载均衡策略从 RocketMQ 服务端 5.0 版本开始支持
之前的版本都是队列粒度的负载均衡
使用范围:对于 PushConsumer 和 SimpleConsumer 类型的消费者, 默认且仅使用消息粒度负载均衡策略
基本原理:同一消费者分组内的多个消费者将按照消息粒度平均分摊主题中的所有消息, 即同一个队列中的消息, 可被平均分配给多个消费者共同消费。这里分配算法的结果是随机的
消费者获取某条消息后, 服务端会将该消息加锁, 保证这条消息对其他消费者不可见, 直到该消息消费成功或消费超时. 因此, 即使多个消费者同时消费同一队列的消息, 服务端也可保证消息不会被多个消费者重复消费
顺序消息场景下, 消息粒度负载均衡策略还需要保证同一消息组内的消息, 按照服务端存储的先后顺序进行消费. 不同消费者处理同一个消息组内的消息时, 会严格按照先后顺序锁定消息状态, 确保同一消息组的消息串行消费
策略特点(为什么变成5.0的默认负载均衡粒度?)
消费分摊更均衡
对于传统队列级的负载均衡策略, 如果队列数量和消费者数量不均衡, 则可能会出现部分消费者空闲, 或部分消费者处理过多消息的情况. 消息粒度负载均衡策略无需关注消费者和队列的相对数量, 能够更均匀地分摊消息
对非对等消费者更友好
生产环境中, 由于网络机房分区延迟, 消费者物理资源规格不一致等原因, 消费者的处理能力可能会不一致, 如果按照队列分配消息, 则可能出现部分消费者消息堆积, 部分消费者空闲的情况. 消息粒度负载均衡策略按需分配, 消费者处理任务更均衡
队列分配运维更方便
传统基于绑定队列的负载均衡策略必须保证队列数量大于等于消费者数量, 以免产生部分消费者获取不到队列出现空转的情况, 而消息粒度负载均衡策略则无需关注队列数
适用场景
同一队列内的消息离散地分布于多个消费者, 适用于绝大多数在线事件处理的场景
队列粒度负载均衡
使用范围:5.0 之前历史版本的消费者,默认都是队列粒度负载均衡
策略原理
同一消费者分组内的多个消费者将按照队列粒度消费消息, 即每个队列仅被一个消费者消费
这里三个队列被分给两个消费者,所以有一个消费者被分配了两个队列的消息。如果队列数量小于消费者数量,则有的消费者无法绑定队列
策略特点
队列粒度负载均衡策略分配粒度较大, 不够灵活. 但该策略在流式处理场景下有天然优势, 能够保证同一队列的消息被相同的消费者处理, 对于批量处理, 聚合处理更友好
适用场景
适用于流式计算, 数据聚合等需要明确对消息进行聚合, 批处理的场景
使用建议
针对消费逻辑做消息幂等
无论是消息粒度负载均衡策略还是队列粒度负载均衡策略, 在消费者上线或下线, 服务端扩缩容等场景下, 都会触发短暂的重新负载均衡动作. 此时可能会存在短暂的负载不一致情况, 出现少量消息重复的现象. 因此, 需要在下游消费逻辑中做好消息幂等去重处理
消费进度管理
基础
RocketMQ 的消费进度管理机制可以解答以下问题
消费者启动后从哪里开始消费消息?
消费者每次消费成功后如何标记消息状态, 确保下次不会再重复处理该消息?
某消息被指定消费者消费过一次后, 如果业务出现异常需要做故障恢复, 该消息能否被重新消费?
消费者每次消费成功后如何标记消息状态, 确保下次不会再重复处理该消息?
某消息被指定消费者消费过一次后, 如果业务出现异常需要做故障恢复, 该消息能否被重新消费?
消费进度原理
消息位点(Offset)
消息是按到达服务端的先后顺序存储在指定主题的多个队列中, 每条消息在队列中都有一个唯一的 Long 类型坐标, 这个坐标被定义为消息位点
任意一个消息队列在逻辑上都是无限存储, 即消息位点会从 0 到 Long.MAX 无限增加。RocketMQ 定义队列中最早一条消息的位点为最小消息位点(MinOffset); 最新一条消息的位点为最大消息位点(MaxOffset). 虽然消息队列逻辑上是无限存储, 但由于服务端物理节点的存储空间有限, RocketMQ 会滚动删除队列中存储最早的消息. 因此, 消息的最小消费位点和最大消费位点会一直递增变化
消费位点(ConsumerOffset)
每个主题的队列都可以被多个消费者分组订阅. 若某条消息被某个消费者消费后直接被删除, 则其他订阅了该主题的消费者将无法消费该消息
RocketMQ 通过消费位点管理消息的消费进度. 每条消息被某个消费者消费完成后不会立即在队列中删除, RocketMQ 会基于每个消费者分组维护一份消费记录, 该记录指定消费者分组消费某一个队列时, 消费过的最新一条消息的位点, 即消费位点
当消费者客户端离线, 又再次重新上线时, 会严格按照服务端保存的消费进度继续处理消息. 如果服务端保存的历史位点信息已过期被删除, 此时消费位点向前移动至服务端存储的最小位点
每个消费组在每个队列上维护一个消费位置 offset, 因为在发布订阅模式中一般会涉及到多个消费者组, 而每个消费者组在每个队列中的消费位置都是不同的
消费位点初始值
消费位点初始值指的是消费者分组首次启动消费者消费消息时, 服务端保存的消费位点的初始值
RocketMQ 定义消费位点的初始值为消费者首次获取消息时, 该时刻队列中的最大消息位点. 相当于消费者将从队列中最新的消息开始消费
新创建的ConsumerGroup从哪里开始消费消息?
在首次上线时会从服务器中的最新消息开始消费, 也就是从队列的尾部开始消费; 再次重新启动后, 会从最后一次的消费位置继续消费
重置消费位点(回溯消费)
定义
回溯消费是指已经消费成功的消息, 由于业务上需求需要重新消费
若消费者分组的初始消费位点或当前消费位点不符合业务预期, 可以通过重置消费位点调整消费进度
支持按照时间回溯消费, 时间维度精确到毫秒
适用场景
初始消费位点不符合需求
消费堆积快速清理
下游来不及消费时,可以重置消费点位到最新的位置,就绕过此部分堆积的消息
业务回溯,纠正处理
业务执行异常,重置消费点位后重新消费消息,以纠正异常的业务
使用限制
对于回溯重置类场景, 重置后的历史消息大多属于存储冷数据, 可能会造成系统压力上升, 一般称为冷读现象
需要谨慎操作
只能重置对消费者可见的消息, 不能重置定时中, 重试等待中的消息
消息堆积问题
产生堆积的原因
生产者生产太快或消费者消费太慢
生产太快
限流降级
消费过慢
看看是否是消费者出现大量消费错误,是否遇到线程卡死、资源死锁等
解决方法:增加消费者
使用建议
重置消费位点会给系统带来额外处理压力, 可能会影响新消息的读写性能. 因此该操作请在适用场景下谨慎执行, 并提前做好合理性和必要性评估.
消息重试
基础
消费者出现异常, 消费某条消息失败时, RocketMQ 会根据消费重试策略重新投递该消息进行故障恢复
消费重试主要解决的是业务处理逻辑失败导致的消费完整性问题, 是一种为业务兜底的策略, 不应该被用做业务流程控制
推荐场景
业务处理失败, 且失败原因跟当前的消息内容相关
消费失败的原因不会导致连续性, 即当前消息消费失败是一个小概率事件, 不是常态化的失败, 后面的消息大概率会消费成功
可以解决的问题
如何保证业务完整处理消息
系统异常时处理中的消息状态如何恢复
消费重试策略概述
消费者在消费某条消息失败后, RocketMQ 服务端会根据重试策略重新消费该消息, 超过一次定数后若还未消费成功, 则该消息将不再继续重试, 直接被发送到死信队列中
消息重试触发条件
消费失败
消息处理超时
PushConsumer消费重试策略
几种状态
已就绪状态
服务端已就绪, 可以被消费者消费
处理中状态
消息被消费者客户端获取, 处于消费中还未返回消费结果的状态
待重试状态
PushConsumer 独有的状态. 当消费者消息处理失败或消费超时, 会触发消费重试逻辑判断. 如果当前重试次数未达到最大次数, 则该消息变为待重试状态, 经过重试间隔后, 消息将重新变为已就绪状态可被重新消费. 多次重试之间, 可通过重试间隔进行延长, 防止无效高频的失败
提交状态
消费成功的状态
死信状态
最终兜底机制, 若消息一直处理失败并不断进行重试, 直到超过最大重试次数还未成功, 此时消息不会再重试, 会被投递至死信队列
SimpleConsumer消费重试策略
几种状态
与 PushConsumer 一致,只是少了 待重试状态
使用建议
消息重试适用业务处理失败且当前消费为小概率事件的场景, 不适合在连续性失败的场景下使用, 例如消费限流场景
建议通过减少重试次数 + 延长重试间隔来降低系统压力, 避免出现无限重试或大量重试的情况
消息存储和清理机制
基础
理论上每个队列都支持无限存储,但服务端节点的物理存储空间有限, 消息无法做到永久存储
解决的问题
消息在服务端中的存储以什么维度为判定条件? 消息存储以什么粒度进行管理? 消息存储超过限制后如何处理? 这些问题都是由消息存储和过期清理机制来定义的
消息存储机制
原理机制
RocketMQ 使用存储时长作为消息存储的依据, 即每个节点对外承诺消息的存储时长. 在存储时长范围内的消息都会被保留, 无论消息是否被消费; 超过时长限制的消息则会被清理掉
消息存储管理粒度说明
按照服务端节点粒度管理存储时长而非队列或主题
这种机制可以带来顺序读写, 高吞吐, 高性能等优势, 但缺点是不支持按主题和队列单独管理
消息存储和消费状态关系说明
RocketMQ 统一管理消息的存储时长, 无论消息是否被消费
当消费者不在线或消息消费异常时, 会造成队列中大量消息堆积, 且该现象暂时无法有效控制. 若此时按照消费状态考虑将未消费的消息全部保留, 则很容易导致存储空间不足, 进而影响到新消息的读写速度
消息存储文件结构
默认存储在本地磁盘中
消息过期清理机制
本地磁盘空间不足时, 为保证服务稳定性消息仍然会被强制清理, 导致消息的实际保存时长小于设置的保存时长
存储的消息将最多保存 3 天, 超过 3 天未使用的消息将被删除
使用建议
消息存储时长建议适当增加
最佳实践
生产者
发送消息注意事项
Tag的使用
一个应用尽可能用一个 Topic, 而消息子类型则可以用 tags 来标识
Keys的使用
每个消息在业务层面一般建议映射到业务的唯一标识并设置到 keys 字段, 方便将来定位消息丢失问题
一定要打印日志
消息发送失败处理方式
重试方式没有集成到 MQ 客户端内部做, 而是要求应用自己去完成
消费者
消费过程幂等
RocketMQ 无法避免消息重复(Exactly-Once), 所以如果业务对消费重复非常敏感, 务必要在业务层面进行去重处理
首先需要确定消息的唯一键, 可以是 msgId, 也可以是消息内容中的唯一标识字段, 例如订单 Id 等. 在消费之前判断唯一键是否在关系数据库中存在. 如果不存在则插入, 并消费, 否则跳过
也可以使用写入 Redis 来保证, 因为 Redis 的 key 和 value 就是天然支持幂等的
消费速度慢的处理方式
提高消费并行度
增加Consumer实例数量
提高单个Consumer的消费并行线程数
批量方式消费
某些业务流程如果支持批量方式消费, 则可以很大程度上提高消费吞吐量
重置位点跳过非重要消息
发生消息堆积时, 如果消费速度一直追不上发送速度, 如果业务对数据要求不高的话, 可以选择丢弃不重要的消息
优化每条消息消费过程
看看消费过程能否从逻辑上优化,降低消费时间
消费打印日志
如果消息量较少, 建议在消费入口方法打印消息, 消费耗时等, 方便后续排查问题
其他
参数约束和建议
参数默认值
请求超时时间
3000ms
消息大小
不超过 4M
过大可以用OSS
消息发送重试次数
3次
建议取值不要过大, 避免阻塞业务线程
消息消费重试次数
默认 16 次
部署与运维
快速安装与消息发送实例
如何快速部署一个单节点单副本 RocketMQ 服务, 并完成简单的消息收发
安装RocketMQ、启动NameServer、启动Broker+Proxy,测试消息收发
部署方式
Local模式部署
Proxy 和 Broker 是同进程部署, Proxy 本身无状态, 因此主要的集群配置仍然以 Broker 为基础进行即可
一般没有特殊要求就选这个
Cluster模式部署
主备自动切换模式部署
实现自动主从切换的 RocketMQ 集群
Controller部署
Controller 组件提供选主能力, 若需要保证 Controller 具备容错能力, Controller 部署需要三副本及以上(遵循 Raft 的多数派协议)
部署方式
Controller嵌入NameServer部署
Controller独立部署
Broker部署
RocketMQ Dashboard
基础
提供客户端和应用程序的各种事件, 性能的统计信息, 支持以可视化工具代替 Topic 配置, Broker 管理等命令行操作
基本功能
创建主题 Topic、创建消费者组、重置消费位点、扩容 Topic队列、扩容 Broker、发送消息等
RocketMQ Prometheus Exporter
基础
Rocketmq-exporter 是用于监控 RocketMQ broker 端和客户端所有相关指标的系统
监控指标获取流程
Expoter 通过 MQAdminExt 向 MQ 集群请求数据, 请求到的数据通过 MetricService 规范化成 Prometheus 需要的格式, 然后通过 /metics 接口暴露给 Promethus
Metric结构
Metric 类位于 org.apache.rocketmq.expoter.model.metrics 包下, 实质上是一些实体类, 每个实体类代表一类指标, 总共 14 个 Metric 类
Prometheus拉取metrics的过程
RocketMQ-exporter 项目和 Prometheus 相当于服务器和客户端的关系, RocketMQ-exporter 项目引入了 Prometheus 的 client 包, 该包中规定了需要获取的信息的类型即项目中的 MetricFamilySamples 类, Prometheus 向 expoter 请求 metrics, expoter 将信息封装成相应的类型之后返回给 Prometheus
可观测性指标
服务端指标和客户端指标, 服务端指标由服务端直接生成, 客户端指标在客户端产生, 由服务端通过 rpc 请求客户端获取到
Dledger
基础
DLedger 是一套基于 Raft 协议的分布式日志存储组件, 部署 RocketMQ 时可以根据需要选择使用 DLeger 来替换原生的副本存储机制
基于 DLedger 的可以自动容灾切换 RocketMQ 集群
集群搭建
RocketMQ-on-DLedger Group 是指一组相同名称的 Broker, 至少需要 3 个节点, 通过 Raft 自动选举出一个 Leader, 其余节点 作为 Follower, 并在 Leader 和 Follower 之间复制数据以保证高可用. RocketMQ-on-DLedger Group 能自动容灾切换, 并保证数据一致
权限控制
权限控制(ACL)主要为 RocketMQ 提供 Topic 资源级别的高级访问控制功能,服务端通过权限控制参数实现各个资源的权限管理和校验
一般仅建议在网络环境不安全, 业务数据敏感, 多部门租户混用的场景下使用
JVM与OS配置
JVM选项
建议配置最大堆内存不要超过 32G, 否则会影响 JVM 的指针压缩技术, 浪费内存
因为 Broker 是重度依赖内存 PageCache 做性能优化的, 内存过小可能造成性能不稳定
建议使用 JDK 1.8 自带的 G1 收集器
Linux内核参数
os.sh 脚本在 bin 文件夹中列出了许多内核参数, 可以进行微小的更改然后用于生产用途
客户端SDK
演进历程&选型对比
底层通信协议的差异主要支持两个系列的客户端 SDK, 分别是 Remoting 协议和 gRPC 协议,早期的是 Remoting 协议,5.0 推出 gRPC 协议
Remoting 协议 SDK
老版本的协议
gRPC 协议 SDK
自 5.0 版本全新推出, 旨在以云原生主流技术演进更加轻量, 标准, 易扩展的客户端服务端通信协议
支持更多语言
Disruptor
收藏
收藏
0 条评论
下一页