rabbitMQ
2022-08-30 16:41:00 0 举报
AI智能生成
rabbitMQ知识点总结
作者其他创作
大纲/内容
什么是 MQ?
消息总线、消息队列、消息服务器(Message Queue),是一种跨进程、异步的通信机制,用于上下游传递消息。由消息系统来确保消息的可靠传递。
概述
RabbitMQ 是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据 RabbitMQ 配置的转发机制接收服务端发来的消息。RabbitMQ 依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。
应用场景
服务解耦
场景
假设有这样一个场景, 服务A产生数据, 而服务B,C,D需要这些数据, 那么我们可以在A服务中直接调用B,C,D服务,把数据传递到下游服务即可。但是,随着我们的应用规模不断扩大,会有更多的服务需要A的数据,如果有几十甚至几百个下游服务,而且会不断变更,再加上还要考虑下游服务出错的情况,那么A服务中调用代码的维护会极为困难
这是由于服务之间耦合度过于紧密
这是由于服务之间耦合度过于紧密
图示
传统处理方式:系统与系统之间直接调用
使用 rabbitmq 解耦
流量削峰
场景
假设我们有一个应用,平时访问量是每秒300请求,我们用一台服务器即可轻松应对,而在高峰期,访问量瞬间翻了十倍,达到每秒3000次请求,那么单台服务器肯定无法应对,这时我们可以考虑增加到10台服务器,来分散访问压力。但如果这种瞬时高峰的情况每天只出现一次,每次只有半小时,那么我们10台服务器在多数时间都只分担每秒几十次请求,这样就有点浪费资源了
图示
正常访问量
平时访问量是每秒300请求,一台服务器即可
高峰期访问量(集群负载)
高峰期,访问量瞬间翻了十倍,考虑增加到10台服务器,来分散访问压力
rabbitmq 削峰
使用RabbitMQ来进行流量削峰,高峰情况下,瞬间出现的大量请求数据,先发送到消息队列服务器,排队等待被处理,而我们的应用,可以慢慢的从消息队列接收请求数据进行处理,这样把数据处理时间拉长,以减轻瞬时压力
异步调用
场景
考虑定外卖支付成功的情况
支付后要发送支付成功的通知,再寻找外卖小哥来进行配送,而寻找外卖小哥的过程非常耗时,尤其是高峰期,可能要等待几十秒甚至更长
这样就造成整条调用链路响应非常缓慢。
支付后要发送支付成功的通知,再寻找外卖小哥来进行配送,而寻找外卖小哥的过程非常耗时,尤其是高峰期,可能要等待几十秒甚至更长
这样就造成整条调用链路响应非常缓慢。
图示
同步调用
rabbitmq 异步
引入RabbitMQ消息队列,订单数据可以发送到消息队列服务器,那么调用链路也就可以到此结束,订单系统则可以立即得到响应,整条链路的响应时间只有200毫秒左右
寻找外卖小哥的应用可以以异步的方式从消息队列接收订单消息,再执行耗时的寻找操作
寻找外卖小哥的应用可以以异步的方式从消息队列接收订单消息,再执行耗时的寻找操作
架构与基本概念
架构
基本概念
Exchange 交换机
类型
Direct Exchange
处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “abc”,则只有被标记为“abc”的消息才被转发,不会转发abc.def,也不会转发dog.ghi,只会转发abc。
Topic Exchange
将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。
# :匹配一个或多个词
* :匹配不多不少 1个词
* :匹配不多不少 1个词
Fanout Exchange
不处理路由键。只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
Headers Exchanges
不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。而fanout,direct,topic 的路由键都需要要字符串形式的。
声明
exchangeDeclare(String exchange,
BuiltinExchangeType type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments)
BuiltinExchangeType type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments)
参数说明
queueName
交换机的名字(自定义)
durable
是否持久化,RabbitMQ关闭后,没有持久化的Exchange将被清除
type
交换机的类型,可以用 BuiltinExchangeType 枚举类,也可以直接用
"direct", "topic","fanout" 等
"direct", "topic","fanout" 等
autoDelete
是否自动删除,如果没有与之绑定的Queue,直接删除
internal
是否内置的,如果为true,只能通过Exchange到Exchange
结构化参数,可指定备份交换机,备份交换器是为了实现没有路由到队列的消息,声明交换机的时候添加属性alternate-exchange,声明一个备用交换机,一般声明为fanout类型,这样交换机收到路由不到队列的消息就会发送到备用交换机绑定的队列中。
Queue 队列
声明队列
channel.queueDeclare(String queue,
boolean durable,
boolean exclusive,
boolean autoDelete,
Map<String, Object> arguments)
boolean durable,
boolean exclusive,
boolean autoDelete,
Map<String, Object> arguments)
参数说明
queue
队列名
durable
是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库
exclusive
设置队列为排他的。如果一个队列被声明为排他队列,该队列 仅对首次声明它的连接(Connection)可见,是该Connection私有的,类似于加锁,并在连接断开connection.close()时自动删除 ;
autoDelete
是否自动删除 ;如果autoDelete = true,声明队列后当所有消费者都与这个队列断开连接时,这个队列会自动删除。
arguments
1、x-message-ttl:Number
此队列中消息存放时间
2、x-expires:Number
当Queue(队列)在指定的时间未被访问,则队列将被自动删除。
3、x-max-length:Number
队列所能容下消息的最大长度。当超出长度后,新消息将会覆盖最前面的消息,类似于Redis的LRU算法。
4、x-max-length-bytes:Number
限定队列的最大占用空间,当超出后也使用类似于Redis的LRU算法。
5、x-overflow:String
设置队列溢出行为。这决定了当达到队列的最大长度时,消息会发生什么。有效值为Drop Head或Reject Publish。
6、x-dead-letter-exchange:String
死信交换机
有时候我们希望当队列的消息达到上限后溢出的消息不会被删除掉,而是走到另一个队列中保存起来。
7、x-dead-letter-routing-key:String
如果不定义,则默认为溢出队列的routing-key,因此,一般和 x-dead-letter-exchange一起定义。
8、x-max-priority:Number
如果将一个队列加上优先级参数,那么该队列为优先级队列。
9、x-queue-mode:String
队列类型 x-queue-mode=lazy 懒队列,在磁盘上尽可能多地保留消息以减少RAM使用;如果未设置,则队列将保留内存缓存以尽可能快地传递消息。
10、x-queue-master-locator:String
将队列设置为主位置模式,确定在节点集群上声明时队列主位置所依据的规则。
注意:队列一旦声明,代码重新运行更改属性,属性发生变化也不会覆盖
此队列中消息存放时间
2、x-expires:Number
当Queue(队列)在指定的时间未被访问,则队列将被自动删除。
3、x-max-length:Number
队列所能容下消息的最大长度。当超出长度后,新消息将会覆盖最前面的消息,类似于Redis的LRU算法。
4、x-max-length-bytes:Number
限定队列的最大占用空间,当超出后也使用类似于Redis的LRU算法。
5、x-overflow:String
设置队列溢出行为。这决定了当达到队列的最大长度时,消息会发生什么。有效值为Drop Head或Reject Publish。
6、x-dead-letter-exchange:String
死信交换机
有时候我们希望当队列的消息达到上限后溢出的消息不会被删除掉,而是走到另一个队列中保存起来。
7、x-dead-letter-routing-key:String
如果不定义,则默认为溢出队列的routing-key,因此,一般和 x-dead-letter-exchange一起定义。
8、x-max-priority:Number
如果将一个队列加上优先级参数,那么该队列为优先级队列。
9、x-queue-mode:String
队列类型 x-queue-mode=lazy 懒队列,在磁盘上尽可能多地保留消息以减少RAM使用;如果未设置,则队列将保留内存缓存以尽可能快地传递消息。
10、x-queue-master-locator:String
将队列设置为主位置模式,确定在节点集群上声明时队列主位置所依据的规则。
注意:队列一旦声明,代码重新运行更改属性,属性发生变化也不会覆盖
死信队列
概念
利用DLX,当消息在一个队列中变成死信(dead message,就是没有任何消费者消费)之后,他能被重新publish到另一个Exchange,这个Exchange 就是DLX(死信交换机)。
DLX也是一个正常的Exchange,和一般的Exchange没有任何的区别,他能在任何的队列上被指定,实际上就是设置某个队列的属性。
当这个队列出现死信的时候,RabbitMQ就会自动将这条消息重新发布到Exchange上去,进而被路由到另一个队列。可以监听这个队列中的消息作相应的处理,这个特性可以弥补rabbitMQ以前支持的immediate参数的功能。
DLX也是一个正常的Exchange,和一般的Exchange没有任何的区别,他能在任何的队列上被指定,实际上就是设置某个队列的属性。
当这个队列出现死信的时候,RabbitMQ就会自动将这条消息重新发布到Exchange上去,进而被路由到另一个队列。可以监听这个队列中的消息作相应的处理,这个特性可以弥补rabbitMQ以前支持的immediate参数的功能。
消息进入死信队列的几种情况
消息被拒绝(basic.reject/basic.nack)同时requeue=false(不重回队列)
TTL 消息过期
队列设置的消息过期时间
消息自身设置的过期时间
当 消息 和 队列 都存在超时时间时,那么就以最短的 TTL 为准
用死信交换机实现延迟队列,应该是在队列上设置过期时间,因为 rabbitmq 是懒检查机制,假设给消息设置过期时间, 队列里存了 3条消息,第一条是 5分钟过期,第二条是1分钟过期,第3条是1秒钟过期,那么根据队列先进先出的特点,服务器会先拿第一条消息,如果第一条消息未被取走,或者为过期,后边的消息都会排队等候,也就是说,第二条消息要等 5 分钟后才会过期(第一条消息取走后第二条立即过期),第3条消息要等前两条消息被取走或者过期,才能被取走或过期
如果是给队列设置过期时间,会直接批量过期
用死信交换机实现延迟队列,应该是在队列上设置过期时间,因为 rabbitmq 是懒检查机制,假设给消息设置过期时间, 队列里存了 3条消息,第一条是 5分钟过期,第二条是1分钟过期,第3条是1秒钟过期,那么根据队列先进先出的特点,服务器会先拿第一条消息,如果第一条消息未被取走,或者为过期,后边的消息都会排队等候,也就是说,第二条消息要等 5 分钟后才会过期(第一条消息取走后第二条立即过期),第3条消息要等前两条消息被取走或者过期,才能被取走或过期
如果是给队列设置过期时间,会直接批量过期
队列达到最大长度
x-max-length
消息
发送消息
channel.basicPublish(String exchangeName,
String routingkey,
BasicProperties properties,
byte[] message);
例:
channel.basicPublish("directExchange",
"qmy.direct",
null,"hello".getBytes());
String routingkey,
BasicProperties properties,
byte[] message);
例:
channel.basicPublish("directExchange",
"qmy.direct",
null,"hello".getBytes());
参数说明
exchangeName
交换机名字
routingkey
路由键,用于匹配交换机与队列绑定的 Bind key
properties
设置消息属性(BasicProperties 成员变量)
message
发送的消息
消费消息
//收到消息后用来处理消息的回调对象
DeliverCallback callback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("收到: "+msg);
}
};
//消费者取消时的回调对象
CancelCallback cancel = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
channel.basicConsume(String queue,
boolean autoAck,
DeliverCallback deliverCallback,
CancelCallback cancelCallback)
DeliverCallback callback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("收到: "+msg);
}
};
//消费者取消时的回调对象
CancelCallback cancel = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
channel.basicConsume(String queue,
boolean autoAck,
DeliverCallback deliverCallback,
CancelCallback cancelCallback)
消息序列化
注意,发送方和接收方都需要定义这个 messageConvert,否则会导致序列化 or 反序列化失败的问题
消息预取
为什么要设置消息预取
rabbitmq 默认会以轮询的机制最快的把队列所有的消息发送给所有客户端 (如果消息没确认的话 他会添加一个 Unacked的标识) 对于Rabbitmq来讲 这样子能最快速的使自己不会囤积消息而对性能造成影响, 但是 对于我们整个系统来讲, 这种机制会带来很多问题。
可能造成服务崩溃
第一个就是如果不设置消息预取,当生产者与消费者能力不匹配,在高并发的情况下生产端产生大量消息,消费端无法消费那么多消息。
假设我们有个场景,首先,我们有个 RabbitMQ 服务器上有上万条消息未消费,然后我们随便打开一个消费者客户端,会出现:巨量的消息瞬间推送过来,但是我们的消费端无法同时处理这么多数据。
这时就会导致你的服务崩溃。
设置消息预取后,在一定数量的消息未确认的情况下,不会向消费者继续发送消息,消费者也不再接受新的消息,保证消费者不会因为大量的消息涌入无法处理导致服务器崩溃
假设我们有个场景,首先,我们有个 RabbitMQ 服务器上有上万条消息未消费,然后我们随便打开一个消费者客户端,会出现:巨量的消息瞬间推送过来,但是我们的消费端无法同时处理这么多数据。
这时就会导致你的服务崩溃。
设置消息预取后,在一定数量的消息未确认的情况下,不会向消费者继续发送消息,消费者也不再接受新的消息,保证消费者不会因为大量的消息涌入无法处理导致服务器崩溃
负载均衡(能者多劳)
比如说 我一个队列有2个消费者同时在消费,而且他们处理 能力不同, 有100个订单消息需要处理(消费) 现在有消费者A 和消费者B , 消费者A消费一条消息的速度是 10ms 消费者B 消费一条消息的速度是15ms , 那么 rabbitmq 会默认给 消费者A B 一人50条消息让他们消费 ,但是 消费者A 他500ms 就可以消费完所有的消息 并且处于空闲状态 ,而消费者B需要750ms 才能消费完 ,如果从性能上来考虑的话 ,这100条消息消费完的时间一共是750ms(因为2个人同时在 消费) 但是如果 在消费者A消费完的时候 能把这个空闲的性能用来和B一起消费剩下的信息的话, 那么这处理速 度就会快非常多。
配置消息预取
那么设置完之后是什么效果呢? 还是刚刚那个例子 还是2个消费者 因为会在消费者 basicACK 确认消息后, rabbitmq 才会继续发送消息给客户端 ,而且客户端的消息累计量不会超过我们刚刚设置预取的数量,
所以我们再运行同样的 例子的话 会发现 A消费者消费完99条消息了 B消费者才消费1条 (因为B消费者休眠了0.5秒才消费完{返回消息确 认} 但是0.5秒之内A消费者就已经把所有消息消费完毕了 当然 如果计算机处理速度较慢这个结果可能会有差异,效 果大概就是A消费者会处理大量消息)
所以我们再运行同样的 例子的话 会发现 A消费者消费完99条消息了 B消费者才消费1条 (因为B消费者休眠了0.5秒才消费完{返回消息确 认} 但是0.5秒之内A消费者就已经把所有消息消费完毕了 当然 如果计算机处理速度较慢这个结果可能会有差异,效 果大概就是A消费者会处理大量消息)
预取数量的设置
关于这个预取的数量如何设置呢? 我们发现 如果设置为 1, 能极大的利用客户端的性能(我消费完了就可以赶紧消 费下一条 不会导致忙的很忙 闲的很闲) 但是, 我们每消费一条消息 就要通知一次rabbitmq 然后再取出新的消 息, 这样对于rabbitmq的性能来讲 是非常不合理的 所以这个参数要根据业务情况设置 我根据我查阅到的资料然后加以测试, 这个数值的大小与性能成正比 但是有上限,与数据可靠性以及客户端的利用率成反比
消息可靠性投递
消息传递各个节点
消息从生产者发送到 Broker 服务器
失败可能性分析
可能因为网络或者Broker的问题导致 流程① 失败,而生产者是无法知道消息是否正确发送到Broker的。 有两种解决方案,
解决方案
Transaction(事务)模式
在通过channel.txSelect方法开启事务之后,我们便可以发布消息给RabbitMQ了,如果事务提交成功,则消息一定 到达了RabbitMQ中,如果在事务提交执行之前由于RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便 可以将其捕获,进而通过执行channel.txRollback方法来实现事务回滚。使用事务机制的话会“吸干”RabbitMQ的性 能,一般不建议使用。
Confirm(确认)模式
Confirm消息确认机制: 生产者向MQ投递完消息后,要求MQ返回一个应答,生产者异步接收该应答,用来确定该消息是否正常的发送到了Broker, 从而保障消息的可靠性投递,如果是集群模式,必须所有的 Broker 都接受到消息才会调用 confirmCallback
springboot 开启消息确认
在配置文件中开启发送方确认模式
实现 RabbitTemplate.ConfirmCallback接口 confirm 方法,通过 ack 来判断消息是否发送成功并作出处理
将实现的 callbackAndReturn 注入 rabbitmq bean 对象中
消息从 Exchange 路由到 Queue
失败原因分析
可能因为路由关键字错误,或者队列不存在,或者队列名称错误导致消息路由失败。
解决方案
Return消息返回机制
概述
该机制用于处理一些不可路由的消息。如果生产在发送消息时,发现当前的exchange不存在或者指定的路由key找不到时,生产者可以开启该模式来监听这种不可达的消息,以进行后续。(如果不开启的话,broker会自动删除该消息)如果是集群模式,必须所有broker的 队列都接受到消息才算成功,有一个 broker 的队列没有接受到都会触发 returnCallback
springboot 开启消息返回
开启消息返回机制
实现 ReturnCallback 接口的 returnedMessage 方法
将实现类对象 callbackndReturn 设置到 rabbitTemplate 中
PS
消息从 exchange–>queue 路由失败则会返回一个 returnCallback 。如果有备用交换机不管有没有成功路由,不会回调,除非备用交换机也失败了才会 returnCallback
消息在Queue中存储
原因分析
可能因为系统宕机、重启、关闭等等情况导致存储在队列的消息丢失
解决方案
持久化。设置消息持久化必须先设置队列持久化,否则队列不持久化,消息持久化,队列都不存在了,消息存在还有什么意义。消息持久化需要将交换机持久化、队列持久化、消息持久化,才能最终达到持久化的目的
队列持久化
交换机持久化
消息持久化
消费者订阅Queue并消费消息
原因分析
消费者收到消息后未来得及处理即发生异常
解决方案
消费者确认消费
为什么要确认消费?
默认情况下 消费者在拿到 rabbitmq 的消息时 已经自动确认这条消息已经消费了, 讲白话就 是 rabbitmq 的队列里就会删除这条消息了, 但是 我们实际开发中 难免会遇到这种情况, 比如说 拿到这条消息 发 现我处理不了 比如说 参数不对, 又比如说 我当前这个系统出问题了, 暂时不能处理这个消息, 但是 这个消息已 经被你消费掉了 rabbitmq的队列里也删除掉了, 你自己这边又处理不了, 那么 ,这个消息就被遗弃了。 这种情 况在实际开发中是不合理的, rabbitmq提供了解决这个问题的方案, 也就是我们上面所说的confirm模式 只针对发送方的 这次我们来讲消费方的。
自动确认
自动 ACK 情况下(autoAck = true),消息什么时候被删除?
发送给消费者以后服务端就删除该消息,而不是等待消费者业务处理完后才删除
缺点
自动 ACK 容易造成消息丢失,如果消息还未被消费成功消费者宕机,rabbitmq 又删除了消息,这条消息就丢失了
手动确认
开启手动确认
手动确认
channel.basicAck(String queue, boolean autoAck)
单条拒绝
channel.basicReject(long deliveryTag, boolean requeue)
void basicReject(long deliveryTag, boolean requeue) throws IOException;
批量拒绝
channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
与 basicReject 不同之处在于 basicReject 不能批量拒绝
手动ACK 情况下(autoAck = true),消息什么时候被删除?
消息处理成功,手动 ack,消息从 Broker 中移除
消息处理失败,nack/reject,如果重回队列就发送给其他消费者处理,如果不重回队列且有死信交换机,就交给死信交换机处理,如果都没有,丢弃消息
消息如果一直没有调用 ack/nack,broker会一直等待确认,不会投递给其他consumer,也不会删除消息。消息状态为 unack,如果此时接收消息未发送回执的客户端断开,则消息被重新置为 ready 状态,发送给其他consumer
消息处理失败,nack/reject,如果重回队列就发送给其他消费者处理,如果不重回队列且有死信交换机,就交给死信交换机处理,如果都没有,丢弃消息
消息如果一直没有调用 ack/nack,broker会一直等待确认,不会投递给其他consumer,也不会删除消息。消息状态为 unack,如果此时接收消息未发送回执的客户端断开,则消息被重新置为 ready 状态,发送给其他consumer
缺点
手动 ACK 可能会造成消息重复消费,如果消息消费成功还未手动确认,那么消息会在 mq 中被标识为 unacked,消费端宕机,此时消息会重新置为 ready状态,给其他消费者消费
rabbitmq 没有提供重复消费的解决方案,可以给消息加一个字段标识,加一个业务ID,
rabbitmq 没有提供重复消费的解决方案,可以给消息加一个字段标识,加一个业务ID,
有时候会有特殊的情况,比如预先拉取了50条消息,批量确认50条,结果消费了40条mq宕机,由于mq的宕机,导致前40条消息消费了但还处于unacked状态,此时会重新置为 ready 状态,交由其它消费者消费,也就是重回队列了,这就会导致重复消费的问题,这就需要消息具备幂等性。
0 条评论
下一页