RabbitMQ知识整理
2021-04-09 11:08:01 1 举报
AI智能生成
点赞收藏~~
作者其他创作
大纲/内容
简单来一下
安装和使用
官网地址
http://www.rabbitmq.com/
前提准备
安装 Linux 相关依赖包
下载 RabbitMQ 安装包
建议使用稳定版本
很多人都还在用 3.6.x
使用最简洁的方式
需要先安装 Erlang 环境
注意 Erlang 和 RabbitMQ 的版本要对应
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
再安装 socat
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
最后安装 rabbitmq-server
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
配置文件
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
端口号:5672
修改{ loopback_users, [guest] }
启动&访问
启动服务
rabbitmq-server start &
& 表示后台启动
ps -ef | grep rabbit
如果有占用的线程,使用 kill 端口杀掉
验证启动
lsof -i:5672
停止服务
rabbitmqctl stop_app
管理插件
rabbitmq-plugins enable rabbitmq_management
查看所有插件
rabbitmq-plugins list
访问
http://192.168.207.133:15672/
用户名&密码
guest
命令行&工作台
命令行基础操作
启动应用
rabbitmqctl start_app
停止应用
rabbitmqctl stop_app
查看状态
rabbitmqctl status
添加用户
rabbitmqctl add_user username password
用户列表
rabbitmqctl list_users
移除用户
rabbitmqctl delete_user username
设置用户权限
rabbitmqctl set_permissions -p vhostpath username
移除用户权限
rabbitmqctl clear_permissions -p vhostpath username
查看用户权限
rabbitmqctl list_user_permissions username
重置密码
rabbitmqctl change_password username newpassword
创建虚拟主机
rabbitmqctl add_vhost vhostpath
查看所有虚拟主机
rabbitmqctl list_vhosts
列出虚拟主机上所有权限
rabbitmqctl list_permissions -p vhostpath
删除虚拟主机
rabbitmqctl delete_vhost vhostpath
查看所有队列
rabbitmqctl list_queues
清除队列里的消息
rabbitmqctl -p vhostpath purge_queue blue
命令行进阶操作
移除所有数据
要在 rabbitmqctl stop_app 之后使用
rabbitmqctl reset
组成集群命令
rabbitmqctl join_cluster <clusternode> [--ram]
查看集群状态
rabbitmqctl cluster_status
修改集群节点的存储形式
rabbitmqctl change_cluster_node_type disc | ram
忘记(摘除)节点
rabbitmqctl forget_cluster_node [--offline]
修改节点名称
rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2...]
工作台操作
Hello World
思路
获取连接工厂 ConnectionFactory
获取一个连接 Connection
获取数信信道,可发送和接收消息 Channel
具体的消息存储队列 Queue
生产和消费者 Producer & Consumer
实现
新建 Spring boot 项目
图示
分支主题
编辑 pom.xml 文件
新建 Producer 生产者类
新建 Consumer 消费者类
先要运行 消费者,才有队列
核心概念
Exchange 交换机
作用
接收消息,并根据路由键转发消息所绑定的队列
图示
分支主题
蓝色框:生产消息,经过交换机,到达队列
绿色框:消费者,从队列中,获取并消费消息
红色框:RabbitMQ Server
黄色框:交换机绑定队列
交换机属性
Type 类型
direct
topic
fanout
headers(不常用)
常见属性
Durability
是否持久化:true / false
Auto Delete
当 Exchange 上所有队列都删除后,它也将自动被删除
拓展:在队列上,找不到关联的交换机,队列也要被清除
Internal(较少使用)
当前 Exchange 是否只在 RabbitMQ 内部使用
一般保持默认 false
除非熟悉 Erlang 语言,可自定义扩展插件
Arguments
扩展参数,用于扩展 AMQP 协议定制使用
Direct Exchange 直连交换机
作用
发送到 Direct Exchange 的消息,都会被转发到 RouteKey 中指定的 Queue 中
就是一对一的作用
注意
Direct 模式可以使用 RabbitMQ 自带的 Exchange:default Exchange,所以不需要将 Exchange 进行任何绑定(binding)操作,消息传递时,RouteKey 必须完全匹配才会被队列接收,否则该消息会被抛弃
图示
分支主题
案例 Demo
3.6.* 版本
Producer
Consumer
3.7.15
Direct_producer
Driect_consumer
Topic Exchange 主题交换机
作用
发送到 Topic Exchange(主题交换机) 上的消息,会被指定给主题相关的 Queue(队列)上
主要是将 RouteKey 和设置的 Topic 进行模糊匹配
注意
可以使用通配符进行模糊匹配
符号 # 匹配一个或多个词
hello.# → hello.girl.cuihua
符号 * 匹配一个词
hello.* → hello.cuihua
图示
分支主题
案例 Demo
3.6.* 版本
Producer
Consumer
3.7.15
Topic_consumer
Topic_producer
Fanout Exchange 广播交换机
作用
直接广播,不走路由键,直接将队列绑定到交换机上
发送到交换机的消息,全都会被转发到与该交换机绑定的队列上
转发消息是最快的
图示
分支主题
案例 Demo
3.6.* 版本
Producer
Consumer
3.7.15
Fanout_producer
Fanout_consumer
Binding 绑定
Exchange 和 Exchange、Queue 之间的连接关系
Binding 中可以包含 RoutingKey 或者参数
Queue 消息队列
消息队列,存储消息数据
Durability 是否持久化
Durable 是
Transient 否
Auto delete
如果选 yes,代表当最后一个监听被移除之后,该 Queue 会自动被删除
Message 消息
服务器和应用程序之间,进行传送的数据
就是一段数据,由 Properties 和 Payload(Body)组成
常用属性
delivery mode 消息送达模式
持久化
非内存级别的非持久化
headers(自定义属性)
其他属性
content_type、content_encoding、priority
correlation_id、reply_to、expiration、message_id
timestamp、type、user_id、app_id、cluster_id
案例 Demo
3.6.* 版本
Procuder
Consumer
3.7.15
Producer
Consumer
Virtual Host 虚拟主机
虚拟地址,用于进行逻辑隔离,最上层的消息路由
一个 Virtual Host 里面可以有若干个 Exchange 和 Queue
同一个 Virtual Host 里面不能有相同名称的 Exchange 或 Queue
加快心跳
概述
如何保障消息的成功投递?
幂等性概念
在海量订单产生的业务高峰期,如何避免消息的重复消费
Confirm 确认消息 & Return 返回消息
自定义消费者
消息的 ACK 与重回队列
消息的限流
TTL 消息
死信队列
如何保障消息的成功投递?
什么是生产端的可靠性投递?
保证消息的成功发出
保障 MQ 节点的成功接收
发送端收到 MQ 节点确认应答
完善的消息进行补偿机制
生产端-可靠性投递
常见解决方案
方案 1
消息信心落库,对消息转改进行打标
图示
分支主题
疑问
那这种方式在高并发的场景下是否合适?
隐含问题
有两次数据持久化的操作,第一次要保存业务消息,第二次对数据进行记录。
数据 IO 磁盘,每次都需要读两次,数据库容易遇到瓶颈
解决办法:其实我们只需要对业务进行入库即可
方案 2
消息的延迟投递,做二次确认,回调检查
互联网大公司常用的方式
但也不一定能 100% 保证可靠性投递
极端情况,需要人工进行补偿
主要目的:减少数据库的操作
在项目核心链路中,每一次持久化操作,都要精心考量
花费时间太多,可能会造成核心链路中最大瓶颈
图示
分支主题
很复杂,但能够最大限度节省我们数据信息落库的操作
蓝色:上游服务
生产端
红色:下游服务
消费端
Callback:回调服务
第一步&第二步
把消息落库完了之后,才能 step 1 进行发送消息
还要记住,互联网大公司不会加任何事务,事务的性能会造成很严重的性能瓶颈
注意:这一次,在生产端它会一次生成两条消息
也就是执行完了 step 1 发送消息后
还会执行 step 2 做消息延迟检查,可以 2~5 分钟之后
第三步
监听&接收消息之后,就进行处理
第四步
当消费端中消息处理成功之后,还需生成一个 确认 消息
第五步
Callback 服务,通过监听器,监听 确认 消息
当确认了之后,就对消息做最终的存储
第六步
假设 5 分钟后,延迟投递检查消息,发送过来了
callback 服务,监听这个 检查细节
有单独的监听器监听
然后就去检查 MSG DB 数据库
发现刚刚下游已经把数据处理好了
就没问题了
如果刚刚没有返回,或者返回失败,出现异常了
这时 callback 需要做补偿
因为 callback 在监听延迟消息
当 callback 发现 message 并不存在,则会主动发起 RPC 通信,给上游反馈延迟检查的内容,并没有找到
然后,再次发送一次数据
这么做的目的
少做一次 DB 存储
能节省一步,就节省一步
可以进行异步去补偿
在高并发场景中,最应该关注并保证性能,保证能扛得住庞大的订单量
幂等性
幂等性是什么?
我们可以借鉴数据库的乐观锁机制
比如我们执行一条更新库存的 SQL 语句
update tb_pro set count = count - 1, version = version + 1 where version = 1
比如有很多商品,卖了一件,就减 1
如果只剩下一件了,减了 1,就卖完了,没办法继续卖了
如果此时碰上并发,两个请求同时过来,有可能 count 就变成 -1,这肯定是不行的
解决方式,就是加上 version 版本号,还可带上商品 id
像 elsaticsearch 中也是使用了这种严格的 幂等性
总结
可能你对一件事情进行操作,这个操作你可能执行非常多次,操作的结果也是相同的,这个就是幂等性保障
消费端-幂等性保障
在海量订单产生的业务高峰期,如何避免消息的重复消费问题?
在高并发的情况下,会有很多信息到达 MQ,消费端可能要监听大量的消息,难免会出现消息的重复投递,或者网络闪断,导致 Broker 端重发消息
消费端实现幂等性,就意味着,我们的消息永远不会消费多次,即使我们收到了多条一样的消息
有可能代码会执行多次,但数据库只会执行这一步操作
金融公司,最重视
业界主流的幂等性操作
唯一 ID + 指纹码机制,利用数据库主键去重
唯一 ID + 指纹码机制,利用数据库主键去重
有些用户就有可能在某一瞬间进行多次消费
比如刚刚转了一笔钱,接着又马上又转了一笔
指纹码
某些业务规则或者生成的信息拼接而成
select count(1) from tb_order where id = 唯一 ID + 指纹码
如果已经有记录,代表已经被操作了
好处:实现简单
坏处:高并发下有数据库写入的性能瓶颈
解决方案:根据 ID 进行分库分表进行算法路由
实现分压分流的机制
利用 redis 的原子性去实现
使用 redis 进行幂等,需要考虑的问题
比如我们 set 一个key,如果第二次还 set,就会更新为最新值
也可以做一个预先判断,exsit() 操作,存在就不更新了
最简单的自增,也是可以保障的
1)我们是否要进行数据落库,如果落库的话,关键解决的问题是数据库和缓存如何做到原子性?
2)如果不进行落库,那么都存储到缓存中,如何设置定时同步的策略?
Confirm 确认消息
理解 Confirm 消息确认机制
消息的确认,是指生产者投递消息后,如果 Broker 收到消息,则会给我们生产者一个应答
生产者进行接收应答,用来确定这条消息是否正常的发送到 Broker,这种方式也是消息的可靠性投递的核心保障
确认机制流程图
如何实现 Confirm 确认消息?
1)在 channel 上开启确认模式:channel.confirmSelect()
2)在 channel 上添加监听:addConfirmListener
监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理
案例 Demo
Producer
Consumer
Return 消息机制
Return Listener 用于处理一些不可路由的消息
我们的消息生产者,通过指定一个 Exchange 和 Routing Key,把消息送达某一个队列中去,然后我们的消费者监听队列,进行消费处理操作
但是在某些情况下,如果我们在发送消息的时候,当前的 Exchange 不存在或者指定的路由 Key 路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用 Return Listener
在基础 API 中有一个关键的配置项
Mandatory
如果为 true,则监听器会接收到路由不可达的消息,然后进行后续处理。
如果为 false,那么 broker 端自动删除该消息。
图示
案例 Demo
Product
Consumer
消费端自定义监听
我们一般就是在代码中编写 while 循环,进行 consumer.nextDelivery() 方法进行获取下一条消息,然后进行消费处理
但是我们使用自定义的 Cusumer 更加方便,解耦性更加强,也是在实际工作中最常使用的方式
案例 Demo
Producer
Consumer
MyConsumer
消费端限流
什么是消费端的限流?
假设一个场景,首先,我们RabbitMQ 服务器上有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面的情况:
巨大量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据
RabbitMQ 提供了一种 qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于 consumer 或者 channel 设置 Qos 的值)未被确认钱,不进行消费新的消息
void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
prefetchSize:0
prefetchCount
会告诉 RabbitMQ 不要同时给一个消费者推送多于 N 个消息,即一旦有 N 个消息还没有 ack,则该 consumer 将 block 掉,直到有消息 ack
global
true / false 是否将上面设置应用于 channel
简单说,就是上面限制是 channel 级别的还是 consumer 级别
prefetchSize 和 global 这两项,rabbimq 没有实现,暂且不研究。prefetch_count 在 no_ask = false 的情况下生效,即在自动应答的情况下这两个值是不生效的
案例 Demo
Producer
Consumer
MyConsumer
消费端 ack 与重回队列
消费端的手工 ack 和 nack
消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿
如果由于服务器宕机等严重问题,那我们就需要手工进行 ack 保障消费端消费成功
消费端的重回队列
消费端重回队列是为了对没有处理成功的消息,把消息重新传回给 Broker
一般我们在实际应用中,都会关闭重回队列,也饿就是设置为 False
案例 Demo
Producer
Consumer
MyConsumer
TTL 队列 / 消息
TTL 是 Time To Live 的所写,也就是生存时间
RabbitMQ 支持消息的过期时间,在消息发送时可以进行指定
RabbitMQ 支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动地清除
死信队列
死信队列 DLX, Dead-Letter-Exchange
利用 DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新 pulish 到另一个 Exchange,这个 Exchange 就是 DLX
消息变成死信队列的几种情况
消息被拒绝(basic.reject / basic.nack)并且 requeue = false
消息 TTL 过期
队列达到最大长度
DLX 也是一个正常的 Exchange,和一般的 Exchange 没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
当这个队列中有死信时,RabbitMQ 就会自动的将这个消息重新发布到设置的 Exchange 上去,进而被路由到另一个队列。
可以监听这个队列中消息做响应的处理,这个特性可以弥补 RabbitMQ 3.0 以前支持的 imediate 参数的功能
死信队列设置
首先需要设置死信队列的 exchange 和 queue,然后进行绑定
Exchange
dlx.exchange
Queue
dlx.queue
RoutingKey
#
然后进行正常声明交换机
队列、绑定,只不过需要在队列上加一个参数
arguments.put("x-dead-letter-exchange", "dlx.exchange");
这样消息在过期、requeue、队列在达到最大长度时,消息就可以直接路由到死信队列
案例 Demo
Producer
Consumer
MyConsumer
Spring 整合 RabbitMQ
添加依赖
配置文件
发送消息
消费消息
收藏
收藏
0 条评论
下一页