RabbitMQ
2024-10-31 23:05:48 29 举报
AI智能生成
RabbitMQ梳理(基于RabbitMQ官方文档的Java 入门、Spring AMQP入门 + AMQP091概念文章 + Spring BootAMQP官方文档) 2024年9月3日18:13:32:调整思维导图的样式风格。 2024年10月17日18:33:41:完善内容 2024年10月24日19:23:09:堂堂大更新! 2024年10月31日23:04:41:自我认为是补充完了,工作中能cover到的基本都有了,后续应该是不考虑再继续补充了,当然如果后面要开其他MQ中间件的坑,会在这里进行重构的。
作者其他创作
大纲/内容
应用场景
解耦
流量削峰
异步
缓冲
AMQP协议模型
AMQP 091(当前用的,RabbitMQ当前版本的核心协议)
协议模型
生产者生产消息,将消息发送到交换机,接着交换机使用成为Bindings的规则,将消息副本分发到队列,然后代理将消息传送给订阅队列的消费者,或者消费者按需从队列中提取消息。
AMQP 091的消息确认概念
当消息传递给消费者时,消费者会通知代理,通知方式可以是自动的,也可以是应用程序开发人员选择通知时立即通知。使用消息确认时,代理只有在收到该消息(或一组消息)的通知时才会从队列中完全删除该消息。
AMQP 091的消息特殊处理机制
在某些情况下,例如,当无法路由消息时,消息可能会被返回给发布者、被丢弃,或者,如果代理实施了扩展,则被放入所谓的“死信队列”。发布者通过使用某些参数发布消息来选择如何处理此类情况。
Streams(原生支持)
AMQP 1.0(原生支持,4.0版本这个协议成为核心协议,不过4.0最近2024年10月31日22:17:00才发布,暂时观望,不深入了。)
MQTT
RabbitMQ通过插件拓展支持
STOMP
RabbitMQ通过拓展代理AMQP091协议的插件来支持该协议
AMQP091 VS AMQP 1.0
091
事务
AMQP 0.9.1 提供有限的支持,而 AMQP 1.0 目前不支持事务
直接回复
OAuth 2.0 令牌刷新
Channel Interceptor
1.0
更精细粒度的流量控制
实时度更高的队列位置
定义明确的类型
更好地定义消息标头
增强的消息完整性
Stream消息保真度
核心概念(基于AMQP091)
生产者
事务(代码端,没用过),不要抱太高期待,局限性很大。基本不用,被发布确认机制吊打,相较于发布确认机制吞吐量会降低 250 倍。
消费者
订阅消息(推荐,目前基本都是这种)
轮询消息(不推荐,效率低)
消息确认
问题:
代理何时应该从队列中删除消息?
确认模式
代理向应用程序发送消息后确认(自动确认模型)
basic.deliver 或 basic.get-ok
在应用程序发回确认后确认(手动确认模型)
basic.ack
特殊情况处理:
如果消费者在未发送确认的情况下死亡,则代理会将其重新交付给另一个消费者,或者,如果当时没有可用的消费者,则代理将等到至少一个消费者注册到同一队列中,然后再尝试重新交付。
消息拒绝
可以通过拒绝消息来向代理指示消息处理失败(或当时无法完成)。拒绝消息时,应用程序可以要求代理放弃或重新排队。
否定确认
basic.reject无法拒绝多条消息,但是RabbitMQ提供了一个 AMQP 0-9-1 扩展称为否定确认或 nack。
预取消息
对于多个消费者共享一个队列的情况,在发送下一个确认之前,能够指定每个消费者一次可以发送多少条消息是有用的。这可以用作一种简单的负载平衡技术,或者在消息倾向于批量发布的情况下提高吞吐量。例如,如果一个生产应用程序由于其所做工作的性质而每分钟发送一条消息。
connection(http)
RabbitMQ 支持的所有协议都是基于 TCP 的,并采用长期连接(每个协议操作不会打开新连接)以提高效率。
Channel(AMQP),就是个连接池,共享单个 TCP 连接的轻量级连接
AMQP 0-9-1 为连接提供了一种通过单个 TCP 连接进行多路复用的方法。这意味着应用程序可以在单个连接上打开多个称为通道的 “轻量级连接”。
exchange交换机,将消息路由到匹配的队列中
交换机的类型
direct(默认的也是这个类型,比较特殊的是它的交换机名的空字符串,所有队列以队列名作为路由键绑定到这个交换机上)
Routing Key==Binding Key
一对一定点发送
一对多(队列)组播
fanout
无视routing key,路由到所有与它绑定的Queue中
所用于广播(队列)
topic
Routing Key模糊匹配
多用于(队列)组播
headers
不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的header属性进行匹配。
交换机的属性
name,交换机名称
Durability持久性,MQ重启后,交换机还在
Auto-delete
当最后一个队列和交换机解绑时,交换机会自动删除
Internal
If yes, clients cannot publish to this exchange directly. It can only be used with exchange to exchange bindings.
参数
可选的各种参数,根据插件定义
alternate-exchange
如果无法以其他方式路由到此交换器的消息,请将它们发送到此处指定的备用交换器。
默认提供的交换机
Direct exchange
(Empty string)((默认交换机)
特殊属性:
创建的每个队列都使用与队列名称相同的路由键自动绑定到它
ps:默认交换不允许绑定/取消绑定操作。绑定到默认交换的操作将导致错误
amq.direct
根据消息路由键将消息传递到队列。直接交换是消息单播路由(单队列)的理想选择。它们也可用于组播路由(多队列)。
Fanout exchange
amq.fanout
将消息路由到绑定到它的所有队列,并忽略路由键。扇出交换非常适合消息的广播路由。
Topic exchange
amq.topic
根据消息路由密钥与用于将队列绑定到交换的模式之间的匹配,将消息路由到一个或多个队列。通常用于消息的组播路由
Headers exchange
忽略路由键属性。相反,用于路由的属性取自 headers 属性。如果标头的值等于绑定时指定的值,则认为消息匹配。
amq.match
amq.headers
PS:可以使用多个header值,但是需要用x-match参数指定匹配任意还是全部header
queue队列
队列的属性
name
Durable 持久化
MQ重启后队列还在
参数
各种,如消息的TTL、队列长度
Auto expire
队列在被自动删除之前可以不使用多长时间(毫秒)
参数x-expires,数值
Max length
队列在开始从其头部删除消息之前可以包含多少(就绪)消息。
队列在开始从其头部删除消息之前可以包含多少(就绪)消息。
参数x-max-length,数值
Max length bytes
队列在开始从其头部删除消息之前可以包含的就绪消息的总正文大小。
队列在开始从其头部删除消息之前可以包含的就绪消息的总正文大小。
参数x-max-length-bytes,数值
Message TTL
发布到队列的消息在被丢弃之前可以存活多长时间(毫秒)。
发布到队列的消息在被丢弃之前可以存活多长时间(毫秒)。
参数x-message-ttl,数值
Overflow behaviour
设置队列溢出行为。这决定了当达到队列的最大长度时,消息会发生什么情况。有效值为 drop-head、reject-publish 或 reject-publish-dlx。仲裁队列类型仅支持 drop-head 和 reject-publish。
参数x-overflow,字符串
Single active consumer
如果设置,请确保一次只有一个消费者从队列中消费,并故障转移到另一个注册的消费者,以防活动消费者被取消或死亡。
参数x-single-active-consumer,布尔值
Dead letter exchange
消息被拒绝或过期时将重新发布到的交换器的可选名称。
消息被拒绝或过期时将重新发布到的交换器的可选名称。
参数x-dead-letter-exchange,,字符串
Dead letter routing key
当消息是死信时使用的可选替换路由密钥。如果未设置,则将使用消息的原始路由密钥。
当消息是死信时使用的可选替换路由密钥。如果未设置,则将使用消息的原始路由密钥。
参数x-dead-letter-routing-key,字符串
Leader locator
设置在节点集群上声明时队列领导者的定位规则。有效值为客户端本地(默认)和平衡。
参数x-queue-leader-locator,字符串
延迟队列(delayed)
①基于死信队列实现延迟队列(TTL+Dead Letter Exchanges)
①声明队列时,通过x-dead-letter-exchange去指明这个队列的死信交换机,队列中出现死信后,死信会重新投递到死信交换机中
②队列不指定消费者,设置过期时间,到期后消息变死信
②开一个队列绑到死信交换机上,消费者监听这个队列去处理死信,达到一个延迟消费的效果
②安装插件,集成新的交换机类型
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
使用x-delayed-message声明交换机
消息带x-delay头
延迟时间内,消息不会进入队列,时间到了才会进入队列
懒惰队列lazy queue(默认就具备这个属性)
通过设置队列参数实现懒惰队列,消息会先落盘
声明队列时,加上x-queue-mode=lazy参数
ps:这个和延迟队列的使用场景不同,它就要是为了减轻内存的压力,不过这个模式的磁盘IO会比较多
优先级队列
默认情况:基于队列先进先出的特性,通常来说,先入队的先投递
设置优先级之后:优先级高的消息更大几率先投递
关键参数:x-max-priority
设置优先级之后:优先级高的消息更大几率先投递
关键参数:x-max-priority
队列在声明时可以指定参数:x-max-priority
默认值:0 此时消息即使设置优先级也无效
指定一个正整数值:消息的优先级数值不能超过这个
默认值:0 此时消息即使设置优先级也无效
指定一个正整数值:消息的优先级数值不能超过这个
死信交换Dead Letter Exchanges和死信队列
当消息过期或被拒绝时,RabbitMQ 支持将这些消息转发到死信队列。你可以为队列配置死信队列,通过 x-dead-letter-exchange 和 x-dead-letter-routing-key 属性来指定消息在过期或被拒绝后应该路由到的交换器和路由键。
死信消息的来源?
拒绝,直接丢弃不重新入队
队列溢出,先进先出,被挤出去的会变死信
超时
死信怎么处理?
会回到死信交换机再路由到特定的死信队列
不处理丢弃
触发告警通知
入库持久化处理
消费端监听死信队列对死信消息进行消费
使用注意:
在使用队列之前,必须声明它。声明队列将导致创建它(如果它尚不存在)。如果队列已经存在并且其属性与声明中的属性相同,则声明将不起作用。当现有队列属性与声明中的属性不同时,将引发代码为 406 (PRECONDITION_FAILED) 的通道级异常。
bindings绑定
交换机和队列之间的绑定
交换机和交换机之间的绑定(备用交换机、死信交换)
ps:如果消息无法路由到任何队列(例如,由于消息发布到的 Exchange 没有绑定),则会将其丢弃或返回给发布者,具体取决于发布者设置的消息属性。
属性:mandatory
false(默认)
消息将被丢弃或重新发布到备用交换(如果有)
true
消息将返回给该队列(必须设置返回的消息处理程序才能处理返回消息)
virtual host
用于环境隔离,把它当租户看吧
message消息
消息的属性
Content type
Content encoding
Routing key
Delivery mode
交付模式(持久性或非持久性)
Message priority
消息优先级
Message publishing timestamp
Expiration period
有效期
如果消息的 TTL 过期,它会自动被删除或成为死信。
Publisher application id
拓展:队列消息TTL和消息TTL的优先级
如果队列设置了 x-message-ttl,则队列中所有消息都受该 TTL 的限制
如果消息在发布时指定了 TTL,则消息的 TTL 优先级高于队列的 TTL
工作模式
RPC(没用过)
发布者确认模式
direct交换机(这个用的比较多)
HelloWorld(direct交换机)
Basic one-to-one messaging
work queue(direct交换机)
Load balancing between multiple consumers
routing(direct交换机+routingkey)
Selective message routing to queues based on routing keys.
fanout交换机
发布订阅(fanout交换机)
Message broadcast to multiple queues
topic交换机
topic(topic交换机+正则表达式)
Messages routed to queues based on wildcard routing keys.
headers交换机
Message routed based on header values instead of routing keys.
常见场景
消息丢失
生产者生产消息到RabbitMQ Server消息丢失(发布者确认)
生产者异步发布确认机制(Publisher Confirms)
交换机确认
效果:
用于确认消息是否成功到达 RabbitMQ 服务器。当消息被成功发送到交换器并且被确认时confirm 方法会被调用
如果消息没有被确认(例如,交换器不存在或其他错误)也可以通过 confirm 方法的相关参数获取错误信息。
实践
①配置文件publisher-confirm-type: correlated
②声明ConfirmCallBack接口,重写confirm方法,然后,绑定到RabbitMQTemplate上
总结
这个回调用于确保消息在生产者端的可靠发送,帮助开发者处理可能出现的消息丢失情况。
队列确认
效果:
如果 RabbitMQ 无法将生产者发送的消息路由到任何队列,RabbitMQ 会把这条消息“退回”给发送消息的生产者,当消息返回给生产者时,触发该回调
实践
①publisher-returns: true
②整个配置类声明ReturnsCallBack接口,重写returnMessage方法,然后,绑定到RabbitMQTemplate
PS:这个机制只确认消息成功到达交换机/队列,不关心消息是否被消费
使用备用交换机,消息投递失败,转投递到备用交换机
声明交换机的时候带alternate-exchange=exchangename参数指定备用交换机。备用交换机一般来说可以对消息作一样的消费流程,但是为了规范性,一般而言备用交换机后续对消息的处理应该是作记录或者异常通知,而不是消费。
流程
Java这边的Server端有一个自动恢复机制,不过我没去验证,补个待办把/TODO
Server存储的消息丢失
持久化(默认开启,不用配置)
配置队列的持久化(默认的,不用显示配置)
Durable = true
配置Message的持久化(默认的,不用显示配置)
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return msg;
});
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return msg;
});
RabbitMQ Server到消费者消息丢失(消费者确认)
消费者使用消费成功使用ACK确认消息
自动ACK(默认)
手动ACK(√)
acknowledge-mode: manual
使用basicAck方法处理消息
如果消息消费失败,使用basicNAck拒绝消息
PS:这个机制是确认消息被消费者消费完成后,通知队列删除对应的消息副本。
消费者消费失败使用NACK,消息放回服务端,消费者重试一次
其他:
实现队列的高可用性,有钱直接上云,出问题了直接找服务商兜底,并用死信队列来保证消息的处理
重复消费
消息内部会生成一个唯一id(deliverTag 64位),消费者在处理消息时会检查是否已经处理过该标识的消息。可以使用数据库或分布式缓存(如Redis)来记录已处理的消息标识
业务上确保消息的处理逻辑是幂等的,即同一消息即使被处理多次,结果也是相同的。例如,在数据库操作中,可以使用唯一约束来防止重复插入。
使用数据库的去重表来记录已经处理的消息,处理新消息前先检查去重表,防止重复处理
消息积压
可以做好监控报警,提前发现问题进行扩容
增加消费者数量
优化消费者处理能力
顺序消息
①消费者使用独占队列
exclusive 属性控制是否让当前消费者独占队列。当设置为 true 时,该消费者会成为该队列的唯一消费者,并且其他消费者无法同时消费这个队列的消息。该功能通常用于需要确保某个队列只能由一个消费者实例来处理的场景。
@RabbitListener(queues = "myQueue", exclusive= true)
public void receive(Message message) {
// 处理顺序消息
}
public void receive(Message message) {
// 处理顺序消息
}
②消息分区
可以根据消息的某个属性(如用户ID、订单ID)将消息路由到特定的队列,从而确保特定分区的消息顺序被保证,且可以扩展到多个队列进行并行处理
③限制预取值,但是严格说来说有局限性
其他
并发控制
concurrency 属性用于控制消费者实例的并发数量。也就是说,允许多个消费者并发地处理来自同一个队列的消息,从而提高处理效率。适用于高吞吐量的场景,例如当你希望使用多个线程来并发处理消息时。通过并发消费者的数量,可以平衡消息处理速度和系统资源的利用率
@RabbitListener(queues = "myQueue", concurrency = "1")
public void receive(Message message) {
// 处理顺序消息
}
public void receive(Message message) {
// 处理顺序消息
}
流量控制
basic_qos 是 RabbitMQ 中用来设置消费者每次从队列中获取消息的数量的参数,主要用于流量控制,防止消费者积压过多未处理的消息。
设置预取值basic_qos(prefetch count)为1实现顺序消费
配置文件里的prefetch(RabbitMQ原始配置文件里的默认值好像是250)
策略
预取值为 1:消费者每次只能获取 1 条消息,必须处理完该消息并确认(ack)后,才能获取下一条消息
这种机制可以防止消费者积压大量消息未处理,有助于在单个消费者场景下保持顺序消费。
但是,这并不能保证在多个消费者并发消费时的全局顺序。如果你有多个消费者,消息仍然可能被不同的消费者按顺序消费,导致顺序被打乱。
预取值为 N:消费者一次可以获取最多 N 条消息,不需要等到一条消息处理完就可以获取更多消息
SpringBoot整合
概要:
我们日常使用的基本都是这种方式,所以我们可以了解它给我们自动装配了什么默认配置
①默认是没有开启连接重试的,需要手动开启
②默认交换机、队列持久化
③默认ACK
依赖
spring-boot-starter-amqp
基础配置
连接信息
PS:关于virtual-host的配置一般规模比较小的话都用默认的是没问题的,我们就是直接用默认的没区分,没遇到啥问题
声明
方式一:通过声明Queue、Exchange、Binding类的方式(√),在生产端建个配置类,在配置类里声明
方式二:通过@RabbitListener里的参数声明(我个人不太喜欢这种方式)
PS:个人在使用上更加推荐第一种方式,我认为相比第二种,更加的直观实用。第二种看上去带来了一种注解一把梭的便捷感,但是对于消息的发送端并不是很友好。所以我的拙见是消费端只要专注于从某个队列里拿消息消费就好了,至于队列、交换机、绑定的声明,交给发送端去做会更合适,由消息生产端去掌握声明的主动权,可以避免遇到一些声明的沟通、配置错误。
发送
RabbitMQTemplate
接收
@RabbitMQListener
拓展:
关于Spring AMQP的自动声明机制
①Spring AMQP核心包里,提供了一个Declarable顶层接口:该顶层接口的作用是,实现该接口的类,在Spring上下文初始化阶段会使用AMQPAdmin自动声明。
②Spring AMQP核心包里提供的Binding、Queue、Exchange相关顶层的接口或者类,都声明Declarable接口,这也就是为什么我们能看到当我们启动程序的时候,MQ服务端会自动创建相应的Binding、Queue、Exchange
关于交换机核心包里的CustomExchange类
RabbitMQ提供了插件的拓展机制,而这个类就是为了这个机制提供的,例如我们使用的延迟队列插件,对相关队列的声明也是要基于这个类去声明对应的交换机,这个类没有提供某种额外的机制或者限制,仅仅是希望拓展要符合相应的规范,这由插件的开发者自行决定是否遵守相关的实现规范,并不是强制性的。
生产端消息异步发布确认机制(生产者 to RabbitMQ Server)
①加配置
spring.rabbitmq.publisher-confirm-type=correlated# 生产者发布确认模式
spring.rabbitmq.publisher-returns=true# 当前消息失败时,开启消息返回到生产者
②给RabbitTemplate设置回调函数
rabbitTemplate.setConfirmCallback()
rabbitTemplate.setReturnsCallback()
消费端消息手动确认机制(RabbitMQServer to 消费者 )
①加配置
spring.rabbitmq.listener.direct.acknowledge-mode=manual(这是全局的)
细粒度的话,@RabbitListener注解里有个ackMode属性。
②消费者手动确认消息或者拒绝消息
确认
channel.basicAck
拒绝
channel.basicNack(√),它是对channel.basicReject这个方法的拓展,支持一次性拒绝多条消息
channel.basicReject,这个不能批量拒绝
这里再拓展
对于拒绝的处理意见
不重新消费的,也就不回队列了,打个日志记录一下,然后拒绝,后续也有人工介入的余地
要重新消费的,设置个回队列,但是!消费端要注意判断消息的重新投递状态,不然有可能陷入某种死循环:消费失败→回队列→重新消费→消费失败→重新回队。。。。。。
具体操作:
聊一些拓展性的东西
关于消费者确认和发布者确认
第一:两个功能都建立在相同的理念之上,并受到 TCP 的启发。不同的AMQP协议的实现思路大致一致。
第二:发布者确认和消费者交付确认是非常相似的功能
消费者确认涵盖了 RabbitMQ 与消费者的通信,并不感知发布者
发布者确认涵盖发布者与 RabbitMQ 的通信,并不感知消费者
第三:消息具备一个delivery tag,它在整个Channel内是唯一的单调增长的正整数,因此必须在当前的Channel内进行消息确认,否则会导致 “unknown delivery tag” 协议异常并关闭通道
第四:默认的消费者确认的判定是消息是否送达到消费者,而不是消息是否被消费者消费,这也是为什么默认的自动确认机制在消息到达消费者的瞬间就进行确认,而不是等待消息被消费。这种自动确认虽然带来了高吞吐量的优势,但是并不是一直数据安全的工作模式,在要求较高的场景下,手动确认依旧是更有效的选择。
第五:关于预取机制。它通常是与手动确认机制一起工作的,在自动确认机制下,它是无效的,即没有限制,这容易造成消费者的高负载。应谨慎使用自动确认模式或具有无限预取的手动确认模式。如果使用者使用大量消息而未确认,则会导致他们所连接的节点上的内存消耗增长。找到合适的预取值是一个反复试验的问题,并且会因工作负载而异。100 到 300 范围内的值通常提供最佳吞吐量,并且不会冒着让使用者不知所措的重大风险。较高的值通常会遇到收益递减定律。
第六:确认是否必要使用消费者确认的重新入队投放,如果只有单个消费者订阅,那么重新投放一般是没有意义的。
0 条评论
下一页