RocketMQ
2023-11-06 16:10:49 0 举报
AI智能生成
RocketMQ详解,具体异步,削峰,解耦等特点
作者其他创作
大纲/内容
含义
是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点;
消费形式
拉取式消费(Pull Consumer)
Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。
一旦获取了批量消息,应用就会启动消费过程。
一旦获取了批量消息,应用就会启动消费过程。
推动式消费(Push Consumer)
该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。
特性
订阅与发布
消息的发布是指某个生产者向某个topic发送消息;
消息的订阅是指某个消费者关注了某个topic中带有某些tag的消息,进而从该topic消费数据
消息的订阅是指某个消费者关注了某个topic中带有某些tag的消息,进而从该topic消费数据
消息顺序
消息有序指的是一类消息消费时,能按照发送的顺序来消费。
全局顺序
对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。
分区顺序
对于指定的一个Topic,所有消息根据 sharding key 进行区块分区。
同一个分区内的消息按照严格的FIFO顺序进行发布和消费。
同一个分区内的消息按照严格的FIFO顺序进行发布和消费。
消息过滤
RocketMQ的消费者可以根据Tag进行消息过滤,也支持自定义属性过滤
消息可靠性
RocketMQ支持消息的高可靠。RocketMQ从3.0版本开始支持同步双写。
至少一次
至少一次(At least Once)指每个消息必须投递一次
回溯消费
回溯消费是指Consumer已经消费成功的消息,由于业务上需求需要重新消费。
RocketMQ支持按照时间回溯消费,时间维度精确到毫秒。
RocketMQ支持按照时间回溯消费,时间维度精确到毫秒。
事务消息
指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。
RocketMQ的事务消息提供类似X/Open XA的分布事务功能,通过事务消息能达到分布式事务的最终一致
RocketMQ的事务消息提供类似X/Open XA的分布事务功能,通过事务消息能达到分布式事务的最终一致
定时消息
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。
broker有配置项messageDelayLevel,有18个level,msg.setDelayLevel(level)。
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,
即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。
broker有配置项messageDelayLevel,有18个level,msg.setDelayLevel(level)。
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,
即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。
消息重试
Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。
RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列
(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的)
RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列
(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的)
消息重投
生产者在发送消息时,同步消息失败会重投,异步消息有重试
设置消息重试策略
retryTimesWhenSendFailed:同步发送失败重投次数,默认为2,
不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。
不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。
retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他broker,
仅在同一个broker上做重试,不保证消息不丢
仅在同一个broker上做重试,不保证消息不丢
retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),
是否尝试发送到其他broker,默认false。十分重要消息可以开启。
是否尝试发送到其他broker,默认false。十分重要消息可以开启。
流量控制
生产者流控
broker通过拒绝send 请求方式实现流量控制
broker每隔10ms检查send请求队列头部请求的等待时间,默认200ms, 超过,则拒绝当前send请求,返回流控
commitLog文件被锁时间超过osPageCacheBusyTimeOutMills时,参数默认为1000ms,返回流控
transientStorePoolEnable == true 并且broker为异步刷盘的主机,且transientStorePool中资源不足,拒绝当前send请求,返回流控
消费者流控
消费者流控的结果是降低拉取频率
消费者本地缓存消息数超过,默认1000
消费者本地缓存消息大小超过,默认100MB
消费者本地缓存消息跨度超过,默认2000
死信队列
死信队列用于处理无法被正常消费的消息
在RocketMQ中,可以通过使用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。
消息刷盘
同步刷盘
只有在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应。
异步刷盘
能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端
消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。
集群模式
单Master模式
多Master模式
多Master多Slave模式(异步)
每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级)
多Master多Slave模式(同步)
每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式
DLedger 集群模式
RocketMQ-on-DLedger Group 是指一组相同名称的 Broker,至少需要 3 个节点,通过 Raft 自动选举出一个 Leader,
其余节点 作为 Follower,并在 Leader 和 Follower 之间复制数据以保证高可用。
其余节点 作为 Follower,并在 Leader 和 Follower 之间复制数据以保证高可用。
RocketMQ-on-DLedger Group 能自动容灾切换,并保证数据一致
天然弊端
RocketMQ 采用一个 consumer 绑定一个或者多个 Queue 模式,
假如某个消费者服务器挂了,则会造成部分Queue消息堆积
假如某个消费者服务器挂了,则会造成部分Queue消息堆积
集成spring boot使用
基本概念
消息模型(Message Model)
RocketMQ主要由Producer、Broker、Consumer三部分组成,其中Producer负责生产消息,Consumer负责消费消息,Broker负责存储消息。
Broker在实际部署过程中对应一台服务器,每个Broker可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的Broker。
MessageQueue用于存储消息的物理地址,每个Topic中的消息地址存储于多个MessageQueue中。ConsumerGroup由多个Consumer实例构成。
Broker在实际部署过程中对应一台服务器,每个Broker可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的Broker。
MessageQueue用于存储消息的物理地址,每个Topic中的消息地址存储于多个MessageQueue中。ConsumerGroup由多个Consumer实例构成。
消息生产者(Producer)
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。
消息消费者(Consumer)
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。
主题(Topic)
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
代理服务器(Broker Server)
消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、
同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移、主题和队列消息等。
同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移、主题和队列消息等。
名字服务(Name Server)
名字服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的BrokerIP列表。
多个Namesrv实例组成集群,但相互独立,没有信息交换。
多个Namesrv实例组成集群,但相互独立,没有信息交换。
生产者组(Producer Group)
同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,
则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
消费者组(Consumer Group)
同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。
消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。
要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。
消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。
要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。
普通顺序消息(Normal Ordered Message)
消费者通过同一个消息队列(Topic分区,称作Message Queue)收到的消息是有顺序的,
不同消息队列收到的消息则可能是无顺序的。
不同消息队列收到的消息则可能是无顺序的。
严格顺序消息(Strictly Ordered Message)
消费者收到的所有消息均是有顺序的
消息(Message)
消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。
RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。
系统提供了通过Message ID和Key查询消息的功能。
系统提供了通过Message ID和Key查询消息的功能。
标签(Tag)
为消息设置的标志,用于同一主题下区分不同类型的消息。
消息发送方式
同步发送
异步发送
顺序发送
单向发送
注意:同步和异步方式均需要Broker返回确认信息,单向发送不需要
消费模式
集群消费(Clustering)
集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
广播消费(Broadcasting)
广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息
消息存储整体架构
CommitLog
消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。
消息主要是顺序写入日志文件,当文件满了,写入下一个文件
消息主要是顺序写入日志文件,当文件满了,写入下一个文件
ConsumeQueue
消息消费队列,引入的目的主要是提高消息消费的性能,
ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,
消息大小size和消息Tag的HashCode值。consumequeue文件采取定长设计,每一个条目共20个字节
消息大小size和消息Tag的HashCode值。consumequeue文件采取定长设计,每一个条目共20个字节
IndexFile
IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法
IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。
架构图
说明
RocketMQ采用的是混合型的存储结构,针对Producer和Consumer分别采用了数据和索引部分相分离的存储结构
1、Producer发送消息至Broker端,然后Broker端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中
集群工作流程
流程图
流程说明
1、启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
2、Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。
注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
3、发送消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
4、Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,
轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
5、Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,
然后直接跟Broker建立连接通道,开始消费消息。
然后直接跟Broker建立连接通道,开始消费消息。
问题
可以一直增加客户端的数量提升消费能力吗?
当然不可以,因为 Queue 数量有限,客户端数量一旦达到 Queue 数量,
再扩容新节点无法提升消费能力,因为会有节点分配不到 Queue 而无法消费。
再扩容新节点无法提升消费能力,因为会有节点分配不到 Queue 而无法消费。
comsumer 在启动时会和comsumer queue绑定,这个绑定策略是咋样的?
默认策略
queue 个数大于 Consumer个数, 那么 Consumer 会平均分配 queue,不够平均,会根据clientId排序来拿取余数
queue个数小于Consumer个数,那么会有Consumer闲置,就是浪费掉了,其余Consumer平均分配到queue
一致性hash算法
就近原则,离的近的消费
每个消费者依次消费一个queue,环状
自定义方式
0 条评论
下一页