RabbitMQ
2022-04-17 15:37:12 46 举报
AI智能生成
RabbitMQ
作者其他创作
大纲/内容
特点
流量消峰
消息模块解耦
异步
常见的MQ
ActiveMQ
高吞吐, 低延迟, 高可用, 低丢失
维护少
Kafka(大型公司)
超高吞吐, 低延迟, 分布式可用性; 有第三方web管理页面
RocketMQ(双11等高并发场景)
阿里, Java. 参照了Kafka
高吞吐, 高可用, 可分布式. 0丢失
RabbitMQ(中小公司)
基于AMQP协议的消息中间件. 最主流. 高性能, 稳定, 健壮,跨平台, 支持多语言, 有第三方web管理页面, 社区活跃, 更新频繁;
商业版付费
四大核心概念
交换机 Exchange
概述
对生产者推送的消息进行分发到队列的模型, 分发至队列是, 使用 RoutingKey 进行与队列的绑定
类型
默认交换机(AMQP default)
当不指定交换机, 信道直接创建队列时, 使用的是默认交换机
通常将交换机名设为空串
简单队列
扇出 (fanout)
发布订阅
对多个queue使用相同routingKey绑定
直接 direct
路由模式
对多queue进行routingKey 分组, 进行根据路由的分发
主体 topic
对routingKey进行判定而确定是否发送到对应队列
routingKey格式:
以点联系的单词列表
*(星号)可以代替一个单词
#(井号)可以替代零个或多个单词
#(井号)可以替代零个或多个单词
标题 headers
队列
生产者
消费者
六种模式
简单队列
默认交换机
工作队列
单生产者, 多工作消费者, 轮询接收
一个消息被处理一次
消息的确认 -- 消息应答
消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了(肯定/否定),rabbitmq 可以把该消息删除了
自动应答
消费者接收消息时, 立刻应答
手动应答
特点
Multiple
批量应答处理, 减少网络拥堵
性能提升, 一致性下降
类型
肯定确认
Channel.basicAck
否定确认
Channel.basicNAck()
Channel.basicReject()
与 Channel.basicNack 相比少一个参数
不处理该消息了直接拒绝,可以将其丢弃了
不处理该消息了直接拒绝,可以将其丢弃了
消息的防丢失
消费者断连 -- 消息自动重新入列
消费者失去连接, 但其已接收的消息并未处理完(未发送ACK确认). 消息队列会将该消息自动入列并重新排队以被消费
开启手动应答, 在消费者回调中, 对消息进行消费后, 进行手动肯定应答. 对于未肯定应答的消息, 会自动进行重新入列
服务器宕机 -- 消息和队列的持久化
队列持久化
(生产者) 初始化队列时, 持久化参数 durable: true
消息持久化
生产者发送消息时, 消息属性设置 BasicProperties: MessageProperties.PERSISTENT_TEXT_PLAIN
将消息标记为持久化并不能完全保证不会丢失消息,消息还在缓存的一个间隔点, 此时并没
有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余
有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余
持久化间隔点时的服务器宕机 -- 发布确认
前提: 队列、消息的持久化开启
发布信道开启 confirm 模式, 消息到达信道后回调
开启发布确认
开启生产者信道的发布确认
channel.confirmSelect()
(生产者)获取发布确认的结果
channel.waitForConfirms() : boolean
三种模式
同步
原理: 调用waitForConfirms() 返回 自上次调用以来消息发布结果
具体子模式
普通确认 单个 (简单,但吞吐量非常有限)
批量确认 (简单,合理的吞吐量, 排错难)
异步确认 (综合性能和排错性最佳)
原理: 对信道添加发布确认监听器, 其中包含发布正确和错误的回调, 可对正确发布和错误发布分别处理.
实现
Channel.addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback);
异常消息的处理 -- 死信队列
异常情况
消息被拒
消息TTL过期
队列达到最大长度
死信队列模型
子主题
基于死信的延迟消息
延迟死信
消息分发策略
设置
preFetchCount预取值
一个未确认的消息缓冲区,该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,
RabbitMQ 将停止在通道上传递更多消息
RabbitMQ 将停止在通道上传递更多消息
对消费者信道进行预取值设定 prefetchCount
channel.BasicQos( prefetchCount )
轮询(默认)
0
根据机器性能 -- 不公平分发
1
其它自定义的 >1 的预取值
延时队列
利用死信队列中 TTL 过期的死信特性, 直接不配置前置消费者, 来达到延时通知的效果
问题: 对于消息粒度的过期时长, 死信判断只针对队列的最出口一个消息. 所以即使中间某个消息已超过ttl, 但因为消息堆积也可能还存在队列中. 故不能达成延时队列的顺序性
解决方法: 使用 延时队列交换机插件
rabbitmq_delayed_message_exchange-3.8.0.ez
模型
子主题
注意: 对插件延时, 需要对延时交换机设置参数: x-delayed-type=direct
发布确认高级与队列回退
前提: 开启发布确认模式/队列回退
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publish-return: true
在broker块对消息是否接收进行回调反馈
交换机(发布确认)
RabbitTemplate.ConfirmCallBack
队列(队列回退)
RabbitTemplate.ReturnCallBack
rabbitTemplate.mandatory = true
将回退的消息返回给生产者, 否则丢弃
实现如上接口, 并注入到 使用的rabbitTemplate的对应内部属性中, (该注入操作需要前置到调用rabbitTemplate)
架构
子主题
备份交换机
功能
可以替代队列回退 进行 队列的路由异常的处理. 优先级比队列回退高
当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备
份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定
的队列中
份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定
的队列中
架构
子主题
使用
使用参数 "alternate-exchange=备份交换机名" 对主机进行绑定设置
开发中其它场景
幂等性
唯一ID
Redis Setnx
优先级队列
优先级范围 [0,255]
对入队列的消息, 进行排队
使用
队列开启优先级设置 "x-max-priority=最大优先级", 通常最大设为10, 消息添加优先级参数
惰性队列
队列的模式设置 "x-queue-mode=指定模式"
default正常队列(内存); lazy惰性队列(磁盘)
在慢消费-消息大量堆积而长时间不能消费的情况下节省内存
高可用负载均衡
HaProxy + KeepAlive
RabbitMQ的集群与镜像队列
异常报错
https://blog.csdn.net/Epoch_Elysian/article/details/94075456
集群状态下(网络延迟导致的)数据一致性问题
联邦交换机 Federation Exchange
联邦队列 Federation Queue
架构
子主题
0 条评论
下一页