RabbitMQ
2021-08-24 10:14:39 3 举报
AI智能生成
RabbitMQ
作者其他创作
大纲/内容
RabbitMQ
1. 消息中间件
分布式架构通信
原理
SOA
使用Dubbo/Zookeeper进行服务间的RPC
Dubbo使用自定义的TCP协议,可以让请求报文体积更小,或者使用HTTP2协议,也可以减少报文体积,提高传输效率
微服务
SpringCloud使用Feign解决服务之间的RPC问题
RPC
主要基于TCP/UDP协议,HTTP协议是应用层协议,是构建在传输层协议TCP之上的,RPC效率更高,RPC长连接不必每次通信都像HTTP一样三次握手,减少网络开销
HTTP服务开发迭代更快
在接口不多,系统之间交互较少的情况下,HTTP更加方便
相反,接口比较多,系统之间交互较多的情况下,HTTP没有RPC有优势
同步通信的问题
分布式服务中,由于业务拆分,应用也需要拆分,甚至数据库分库分表
但是完成一个业务处理,往往要涉及到多个模块之间的协调处理
此时模块之间,服务之间以及客户端/服务端之间的通信将变得非常复杂
分布式异步通信模式
生产者/消费者模式
可以跨平台、支持异构系统,通常借助消息中间件完成
优点
系统间解耦,并具有一定的可恢复性,支持异构系统,下游通常可并发执行,系统具备弹性
缺点
消息中间件存在一些瓶颈和一致性问题,对于开发来说不直观且不易调试,有额外成本
注意问题
1. 哪些业务需要同步,哪些需要异步
2. 如何保证消息的安全,消息是否会丢失/重复
3. 请求延迟如何能够减少
4. 消息接收顺序是否会影响到业务流程的正常执行
5. 消息处理失败后是否需要重发,如果重发如何保证幂等
简介
概念
面向消息的系统(消息中间件)是在分布式系统中完成消息的发送和接收的基础软件
也可以成为消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成,通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信
消息中间件就是在通信的上下游之间截断,然后利用中间件解耦、异步的特性,构建弹性、可靠、稳定的系统
异步处理、流量削峰、限流、缓冲、排队、最终一致性、消息驱动等需求的场景都可以使用消息中间件
自定义消息中间件
BlockingQueue是java中常见的容器,在多线程编程中被广泛使用
当队列容器已满时生产者线程阻塞,直到队列未满后才可以继续put
当队列容器为空,消费者线程被阻塞,直至队列非空,才继续take
选型
选型原则
产品应该是开源的,开源意味着如果队列使用中遇到bug,可以很快修改,而不用等待开发者的更新
产品必须是近年流行的,要有一个活跃的社区,这样遇到问题很快就可以找到解决方案。流行也意味着bug较少,跟周边系统兼容性也高
作为消息队列,要具备以下特性
1. 消息传输的可靠性,保证消息不会丢失
2. 支持集群,包括横向扩展,单点故障都可以解决
3. 性能要好,要能够满足业务的性能需求
RabbidMQ
开始是用在电信业务的可靠通信,也是少有的几款支持AMQP协议的产品之一
优点
1. 轻量级,快速,部署使用方便
2. 支持灵活路由配置
RabbitMQ中,在生产者和队列之间有一个交换器模块,根据配置的路由规则,生产者发送的消息可以发送到不同的队列中
路由规则很灵活,可以自己实现
3. RabbitMQ客户端支持大多数编程语言
缺点
1. 如果有大量消息堆积在队列中,性能会急剧下降
2. RabbitMQ性能在Kafka/RocketMQ中最差
每秒处理几万到几十万的消息
如果应用要求高性能,不要选择RabbitMQ
3. RabbitMQ是Erlang开发的,功能扩展和二次开发代价很高
RocketMQ
是一个开源的消息队列,使用Java实现,借鉴了Kafka的设计并做了很多改进
主要用于有序、事务、流计算、消息推送、日志流处理、binlog分发等场景
经历历次双11考验
几乎具备了消息队列应该具备的所有特性和功能
优点
Java开发,阅读源码、扩展、二次开发很方便
对电商领域的相应延迟做了很多优化
大多数情况下,响应在毫秒级
如果应用很关注响应时间,可以使用RocketMQ
性能比RabbitMQ高一个数量级,每秒处理几十万消息
缺点
跟周边系统的整合和兼容不是很好
Kafka
可靠性、稳定性和功能特性基本满足大多数的应用场景
跟周边系统兼容性很好,尤其是大数据和流计算领域,几乎所有相关的开源软件都支持Kafka
高效、可伸缩,消息持久化,支持分区、副本、容错
是Scala和Java开发的,对批处理和异步处理做了大量设计,因此可以得到非常高的性能
异步消息发送和接收是三个中最好的
但是跟RocketMQ拉不开数量级,每秒处理几十万消息
如果是异步消息,并且开启压缩,最终可以达到每秒2000w消息处理
由于异步和批处理,延迟比较高,不适合电商场景
应用场景
秒杀
问题
秒杀开始前,用户不断刷新页面,如何应对高并发的读请求
秒杀开始时,大量并发用户瞬间向系统请求生成订单,扣减库存,如何应对高并发的写请求
高并发读
使用缓存策略将请求挡在上层的缓存中
能静态化的数据尽量做到静态化
加入限流(userid、IP、重复请求等)
高并发写
使用消息队列
流量削峰
削去场景下的峰值写流量
异步处理
通过异步处理简化请求中的业务流程
解耦
实现模块间的松耦合
流量削峰
将秒杀请求暂存于消息队列
业务服务器响应用户“秒杀结果正在处理中....”,释放系统资源去处理其他用户请求
填谷,削平短暂的流量高峰,消息堆积会造成请求延迟处理,但秒杀对于短暂延迟有一定容忍度
异步处理
先处理主要业务,异步处理次要业务
主要流程是生成订单、扣减库存;次要流程是发优惠券,增加用户积分等
解耦
将秒杀数据同步给数据团队,有两种思路
1. 使用HTTP/RPC同步调用
即提供一个接口,实时推送数据
系统耦合度高
2. 使用消息队列
数据全部发送给消息队列,然后数据服务订阅这个消息队列,接收数据进行处理
拉勾
拉勾网分为B端/C端,分别面向企业/求职者
两个模块业务处理逻辑不同,数据库结构不同,处于解耦状态
两个模块实际需要对方的数据
解决方式
1. 同步方式
两端通过RPC/WebSerbice方式发布服务,供对方调用,获取数据
2. 异步方式
使用消息队列,一端将数据发布到消息队列,另一端订阅该消息队列
JMS经典模式
介绍
JMS即Java消息服务应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信
它与具体平台无关,绝大多数MOM提供商都支持,类似于JDBC
JMS消息
消息是JMS中的一种类型对象,由两部分组成:报文头和消息主体
报文头
包括消息头字段和消息头属性,字段是JMS协议队形的字段,属性可以由用户按需添加
字段
JMSDestination
包含消息要发送到的目的地
JMSDeliveryMode
包含消息在发送时指定的投递模式
JMSMessageID
包含服务器法施工每个消息的唯一标识
JMSTimestamp
包含消息封装完成要发送往服务器的时间
不是真正向服务器发送的时间,因为真正的发送事件,可能因为事务或客户端消息排队而延后
JMSCorrelationID
客户端使用该字段值与另一条消息关联
一个经典场景是使用该字段将响应消息和请求消息关联
可以包含如下值
服务器规定的消息ID
应用指定的字符串
服务器原生的byte[]值
JMSReplyTo
包含了在客户端发送消息时指定的Destination,即对该消息的响应应该发送到该字段指定的Destination
设置了该字段值的消息一般期望收到一个响应
JMSRedelivered
如果为true,则告知消费者应用该消息已经发送,消费端应用应该小心不要重复处理
JMSType
消息发送时用于标识该消息的类型
具体类型由JMS实现厂商决定
JMSExpiration
发送消息时,其到期时间将计算为send()上指定的生存时间值与当前GMT值之和
从send()返回时,消息的JMSExpiration标头字段包含此值
收到消息后,其JMSExpiration标头字段包含相同的值
JMSPriority
JMS定义了一个十级优先极值,从0至9
客户端应将0-4视为正常优先级,5-9视为快速优先级
JMS不需要服务器严格执行消息的优先级排序,但应该尽力在普通消息之前传递加急消息
消息主体
携带应用程序的数据或有效负载
根据有效负载类型,可将消息分为
1. 简单文本 TextMessage
2. 可序列化对象 ObjectMessage
3. 属性集合 MapMessage
4. 字节流 BytesMessage
5. 原始值流 StreamMessage
6. 无有效负载的消息 Message
体系架构
1. JMS供应商产品
JMS接口的一个实现,可以是Java的JMS实现,也可以是非Java的面向消息中间件的适配器
2. JMS Client
生产或消费基于消息的Java的应用程序或对象
3. JMS Producer
创建并发送消息的JMS客户
4. JMS Consumer
接收消息的JMS客户
5. JMS Message
包括可以在JMS客户之间传递的数据的对象
6. JMS Queue
缓存消息的容器,消息的接受顺序并不一定要与消息的发送顺序相同,消息被消费后将从队列中移除
7. JMS Topic
Pub/Sub模式
对象模型
1. ConnectionFactory接口
用来创建JMS提供者的连接的被管对象
JMS客户端通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改
管理员在JNDI名字空间中配置连接工厂,这样JMS客户才能查找到它们
根据消息类型不同,用户将使用队列连接工厂或主题连接工厂
2. Connection接口
代表了应用程序和消息服务器之间的通信链路
在获得连接工厂后,就可以创建一个与JMS提供者的连接
根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标
3. Destination接口
是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或是队列,或是主题
JMS管理员创建这些对象,用户通过JNDI发现它们
和连接工厂一样,管理员可以创建两种类型的目标
点对点模型的队列
发布者/订阅者模型的主题
4. Session接口
表示一个单线程的上下文,用于发送和接收消息
由于会话是单线程的,所以消息是连续的,即消息是按照发送顺序一个一个接收的
会话的好处是支持事务,如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息
一个会话允许用户创建消息,生产者发送消息,消费者接收消息
5. MessageConsumer接口
由会话创建的对象,用于接收发送到目标的消息
消费者可以同步地(阻塞模式),或异步地(非阻塞)接收队列和主题类型的消息
6. MessageProducer接口
由会话创建的对象,用户发送消息到目标
用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标
7. Message接口
是在消费者和生产者之间传送的对象,及从一个app传送到另一个app
有三个主要部分
消息头(必须)
包含用于识别和为消息寻找路由的操作设置
一组消息属性(可选)
包含额外属性,支持其他提供者和用户的兼容
可以创建定制的字段和过滤器(消息选择器)
一个消息体(可选)
允许用户创建五种类型的消息
模式
JMS应用程序结构支持两种模式:点对点模式,发布/订阅模式
1. 点对点(队列)模式
生产者向一个特定的队列发布消息,消费者从该队列中读取消息
生产者知道消费者的队列,并直接将消息发送到消费者的队列
一条消息只有一个消费者获得
生产者无需在接收者消费该消息期间处于运行状态,接收者同样无需在消息发送时处于运行状态
每一个成功吹的消息要么自动确认,要么由接收者手动确认
2. 发布/订阅模式
特性
支持向一个特定的主题发布消息
0或多个订阅者可能对接收特定主题的消息感兴趣
发布者和订阅者彼此不知道对方
多个消费者可以获得消息
发布者和订阅者之间存在时间依赖性
发布者需要建立一个主题,以便客户能够订阅
订阅者必须保持持续的活动状态以接收消息,否则会丢失未上线的消息
对于持久订阅,订阅者未连接时发布的消息将在订阅者重连时重发
传递方式
NON_PERSISTENT
默认传递方式,最多投递一次
可能降低内存和需要的存储器,当许需要接收所有消息时使用
PERSISTENT
将使用暂存后再转送的机制投递
如果一个JMS服务下线,持久性消息不会丢失,等服务恢复时再传递
集群应用中的问题
消费者做集群时
Queue模式
创建多个一样的Queue,每个应用消费自己的Queue
弊端
浪费空间,生产者还需要关注下游有几个消费者,违反了“解耦”初衷
Topic模式
业务上做散列,或通过分布式锁来实现不同节点的竞争
弊端
对业务侵入较大,解决方案不优雅
ActiveMQ通过“虚拟主题”解决了这个问题
生产中需要结合两种模式
不同节点的相同应用间存在竞争,会部分消费(P2P)
不同的应用都需要消费到全量的消息(Topic)
总结
JMS客户端API是标准化的,因此JMS应用程序可以在供应商的实现之间移植,但是
1、底层消息传递实现未指定,因此JMS实现之间没有互操作性。除非存在桥接技术,否则想要共享消息传递的Java应用程序必须全部使用相同的JMS实现
2、如果没有供应商特定的JMS客户端库来启用互操作性,则非Java应用程序将无法访问JMS
3、AMQP_0-9-1是一种消息传递协议,而不是JMS这样的API。任何实现该协议的客户端都可以访问支持AMQP_0-9-1的代理
4、协议级的互操作性允许以任何编程语言编写且在任何操作系统上运行的AMQP_0-9-1客户端都可以参与消息传递系统,而无需桥接不兼容的服务器实现
AMQP协议
架构
AMQP全称高级消息队列协议(Advanced_Message_Queuing_Protocol),是一种标准,类似JMS,兼容JMS协议
目前RabbitMQ主流支持AMQP_0-9-1,3.8.4版本支持AMQP 1.0
概念
Publisher
消息发送者,将消息发送到Exchange并指定RoutingKey,以便queue可以接收到指定的消息
Consumer
消息消费者,从queue获取消息,一个Consumer可以订阅多个queue以从多个queue接收消息
Server
一个具体的MQ服务实例,也称为Broker
Virtual host
虚拟主机,一个Server下可以有多个虚拟主机,用于隔离不同项目
一个Virtual Host通常包含多个Exchange、Message Queue
Exchange
交换器,接收Producer发送来的消息,把消息转发到对应的Message Queue中
Routing Key
路由键,用于指定消息路由规则(Exchange将消息路由到具体的queue中)
通常需要和具体的Exchange类型、Binding的Routing Key结合使用
Bindings
制定了Exchange和Queue之间的绑定关系
Exchange根据消息的Routing Key和Binding配置(绑定关系、Binding、RoutingKey等)来决定把消息分派到哪些具体的Queue中
Message Queue
实际存储消息的容器,并把消息传递给最终的Consumer
传输层架构
简要概述
AMQP是一个二进制协议,信息被组织成数据帧,有很多类型
数据帧携带协议方法和其他信息
所有数据帧都拥有基本相同的格式:帧头、负载、帧尾
数据帧负载的格式依赖于数据帧的类型
假定有一个可靠的面向流的网络传输层(TCP/IP等价协议)
在一个单一的socket连接中,可能有多个相互独立的控制线程,称为“channel”
每个数据帧使用通道号码编号
通过数据帧交织,不同的通道共享一个连接
对于任意给定通道,数据帧严格按照序列传输
我们使用小的数据类型来构造数据帧,如bit、integer、string以及字段表
数据帧的字段做了轻微的封装,不会让传输变慢或解析困难
根据协议规范机械的生成数据帧层相对简单
线级别的格式被设计为可伸缩和足够通用,以支持任意的高层协议(不仅是AMQP)
假定AMQP会扩展,改进以及随时间变化,并要求wire-level格式支持这些变化
数据类型
Integers(数值范围1-8的十进制数字)
用于表示大小、数量、限制等,整数类型无符号的,可以在帧内不对齐
Bits(统一为8字节)
用于表示开/关值
Short Strings
用于保存简短的文本属性,个数限制为255,8个字节
Long Strings
用于保存二进制数据块
Field tables
包含键值对,字段值一般为字符串、整数等
协议协商
AMQP客户端和服务端进行协议协商
意味着当客户端连接上之后,服务端会向客户端提出一些选项,客户端必须能接收或修改
如果双方都认同协商的结果,继续进行连接的建立过程
协议协商是一个很有用的技术手段,因为它可以让我们断言假设和前置条件
在AMQP中,需要协商协议的一些特殊方面
1、真实的协议和版本,服务器可能在同一个端口支持多个协议
2、双方的加密参数和认证方式,这是功能层的一部分
3、数据帧最大大小,通道数据以及其他操作限制
最限制条件的认同可能会导致双方重新分配key的缓存,避免死锁
每个发送的数据帧要么遵守认同的限制,即安全的,要么超过限制,此时另一方出错,必须断开连接
协商双方认同限制到一个小的值,如下
1、服务端必须告诉客户端它加上了什么限制
2、客户端响应服务器,或许会要求对客户端的连接降低限制
数据帧界定
TCP/IP是流协议,没有内置的机制用于界定数据帧
现有协议解决方法
1、每个连接发送单一数据帧,简单但是慢
2、在流中添加帧的边界,简单,但是解析很慢
3、计算数据帧的大小,在每个数据帧头加上该数值,简单、快速(AMQP的选择)
实现JMS客户端
RabbitMQ的JMS客户端用RabbitMQ_Java客户端实现,既与JMS兼容,也支持AMQP_0-9-1协议
局限性
RabbitMQ JMS客户端不支持某些JMS1.1功能
JMS客户端不支持服务器会话
XA事务支持接口未实现
RabbitMQ_JMS主题选择器插件支持主题选择器,队列选择器尚未实现
支持RabbitMQ连接的SSL和套接字选项,但仅使用RabbitMQ客户端提供的(默认)SSL连接协议
RabbitMQ不支持JMS_NoLocal订阅功能,该功能禁止消费者接收通过消费者自己的连接发布的消息
RabbitMQ使用AMQP协议,JMS规范仅对于Java使用做了规定
2. RabbitMQ 架构与实战
介绍
RabbitMQ是目前非常热门的一款开源消息中间件,不管是互联网行业还是传统行业都广泛使用
特性
1、高可靠性、易扩展、高可用、功能丰富
2、支持大多数编程语言客户端
3、遵循AMQP协议,自身采用Erlang编写
4、支持MQTT等其他协议
具有很强大的插件扩展能力,官方和社区提供了非常丰富的插件可供使用
整体逻辑架构
Producer
Broker
Exchange
Direct
Topic
Fanout
Bindings
Queues
Consumer
Exchange类型
fanout
会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中
direct
会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中
topic
在direct匹配规则上进行了扩展,也是将消息路由到BindingKey和RoutingKey相匹配的队列中,这里的匹配规则稍微不同
BindingKey和RoutingKey一样都是由“."分隔的字符串
BindingKey中可以存在两种特殊字符“*”和“#”,用于模糊匹配
“*” 匹配一个单词
“#” 匹配多个单词(可以是0个)
headers
该类型不依赖于路由键的匹配规则来路由信息,而是根据发送的消息内容中的headers属性进行匹配
在绑定队列和交换器时指定一组键值对,当发送的消息到交换器时,
RabbitMQ会获取到该消息的headers,对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,匹配,消息就路由到该队列
该类型性能很差,不实用
数据存储
存储机制
持久化消息
在达到队列时写入磁盘,同时在内存中保存一份备份,当内存吃紧时,消息从内存中清除,这会提高一定的性能
非持久化消息
一般只存于内存中,当内存压力大时数据刷盘处理,以节省内存空间
存储层组成
队列索引 rabbit_queue_index
维护队列的落盘消息的信息,如存储地点、是否已被给消费者接收、是否已被消费者ack等
每个队列都有相对应的索引
索引使用顺序的段文件来存储,后缀为.idx,文件名从0开始累加
每个段文件中包含固定的segment_entry_count条记录,默认16384
每个index从磁盘中读取消息时,至少要在内存中维护一个段文件
设置queue_inde_embed_msgs_below要格外谨慎,一点点增大也可能会引起内存爆炸式增长
消息存储 rabbit_msg_store
消息以键值对的形式存储到文件中,一个虚拟主机上的所有队列使用同一块存储,每个节点只有一个
存储分为持久化存储(msg_store_persistent)和短暂存储(msg_store_transient)
持久化存储的内容在broker重启后不会丢失,短暂存储的内容在broker重启后丢失
store使用文件来存储,后缀为.rdq,经过store处理的所有消息都会以追加的方式写入到该文件中
当该文件的大小超过指定的限制(file_size_limit)后,将会关闭该文件并创建一个新文件以供新消息写入
文件名从0开始累加
在进行消息存储时,RabbitMQ会在ETS表中记录消息在文件中的位置映射和文件相关信息
消息可以直接存储在index中,也可以存储在store中
最佳的方式是较小的消息存在index中,较大的存在strore中
大小的界定由queue_index_embed_msgs_below来配置,默认4096B
一个完整的消息大小小于该值,就放到索引中,否则放到持久化消息文件中
读取消息时,先根据消息ID找到对应的存储文件
如果文件存在且未被锁住,则打开文件,从指定位置读取消息内容
否则,发送请求由store进行处理
删除消息时,只是从ETS表删除指定消息的相关信息,同时更新消息对应的存储文件和相关信息
在执行消息删除操作时,并不立即对文件中的消息进行删除,仅仅是标记为垃圾数据而已
当一个文件中都是垃圾数据时,可以将这个文件删除
垃圾回收/文件合并
当检测到前后两个文件中的有效数据可以合并,且所有垃圾数据大小和所有文件(至少3个文件存在的情况下)数据大小的比值超过设置的阈值(garbage_fraction,默认0.5),会触发垃圾回收,将这两个文件合并
执行合并的两个文件一定是逻辑上相邻的两个文件
合并逻辑
1、锁定这两个文件
2、整理前面的文件的有效数据,再整理后面文件的有效数据
3、将后面文件的有效数据写入前面文件中
4、更新消息在ETS表中的记录
5、删除后面的文件
队列结构
组成
rabbit_amqqueue_process
负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消息、处理消息的确认(包括生产端的confirem和消费端的ack)等
backing_queue
消息存储的具体形式和引擎,向rabbit_amqqueue_process提供相关的接口以供调用
如果消息投递的目的队列为空,且有消费者订阅了这个队列,则该消息会直接发送给消费者,不经过队列
当消息无法直接投递给消费者时,需要暂时将消息存入队列,以便重新投递
消息存入队列中,不是固定不变的,它会随着系统的负载在队列中不断流动,消息状态会不断发生变化
RabbitMQ队列的消息状态
1、alpha
消息索引和消息内容都存内存,最耗内存,很少消耗CPU
2、beta
消息索引存内存,消息内容存磁盘
3、gama
消息索引内存和磁盘都有,消息内容存磁盘
持久化消息时的状态
4、delta
消息索引和内容都存磁盘,基本不消耗内存,消耗更多CPU/IO操作
运行时,RabbitMQ会根据消息传递的速度定期计算一个当前内存中能够保存的最大消息数量(target_ram_count),如果alpha状态的消息数量大于此值,则会引起消息的状态转换,多余的消息可能会转换到另外三种状态
区分4中状态的主要作用是满足不同的内存和CPU需求
对于普通没有设置优先级和镜像的队列,backing_queue的默认实现是rabbit_variable_queue
其内部通过5个子队列Q1、Q2、delta、Q3、Q4来体现消息的各个状态
消费者获取消息会引起消息状态转换
1、首先从Q4中获取消息,如果成功则返回
2、如果Q4为空,则尝试从Q3中获取消息
系统首先会判断Q3是否为空,为空则返回队列为空,表明队列中无消息
3、Q3不为空,则取出Q3中的消息
进而判断此时Q3和Delta中的长度,如果都为空,则认为Q2、Delta、Q3、Q4全部为空,此时将Q1中的消息直接转移至Q4,下次直接从Q4获取消息
4、Q3为空,Delta不为空
将Delta的消息转移至Q3中,下次可以直接从Q3获取消息
转移过程中,是按照索引分段读取的
消息堆积
系统负载较高时,消息若不能很快消费掉,这些消息就会进入到很深的队列中,这样会增加处理每个消息的平均开销
因为要花更多的时间资源来处理“堆积”的消息,如此用来处理新流入消息能力就会降低,使得后流入的消息又被积压到深队列中,持续恶化,使系统处理能力大大降低
处理方法
1、增加prefetch_count的值
即一次发送多条消息给消费者,加快消费速度
2、采用multiple_ack,降低处理ack带来的开销
3、流量控制
安装/配置
1、安装socat依赖
2、安装Erlang
3、安装RabbitMQ
rpm安装
启动RabbitMQ管理插件
rabbitmq-plugins enable rabbitmq_management
4、启动
systemctl start rabbitmq-server
rabbitmq-server -detached
后台启动
5、添加用户/授权/设置标签
rabbitmqctl add_user root 123456
rabbitmqctl set_permissions root -p / ".*" ".*" ".*"
rabbitmqctl set_user_tags root administrator
用户权限
None
没有访问management插件的权限
management
可以使用消息协议做任何操作的权限
加上
1、可以使用AMQP协议登录虚拟主机的权限
2、查看它们能登录的所有虚拟主机中所有队列、交换器和绑定的权限
3、查看和关闭它们自己的通道和连接的权限
4、查看它们能访问的虚拟主机中的全局统计信息,包括其他用户的活动
policymaker
所有management的权限
加上
1、在自己能通过AMQP协议登录的虚拟主机上查看、创建、删除策略以及虚拟主机参数的权限
monitoring
所有management的权限
加上
1、列出所有虚拟主机,包括列出不能使用消息协议访问的虚拟主机的权限
2、查看其他用户连接和通道的权限
3、查看节点级别的数据,如内存使用和集群的权限
4、查看真正的全局所有虚拟主机统计数据的权限
administrator
所有policymaker和monitoring的权限
加上
1、创建删除虚拟主机的权限
2、查看、创建、删除用户的权限
3、查看、创建、删除权限的权限
4、关闭其他用户连接的权限
常用命令
rabbitmq-server
前台启动
rabbitmq-server -detached
后台启动
rabbitmqctl stop
停止RabbitMQ和Erlang VM
rabbitmqctl list_queues
查看所有队列
rabbitmqctl list_vhosts
查看所有虚拟主机
rabbitmqctl start_app
在Erlang VM运行时启动RabbitMQ应用
rabbitmqctl status
查看节点状态
rabbitmq-plugins list
查看所有可用插件
rabbitmq-plugins enable/disable
启动/停用 插件
rabbitmqctl add_user
添加用户
rabbitmqctl list_users
列出所有用户
rabbitmqctl delete_user
删除用户
rabbitmqctl clear_permissions -p
清除用户权限
rabbitmqctl list_user_permissions
列出用户权限
rabbitmqctl change_password
修改密码
rabbitmqctl set_permissions -p ".*" ".*" ".*"
设置用户权限
rabbitmqctl add_vhost
创建虚拟主机
rabbitmqctl list_vhosts
列出所有虚拟主机
rabbitmqctl list_permissions -p
列出虚拟主机上所有权限
rabbitmqctl delete_vhost
删除虚拟主机
rabbitmqctl reset
移除所有数据,要在rabbitmqctl stop_app 之后使用
工作流程
发送消息
1、生产者连接RabbitMQ,建立TCP连接(Connection),开启信道(Channel)
2、生产者声明一个Exchange,并设置相关属性,如交换器类型,是否持久化等
3、生产者声明一个队列并设置相关属性,如是否排他、是否持久化、是否自动删除等
4、生产者通过routingkey将交换器和队列绑定起来
5、生产者发送消息至RabbitMQ_Broker,其中包含routingKey、交换器等信息
6、相应交换器根据接收到的routingKey查找相匹配的队列
7、如果找到,则将从生产者发送过来的消息存入相应的队列中
8、如果没找到,则根据生产者配置的属性选择丢弃还是回退
9、关闭信道
10、关闭连接
接收消息
1、消费者连接到RabbitMQ_Broker,建立一个连接(Connection),开启一个信道(Channel)
2、消费者向RabbitMQ_Broker请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作
3、等待RabbitMQ_Broker回应并投递相应队列中的消息,消费者接收消息
4、消费者确认(ack)接收到的消息
5、RabbitMQ从队列中删除相应已经被确认的消息
6、关闭信道
7、关闭连接
Connection和Channel
生产者和消费者,需要与RabbitMQ_Broker建立TCP连接,即Connection
一旦TCO连接建立起来,客户端紧接着创建一个AMQP信道(Channel),每个信道都会指派一个唯一ID
信道是建立在Connection上的虚拟连接,RabbitMQ处理的每条AMQP指令都是通过信道完成
RabbitMQ采用类似NIO的做法,复用TCP连接,减少性能开销,便于管理
每个信道流量不是很大时,复用单一的Connection可以在产生性能瓶颈时有效地节省TCP连接资源
信道本身流量很大时,一个Connection就会产生性能瓶颈,流量被限制,需要建立多个Connection,分摊信道
工作模式
Work Queue
生产者发消息,启动多个消费者实例来消费消息,每个消费者仅消费部分消息,可达到负载均衡的效果
发布/订阅模式
使用fanout类型,routingkey忽略。每个消费者定义生成一个队列并绑定到同一个Exchange,每个消费者都可以消费到完整的消息
在RabbitMQ中,生产者不是将消息直接发送给消息队列,实际上生产者根本不知道一个消息被发送到哪个队列
生产者将消息发送给交换器
交换器从生产者接收消息,将消息推送给消息队列
交换器必须清除知道要如何处理接收到的消息
是追加到一个指定的队列
还是追加到多个队列
还是丢弃
临时队列
无论何时连接RabbitMQ,都需要一个新的空队列,可以使用随机的名字创建队列,也可以让服务器生成随机的消息队列名字
一旦断开到消费者的连接,该队列应该自动删除
channel.queueDeclare().getQueue()
声明一个非持久化的、排他的、自动删除的队列
队列名字是服务器随机生成的
绑定
本模式中,Exchange在声明时不需要指定bindingKey
消息推拉
实现RabbitMQ的消费者有两种模式:Push/Pull
实现Push模式推荐的方式是继承DefaultConsumer基类,也可以使用Spring_AMQP的SimpleMessageListenerContainer
实现Pull模式RabbitMQ的Channel提供了basicGet()
默认交换器
如果不指定交换器,有时也可以将消息发送到指定队列,此时使用的是默认交换器
默认交换器没有名字
channel.basicPublish("", "log", null, message.getBytes())
第一个参数是交换器名称,为空字符串
路由模式
使用direct类型的Exchange,发N条消息并使用不同的routingKey,消费者定义队列并将队列、routingKey、Exchange绑定,此时必须要routingKey完全匹配的情况下消息才会被转发
绑定
消费者通过queueBind获取自己想要的消息
channel.queueBind(queueName, EXCHANGE_NAME, "black")
主题模式
使用topic类型的交换器,队列绑定到交换器,bindingkey时使用通配符,交换器将消息路由转发到具体队列时会根据消息routingkey模糊匹配,比较灵活
topic类型,routingkey/bindingkey必须是点分单词
如:stock.usd.nyse
该点分单词字符串最长255字节
只要队列的bindingkey和消息的routingkey匹配,队列就可以收到该消息
1、“*” 匹配一个单词
2、“#” 匹配0到多个单词
如果在topic类型中bindingkey使用 # ,则是 fanout 类型交换器的行为
如果在topic类型中bingdingkey不使用 * 和 # ,则是 direct 类型交换器的行为
整合Spring
spring-amqp是对AMQP的一些概念的抽像,spring-rabbit是对RabbitMQ操作的封装实现
主要核心类
RabbitAdmin
完成对Exchange、Queue、Binding的操作,在容器中管理了RabbitAdmin的时候,可以对Exchange、Queue、Binding进行自动声明
RabbitTemplate
发送和接收消息的工具类
SimpleMessageListenerContainer
消费消息的容器
可以基于注解或配置文件开发
3. Rabbit高级特性
消息可靠性
场景
支付平台必须保证数据正确性,保证数据并发安全性,保证数据最终一致性
方式
1、分布式锁
能够保证数据强一致性
高并发场景可能有性能问题
2、消息队列
异步、高并发
有一定延时、数据弱一致性
可以从以下几方面来保证消息的可靠性
1、客户端代码中的异常捕获,包括生产者/消费者
2、AMQP/RabbitMQ的事务机制
3、发送端确认机制
4、消息持久化机制
5、Broker端的高可用集群
6、消费者确认机制
7、消费端限流
8、消息幂等性
异常捕获机制
先执行业务操作,操作成功后执行消息发送,消息发送过程通过try/catch方式捕获异常,在异常处理的代码块中执行回滚或重发操作等
最大努力确保的方式,并无法保证100%绝对可靠
可以通过配置开启发送端重试
spring.rabbitmq.template.retry.enabled=true
AMQP/RabbitMQ事务机制
没有捕获到异常并不能代表消息一定投递成功
一直到事务提交后都没有异常,确实就说明消息是投递成功了
这种方式在性能方面开销很大,一般不推荐使用
发送端确认机制
RabbitMQ后来引入了一种轻量级的方式,叫做发送方确认(publisher_confirm)机制
生产者将信道设置成confirm模式,一旦信道进入该模式,所有在该信道上发布的消息都会被指派一个唯一ID(从1开始)
一旦消息被投递到所有匹配的队列之后(如果消息和队列是持久化的,那么确认消息会在消息持久化后发出),RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这样生产者就知道消息已经正确送达了
RabbitMQ回传给生产者的确认消息中的deliveryTag字段包含了确认消息的序号
另外,通过设置channel.BasicAck()中的multiple参数,表示到这个序号之前的所有消息是否都已经得到了处理
生产者投递消息后不需要一直阻塞着,可以继续投递下一条消息并通过回调方式处理ACK响应
如果RabbitMQ因为自身内部错误导致消息丢失等异常消息发生,就会响应一条nack(Basic.Nack)命令,生产者应用程序同样可以在回调方式中处理该命令
waiForConfirm()
有个重载,可以自定义timeout超时时间,超时后会抛出TimeoutException
类似还有waitForConfirmsOrDie(),Broker端在返回nack后,方法会抛出IOException
该方式运行中是同步阻塞模式的,性能并不是太好
实际中,可以通过“批处理”的方式来改善整体性能
即批量发送消息后仅调用一次waitForConfirms()
还可以通过异步回调的方式来处理Broker的响应
addConfirmListener()可以添加ConfirmListener回调接口参数
ConfirmListener包含handleAck()和handleNack()
持久化存储机制
持久化是提高RabbitMQ可靠性的基础,否则当RabbitMQ遇到异常时,数据将会丢失
主要从以下几个方面保障消息的持久性
1、Exchange持久化
通过定义时设置durable参数来保证Exchange相关的元数据不丢失
2、Queue持久化
通过定义时设置durable参数来保证Queue相关的元数据不丢失
3、消息的持久化
通过将消息的投递模式设置为2即可实现消息的持久化,保证消息自身不丢失
deliveryMode = 2
RabbitMQ中的持久化消息都需要写入磁盘
当系统内存不足时,非持久化的消息也会被刷盘处理
所有的处理动作都是在“持久层”中完成的
持久层是一个逻辑概念,包含
1、队列索引
2、消息存储
RabbiMQ通过配置queue_index_embed_msgs_below可以根据消息大小决定存储位置,默认4096字节(包含消息体、属性及headers)
小于该值的存在队列索引中
消费者ACK
发送端确认和消息持久化存储,依然无法保证整个过程的可靠性
因为消息可能在消费过程中业务处理失败
RabbitMQ在消费端会有ACK机制,即消费端消费消息后需要发送Ack确认报文给Broker端,告知自己是否已消费完成,否则可能会一直重发消息直到消息过期(AUTO默认)
处理模式
1、NONE模式
消费的过程中自行捕获异常,引发异常后直接记录日志并落到异常恢复表,再通过后台定时任务扫描异常恢复表尝试做重试动作
如果业务不自行处理则有丢失数据的风险
2、AUTO模式
不主动捕获异常,当消费过程中出现异常会将消息放回Queue中
然后消息会被重新分配到其他消费者节点重新被消费
默认会一直重发消息并直到消费完成返回Ack或一直到过期
3、MANUAL模式
消费者自行控制流程并手动调用channel相关的方法返回Ack
相关配置
spring.rabbitmq.listener.simple
retry.max-attempts = 5
最大重试次数
retry.enabled = true
是否开启消费者重试
retry.initial-interval = 5000
重试间隔时间(ms)
default-requeue-rejected = false
重试超过最大次数后是否拒绝
acknowledge-mode = manual
ack模式
消费端限流
当消息投递速度远快于消费速度时,随着时间积累就会出现“消息积压”
消息中间件本身具备一定的缓冲能力,但这个能力是有容量限制的,如果长期运行并没有任何处理,最终会导致Broker崩溃
解决方案(QOS/限流)
1、RabbitMQ可以对内存和磁盘使用量设置阈值
说明
当达到阈值后,生产者将被阻塞,直到对应项指标回复正常
全局上可以防止超大流量,消息积压等导致的Broker被压垮
当内存受限或磁盘可用空间受限时,服务器都会暂时阻止连接,服务器将暂停从发布消息的已连接客户端的socket读取数据,连接心跳监视也将被禁用
所有网络连接将在rabbitmqctl和管理插件中显示“已阻止”
配置 rabbitmq.conf
disk_free_limit.absolute = 50000
设置磁盘可用空间大小,单位字节
当磁盘可用空间低于这个值时,发出磁盘警告,触发限流
如果设置了相对大小,则忽略此绝对大小
disk_free_limit.relative = 2.0
设置磁盘可用空间相对于总可用内存的相对值
当相对值低于此值时,触发限流
此相对值不能低于1.0
vm_memory_high_watermark.absolute = 1073741824
设置内存可用大小的绝对值
如果设置了相对值,则忽略该值
vm_memory_high_watermark.relative = 0,4
设置内存限流阈值,表示阈值和总可用内存的比值
总可用内存表示操作系统给每个进程分配的大小,或实际内存大小
如32位windows,系统给每个进程最大2G的内存,则此比值表示阈值为800MB
注:RabbitMQ 3.6.0开始,绝对值支持计量单位
k,kiB:2^10 - 1024bytes
M,MiB:2^20 - 1048576bytes
G,GiB:2^30 - 1073741824bytes
kb:10^3 - 1000bytes
MB:10^6 - 1000000bytes
GB:10^9 - 1000000000bytes
2、RabbitMQ默认提供一种基于credit_flow的流控机制,面向每一个连接进行流控
当单个队列达到最大流速时,或多个队列达到总流速时,都会触发流控
触发单个连接的流控可能是因为connection、channel、queue的某一个过程处于flow状态,这些状态都可以在监控平台看到
3、RabbitMQ中有一种QoS保证机制,可以限制channel上接收到的未被Ack的消息数量
超过这个数量限制RabbitMQ将不会再往消费端推送消息,这是一种流控手段,可以防止大量消息瞬时从Broker送达消费端造成消费端巨大压力(甚至压垮消费端)
QoS机制仅对于消费端推模式有效,对拉模式无效,且不支持NONE_ACK模式
执行channel.basicConsume()前通过channel.basicQoS()可以设置该数量
消息发送时异步的,消息确认也是异步的,在消费者消费慢时,可以设置QoS的prefetchCount,它表示broker在向消费者发送消息时,一旦发送prefetchCount个消息而没有一个消息确认时就停止发送
消费者确认多少,broker就发送多少,消费者等待处理的个数永远限制在prefetchCount个
如果对于每个消息都发送确认,增加了网络流量,此时可以批量确认消息,设置multiple为true
消费者确认时,如果id为8的消息确认了,则表明8之前的消息都确认了
在下游,期望消费端能尽快消费完消息,且还要防止瞬时大量消息压垮消费端(推模式),所以期望消费端处理速度是最快、最稳定且相对均匀的(理想化)
提升下游应用的吞吐量和缩短消费过程的耗时,主要优化方式
1、优化应用程序的性能,缩短响应时间
2、增加消费者节点实例
成本增加,且底层数据库操作可能成为瓶颈
3、调整并发消费的线程数
线程数并非越大越好,需要大量压测调优
消息可靠性保障
1、消息可靠传输
业务系统接入消息中间件时首要考虑的问题
一般消息中间件的消息传输保障分为三层,RabbitMQ支持其中的“最多一次”和“最少一次”
1、At most once
最多一次,消息可能会丢失,但绝不会重复传输
2、At least once
最少一次,消息绝不会丢失,但可能会重复传输
3、Exactly once
恰好一次,每条消息肯定会被传输一次且仅传输一次
最少一次的投递实现需要考虑
消息生产者需要开启事务机制或发送端确认机制,以确保消息可以可靠地传输到RabbitMQ中
消息生产者需要配合使用mandatory参数或备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不会被丢弃
消息和队列都需要进行持久化处理,以确保RabbitMQ服务器在遇到异常情况时不会造成消息丢失
消费者在消费消息时需要将autoAck设置为false,然后通过手动确认的方式确认已经正确消费的消息,以避免在消费端引起不必要的消息丢失
恰好一次RabbitMQ目前无法保障
2、各种限流、应急手段
3、业务层面的一些容错、补偿、异常重试等
消息幂等性处理
追求高性能就无法保证消息的顺序,追求可靠性就可能产生重复消息
解决重复消息的方式是,在消费端让消费消息的操作具备幂等性
一个幂等操作的的特点是:其任意多次致性所产生的影响均与一次执行的影响相同
对于幂等的方法:不用担心重复执行会对系统造成任何改变
常见做法
1、借助数据库唯一索引,重复插入直接报错,事务回滚
2、前置检查机制
排他锁、乐观锁、CAS机制
3、唯一Id机制
比较通用的做法,对于每条消息生成唯一Id,消费前判断Tair中是否存在(Id做Tair排他锁的Key),消费成功后将状态写入Tair
可靠性分析
在RabbitMQ中可以使用Firehose功能来实现消息追踪,Firehose可以记录每一次发送或消费消息的记录,方便RabbitMQ的使用者进行调试、排错等
Firehose
原理
将生产者投递给RabbitMQ的消息,或RabbitMQ投递给消费者的消息按照指定的格式发送到默认的交换器上
这个默认的交换器是amp.rabbitmq.trace,是一个topic类型的交换器
发送到这个交换器上的消息的路由键是
publish.{exchangename}
对应生产者投递到交换器的消息
deliver.{queuename}
对应消费者从队列中获取的消息
命令
开启
rabbitmqctl trace_on [-p host]
关闭
rabbitmqctl trace_off [-p host]
说明
默认情况下处于关闭状态,且它的状态是非持久化的,会在RabbitMQ重启时还原成默认状态
它开启后多少会影响RabbitMQ整体服务性能,因为它会引起额外的消息生成、路由和存储
rabbitmq_tracing插件
说明
相当于Firehose的GUI版本,同样能跟踪RabbitMQ中消息的流入流出情况
同样会对流入流出消息进行封装,然后将封装后的消息日志存入相应的trace文件中
命令
开启
rabbitmq-plugins enable rabbitmq_tracing
关闭
rabbitmq-plugins disable rabbitmq_tracing
TTL机制
Time_to_Live,即过期时间,RabbitMQ可以对消息和队列两个维度来设置TTL
任何消息中间件的容量和堆积能力都是有限的,如果有一些消息总是不被消费掉,那么需要有一种过期的机制来兜底
目前有两种方法可以设置消息的TTL
1、通过Queue属性设置,队列中所有消息都有相同的过期时间
2、对消息自身进行单独设置,每条消息的TTL可以不同
两种方法一起使用,则消息的TTL以两者之间较小数据为准
通常来讲,消息在队列中的生存时间一旦超过设置的TTL值时,就会变成“死信”,消费者默认就无法再收到消息
设置方式
通过命令行方式设置全局TTL
rabbitmqctl set_policy TTL '.*' '{"message-ttl": 30000}' --apply-to queues
代码方式
通过restful api方式
默认规则
1、如果不设置TTL,则表示消息不会过期
2、如果TTl设置为0,则表示除非此时可以直接将消息投递给消费者,否则该消息会被立即丢弃
死信队列
在定义业务队列时可以考虑指定一个死信交换器,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样方便看消息失败的原因
DLX,全称为Dead-Letter-Exchange,死信交换器
消息在一个队列中变成死信之后,被重新发送到一个特殊的交换器(DLX)中,同时,绑定DLX的队列就称为”死信队列“
导致消息变为死信的原因
1、消息被拒绝,且设置request参数为false
Basic.Reject
Basic.Nack
2、消息过期
3、队列达到最大长度
对于RabbitMQ来说,DLX是一个非常有用的特性
它可以处理异常情况下,消息不能够被消费者正确消费而被置入死信队列中的情况
后续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统
延迟队列
延迟消息指消息发送出去后并不想立即就被消费,而是需要等一段时间后才触发消费
在AMQP和RabbitMQ都没有相关的规定和实现,但是可以借助”死信队列“变相实现
实现:rabbit_delayed_message_exchange插件
步骤
1、生产者将消息和路由键发送到指定的延时交换器
2、延时交换器存储消息并等待至消息过期,根据路由键找到绑定的队列并把消息给它
3、队列再把消息发送给监听它的消费者
4. Rabbit集群
原理
集群实践
对于无状态应用很容易实现负载均衡、高可用集群,而对于有状态的系统就比较复杂
主备模式、主从模式、主主模式、分片集群、异地多活
常用负载均衡算法
随机、轮询、加权轮询、最少活跃连接、hash(一致性hash)
集群经典问题
脑裂、网络分区、一致性、可用性
架构模式
主备模式
也叫Warren模式,同一时刻只有一个节点在工作(备份节点不能读写),当主节点发生故障后会将请求切换到备份节点上(主恢复后成为备份节点)
需要借助HAProxy之类的负载均衡器来做健康检查和主备切换,底层需要借助共享存储(如SAN设备)
这不是RabbitMQ官方或开源社区的推荐方案,适用于访问压力不是特别大但是又有高可用架构需求的中小规模系统来使用
首先有一个节点闲置,本身就是资源浪费
其次共享存储往往需要借助硬件存储,或分布式文件系统
Shovel铲子模式
Shovel是一个插件,用于实现跨机房数据复制/数据迁移,故障转移与恢复等
使用Shovel后,架构模型成为了近端同步确认,远端异步确认的方式
支持WAN传输,且Broker实例的RabbitMQ、Erlang版本不要求完全一致
Shovel的配置分静态模式(修改RabbitMQ配置)和动态模式(在控制台直接部署,重启后失效)
RabbitMQ集群
功能
允许消费者和生产者在RabbitMQ单个节点崩溃的情况下继续运行,并可以通过添加更多的节点来线性扩展消息通信的吞吐量,当失去一个节点时,客户端能够重新连接到集群中的任何其他节点并继续生产/消费
元数据
RabbiMQ集群中的所有节点都会备份所有的元数据信息
1、队列元数据:队列的名称和属性
2、交换器:交换器的名称和属性
3、绑定关系元数据:交换器和队列或交换器之间的绑定关系
4、vhost元数据:为vhost内的队列、交换器和绑定提供命名空间及安全属性
说明
基于存储空间和性能的考虑,RabbitMQ集群中的各节点存储的消息是不同的(有点类似分片集群,各节点数据并不是全量对等的),各节点之间同步备份的仅仅是上述元数据以及Queue_Owner(队列所有者,即实际创建Queue并保存消息数据的节点)的指针
集群中某个节点崩溃后,该节点的队列进程和关联的绑定都会消息,关联的消费者也会丢失订阅信息,节点恢复后消息可以重新被消费
总的来说,该集群模式只能保证集群中的某个Node挂掉后应用程序还可以切换到其他Node继续发送/消费消息,但并无法保证原有的消息不丢失,所以并不是一个真正意义的高可用集群
如果需要消费节点a中队列A中的消息
1、应用程序直接连接节点a或通过负载均衡将请求路由到节点a,这和单机无区别
2、应用连接节点b或被路由到节点b,则节点b会通过Owner指针找到节点a获取实际消息数据,相当于担任了代理角色
Erlang语言天然具备分布式特性,所以不需要借助类似ZK之类的组件来实现集群
集群节点间使用Cookie来进行通信验证,所有节点必须使用相同的.erlang.cookie文件内容
不同节点的Erlang、RabbitMQ版本必须一致
镜像队列模式
说明
RabbitMQ内置的集群模式有丢失消息的风险,镜像队列可以看作是对内置默认集群模式的一种高可用架构的补充
可以将队列镜像(同步)到集群中的其他Broker上,相当于时多副本冗余,如果集群中的一个节点失效,队列能自动地切换到集群中的另一个镜像节点上以保证服务的可用性,且消息不丢失
原理
RabbitMQ镜像队列中的master和slave都仅仅是对某个queue而言的,而不是针对节点
一个queue第一次创建所在的节点是它的master节点,其他节点为slave节点
如果master由于某种原因失效,最先加入的slave会被提升为新的master
无论客户端请求到达master还是slave,最终数据都是从master获取
当请求到达master节点时,master节点直接将消息返回client,同时master会通过GM协议(Guaranteed_Multicast)将queue的最新状态广播到slave节点
GM保证了广播消息的原子性,即要么都更新要么都不更新
当请求到达slave节点时,slave节点需要将请求先重定向到master节点,master返回client
Federation联邦模式
类似Shovel,也是一个实现跨集群、节点消息同步的插件
支持联邦交换器、联邦队列(作用在不同级别)
Federation插件允许配置一个exchanges_federation或queues_federation
一个exchanges/queues federation允许从一个或多个upstream接收消息,即远程的exchanges/queues
Federation/Shovel都只是解决消息数据传输的问题,但是跨机房跨城市的网络延迟问题是客观存在的
异地多活
单机多实例部署
注意事项
多个RabbitMQ使用的端口号不能冲突
多个RabbitMQ使用的磁盘存储路径不能冲突
多个RabbitMQ的配置文件也不能冲突
集群管理
1、搭建多台RabbitMQ主机
2、通过拷贝同步所有主机的.erlang.cookie文件
如果没有此文件,手动创建/var/lib/rabbitmq/.erlang.cookie,生成Cookie字符串,或启动一次RabbitMQ自动生成
生产中推荐使用第三方工具生成.erlang.cookie文件
拷贝后应将文件所有者改为rabbitmq:rabbitmq
3、启动rabbitmq
systemctl start rabbitmq-server
4、选择一个节点,其他节点操作加入集群
rabbitmqctl stop_app
停止RabbitMQ应用,保持ErlangVM的运行
rabbitmqctl reset
加入集群前清除当前主机的所有数据
rabbitmqctl join_cluster rabbitmq@mainnode
将当前主机加入到指定集群
注意集群主机之间要能ping通
join_cluster 默认使用disk模式,可加入参数--ram启用内存模式
rabbitmqctl start_app
启动当前主机的RabbitMQ应用
5、将当前节点移除集群
rabbitmqctl forget_cluster_node rabbit@mainnode
移出集群后,节点还保留集群信息,启动还是会尝试加入集群,但是会被拒绝,可以通过重置解决
6、操作
rabbitmqctl set_cluster_name
修改集群名称
rabbitmqctl cluster_status
查看集群状态
镜像集群配置
RabbitMQ中队列的内容是保存在单个节点本地的,跟交换器和绑定不同,它们是对于集群中所有节点的
如此,队列内容存在单点故障,解决方式之一是使用镜像队列,在多个节点上拷贝队列的副本
每个镜像队列包含一个master,若干个镜像
master存在于称为master的节点上,所有操作都是首先对master执行,之后广播到镜像
镜像意味着集群,不应该WAN使用
发布到队列的消息会拷贝到该队列所有的镜像,消费者连接到master,当消费者对消息确认后,镜像删除master确认的消息
队列的镜像提供了高可用,但是没有负载均衡
可以使用策略随时更改队列的类型,可以首先创建一个非镜像队列,然后使用策略将其配置为镜像队列或反过来
非镜像队列没有额外的基础设施,因此可以提供更高的吞吐率
master选举策略
1、最长的运行镜像升级为主镜像
前提是假定它于主镜像完全同步
如果没有与主服务器同步的镜像,则仅存在于主服务器上的消息将丢失
2、镜像认为所有以前的消费者都已突然断开连接
它重新排队已传递给客户端但正在等待确认的所有消息,这包括客户端已为其发出确认的消息
如,确认实在到达节点托管队列主节点之前在线路上丢失了,还是在从主节点广播到镜像时丢失了
这两种情况下,新的主服务器都别无选择,只能重新排队它尚未收到确认的所有消息
3、队列故障转移时请求通知的消费者将收到取消通知
当镜像队列发生了master的故障转移,系统就不知道向哪些消费者发送了哪些消息
已经发送的等待确认的消息会重新排队
4、重新排队的结果
从队列重新使用的客户端必须意识到,它们很可能随后会收到已经收到的消息
5、当所选镜像成为主镜像时
在此期间发布到镜像队列的消息将不会丢失(除非在提升的节点上发生后续故障)
发布到承载队列镜像的节点的消息将路由到队列主服务器,然后复制到所有镜像
如果主服务器发生故障,则消息将继续发送到镜像,并在完成向主服务器的镜像升级后将其添加到队列中
6、即使主服务器(或任何镜像)在正在发布的消息和发布者收到的确认之间失败,有客户端使用发布者确认发布的消息仍将得到确认
从发布者的角度来看,发布到镜像队列与发布到非镜像队列没有什么不同
负载均衡 HAProxy
HAProxy是一款开源免费,并提供高可用、负载均衡以及基于TCP和HTTP协议的代理软件,可以支持四层、七层负载均衡
LVS是工作在内核模式(IPVS),支持四层负载均衡
Nginx支持七层的负载均衡(后期版本也支持四层了)
使用HAProxy来做RabbitMQ的负载均衡,通过暴露VIP给上游的应用程序直接连接,上游不感知底层的RabbitMQ的实例节点信息
监控
RabbitMQ自带的管理控制台功能比较丰富,且自带的命令行工具比较强大
但是这些工具不具备告警功能,实际生产环境中,需要知道负载情况和运行监控状态,且发生问题后需要触发告警
传统的监控平台Nagios、Zabbix等均提供了RabbitMQ相关的插件支持
Prometheus监控平台也提供了Rabbitmq_exporter,介个Grafana的dashboard
5. 源码剖析
队列
交换器
持久化
rabbit_channel进程确定了消息将要投递的目标队列,rabbit_amqqueue_process是队列进程,每个队列都有一个对应的进程,实际上rabbit_amqqueue_process进程只是提供了逻辑上对队列的相关操作,它的真正操作是通过backing_queue模块提供的相关接口实现的
backing_queue的实现模块是rabbit_variabel_queue
启动过程
消息发送
消息消费
0 条评论
下一页