rabbitMQ
2022-08-30 16:41:00 0 举报
AI智能生成
rabbitMQ知识点总结
作者其他创作
大纲/内容
消息总线、消息队列、消息服务器(Message Queue),是一种跨进程、异步的通信机制,用于上下游传递消息。由消息系统来确保消息的可靠传递。
什么是 MQ?
RabbitMQ 是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据 RabbitMQ 配置的转发机制接收服务端发来的消息。RabbitMQ 依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。
概述
场景
传统处理方式:系统与系统之间直接调用
使用 rabbitmq 解耦
图示
服务解耦
正常访问量
高峰期访问量(集群负载)
rabbitmq 削峰
流量削峰
同步调用
rabbitmq 异步
异步调用
应用场景
架构
基本概念
架构与基本概念
处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “abc”,则只有被标记为“abc”的消息才被转发,不会转发abc.def,也不会转发dog.ghi,只会转发abc。
Direct Exchange
# :匹配一个或多个词* :匹配不多不少 1个词
将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。
Topic Exchange
不处理路由键。只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
Fanout Exchange
不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。而fanout,direct,topic 的路由键都需要要字符串形式的。
Headers Exchanges
类型
交换机的名字(自定义)
queueName
是否持久化,RabbitMQ关闭后,没有持久化的Exchange将被清除
durable
交换机的类型,可以用 BuiltinExchangeType 枚举类,也可以直接用\"direct\
type
是否自动删除,如果没有与之绑定的Queue,直接删除
autoDelete
是否内置的,如果为true,只能通过Exchange到Exchange
internal
结构化参数,可指定备份交换机,备份交换器是为了实现没有路由到队列的消息,声明交换机的时候添加属性alternate-exchange,声明一个备用交换机,一般声明为fanout类型,这样交换机收到路由不到队列的消息就会发送到备用交换机绑定的队列中。
span data-shimo-docs=\
参数说明
声明
Exchange 交换机
队列名
queue
设置队列为排他的。如果一个队列被声明为排他队列,该队列 仅对首次声明它的连接(Connection)可见,是该Connection私有的,类似于加锁,并在连接断开connection.close()时自动删除 ;
exclusive
是否自动删除 ;如果autoDelete = true,声明队列后当所有消费者都与这个队列断开连接时,这个队列会自动删除。
1、x-message-ttl:Number 此队列中消息存放时间2、x-expires:Number当Queue(队列)在指定的时间未被访问,则队列将被自动删除。3、x-max-length:Number 队列所能容下消息的最大长度。当超出长度后,新消息将会覆盖最前面的消息,类似于Redis的LRU算法。4、x-max-length-bytes:Number 限定队列的最大占用空间,当超出后也使用类似于Redis的LRU算法。5、x-overflow:String 设置队列溢出行为。这决定了当达到队列的最大长度时,消息会发生什么。有效值为Drop Head或Reject Publish。6、x-dead-letter-exchange:String死信交换机 有时候我们希望当队列的消息达到上限后溢出的消息不会被删除掉,而是走到另一个队列中保存起来。7、x-dead-letter-routing-key:String 如果不定义,则默认为溢出队列的routing-key,因此,一般和 x-dead-letter-exchange一起定义。8、x-max-priority:Number 如果将一个队列加上优先级参数,那么该队列为优先级队列。9、x-queue-mode:String 队列类型 x-queue-mode=lazy 懒队列,在磁盘上尽可能多地保留消息以减少RAM使用;如果未设置,则队列将保留内存缓存以尽可能快地传递消息。10、x-queue-master-locator:String 将队列设置为主位置模式,确定在节点集群上声明时队列主位置所依据的规则。注意:队列一旦声明,代码重新运行更改属性,属性发生变化也不会覆盖
arguments
声明队列
利用DLX,当消息在一个队列中变成死信(dead message,就是没有任何消费者消费)之后,他能被重新publish到另一个Exchange,这个Exchange 就是DLX(死信交换机)。DLX也是一个正常的Exchange,和一般的Exchange没有任何的区别,他能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列出现死信的时候,RabbitMQ就会自动将这条消息重新发布到Exchange上去,进而被路由到另一个队列。可以监听这个队列中的消息作相应的处理,这个特性可以弥补rabbitMQ以前支持的immediate参数的功能。
概念
消息被拒绝(basic.reject/basic.nack)同时requeue=false(不重回队列)
队列设置的消息过期时间
消息自身设置的过期时间
当 消息 和 队列 都存在超时时间时,那么就以最短的 TTL 为准用死信交换机实现延迟队列,应该是在队列上设置过期时间,因为 rabbitmq 是懒检查机制,假设给消息设置过期时间, 队列里存了 3条消息,第一条是 5分钟过期,第二条是1分钟过期,第3条是1秒钟过期,那么根据队列先进先出的特点,服务器会先拿第一条消息,如果第一条消息未被取走,或者为过期,后边的消息都会排队等候,也就是说,第二条消息要等 5 分钟后才会过期(第一条消息取走后第二条立即过期),第3条消息要等前两条消息被取走或者过期,才能被取走或过期如果是给队列设置过期时间,会直接批量过期
TTL 消息过期
x-max-length
队列达到最大长度
消息进入死信队列的几种情况
死信队列
Queue 队列
交换机名字
exchangeName
路由键,用于匹配交换机与队列绑定的 Bind key
routingkey
设置消息属性(BasicProperties 成员变量)
properties
发送的消息
message
发送消息
消费消息
注意,发送方和接收方都需要定义这个 messageConvert,否则会导致序列化 or 反序列化失败的问题
消息序列化
rabbitmq 默认会以轮询的机制最快的把队列所有的消息发送给所有客户端 (如果消息没确认的话 他会添加一个 Unacked的标识) 对于Rabbitmq来讲 这样子能最快速的使自己不会囤积消息而对性能造成影响, 但是 对于我们整个系统来讲, 这种机制会带来很多问题。
第一个就是如果不设置消息预取,当生产者与消费者能力不匹配,在高并发的情况下生产端产生大量消息,消费端无法消费那么多消息。假设我们有个场景,首先,我们有个 RabbitMQ 服务器上有上万条消息未消费,然后我们随便打开一个消费者客户端,会出现:巨量的消息瞬间推送过来,但是我们的消费端无法同时处理这么多数据。这时就会导致你的服务崩溃。设置消息预取后,在一定数量的消息未确认的情况下,不会向消费者继续发送消息,消费者也不再接受新的消息,保证消费者不会因为大量的消息涌入无法处理导致服务器崩溃
可能造成服务崩溃
比如说 我一个队列有2个消费者同时在消费,而且他们处理 能力不同, 有100个订单消息需要处理(消费) 现在有消费者A 和消费者B , 消费者A消费一条消息的速度是 10ms 消费者B 消费一条消息的速度是15ms , 那么 rabbitmq 会默认给 消费者A B 一人50条消息让他们消费 ,但是 消费者A 他500ms 就可以消费完所有的消息 并且处于空闲状态 ,而消费者B需要750ms 才能消费完 ,如果从性能上来考虑的话 ,这100条消息消费完的时间一共是750ms(因为2个人同时在 消费) 但是如果 在消费者A消费完的时候 能把这个空闲的性能用来和B一起消费剩下的信息的话, 那么这处理速 度就会快非常多。
负载均衡(能者多劳)
为什么要设置消息预取
配置消息预取
那么设置完之后是什么效果呢? 还是刚刚那个例子 还是2个消费者 font color=\"#f44336\
关于这个预取的数量如何设置呢? 我们发现 如果设置为 1, 能极大的利用客户端的性能(我消费完了就可以赶紧消 费下一条 不会导致忙的很忙 闲的很闲) 但是, 我们每消费一条消息 就要通知一次rabbitmq 然后再取出新的消 息, 这样对于rabbitmq的性能来讲 是非常不合理的 所以这个参数要根据业务情况设置 我根据我查阅到的资料然后加以测试, 这个数值的大小与性能成正比 但是有上限,与数据可靠性以及客户端的利用率成反比
预取数量的设置
消息预取
消息
消息传递各个节点
可能因为网络或者Broker的问题导致 流程① 失败,而生产者是无法知道消息是否正确发送到Broker的。 有两种解决方案,
失败可能性分析
在通过channel.txSelect方法开启事务之后,我们便可以发布消息给RabbitMQ了,如果事务提交成功,则消息一定 到达了RabbitMQ中,如果在事务提交执行之前由于RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便 可以将其捕获,进而通过执行channel.txRollback方法来实现事务回滚。使用事务机制的话会“吸干”RabbitMQ的性 能,一般不建议使用。
Transaction(事务)模式
在配置文件中开启发送方确认模式
实现 RabbitTemplate.ConfirmCallback接口 confirm 方法,通过 ack 来判断消息是否发送成功并作出处理
将实现的 callbackAndReturn 注入 rabbitmq bean 对象中
springboot 开启消息确认
Confirm(确认)模式
解决方案
消息从生产者发送到 Broker 服务器
可能因为路由关键字错误,或者队列不存在,或者队列名称错误导致消息路由失败。
失败原因分析
该机制用于处理一些不可路由的消息。如果生产在发送消息时,发现当前的exchange不存在或者指定的路由key找不到时,生产者可以开启该模式来监听这种不可达的消息,以进行后续。(如果不开启的话,broker会自动删除该消息)如果是集群模式,必须所有broker的 队列都接受到消息才算成功,有一个 broker 的队列没有接受到都会触发 returnCallback
开启消息返回机制
实现 ReturnCallback 接口的 returnedMessage 方法
将实现类对象 callbackndReturn 设置到 rabbitTemplate 中
消息从 exchange–>queue 路由失败则会返回一个 returnCallback 。如果有备用交换机不管有没有成功路由,不会回调,除非备用交换机也失败了才会 returnCallback
PS
springboot 开启消息返回
Return消息返回机制
消息从 Exchange 路由到 Queue
可能因为系统宕机、重启、关闭等等情况导致存储在队列的消息丢失
原因分析
持久化。设置消息持久化必须先设置队列持久化,否则队列不持久化,消息持久化,队列都不存在了,消息存在还有什么意义。消息持久化需要将交换机持久化、队列持久化、消息持久化,才能最终达到持久化的目的
队列持久化
交换机持久化
消息持久化
消息在Queue中存储
消费者收到消息后未来得及处理即发生异常
默认情况下 消费者在拿到 rabbitmq 的消息时 已经自动确认这条消息已经消费了, 讲白话就 是 rabbitmq 的队列里就会删除这条消息了, 但是 我们实际开发中 难免会遇到这种情况, 比如说 拿到这条消息 发 现我处理不了 比如说 参数不对, 又比如说 我当前这个系统出问题了, 暂时不能处理这个消息, 但是 这个消息已 经被你消费掉了 rabbitmq的队列里也删除掉了, 你自己这边又处理不了, 那么 ,这个消息就被遗弃了。 这种情 况在实际开发中是不合理的, rabbitmq提供了解决这个问题的方案, 也就是我们上面所说的confirm模式 只针对发送方的 这次我们来讲消费方的。
为什么要确认消费?
发送给消费者以后服务端就删除该消息,而不是等待消费者业务处理完后才删除
自动 ACK 情况下(autoAck = true),消息什么时候被删除?
自动 ACK 容易造成消息丢失,如果消息还未被消费成功消费者宕机,rabbitmq 又删除了消息,这条消息就丢失了
缺点
自动确认
手动确认
单条拒绝
与 basicReject 不同之处在于 basicReject 不能批量拒绝
批量拒绝
开启手动确认
手动ACK 情况下(autoAck = true),消息什么时候被删除?
有时候会有特殊的情况,比如预先拉取了50条消息,批量确认50条,结果消费了40条mq宕机,由于mq的宕机,导致前40条消息消费了但还处于unacked状态,此时会重新置为 ready 状态,交由其它消费者消费,也就是重回队列了,这就会导致重复消费的问题,这就需要消息具备幂等性。
手动 ACK 可能会造成消息重复消费,如果消息消费成功还未手动确认,那么消息会在 mq 中被标识为 unacked,消费端宕机,此时消息会重新置为 ready状态,给其他消费者消费rabbitmq 没有提供重复消费的解决方案,可以给消息加一个字段标识,加一个业务ID,
消费者确认消费
消费者订阅Queue并消费消息
消息可靠性投递
rabbitMQ
0 条评论
回复 删除
下一页