RabbitMQ
2024-06-18 00:36:32 2 举报
AI智能生成
登录查看完整内容
RabbitMQ梳理(基于RabbitMQ官方文档的Java 入门、Spring AMQP入门 + AMQP091概念文章 + Spring BootAMQP官方文档)
作者其他创作
大纲/内容
解耦
削峰
异步
应用场景
AMQP 091(当前用的)
AMQP 1.0
MQTT
STOMP
AMQP协议模型
事务(代码端,没用过)
生产者
消费者
Channel(AMQP),就是个连接池,共享单个 TCP 连接的轻量级连接
connection(http)
name
Durability持久性,MQ重启后,交换机还在
当最后一个队列和交换机解绑时,交换机自动删除
Auto-delete
可选的各种参数,根据插件定义
参数
交换机的属性
一对一定点发送
一对多组播
Routing Key==Binding Key
direct(默认的也是这个类型,比较特殊的是它的交换机名的空字符串,所有队列以队列名作为路由键绑定到这个交换机上)
所用于广播
无视routing key,路由到所有与它绑定的Queue中
fanout
多用于组播
Routing Key模糊匹配
topic
不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的header属性进行匹配。
headers
交换机的类型
exchange交换机,将消息路由到匹配的队列中
MQ重启后队列还在
Durable 持久化
只有一个连接可以使用这个队列,当连接关闭后,队列删除
Exclusive 独占
最后一个消费者取消订阅时,删除队列
自动删除
各种,如消息的TTL、队列长度
队列的属性
①声明队列时,通过x-dead-letter-exchange去指明这个队列的死信交换机,队列中出现死信后,死信会重新投递到死信交换机中
②队列不指定消费者,设置过期时间,到期后消息变死信
②开一个队列绑到死信交换机上,消费者监听这个队列去处理死信,达到一个延迟消费的效果
①基于死信队列实现延迟队列(TTL+Dead Letter Exchanges)
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
使用x-delayed-message声明交换机
消息带x-delay头
延迟时间内,消息不会进入队列,时间到了才会进入队列
②安装插件,集成新的交换机类型
延迟队列(delayed)
声明队列时,加上x-queue-mode=lazy参数
通过设置队列参数实现懒惰队列,消息会先落盘
ps:这个和延迟队列的使用场景不同,它就要是为了减轻内存的压力,不过这个模式的磁盘IO会比较多
懒惰队列lazy queue
死信的来源?
会回到交换机
丢弃
或者持久化处理
死信怎么处理?
声明队列时,指定可选 x-dead-letter-exchange 参数
死信队列?
死信交换Dead Letter Exchanges
queue队列
拒绝,直接丢弃
溢出,先进先出
超时
死信的产生
入库
监听,进入死信队列,消费端监听死信队列
死信的处理
死信消息
message消息
用于环境隔离
virtual host
核心概念(基于AMQP091)
①声明ConfirmCallBack接口,重写confirm方法,绑定到RabbitMQTemplate
配置文件publisher-confirm-type: correlated,交换机确认
①声明ReturnsCallBack接口,重写returnMessage方法,绑定到RabbitMQTemplate
publisher-returns: true,队列确认
生产者异步确认机制
使用备用交换机,消息投递失败,转投递到备用交换机
Java这边的Server端有一个自动恢复机制
生产者生产消息到RabbitMQ Server消息丢失
配置队列的持久化
配置Message的持久化
持久化
Server存储的消息丢失
自动ACK
basicAck方法
basicNAck
acknowledge-mode: manual
手动ACK(√)
消费者使用消费成功使用ACK确认消息
消费者消费失败使用NACK,消息放回服务端,消费者重试一次
RabbitMQ Server到消费者消息丢失
使用扩展的发布确认(Publisher Confirms)默认解决,不过Spring AMQP好像没做这方面的适配,需要自己用原生的Java API
实现队列的高可用性,并用死信队列来保证消息的处理
其他:
消息丢失
消息内部会生成一个唯一id(deliverTag 64位),消费者在处理消息时检查是否已经处理过该标识的消息。可以使用数据库或分布式缓存(如Redis)来记录已处理的消息标识
确保消息的处理逻辑是幂等的,即同一消息即使被处理多次,结果也是相同的。例如,在数据库操作中,可以使用唯一约束来防止重复插入。
使用数据库的去重表来记录已经处理的消息,处理新消息前先检查去重表,防止重复处理
重复消费
扩容
监控报警
增加消费者数量
优化消费者处理能力
消息积压
①单一消费者
②使用独占队列
③设置预取值basic_qos为1实现顺序消费
顺序消息
常见场景
①默认是没有开启连接重试的,,需要手动开启
我们日常使用的基本都是这种方式,所以我们可以了解它给我们自动装配了什么默认配置
概要:
starter-amqp
依赖
连接信息
基础配置
交换机、队列、绑定
声明
RabbitMQTemplate
发送
@RabbitMQListener
接收
SpringBoot整合
RabbitMQ
0 条评论
回复 删除
下一页