RabbitMQ整理
2020-12-01 14:52:49 8 举报
AI智能生成
rabbitMQ整理
作者其他创作
大纲/内容
定义
消息中间件属于分布式系统中一个子系统,关注于数据的发送和接收,利用高效可靠的异步消息传递机制对分布式系统中的其余各个子系统进行集成。
特性
高效
消息处理速度快
可靠
需要有消息持久化机制和其他的机制确保消息不丢失
异步
消息发送完,不需要等待返回,随时可以发送下一个消息
使用文件系统存储
从存储方式和效率来看:文件系统 > KV存储 > 关系型数据库
意义
主要解决分布式系统之间消息的传递,同时为分布式系统中其他子系统提供了松耦合的架构
优势
低耦合
异步通信
缓冲
削峰填谷,就像一个巨大的蓄水池
伸缩性
通过向集群中增加服务器来提高系统个面对高并发访问的压力和数据不断增长的存储需求
扩展性
与RPC区别
不同点
依赖性
强依赖关系需要RPC,非强依赖使用消息中间件
同步性
RPC是典型的同步方式,让远程调用向本地调用一样。MQ是异步方式
相同点
都是分布式系统间的通信方式
场景
异步处理
用户注册后,发送邮件和短信
方式
串行
并行
比串行提高了响应速度
异步处理(MQ)
响应速度进一步提高
应用解耦
用户下单,订单系统通知库存系统
方式
传统模式
串行调用
缺点
如果库存系统无法访问,则订单减库存失败,从而导致下订单失败
订单系统和库存系统耦合
引入MQ
订单系统完成持久化,然后将消息写入MQ,然后对客户端返回。库存系统从MQ中消费队列
优点
不会因为库存系统对问题影响订单系统
两系统解耦
流量削峰
秒杀和团购活动中,流量暴增,会导致应用挂掉。需要在应用前端接入消息队列,控制活动对人数;可以缓解短时间内高流量压垮应用
步骤
服务端接收到请求后,先写入消息队列。如果消息队列长度超过哦最大数量,则抛弃请求或跳转到错误页面
秒杀业务先使用MQ缓存请求消息,然后根据消息队列中到请求信息,再做后续处理
日志处理
解决大量日志传输的问题
步骤
日志采集客户端收集日志,定时写入MQ
MQ复制日志的接收,存储和转发
日志处理应用订阅并消费MQ队列中的日志
消息通讯
消息队列都内置了高效的通信机制,可以用在纯的消息通信
应用
点对点通讯
聊天室通讯
AMQP
定义
为MQ设计的应用层协议,基于此协议,客户端和MQ间可以传递消息
通信
连接
客户端与MQ直接建立一条TCP连接
信道
生产者/消费者与MQ通信的渠道。信道是建立在TCP连接上的虚拟连接。
rabbitmq 在一条 TCP 上建立成百 上千个信道来达到多个线程处理,这个 TCP 被多个线程共享,每个线程对应一个信道,信道在 RabbitMQ 都有唯一的 ID ,保证了信道私有性,对应上唯一 的线程使用。
为什么不建立多个TCP连接?
rabbit 保证性能,系统为每个线程开辟一个 TCP 是非常消耗性能,每秒成百上千的建立销毁 TCP 会严 重消耗系统。所以 rabbitmq 选择建立多个信道(建立在 tcp 的虚拟连接)连接到 rabbit 上。
技术上叫做“多路复用”,对于执行多个任务的多线程或者异步应用程序很有用。
要素
消费者
复制消息的消费
生产者
复制消息的生成
消息
包含有效载荷和标签
有效载荷
传输的数据
标签
描述有效载荷,rabbitmq使用它来决定谁获得消息
交换器
Direct
路由键完全匹配,消息被投递到对应的队列, direct 交换器是默认交换器。声明一个队列时,会自动绑定到默认交换器,并且以队列名称作为路由 键:channel->basic_public($msg,’’,’queue-name’)
Fanout
消息广播到绑定的队列,不管队列绑定了什么路由键,消息经过交换器,每个队列都有一份。
Topic
通过使用“*”和“#”通配符进行处理,使来自不同源头的消息到达同一个队列,”.”将路由键分为了几个标识符,“*”匹配 1 个,“#”匹配一个 或多个。
Headers
几乎和Direct一样,不实用,可以忽略
队列
消费队列
临时队列
没有持久化的队列,如果RabbitMQ服务器重启,那么这些队列将不会存在
类型
自动删除队列
自动删除队列和普通队列在使用上没有什么区别,唯一的区别是,当消费者断开连接时,队列将会被删除。自动删除队列允许的消费者没有限制, 也就是说当这个队列上最后一个消费者断开连接才会执行删除。
自动删除队列只需要在声明队列时,设置属性 auto-delete 标识为 true 即可。系统声明的随机队列,缺省就是自动删除的。
自动删除队列只需要在声明队列时,设置属性 auto-delete 标识为 true 即可。系统声明的随机队列,缺省就是自动删除的。
单消费者队列
普通队列允许的消费者没有限制,多个消费者绑定到多个队列时,RabbitMQ 会采用轮询进行投递。如果需要消费者独占队列,在队列创建的时候, 设定属性 exclusive 为 true。
自动过期队列
指队列在超过一定时间没使用,会被从 RabbitMQ 中被删除。
如何定义未使用
一定时间内没有Get操作发生
没有Consumer连接到队列上
注意⚠️
就算一致有消息进入队列,也不算队列在被使用
使用
通过声明队列时,设定 x-expires 参数即可,单位毫秒。
永久队列
持久化队列会被保存在磁盘中,固定并持久的存储,当 Rabbit 服务重启后,该队列会保持原来的状态在 RabbitMQ 中被管理
优势
队列一直存在,不会随服务器重启消失
劣势
速度没有非持久化队列快,性能差一些
使用
在声明队列时,将属性 durable 设置为“false”,则该队列为非持久化队列,设置成“true”时,该队列就为持久化队列
队列级别消息过期
为每个队列设置消息的超时时间。只要给队列设置 x-message-ttl 参数,就设定了该队列所有消息的存活时间,时间单位是毫秒。如果声明队列 时指定了死信交换器,则过期消息会成为死信消息。
使用
队列保留参数
x-dead-letter-exchange 死信交换器
x-dead-letter-routing-key 死信消息的可选路由键
x-expires 队列在指定毫秒数后被删除
x-ha-policy 创建HA队列
x-ha-nodes HA 队列的分布节点
x-max-length 队列的最大消息数
x-message-ttl 毫秒为单位的消息过期时间,队列级别
x-max-priority 最大优先值为 255 的队列优先排序功能
x-dead-letter-routing-key 死信消息的可选路由键
x-expires 队列在指定毫秒数后被删除
x-ha-policy 创建HA队列
x-ha-nodes HA 队列的分布节点
x-max-length 队列的最大消息数
x-message-ttl 毫秒为单位的消息过期时间,队列级别
x-max-priority 最大优先值为 255 的队列优先排序功能
绑定
路由键
虚拟主机
虚拟消息服务器,vhost,本质是mini版的mq服务器,有自己的队列,交换器和绑定,最重要的是有自己的权限机制。
Vhost 提供了逻辑上的 分离,可以将众多客户端进行区分,又可以避免队列和交换器的命名冲突。Vhost 必须在连接时指定,rabbitmq 包含缺省 vhost:“/”,通过缺省用户和 口令 guest 进行访问。
rabbitmq 里创建用户,必须要被指派给至少一个 vhost,并且只能访问被指派内的队列、交换器和绑定。Vhost 必须通过 rabbitmq 的管理控制工具创 建。
消息确认
消费者需要对收到对每一条消息都进行确认
自动确认
autoAck=true
自行确认
autoAck=false
消费者在声明队列时,可以指定 autoAck 参数,当 autoAck=false 时,RabbitMQ 会等待消费者显式发回 ack 信号后才从内存(和磁盘,如果是持久化消
息的话)中移去消息。否则,RabbitMQ 会在队列中消息被消费后立即删除它。
息的话)中移去消息。否则,RabbitMQ 会在队列中消息被消费后立即删除它。
采用消息确认机制后,只要令 autoAck=false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题, 因为 RabbitMQ 会一直持有消息直到消费者显式调用 basicAck 为止。
当 autoAck=false 时,对于 RabbitMQ 服务器端而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者, 但是还没有收到消费者 ack 信号的消息。如果服务器端一直没有收到消费者的 ack 信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消 息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。
RabbitMQ 不会为未 ack 的消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。这么 设计的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久
消息发布的权衡
在 RabbitMQ 中,有不同的投递机制(生产者),但是每一种机制都对性能有一定的影响。一般来讲速度快的可靠性低,可靠性好的性能差,具体怎 么使用需要根据你的应用程序来定
在 RabbitMQ 中实际项目中,生产者和消费者都是客户端,它们都可以完成申明交换器、申明队列和绑定关系,但是在我们的实战过程中,我们在生产者代码中申明交换器,在消费者代码中申明队列和绑定关系。
无保障
通过 basicPublish 发布你的消息并使用正确的交换器和路由信息,你的消息会被接收并发送到合适的 队列中。但是如果有网络问题,或者消息不可路由,或者 RabbitMQ 自身有问题的话,这种方式就有风险。所以无保证的消息发送一般情况下不推荐。
失败确认
在发送消息时设置 mandatory 标志,告诉 RabbitMQ,如果消息不可路由,应该将消息返回给发送者,并通知失败。可以这样认为,开启 mandatory 是开启故障检测模式。
注意⚠️
它只会让 RabbitMQ 向你通知失败,而不会通知成功。如果消息正确路由到队列,则发布者不会受到任何通知。带来的问题是无法确保发布消 息一定是成功的,因为通知失败的消息可能会丢失
代码
channel.addConfirmListener 监听MQ发回的消息
监听器
信道关闭
chanel.addShutdownListener
连接关闭
connection.addShhuedownListener
事务
事务的实现是通过对信道(Channel)的设置
方法
开启事务
channel.txSelect()
提交事务
channel.txCommit()
回滚事务
channel.txRollback()
开启事务后,客户端和MQ的交互流程
流程
客户端给MQ发送Tx.Select
MQ返回Tx.Select-Ok
推送消息
客户端发送给MQ事务提交Tx.Commit
MQ返回Tx.Commit-Ok
如果任意一个环节出现问题,就会抛出IoException,这样客户端可以拦截异常进行回滚或者进行重发等操作
发送方确认
基于事务的性能问题,更好的方式是发送方确认模式,比事务模式更轻量,性能影响几乎忽略不计
原理
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),由这
个 id 在生产者和 RabbitMQ 之间进行消息的确认。
个 id 在生产者和 RabbitMQ 之间进行消息的确认。
场景
不可路由消息
当交换器发现,消息不能路由到任何队列,会进行确认操作,表示收到了消息。如果发送方设置了 mandatory 模式,则会先调用
addReturnListener 监听器。
addReturnListener 监听器。
可路由消息
要等到消息被投递到所有匹配的队列之后,broker 会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确 到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了 确认消息的序列号。
注意⚠️
confirm 模式最大的好处在于他可以是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最 终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产 者应用程序同样可以在回调方法中处理该 nack 消息决定下一步的处理。
实现方式
方式一:channel.waitForConfirms()普通发送方确认模式;消息到达交换器,就会返回 true。
方式二:channel.waitForConfirmsOrDie()批量确认模式;使用同步方式等所有的消息发送之后才会执行后面代码,只要有一个消息未到达交换器就会 抛出 IOException 异常。
方式三:channel.addConfirmListener()异步监听发送方确认模式;
备用交换器
在第一次声明交换器时被指定,用来提供一种预先存在的交换器,如果主交换器无法路由消息,那么消息将被路由到这个新的备用交换器。
问题
如果发布消息的同时设置了mandatory会发送什么?
如果主交换器无法路由消息,RabbitMQ 并不会通知发布者,因为,向备用交换器发送消息,表 示消息已经被路由了。注意,新的备用交换器就是普通的交换器,没有任何特殊的地方。
使用备用交换器,向往常一样,声明 Queue 和备用交换器,把 Queue 绑定到备用交换器上。然后在声明主交换器时,通过交换器的参数, alternate-exchange,,将备用交换器设置给主交换器。
建议备用交换器设置为 faout 类型,Queue 绑定时的路由键设置为“#”
建议备用交换器设置为 faout 类型,Queue 绑定时的路由键设置为“#”
消息消费
消息的获得方式
拉取Get
属于一种轮询模型,发送一次 get 请求,获得一个消息。如果此时 RabbitMQ 中没有消息,会获得一个表示空的回复。总的来说,这种方式性能比较 差,很明显,每获得一条消息,都要和 RabbitMQ 进行网络通信发出请求
推送Consume
属于一种推送模型。注册一个消费者后,RabbitMQ 会在消息可用时,自动将消息进行推送给消费者
消息应答
自动确认
消费者在声明队列时,可以指定 autoAck 参数,当 autoAck=true 时,一旦消费者接收到了消息,就视为自动确认了消息。如果消费者在处理消息的过 程中,出了错,就没有什么办法重新处理这条消息,所以我们很多时候,需要在消息处理成功后,再确认消息,这就需要手动确认。
手动确认
消费者在声明队列时,可以指定 autoAck 参数,当 autoAck=false 时,RabbitMQ 会等待消费者显式发回 ack 信号后才从内存(和磁盘,如果是持久化消
息的话)中移去消息。否则,RabbitMQ 会在队列中消息被消费后立即删除它。
息的话)中移去消息。否则,RabbitMQ 会在队列中消息被消费后立即删除它。
采用消息确认机制后,只要令 autoAck=false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题, 因为 RabbitMQ 会一直持有消息直到消费者显式调用 basicAck 为止。
当 autoAck=false 时,对于 RabbitMQ 服务器端而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者, 但是还没有收到消费者 ack 信号的消息。如果服务器端一直没有收到消费者的 ack 信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消 息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。
Qos预取模式
在确认消息被接收之前,消费者可以预先要求接收一定数量的消息,在处理完一定数量的消息后,批量进行确认。如果消费者应用程序在确认消息
之前崩溃,则所有未确认的消息将被重新发送给其他消费者。所以这里存在着一定程度上的可靠性风险。
之前崩溃,则所有未确认的消息将被重新发送给其他消费者。所以这里存在着一定程度上的可靠性风险。
这种机制一方面可以实现限速(将消息暂存到 RabbitMQ 内存中)的作用,一方面可以保证消息确认质量(比如确认了但是处理有异常的情况)。
注意:消费确认模式必须是非自动 ACK 机制(这个是使用 baseQos 的前提条件,否则会 Qos 不生效),然后设置 basicQos 的值;另外,还可以基于 consume 和 channel 的粒度进行设置(global)
事务
使用方法和生产者一致
两种情况
autoAck=false 手动确认的时候是支持事务的,也就是说即使你已经手动确认了消息已经收到了,但 RabbitMQ 对消息的确认会等事务的 返回结果,再做最终决定是确认消息还是重新放回队列,如果你手动确认之后,又回滚了事务,那么以事务回滚为准,此条消息会重新放回队列
autoAck=true 如果自动确认为 true 的情况是不支持事务的,也就是说你即使在收到消息之后在回滚事务也是于事无补的,队列已经把 消息移除了。
消息拒绝
拒绝机制
Reject
可以使用 requeue 标识,告诉 RabbitMQ 是否需要重新发送给别的消费者。如果是 false 则不重新发送,一般这个消息就会被
RabbitMQ 丢弃。Reject 一次只能拒绝一条消息。如果是 true 则消息发生了重新投递。
RabbitMQ 丢弃。Reject 一次只能拒绝一条消息。如果是 true 则消息发生了重新投递。
Nack
可以一次性拒绝多个消息。也可以使用 requeue 标识,这是 RabbitMQ 对 AMQP 规范的一个扩展。
死信交换器DLX
死信:死了的信息。
死信交换器专门处理死了的信息(被拒绝可以重新投递的信息不算是死的)
消息变成死信的方式
消息被拒绝,且requeue被设置为false,不重新投递
消息过期(rabbitmq中消息默认是不过期的,但是可以设置队列的过期事件和消息的过期时间来达到过期的效果)
队列达到最大长度
死信交换器仍然只是一个普通的交换器,创建时并没有特别要求和操作。在创建队列的时候,声明该交换器将用作保存被拒绝的消息即可,相关的 参数是 x-dead-letter-exchange。
消息属性
属性
大小
按照 AMQP 的协议单个最大的消息大小为 16EB(2 的 64 次方),但是 RabbitMQ 将消息大小限定为 2GB(2 的 31 次方)。
存活时间
当队列消息的 TTL 和消息 TTL 都被设置,时间短的 TTL 设置生效。
如果将一个过期消息发送给 RabbitMQ,该消息不会路由到任何队列,而是直接丢弃。
问题
RabbitMQ 只对处于队头的消息判断是否过期(即不会扫描队列),所以,很可能队列中已存在死消息,但是队列并不 知情。这会影响队列统计数据的正确性,妨碍队列及时释放资源。
持久化
要求
持久化的队列和交换器
消息的投递模式为2
应用程序的类型和版本号
在生产者进行升级后,可以水用app-id属性进行区分
Request-Response模式
实际中的很多应用相当于一种一应一答的过程,需要双方都能给对方发送消息
使用
常见面试问题
如果消息达到无人订阅的队列会怎么办
消息一直在队列中等待,rabbitmq的默认队列长度是无限的
多个消费者订阅同一队列怎么办
消息以循环的方式发送给消费者,每个消息只发送给一个消费者
消息路由到不存在的队列怎么办
一般情况下,rabbitmq会忽略这个消息,也就是这个消息被丢弃了
为什么有事务了,还需要发送方确认模式
因为事务的性能非常差,事务会降低2-10被的性能
假设消费者模式中使用了事务,并且在消息确认之后进行了事务回滚,会是什么样的结果?
两种情况
autoAck=false 手动确认的时候是支持事务的,也就是说即使你已经手动确认了消息已经收到了,但 RabbitMQ 对消息的确认会等事务的 返回结果,再做最终决定是确认消息还是重新放回队列,如果你手动确认之后,又回滚了事务,那么以事务回滚为准,此条消息会重新放回队列
autoAck=true 如果自动确认为 true 的情况是不支持事务的,也就是说你即使在收到消息之后在回滚事务也是于事无补的,队列已经把 消息移除了。
死信交换器和备用交换器的区别
备用交换器是主交换器无法路由消息,那么消息将被路由到这个新的备用交换器,而死信交换器则是接收过期或者被拒绝的消息。
备用交换器是在声明主交换器时发生联系,而死信交换器则声明队列时发生联系。
备用交换器一般是用于生产者生产消息时,确保消息可以尽量进入 RabbitMQ, 而死信交换器主要是用于消费者消费消息的失效场景(比如消息过期,队列满了,消息拒绝且不重新投递)
0 条评论
下一页