RabbitMQ
2023-06-12 19:17:27 1 举报
AI智能生成
RabbitMQ
作者其他创作
大纲/内容
基于AMQP协议,erlang语言开发,开源消息中间件
高可靠性、易扩展、高可用、功能丰富等
支持大多数(甚至冷门)的编程语言客户端
RabbitMQ遵循AMQP协议,自身采用Erlang
特点(Feature)
简介
AMQP全称高级消息队列协议(Advanced Message Queuing Protocol),是一种标准,类似于 JMS,兼容JMS协议。目前RabbitMQ主流支持AMQP 0-9-1,3.8.4版本支持AMQP 1.0。
架构协议
Publisher:消息发送者,将消息发送到Exchange并指定RoutingKey,以便queue可以接收到指定的消息。
Consumer:消息消费者,从queue获取消息,一个Consumer可以订阅多个queue以从多个 queue中接收消息。
Server:一个具体的MQ服务实例,也称为Broker。
Virtual host:虚拟主机,一个Server下可以有多个虚拟主机,用于隔离不同项目,一个Virtual host通常包含多个Exchange、Message Queue。
Exchange:交换器,接收Producer发送来的消息,把消息转发到对应的Message Queue中。 (Exchange只是做转发,Exchange 也可以将消息转发到另外一个Excchange )
Routing key:路由键,用于指定消息路由规则(Exchange将消息路由到具体的queue中),通常需要和具体的Exchange类型、Binding的Routing key结合起来使用。
Bindings:指定了Exchange和Queue之间的绑定关系。Exchange根据消息的Routing key和 Binding配置(绑定关系、Binding、Routing key等)来决定把消息分派到哪些具体的queue中。这依赖于Exchange类型。
Message Queue:实际存储消息的容器,并把消息传递给最终的Consumer。
AMQP概念
AMQP是一个二进制的协议,信息被组织成数据帧,有很多类型。数据帧携带协议方法和其他信息。所有数据帧都拥有基本相同的格式:帧头,负载,帧尾。数据帧负载的格式依赖于数据帧的类型。
TCP/IP是流协议,没有内置的机制用于界定数据帧。现有的协议从以下几个方面来解决每个连接发送单一数据帧。简单但是慢。在流中添加帧的边界。简单,但是解析很慢。 计算数据帧的大小,在每个数据帧头部加上该数据帧大小。这简单,快速,AMQP的选择。(Netty)`LTV` `TLV`
数据帧界定
传输层架构
AMQP协议
Rabbit MQ 消息中间件模型
Rabbit MQ支持的消息模型
会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。
fanout(广播)
direct类型的交换器路由规则很简单,它会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中,
Direct
topic类型的交换器在direct匹配规则上进行了扩展,也是将消息路由到BindingKey和RoutingKey 相匹配的队列中,这里的匹配规则稍微不同,它约定:BindingKey和RoutingKey一样都是由\".\"分隔的字符串;BindingKey中可以存在两种特殊字符“* ”和 “#”,用于模糊匹配,其中\"* \"用于匹配一个单词,\"#\"用于匹配多个单词(可以是0个)
Topic
headers类型的交换器不依赖于路由键的匹配规则来路由信息,而是根据发送的消息内容中的 headers属性进行匹配。在绑定队列和交换器时指定一组键值对,当发送的消息到交换器时, RabbitMQ会获取到该消息的headers,对比其中的键值对是否完全匹配队列和交换器绑定时指定的键 值对,如果匹配,消息就会路由到该队列。headers类型的交换器性能很差,不实用。
Headers
Exchange 类型
持久化消息
非持久化消息
消息类型
队列索引存储
rabbitmq.conf中的配置信息
读取消息
合并逻辑
删除消息
消息存储
rabbit_amqqueue_process:负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消息、处理消息的确认(包括生产端的confirm和消费端的ack)等。backing_queue:是消息存储的具体形式和引擎,并向rabbit_amqqueue_process提供相关的接口以供调用
通常队列由rabbit_amqqueue_process和 backing_queue 这两部分组成
alpha:消息索引和消息内容都存内存,最耗内存,很少消耗CPU
beta:消息索引存内存,消息内存存磁盘
gama:消息索引内存和磁盘都有,消息内容存磁盘
delta:消息索引和内容都存磁盘,基本不消耗内存,消耗更多CPU和I/O操作
rabbit_variable_queue.erl源码中定义了RabbitMQ队列的4种状态
消息存入队列后,不是固定不变的,它会随着系统的负载在队列中不断流动,消息的状态会不断发送变化
持久化的消息,索引和内容都必须先保存在磁盘上,才会处于上述状态中的一种。gama状态只有持久化消息才会有的状态。在运行时,RabbitMQ会根据消息传递的速度定期计算一个当前内存中能够保存的最大消息数量 (target_ram_count),如果alpha状态的消息数量大于此值,则会引起消息的状态转换,多余的消息可能会转换到beta、gama或者delta状态。区分这4种状态的主要作用是满足不同的内存和CPU需求。
对于普通没有设置优先级和镜像的队列来说,backing_queue的默认实现是 rabbit_variable_queue,其内部通过5个子队列 Q1、Q2、delta、Q3、Q4来体现消息的各个状态
消费者获取消息也会引起消息的状态转换
消息进入队列及消费流程
在系统负载较高时,消息若不能很快被消费掉,这些消息就会进入到很深的队列中去,这样会增加处理每个消息的平均开销。因为要花更多的时间和资源处理“堆积”的消息,如此用来处理新流入的消息的能力就会降低,使得后流入的消息又被积压到很深的队列中,继续增大处理每个消息的平均开销,继而情况变得越来越恶化,使得系统的处理能力大大降低。
增加prefetch_count的值,即一次发送多条消息给消费者,加快消息被消费的速度。
采用multiple ack,降低处理 ack 带来的开销
流量控制
应对这样问题的策略
消息的堆积会导致性能下降 ?
队列结构
RabbitMQ 存储层
数据存储
经典队列可以选择是否持久化(Durability)以及是否自动删除(Auto delete)两个属性
Classic 经典队列
仲裁队列,是RabbitMQ从3.8.0版本,引入的一个新的队列类型,整个3.8.X版本,也都是在围绕仲裁队列进行完善和优化。仲裁队列相比Classic经典队列,在分布式环境下对消息的可靠性保障更高。官方文档中表示,未来会使用Quorum仲裁队列代替传统Classic队列
Quorum是基于Raft一致性协议实现的一种新型的分布式消息队列,他实现了持久化,多备份的FIFO队列,主要就是针对RabbitMQ的镜像模式设计的。简单理解就是quorum队列中的消息需要有集群中多半节点同意确认后,才会写入到队列中。这种队列类似于RocketMQ当中的DLedger集群。这种方式可以保证消息在集群内部不会丢失。同时,Quorum是以牺牲很多高级队列特性为代价,来进一步保证消息在分布式环境下的高可靠。
Quorum队列大部分功能都是在Classic队列基础上做减法,比如Non-durable queues表示是非持久化的内存队列。Exclusivity表示独占队列,即表示队列只能由声明该队列的Connection连接来进行使用,包括队列创建、删除、收发消息等,并且独占队列会在声明该队列的Connection断开后自动删除。
其中有个特例就是这个Poison Message(有毒的消息)。所谓毒消息是指消息一直不能被消费者正常消费(可能是由于消费者失败或者消费逻辑有问题等),就会导致消息不断的重新入队,这样这些消息就成为了毒消息。这些读消息应该有保障机制进行标记并及时删除。Quorum队列会持续跟踪消息的失败投递尝试次数,并记录在\"x-delivery-count\"这样一个头部参数中。然后,就可以通过设置 Delivery limit参数来定制一个毒消息的删除策略。当消息的重复投递次数超过了Delivery limit参数阈值时,RabbitMQ就会删除这些毒消息。当然,如果配置了死信队列的话,就会进入对应的死信队列
从整体功能上来说,Quorum队列是在Classic经典队列的基础上做减法,因此对于RabbitMQ的长期使用者而言,其实是会影响使用体验的。他与普通队列的区别:
Quorum队列更适合于队列长期存在,并且对容错、数据安全方面的要求比低延迟、不持久等高级队列更能要求更严格的场景。例如电商系统的订单,引入MQ后,处理速度可以慢一点,但是订单不能丢失
一些临时使用的队列:比如transient临时队列,exclusive独占队列,或者经常会修改和删除的队列。对消息低延迟要求高: 一致性算法会影响消息的延迟。对数据安全性要求不高:Quorum队列需要消费者手动通知或者生产者手动确认。队列消息积压严重 : 如果队列中的消息很大,或者积压的消息很多,就不要使用Quorum队列。Quorum队列当前会将所有消息始终保存在内存中,直到达到内存使用极限
不适合使用的场景
Quorum 仲裁队列
Stream队列是RabbitMQ自3.9.0版本开始引入的一种新的数据队列类型,也是目前官方最为推荐的队列类型。这种队列类型的消息是持久化到磁盘并且具备分布式备份的,更适合于消费者多,读消息非常频繁的场景
Stream队列的核心是以append-only只添加的日志来记录消息,整体来说,就是消息将以append-only的方式持久化到日志文件中,然后通过调整每个消费者的消费进度offset,来实现消息的多次分发。下方有几个属性也都是来定义日志文件的大小以及保存时间。如果你熟悉Kafka或者RocketMQ,会对这种日志记录消息的方式非常熟悉
当想要向多个订阅者发送相同的消息时,以往的队列类型必须为每个消费者绑定一个专用的队列。如果消费者的数量很大,这就会导致性能低下。而Stream队列允许任意数量的消费者使用同一个队列的消息,从而消除绑定多个队列的需求
large fan-outs 大规模分发
RabbitMQ已有的这些队列类型,在消费者处理完消息后,消息都会从队列中删除,因此,无法重新读取已经消费过的消息。而Stream队列允许用户在日志的任何一个连接点开始重新读取数据。
Replay/Time-travelling 消息回溯
Strem队列的设计以性能为主要目标,对消息传递吞吐量的提升非常明显
Throughput Performance 高吞吐性能
RabbitMQ一直以来有一个让人诟病的地方,就是当队列中积累的消息过多时,性能下降会非常明显。但是Stream队列的设计目标就是以最小的内存开销高效地存储大量的数据
Large logs 大日志
这种队列提供了RabbitMQ已有的其他队列类型不太好实现的四个特点
整体上来说,RabbitMQ的Stream队列,其实有很多地方借鉴了其他MQ产品的优点,在保证消息可靠性的基础上,着力提高队列的消息吞吐量以及消息转发性能。因此,Stream也是在视图解决一个RabbitMQ一直以来,让人诟病的缺点,就是当队列中积累的消息过多时,性能下降会非常明显的问题。RabbitMQ以往更专注于企业级的内部使用,但是从这些队列功能可以看到,Rabbitmq也在向更复杂的互联网环境靠拢,未来对于RabbitMQ的了解,也需要随着版本推进,不断更新。
Stream队列
RabbitMQ从3.6.0版本开始,就引入了懒队列的概念。懒队列会尽可能早的将消息内容保存到硬盘当中,并且只有在用户请求到时,才临时从硬盘加载到RAM内存当中
懒队列的设计目标是为了支持非常长的队列(数百万级别)。队列可能会因为一些原因变得非常长-也就是数据堆积消费者服务宕机了有一个突然的消息高峰,生产者生产消息超过消费者消费者消费太慢了
默认情况下,RabbitMQ接收到消息时,会保存到内存以便使用,同时把消息写到硬盘。但是,消息写入硬盘的过程中,是会阻塞队列的。RabbitMQ虽然针对写入硬盘速度做了很多算法优化,但是在长队列中,依然表现不是很理想,所以就有了懒队列的出现。懒队列会尝试尽可能早的把消息写到硬盘中。这意味着在正常操作的大多数情况下,RAM中要保存的消息要少得多。当然,这是以增加磁盘IO为代价的。
给队列指定参数
rabbitmqctl set_policy Lazy \"^lazy-queue$\" '{\"queue-mode\":\"default\"}' --apply-to queues
设定一个策略,在策略中指定queue-mode 为 lazy
声明懒队列有两种方式
懒队列适合消息量大且长期有堆积的队列,可以减少内存使用,加快消费速度。但是这是以大量消耗集群的网络及磁盘IO为代价的
懒队列 Lazy Queue
队列类型
生产者和消费者,需要与RabbitMQ Broker 建立TCP连接,也就是Connection 。一旦TCP 连接建立起来,客户端紧接着创建一个AMQP 信道(Channel),每个信道都会被指派一个唯一的ID。信道是建立在Connection 之上的虚拟连接, RabbitMQ 处理的每条AMQP 指令都是通过信道完成的。
RabbitMQ 采用类似NIO的做法,复用TCP连接,减少性能开销,便于管理。 当每个信道的流量不是很大时,复用单一的Connection 可以在产生性能瓶颈的情况下有效地节省 TCP 连接资源。当信道本身的流量很大时,一个Connection 就会产生性能瓶颈,流量被限制。需要建立多个 Connection ,分摊信道。具体的调优看业务需要。 信道在AMQP 中是一个很重要的概念,大多数操作都是在信道这个层面进行的。
为什么不直接使用TCP连接,而是使用信道 ?
Connection 和 Channel 关系
RabbitMQ的常用操作命令
Rabbit MQ 概述
systemctl start|restart|stop|status rabbitmq-server
服务启动相关
rabbitmqctl help 可以查看更多命令
管理命令行(用来在不使用web管理界面情况下命令操作RabbitMQ)
rabbitmq-plugins enable|list|disable
插件管理命令行
管理命令行
overview概览
添加用户
创建虚拟主机
绑定虚拟主机和用户
Admin用户和虚拟主机管理
web管理页面
RabbitMQ 配置
工作流程详解
参数说明
Producer
Consumer
直连(简单)模式
在这种情况下,我们需要考虑消息丢失的问题
消息自动确认机制
work queue(工作)模式
不需要指定Route key
每个队列都能拿到消息
消息的推拉
fanout(发布订阅/广播)模式
Routing Direct直连模式
Routing Topic通配符模式
RabbitMQ的工作模式
简单模式
工作模式
Fanout广播模式
Route 路由模式
SpringBoot 集成
异常捕获机制
RabbitMQ事务
消息确认(Confirm)机制:确认消息提供者是否成功发送消息到交换机return 机制:确认消息是否成功的从交换机分发到队列
普通Confirm
批量Confirm
异步Confirm
Maven项目中的消息确认
Maven项目中的Return机制
配置Yaml
创建confirm和 return 监听
SpringBoot的消息确认和Return机制
生产方消息确认和Return机制
重复消费消息,会对非幂等性操作造成问题 重复消费消息的原因是,消费者没有给RabbitMQ一个ack
工作原理
Maven项目
SpringBott 项目
利用Redis避免消息重复
消息的幂等性
通过定义时设置durable = ture
Exchange的持久化
Queue的持久化
将消息的投递模式 (BasicProperties 中的 deliveryMode 属性)设置为 2
消息的持久化
RabbitMQ中的持久化消息都需要写入磁盘(当系统内存不不足时,非持久化的消息也会被刷盘处理),这些处理理动作都是在“持久层”中完成的。持久层是一个逻辑上的概念,实际包含两个部分:队列索引(rabbit_queue_index),rabbit_queue_index 负责维护Queue中消息的信息(如果消息小于),包括 消息的存储位置、是否已交给消费者、是否已被消费及Ack确认等,每个Queue都有与之对应 的rabbit_queue_index消息存储(rabbit_msg_store),rabbit_msg_store 以键值对的形式存储消息,它被所有队列列 共享,在每个节点(虚拟主机)中有且只有一个。
持久化机制
消费的过程中自行捕获异常,引发异常后直接记录日志并落到异常恢复表, 再通过后台定时任务扫描异常恢复表尝试做重试动作。如果业务不自行处理则有丢失数据的风险
NONE模式
不主动捕获异常,当消费过程中出现异常时会将消息放回 Queue中,然后消息会被重新分配到其他消费者节点(如果没有则还是选择当前节点)重新被消费,默认会一直重发消息并直到消费完成返回Ack或者一直到过期
AUTO(自动ACK)模式
消费者自行控制流程并手动调用channel相关的方法返回 Ack
MANUAL(手动Ack)模式
确认模式
Springboot 相关配置
Consumer ACK(消费方确认)
RabbitMQ 可以对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直到对应项指标恢复正常。全局上可以防止超大流量、消息积压等导致的Broker被压垮。当内存受限或磁盘可用空间受限的时候,服务器都会暂时阻止连接,服务器将暂停从发布消息的已连接客户端的套接字读取数据。连接心跳监视也将被禁用。所有网络连接将在rabbitmqctl和 管理插件中显示为“已阻止”,这意味着它们尚未尝试发布,因此可以继续或被阻止,这意味着 它们已发布,现在已暂停。兼容的客户端被阻止时将收到通知。
RabbitMQ 还默认提供了一种基于credit flow的流控机制,面向每一个连接进行流控。当单个队列达到最大流速 时,或者多个队列达到总流速时,都会触发流控。触发单个链接的流控可 能是因为connection、channel、queue的某一个过程处于flow状态,这些状态都可以从监控平台看到
connection
channel
queue
credit flow 的流控机制
生产者:生产者往往是希望自己产生的消息能快速投递出去,而当消息投递太快且超过了下游的消费速度时 就容易出现消息积压/堆积,所以,从上游来讲我们应该在生产端应用程序中也可以加入限流、应急开关等控制手段,避免超过Broker端的极限承载能力或者压垮下游消费者。消费者:再看看下游,我们期望下游消费端能尽快消费完消息,而且还要防止瞬时大量消息压垮消费端(推模式),我们期望消费端处理速度是最快、最稳定而且还相对均匀(比较理想化)。
优化应用程序的性能,缩短响应时间(需要时间)
增加消费者节点实例(成本增加,而且底层数据库操作这些也可能是瓶颈)
调整并发消费的线程数(线程数并非越大越好,需要大量压测调优至合理值)
提升下游应用的吞吐量和缩短消费过程的耗时
QoS保证机制:可以限制Channel上接收到的未被Ack的消息数量
消费端限流 / QoS(Quality of Service,服务质量)限流
At most once:最多一次。消息可能会丢失,但绝不会重复传输
At least once:最少一次。消息绝不会丢失,但可能会重复传输
Exactly once:恰好一次。每条消息肯定会被传输一次且仅传输一次
消息传输保障三个层级
RabbitMQ 支持其中的“最多一次”和“最少一次”
消息可靠传输一般是业务系统接入消息中间件时首要考虑的问题
消息的可靠性保障
消息的可靠性
AMQP协议和RabbitMQ队列本身没有直接支持延迟队列功能,但是可以通过`TTL(Time To Live)特性`模拟出延迟队列的功能
Maven Java 使用
SpringBoot 使用
rabbitmqctl set_policy TTL \".*\" '{\"message-ttl\":30000}' --apply-to queues
通过命令行方式设置全局TTL
TTL就是消息的存活时间,RabbitMQ可以分别对队列和消息设置存活时间
设置规则
TTL延迟机制
消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false
消息过期
队列达到最大长度
消息变为死信情况
原生API
Spring Boot
TTL消息过期是基于FIFO特性的
原生TTL缺点:
TTL存放消息在死信队列(delayqueue)里,而基于插件存放消息 在延时交换机里(x-delayed-message exchange)。
插件下载/安装
Spring Boot代码
rabbitmq_delayed_message_exchange插件
死信队列
流程图
使用延迟队列实现订单支付监控
延迟机制
单活,容量对等,可以实现故障转移。使用独立存储时需要借助复制、镜像同步等技术,数据会有延迟、不一致等问题(CAP定律),使用共享存储时就不会有状态同步这个问题。
主备模式
一定程度的双活,容量对等,最常见的是`读写分离`。通常也需要借助复制技术,或者要求上游实现双写来保证节点数据一致。
主从模式
两边都可以读写,互为主备。如果两边同时写入很容易冲突,所以通常实现的都是“伪主主模式”,或者说就是主从模式的升级版,只是`新增了主从节点的选举和切换。`
主主模式(双主单写)
不同节点保存不同的数据,上游应用或者代理节点做路由,`突破存储容量限制,分摊读写负载`;典型的如MongoDB的分片、MySQL的分库分表、Redis集群。
分片集群
“两地三中心”是金融行业经典的容灾模式(`有资源闲置的问题`),“异地多活”才是王道。
异地多活
业界实现
随机
轮询
[加权]轮询
最少活跃连接
一致性Hash
常用的负载均衡算法
也叫Warren(兔子窝)模式,同一时刻只有一个节点在工作(备份节点不能读写),当主节点发 生故障后会将请求切换到备份节点上(主恢复后成为备份节点)。需要借助HAProxy之类的(VIP模式)负载均衡器来做健康检查和主备切换,底层需要借助共享存储(如SAN设备)。
主备模式(不推荐)
Shovel是一个插件,用于实现跨机房数据复制,或者数据迁移,故障转移与恢复等。
Slovel 铲子模式
总的来说,该集群模式只能保证集群中的某个Node挂掉后应用程序还可以切换到其他Node上继续地发送和消费消息,但并无法保证原有的消息不丢失,所以并不是一个真正意义的高可用集群。
RabbitMQ集群
Master 选取策略
队列添加镜像
镜像队列模式
Federation联邦模式
异地多活架构
RabbitMQ分布式架构模式
HAProxy是一款开源免费,并提供高可用性、负载均衡以及基于TCP和HTTP协议的代理软件,可以支持四层、七层负载均衡,经过测试单节点可以支持10W左右并发连接。
使用HAProxy来做RabbitMQ的负载均衡,通过暴露VIP给上游的应用程序直接连接,上游应用程序不感知底层的RabbitMQ的实例节点信息。
可以搭建多个HA Proxy节点,在借助keepalive来在进行虚拟出来一个 VIP,当其中某一个节点不可用的时候,通过keeplive的节点健康探测来进行节点的切换。
HA Proxy 也是一个单体,如何避免Ha Proxy的单点故障问题?
Spring Boot 有可以配置多个RabbitMQ的节点,这和 HAProxy 有什么区别?
负载均衡 HA proxy
多实例部署
集群与运维
如何保证消息的可靠性?
MSWT
RabbitMQ
0 条评论
回复 删除
下一页