rabbit
2024-05-06 16:54:21 49 举报
AI智能生成
rabbitMQ 基础
作者其他创作
大纲/内容
作用
解耦
冗余
扩展性
削峰
可恢复性
顺序性
缓冲
异步通信
优劣
优点:
- 延迟性低
- 后台管理友好
- 社区比较活跃
缺点:
- 吞吐量低
- erlang语言开发,深入源码不太方便
具体特点
可靠性:RabbitMQ 使用 些机制来保证可靠性 如持久化、传输确认及发布确认等
灵活的路由:在消息进入队列之前,通过交换器来路由消息 对于典型的路由功能,
RabbitMQ 己经提供了 些内置的交换器来实现 针对更复杂的路由功能,可以将多个
交换器绑定在 起, 可以通过插件机制来实现自己的交换器
RabbitMQ 己经提供了 些内置的交换器来实现 针对更复杂的路由功能,可以将多个
交换器绑定在 起, 可以通过插件机制来实现自己的交换器
扩展性:多个 MQ 节点可以组成 个集群,也可以根据实际业务情况动态地扩展
集群中节点。
集群中节点。
高可用:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队
列仍然可用
列仍然可用
多种协议(默认AMQP)
AMQP
- Module Laye 位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑。
例如 客户端可以使用 Queue Declare 命令声明个队列或者使用 Basic Consume订阅消费一个队列中的消息。 - Session Layer:位于中间层,主要负责将客户端的命令发送给服务器 再将服务端的应答返回给客户端,
主要为客户端与服务器之间的通信提供可靠性同步机制和错误处理。 - Tranport Laye 位于最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示等。
多语言客户端
管理界面
插件机制
架构
Producer:生产者
- 生产者连接到 RabbitMQ Broker 建立一个连接( onnection ),开启一个信道(参考3.1)
- 生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等参考3.2)
- 生产者声明 个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等(参考3.2)
- 生产者通过路由键将交换器和队列绑定起来(参考3.2)
- 生产者发送消息至 RabbitMQ Broker,其中包含路由键、交换器等信息(参考3.3)
- 相应的交换器根据接收到的路由键查找相匹配的队列
- 如果找到,则将从生产者发送过来的消息存入相应的队列中。
- 如果没有找到 ,则根据生产者配置的属性选择丢弃还是回退给生产者(参考4.1)
- 关闭信道。
- 关闭连接。
Consumer:消费者
- 消费者连接到 RabbitMQ Broker ,建立一个连接( Connection ),开启一个信道( Channel)
- 消费者向 RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,以及做 些准备工作(详细内容请参考 3.4 节〉。
- 等待 RabbitMQ Broker 回应并投递相应队列中的消息,消费者接收消息。
- 消费者确认( ack )接收到的消息
- RabbitMQ 从队列中删除相应己经被确认的消息
- 关闭信道。
Broker:服务节点(一个Broker可以看做是一个服务节点或者服务实例)
具体组件
组件介绍
Connection:连接
Channel:信道
连接tcp连接复用,不仅可以减少性能开销,同时也便于管理。
每个线程都把持一个信道,所以信道复用了TCP连接。同时rabbitmq可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的流量不是很大时,复用单一的connection可以再产生性能瓶颈的情况下有效地节省tcp连接资源,但是当信道本身的流量很大时,这时候多个信道复用一个connection就会产生性能瓶颈,进而是整体的流量被限制了。此时就需要开辟多个connection,将这些信道均摊到这些connection中
每个线程都把持一个信道,所以信道复用了TCP连接。同时rabbitmq可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的流量不是很大时,复用单一的connection可以再产生性能瓶颈的情况下有效地节省tcp连接资源,但是当信道本身的流量很大时,这时候多个信道复用一个connection就会产生性能瓶颈,进而是整体的流量被限制了。此时就需要开辟多个connection,将这些信道均摊到这些connection中
Queue:存储消息
不支持队列层面广播消费
Exchange:交换器
交换器类型
fanout:把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中
direct:把消息路由到那些 BindingKey = RoutingKey完全匹配的队列中
topic:通过*和#进行模糊匹配,没有direct那么绝对相等
headers:不依赖路由键的匹配规则,依赖依赖消息内容中的headers属性。性能差,很少使用
RoutingKey:路由键
需要与交换器类型和绑定键(BindingKey)联合使用生效
Binding:绑定键(BindingKey)
大多数情况下习惯性地将 BindingKey写成 RoutingKey,如何区分
在发送消息的时候,其中需要的路由键是 RoutingKey
在使用绑定的时候,其中需要的路由键是 BindingKey
当BindingKey和RoutingKey相匹配时,消息会路由到对应队列中
vhost(虚拟主机)
开箱即用的默认vhost:"/"
权限隔离
组件基础
addReturnListener:添加ReturnListener监听器
监听Basic.Return
exchangeDeclare:声明交换机
exchange:交换机名称
type:交换机类型
durable:设置是否持久化
autoDelete:设置是否自动删除
自动删除的前提是至少有 个队列或者交换器与这个交换器绑定 之后所有与这个交换器绑
定的队列或者交换器都与 解绑。注意不能错误地 这个参数理解为 “当与此交换器
连接的客户端都断开时 RabbitMQ 会自动 除本交换器
定的队列或者交换器都与 解绑。注意不能错误地 这个参数理解为 “当与此交换器
连接的客户端都断开时 RabbitMQ 会自动 除本交换器
internal:设置是否内置
如果设置为 true ,则表示是内置的交换器,
客户端程序无法直接发送消息到这个交换器中,只能通过交换机路由到交换机这种方式。
客户端程序无法直接发送消息到这个交换器中,只能通过交换机路由到交换机这种方式。
argument:其他一些结构化参数
alternate-exchange:绑定备份交换机
exchangeDeclareNoWait方法:不常用,不建议,比exchangeDeclare多一个NoWait属性
exchangeDelete:删除交换机
queueDeclare:声明队列
无参声明队列:由 RabbitMQ 命名的(类似这种amq.gen-LhQzlgv3GhDOv8PIDabOXA 名称,这种队列也称之为匿名队列〉、排他的、自动删除
的、非持久化的队列。
的、非持久化的队列。
有参声明队列
queue:队列名称
durable:设置是否持久化
exclusive:设置是否排他
如果一个队列被声明为排
他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意
三点:排他队列是基于连接( Connection )可见的,同 个连接的不同信道( Channel)
是可以同时访问同一连接创建的排他队列:“首次”是指如果 个连接己经声明了
排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同:即使该队
列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列
适用于一个客户端同时发送和读取消息的应用场景。
他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意
三点:排他队列是基于连接( Connection )可见的,同 个连接的不同信道( Channel)
是可以同时访问同一连接创建的排他队列:“首次”是指如果 个连接己经声明了
排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同:即使该队
列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列
适用于一个客户端同时发送和读取消息的应用场景。
autoDelete:设置是否自动删除
为 true 则设置队列为自动删除。自动删除的前提是:
至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会
自动删除。不能把这个参数错误地理解为:“当连接到此队列的所有客户端断开时,这
个队列自动删除”,因为生产者客户端创建这个队列,或者没有消费者客户端与这个队
列连接时,都不会自动删除这个队列。
至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会
自动删除。不能把这个参数错误地理解为:“当连接到此队列的所有客户端断开时,这
个队列自动删除”,因为生产者客户端创建这个队列,或者没有消费者客户端与这个队
列连接时,都不会自动删除这个队列。
argument:其他一些结构化参数
注意:生产者和消费者都能够使用 queueDeclare 来声明一个队列,但是如果消费者在同一个
信道上订阅了另一个队列,就无法再声明队列了。必须先取消订阅,然后将信道直为“传输”
模式,之后才能声明队列。
信道上订阅了另一个队列,就无法再声明队列了。必须先取消订阅,然后将信道直为“传输”
模式,之后才能声明队列。
queueDeclarePassive 的方法,比较常用
这个方法用来检测相应的队列是否存在。 如果存在 正常返回 ,如果不存在则抛出异常: 404 channel exception ,同时Channel 也会被关闭
queueDeclareNoWait方法:不常用,不建议,比queueDeclare多了一个NoWiat属性
queueDelete:删除队列
queuePurge:清空队列中内容,不删除队列
queueBind:队列和交换机绑定
queue:队列名称
exchange:交换机名称
routingKey:用来绑定队列和交换机的路由键
argument:其他一些结构化参数
queueUnBind:解绑队列和交换机
exchangeBind:交换机和交换机绑定
basicPublish:发送消息
exchange:交换机
routingKey:路由键
props:消息的基本属性集
其包含 14 个属性成员
contentType
contentEnconding
headers(Map<String,Object>)
deliveryMode:投递模式
priority:优先级
correlationid:用来关联请求( request )和其调用 RPC 之后的回复( response)
replyTo:设置回调队列
expiration:过期时间
messageId
timestamp
type
userId
appId
cluster
byte[] body: 消息体
mandatory
true(交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么 RabbitM 会调用 Basic.Return 命令将消息返回给生产者)
false(上述情况,直接丢弃)
false(上述情况,直接丢弃)
immediate
RabbitMQ 3.0开始去掉这个参数的支持,建议采用TTL、DLX
消费模式
推(Push)模式
basicConsumer
queue:队列名称
autoAck:自动确认true/手动确认false
consumerTag :消费者标签,用来区分多个消费者
noLocal:设置为True,同一个connection的生产者发送的消息,消费者不能接收
exclusive :设置是否排他
arguments 设置消费者的其他参数
callback 设置消费者的回调函数
拉(Pull)模式
basicGet
queue:队列名称
autoAck:自动确认true/手动确认false
推拉模式的场景
Basic Consume 将信道( Channel )直为接收模式,直到取消队列的订阅为止。
在接收模式期间, RabbitMQ 会不断地推送消息给消费者,当然推送消息的个数还是会受到 Basic.Qos
的限制.
如果只想从队列获得单条消息而不是持续订阅,建议还是使用 Basic.Get 进行消费.但
是不能将 Basic.Get 放在一个循环里来代替 Basic.Consume ,这样做会严重影响 RabbitMQ
的性能.
如果要实现高吞吐量,消费者理应使用 Basic.Consume 方法。
在接收模式期间, RabbitMQ 会不断地推送消息给消费者,当然推送消息的个数还是会受到 Basic.Qos
的限制.
如果只想从队列获得单条消息而不是持续订阅,建议还是使用 Basic.Get 进行消费.但
是不能将 Basic.Get 放在一个循环里来代替 Basic.Consume ,这样做会严重影响 RabbitMQ
的性能.
如果要实现高吞吐量,消费者理应使用 Basic.Consume 方法。
channel.basicQos(4);
当consumer接收到4条消息后,不再会接收其它消息,只有当这些消息被消费时,才会接收新的消息。
当consumer接收到4条消息后,不再会接收其它消息,只有当这些消息被消费时,才会接收新的消息。
拒绝
basicReject(单个)
deliveryTag:消息的编号
requeue:true(重新存入队列)/false(队列中移除)
basicNack(批量)
deliveryTag:消息的编号
requeue:true(重新存入队列)/false(队列中移除)
multiple:true(拒绝消息编号前的所有未被当前消费者确认的消息)
重推
basicRecover
请求 RabbitMQ 重新发送还未被确认的消息
requeue 参数设置为 true (默认),则未被确认的消息会被重新加入到队列中,
这样对于同一条消息来说,可能会被分配给与之前不同的消费者。
这样对于同一条消息来说,可能会被分配给与之前不同的消费者。
关闭(connection和channel)
getCloseReason 方法可以让你知道对象关闭的原因
isOpen 方法检测对象当前是否处于开启状态
close (int closeCode , String closeMessage) 方法显式地通知当前对象执行关闭操作
ShutdownListener
ShutdownSignalException
isHardError 方法可以知道是 Co 且口 ectio 丑的还是 Channel 的错误
getReason 方法可以获取 cause 相关的信息
组件进阶
备份交换机
解决什么?
生产者在发送消息的时候如果不设置 mandatory 参数,那么消息在未被路由的情况下将会丢失:
如果设置了 mandatory 参数,那么需要添加 ReturnListener 的编程逻辑,生产者的代码将
变得复杂
如果设置了 mandatory 参数,那么需要添加 ReturnListener 的编程逻辑,生产者的代码将
变得复杂
exchangeDeclare中参数argument设置alternate-exchange 绑定交换机
建议设置为 fanout 类型,否则可能会导致消息丢失
备份交换机出现(不存在、未绑定队列、未匹配队列)这些情况,不会有任何异常
如果备份交换器和 mandatory 参数一起使用,那么 mandatory 参数无效
TTL:过期时间
默认无过期
RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的
唯一依据是消费该消息的消费连接是否己经断开,这么设计的原因是 RabbitMQ允许消费者消费一条消息的时间可以很久很久。
唯一依据是消费该消息的消费连接是否己经断开,这么设计的原因是 RabbitMQ允许消费者消费一条消息的时间可以很久很久。
设置消息的TTL
通过队列属性设置,队列中所有消息都有相同的过期时间
queueDeclare 方法中加入x-message -ttl 参数实现的,单位毫秒
TTL 设置为 表示除非此时可以直接将消息投递到消费者,
否则该消息会被立即丢弃
否则该消息会被立即丢弃
对消息本身进行单独设置,每条消息的 TTL 可以不同
basicPublish 方法中加入 expiration的属性参数,单位毫秒
因为每条消息的过期时间不同,消息过期,不会马上从队列中抹去,删除所有过期消息势必要扫描整个队列,
所以每条消息是否过期是在即将投递到消费者之前判定的
所以每条消息是否过期是在即将投递到消费者之前判定的
注意
则消息的 TTL 以两者之间较小的那个数值为准
消息在队列中的生存时间一旦超过设置 TTL 值时,就会变成“死信”
设置队列的TTL
通过 channel queueDeclare 方法中的 x-expires 参数
可以控制队列被自动删除前处于未使用状态的时间
可以控制队列被自动删除前处于未使用状态的时间
未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,
并且在过期时间段内也未调用过 Basic Get 命令。
并且在过期时间段内也未调用过 Basic Get 命令。
RabbitMQ 会确保在过期时间到达后将队列删除,但是不保障删除的动作有多及时 。
在RabbitMQ重启后,持久化的队列的过期时间会被重新计算。
在RabbitMQ重启后,持久化的队列的过期时间会被重新计算。
DLX:死信队列
当消息在一个队列中变成死信( dead message )之后,它能被重新被发送到另一个交换器中,
这个交换器就是 DLX ,绑定 DLX 的队列就称之为死信队列。
这个交换器就是 DLX ,绑定 DLX 的队列就称之为死信队列。
变成死信的情况
消息被拒绝( Basic.Reject/Basic.Nack ),井且设置 requeue 参数为 false;
消息过期
队列达到最大长度
queueDeclare 方法中设置 x-dead-letter-exchange 参数来为这个队列添加 DLX
可以为这个 DLX 指定路由键,如果没有特殊指定,则使用原队列的路由键:
args.put (”x-dead-letter-routing-key” ,”dlx-routing-key”);
args.put (”x-dead-letter-routing-key” ,”dlx-routing-key”);
延迟队列
通过DLX 和TTL 实现延迟队列
假设一个应用中需要将每条消息都设置为 10 秒的延迟,
生产者通过 exchange.normal 这个交换器将发送的消息存储在 queue.normal 这个队列中。消费者
订阅的并非是 queue.normal 这个队列,而是 queue.dlx 这个队列 。当消息从 queue.normal 这个队
列中过期之后被存入 queue.dlx 这个队列中,消费者就恰巧消费到了延迟 10 秒的这条消息
生产者通过 exchange.normal 这个交换器将发送的消息存储在 queue.normal 这个队列中。消费者
订阅的并非是 queue.normal 这个队列,而是 queue.dlx 这个队列 。当消息从 queue.normal 这个队
列中过期之后被存入 queue.dlx 这个队列中,消费者就恰巧消费到了延迟 10 秒的这条消息
顺序消息
channel.basicQos(1) 设置每次只处理一条消息
通过手动ACK手动确认
通过手动ACK手动确认
channel.basicQos(1)指该消费者在接收到队列里的消息但没有返回确认结果之前,队列不会将新的消息分发给该消费者。
队列中没有被消费的消息不会被删除,还是存在于队列中。
队列中没有被消费的消息不会被删除,还是存在于队列中。
RabbitMq 通过交换机发送到指定的队列,队列保证了顺序,但是多个消费者的话,需要消费者自己去保证,比如先通过分布式锁锁定,然后存储到表中或者缓存中。
优先级队列
设置队列的 x-max-priority 参数来实现
优先级高的消息可以被优先消费,这个也是有前提的
如果在消费者的消费速度大于生产者的速度Broker 中没有消息堆积的情况下,对发送的消息设置优先级也就没有什么实际意义
RPC实现
1. 客户端启动,创建一个匿名回调队列(由RabbitMq自动创建)
2. 客户端为RPC请求设置两个属性:replyTo用来告知RPC服务端回复请求时的目的队列,即回调队列;correlationId用来标记一个请求
3. 请求被发送到rpc_queque队列中
4. RPC服务端监听rpc_queue 队列中的请求,当请求到来时,服务当会处理并把带有结果的消息发送到客户端。接收的对列就是replyTo设定的回调队列
持久化
交换器的持久化
队列的持久化
消息的持久化
RabbitMQ消息追踪之Firehose
0 条评论
下一页