RabbitMQ
2023-03-07 15:39:50 0 举报
AI智能生成
RabbitMQ全体系详细介绍以及工作模式特点,核心问题剖析。
作者其他创作
大纲/内容
名词概念
broker
代理:发送消息与接收消息的中间层
exchange
交换机:不同的交换机类型将消息传输到不同的队列
消息进入broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中
常见类型:direct(ptp)、topic(publish-subscribe)、fanout(multicast)
queue
队列:消息通过队列被接收
消息最终被送到这里等待consumer取走
binding
绑定:绑定交换机与队列
交换机先通过binding才会到queue
routing key
路由关键字:消息根据它来确定投递的交换机
virtual host
虚拟主机:一个broker可以通过设置vhost来区分不同用户权限
出于多组户和安全因素设计,类似于网络中的namespace概念
用户可以在自己的vhost下创建并管理自己的exchange/queue
channel
通道:客户端与服务端的连接可以建立多个channel,每一个代表一个会话任务
如果每次访问RabbitMQ都建立一个Connection,消息量大的时候建立TCP Connection的开销也将是巨大的,效率也较低
Channel作为轻量级的Connection极大减少了操作系统建立TCP Connection的开销
producer
生产者:提供消息
consumer
消费者:接收消息
AMQP
Advanced Message Queuing Protocol
高级消息队列协议
网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计
基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件产品不同、开发语言不同等条件限制
兼容JMS(Java Message Service)协议
工作模式:P2P、PubSub
ActiveMQ就是基于此协议实现
菜单内容
Overview:提供服务端概览信息
Totals
Queued messaages:查询队列消息的阻塞情况(可设置查询时间范围)
ready:待消费的消息总数
unacked:待应答的消息总数
Total:ready+unacked
Messages rates:查询队列消息的消费情况(可设置查询时间范围)
单位:速率,计算公式:(num1-num0)/(s1-s0) num1:s1时刻的个数。num0:s0时刻的个数。
Publish:生产者发布消息的速率
Publisher confirm:生产者confirm的速率
Deliver(manual ack):消费者手动 ack 速率
Deliver(auto ack):消费者自动 ack 速率
Consumer ack:消费者正在确定 ack 速率
Redelivered:redelivered标签消息传递的速率
Get(manual ack):basic.get 手动 ack 速率
Get(auto ack):basic.get 自动 ack 速率
Get(empty):basic.get 空队列速率
Return:生产者退回消息的速率
drop:被丢弃的消息速率
Disk read:queue从磁盘读取消息的速率
Disk write:queue从磁盘写入消息的速率
Global counts
Connections:连接数
Channels:管道数
Exchanges:交换机数
Queues:队列数
Consumers:消费者数
Nodes:每一个Node都是一个Broker,如果是集群模式会有多个Node
File descriptors:打开的文件描述符和限制
Socket descriptors:Scoket数量和限制。当限制被耗尽时,RabbitMQ将停止接受新的网络连接。
Erlang descriptors:erlang 运行中的进程数
Memory:占用的内存,剩余可用内存
Disk space:占用的硬盘,剩余可用磁盘
Uptiome:节点up的时间
Churn statistics:连接、管道、队列创建和关闭的统计
Ports and contexts:端口和上下文
Export definitions:导出某一个节点的用户,虚拟主机,权限,参数,交换,队列和绑定组成,可以用于其他节点的导入
Import definitions:导入definitions
Connections
All connections
Overview
Channels
Exchanges
All exchanges
Add a new exchange
Exchange type
Fanout
广播,将消息交给所有绑定到交换机的队列
无需指定routing key也可以
Topic
通配符,把消息交给符合routing pattem(路由模式)的队列
通配符规则:#匹配一个或多个词,*匹配不多不少恰好一个词
例如:item.#能够匹配item.insert.abc
例如:item.*只能匹配到item.insert
Direct
定向,把消息交给符合指定routing key的队列
Headers
根据arguements来匹配消费者
Queues
All queues
Add a new queue
Admin
All users
Add ad user
User details
Overview
Permissions
Set permission
Topic permissions
Update this user
Delete this user
工作模式
point to point
一个生产者,一个消费者,不需要设置交换机(使用默认交换机)
one to many
一个生产者,多个消费者(竞争关系),不需要设置交换机(使用默认交换机)
publish-subscribe
交换机类型为fanout,需要交换机与队列进行绑定。当消息发送到交换机后,交换机会将消息发送到绑定的队列
routing
交换机类型为direct,交换机与队列进行绑定。当消息发送到交换机后,会根据rounting key来判断哪个队列接收此消息
topics
交换机类型为topics,交换机与队列进行绑定。当消息发送到交换机,会根据rounting key + 通配符的形式来判断哪个队列接收此消息
优劣势
优势
应用解耦,提高容错
异步提速
削峰,提高并发量
开源成熟度高,社区相对活跃
劣势
系统可用性降低
系统引入的外部依赖越多,系统稳定性就越差。如果保证宕机不对业务造成影响
系统复杂度提高
同比其他消息中间件(kafka、rocketmq)性能以及处理消息积压并非最优
功能扩展二次开发代价很高,因为开发语言是erlang
端口说明
15671
WEB访问管理界面使用的端口
15672
管理监听端口
5671、5672
AMQP 0-9-1 without and with TLSclient端通信口
4369
RabbitMQ节点和CLI工具使用的对等发现服务
erlang守护线程开放端口
25672
( Erlang distribution) server间内部通信口
核心问题
消息投递可靠性
提供两种方式来控制消息可靠性的投递
事务
开启降低发送效率(影响巨大)
channel
txSelect开始事务
txCommit提交事务
txRollback事务回滚
Producer与Broker之间的处理
confirm确认模式
ack接受消息
nack拒绝消息
重新回到队列
丢弃消息
重试机制
return退回模式
接收被退回的消息
springboot如何解决
生产者
异步
定义callback来判断消费者反馈的confirm状态
消息发送到exchange的时候就会返回confirmCallback
注意这里的回调仅仅是确认消息到达broker,而不是消费者返回的信息
定义returnCallback来获取投递失败的信息
生产者、消费者都必须设置rabbitmqTemplate.setMandatory(true);才能接收退回消息
不能成功到达队列会被退回消息
如果使用Springboot仅支持一个Return Callback
同步
channel.waitForConfirms()
消费者
定义监听器
签收消息basicAck
批量丢弃消息basicNack(multiple)
重新回到队列basicNack(requeue)
单次拒绝消息basicReject
springboot confrim属性
acknowledge
none
自动签收
manual
手动签收
auto
根据异常情况确认
总结
持久化
交换机持久化
队列持久化
生产方确认Confirm
消费者确认Ack
Broker高可用
消息丢失
ack为自动签收有可能造成消息丢失,例如:一次性接收5条消息后第一条出现系统异常,后面4条消息在rabbitmq眼中认为已接收,而在系统中则丢失
禁止自动接收
TTL:Time to Live(存活时间、过期时间)
当消息到达存活时间,还没有被消费,会被自动清除
RabbitMQ可以对消息设置过期时间,也可以对整个队列设置过期时间
arguments:设置x-message-ttl
死信队列:Dead Letter Exchange
如何产生死信
basicNack or basicReject 且 requeue 为 false
TTL
消息队列的长度已经到达上限
延迟队列
消息进入队列后不会立即被消息,只有到达指定时间后,才会被消费
TTL+死信队列实现延迟队列
队列设置好消息过期时间,到达过期时间没有被消费则转换到死信交换机
应用场景
例如下单30分钟后还未被支付则取消,延迟队列要优于定时任务
消息积压
描述:如果有大量消息堆积在队列中,性能会急剧下降
解决方案
增加消费者节点,提高消费能力
预判(兜底):提供批量接收并将消息记录到数据库的队列,来作为缓冲
哪些情况会产生消息挤压
消费能力不足
消费者宕机
发送者流量超出预期
消息幂等性(重复消费)
乐观锁、悲观锁
消息全局唯一ID
消费完成后ID存放到redis/db,如果发现id已存在则认为是重复消息
按顺序消费
消息投递的路径
设置队列优先级
拒绝接收消息并重新投放到队列分发到其他消费者
分布式锁
集群
单一节点
普通集群
默认的集群模式
仅提高吞吐量,并未实现高可用
镜像集群
如何搭建
1、先搭建rabbitmq普通集群
2、在普通集群模式的基础上配置HAProxy来实现高可用
原理
缺点
1、网络带宽压力很大
2、提供了数据的冗余备份,会导致存储压力变大,可能会出现IO瓶颈
流量削峰
异步解耦
分发机制
轮询分发
公平分发
参数
交换机
队列
持久化原理
消息持久化
持久化的消息到达队列会被写入磁盘,也可以在内存中备份,提高性能,如果内存吃紧会从内存中清除,重启后消息不会丢失
消息非持久化
非持久化的消息到达队列会被写入内存,同时也可以写入磁盘,如果内存吃紧会从磁盘读取消息,以节省内存空间,重启后消息会丢失
持久层
rabbitmq_queue_index 队列索引
消息的存储地点、是否已被交付给消费者、是否已被消费者ack
每一个队列都有与之对应的一个队列索引
rabbitmq_msg_store 消息存储
persistent:持久化消息
transient:非持久化
负责消息的存储,它被所有的队列共享,在每个节点中有且只有一个
工作流程
发送消息
非事务
客户端:Connection.Start
服务端:Connection.start-OK
客户端:Conncetion.Tune
服务端:Connection.Tune-Ok
客户端:Connection.Open vhost=xxx
服务端:Connection.Open vhost-OK
客户端:Channel.Open
服务端:Channel.Open-OK
客户端:Basic.Publish x=... rk=... Content-... type=...
事务
省略以上内容
客户端:Tx.Select
服务端:Tx.Select-OK
客户端:Basic.Publish x=... rk=... Content-... type=...
提交
客户端:Tx.Commit
服务端:Tx.Commit-OK
回滚
客户端:Tx.Rollback
服务端:Tx.Rollback-OK
接收消息
basic.consume与basic.get的区别
consume是一直从队列中取出消息
get只取队列中的第一条消息
根据实际场景来选择消费方式,切记get不可替代consume循环取出消息,开销太大
0 条评论
下一页