RabbitMQ
2020-09-07 10:52:52 2 举报
AI智能生成
RabbitMQ
作者其他创作
大纲/内容
工作模式
点对点的队列
简单队列
架构图
最简单的工作队列,其中一个消息生产者,一个消息消费者,一个队列。也称为点对点模式
生产者
消费者
工作队列模式Work Queue
工作队列
架构图
一个消息生产者,一个交换器,一个消息队列,多个消费者。同样也称为点对点模式
生产者
消费者
发布/订阅模式Publish/Subscribe
发布订阅模式
架构图
无选择接收消息,一个消息生产者,一个交换器,多个消息队列,多个消费者。称为发布/订阅模式
生产者
消费者
路由模式Routing
路由模式
架构图
在发布/订阅模式的基础上,有选择的接收消息,也就是通过 routing 路由进行匹配条件是否满足接收消息。
生产者
消费者
通配符模式Topics
通配符模式
架构图
同样是在发布/订阅模式的基础上,根据主题匹配进行筛选是否接收消息,比第四类更灵活。
生产者
消费者
消息确认
消息发送
事务模式
消息异步确认模式
confirm确认模式
消息确认机制
* Confirms给客户端一种轻量级的方式,能够跟踪哪些消息被broker处理,
* 哪些可能因为broker宕掉或者网络失败的情况而重新发布。
* 确认并且保证消息被送达,提供了两种方式:发布确认和事务。(两者不可同时使用)
* 在channel为事务时,不可引入确认模式;同样channel为确认模式下,不可使用事务。
* Confirms给客户端一种轻量级的方式,能够跟踪哪些消息被broker处理,
* 哪些可能因为broker宕掉或者网络失败的情况而重新发布。
* 确认并且保证消息被送达,提供了两种方式:发布确认和事务。(两者不可同时使用)
* 在channel为事务时,不可引入确认模式;同样channel为确认模式下,不可使用事务。
publisher-confirms: true
retrun模式
publisher-returns: true
消息接收
自动应答
消费者在接收到消息后会自动确认,同时会删除Broker的消息,这里产生了一个问题如果消息没处理成功就删除了呢。
不推荐这种自动应答方式
不推荐这种自动应答方式
手动ACK确认
生产者端代码不变,消费者端代码这部分就是用于开启手动应答模式的。
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
注:第二个参数值为false代表关闭RabbitMQ的自动应答机制,改为手动应答。
在处理完消息时,返回应答状态,true表示为自动应答模式。
channel.basicAck(envelope.getDeliveryTag(), false);
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
注:第二个参数值为false代表关闭RabbitMQ的自动应答机制,改为手动应答。
在处理完消息时,返回应答状态,true表示为自动应答模式。
channel.basicAck(envelope.getDeliveryTag(), false);
消息端ACK与重回队列
basicAck
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
因为我在属性配置文件里面开启了ACK确认 所以如果代码没有执行ACK确认 你在RabbitMQ的后台会看到消息会一直留在队列里面未消费掉 只要程序一启动开始接受该队列消息的时候 又会收到
第一个参数id
第二个参数是否批量
basicNack
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
第一个参数消息id
第二个参数是否批量
第三个参数: true 表示消息重新投递,消费者还会重新接收到该消息
false表示消息在rabbitmq服务器中删除,消费者不会接收到该消息
false表示消息在rabbitmq服务器中删除,消费者不会接收到该消息
SpringBoot整合
添加依赖
application.properties配置类
定义RabbitmqConfig配置类
ExchangeConfig
QueueConfig
MsgSendConfirmCallBack
生产者
消费者
消息端限流
可靠消息投递解决方案
简介
介绍
RabbitMQ是一款开源的、Erlang语言编写的、基于AMQP协议的消息中间件。
解耦:实现消费者和生产者之间的解耦
异步:将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度
削峰:将高并发时的同步访问变为串行访问达到一定量的限流,利于数据库的操作
使用场景
服务间异步通信
顺序消费
定时任务
请求削峰
架构图
架构图
基本结构
Broker(Server):接受客户端连接,实现AMQP消息队列和路由功能的进程,我们可以把Broker叫做RabbitMQ服务器。
Virtual Host:一个虚拟概念,一个Virtual Host里面可以有若干个Exchange和Queue,主要用于权限控制,隔离应用。如应用程序A使用VhostA,应用程序B使用VhostB,那么我们在VhostA中只存放应用程序A的exchange,queue和消息,应用程序A的用户只能访问VhostA,不能访问VhostB中的数据。
Exchange:消息队列交换机,接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有Direct、Fanout、Topic和Header四种,不同类型的Exchange路由规则是不一样的
Queue:消息队列,用于存储还未被消费者消费的消息,队列是先进先出的,默认情况下先存储的消息先被处理。
Message:就是消息,由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等,Body是真正传输的数据,内容格式为byte[]。
Connection:连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。
Channel:信道,仅仅创建了客户端到Broker之间的连接Connection后,客户端还是不能发送消息的。需要在Connection的基础上创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令,一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的。
Producer:生产者,即生产客户端把消息发送给BrokerMQ服务器
Consumer:消费者,消费客户端接收服务器的消息。
安装
安装
安装Erlang
yum -y install erlang socat
安装RabbitMQ
wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.10/rabbitmq-server-3.6.10-1.el7.noarch.rpm
使用
运行
systemctl start rabbitmq-server
开机自启
systemctl enable rabbitmq-server
检查
systemctl status rabbitmq-server
关闭
systemctl stop rabbitmq-server
访问
启动web控制台
rabbitmq-plugins enable rabbitmq_management
chown -R rabbitmq:rabbitmq /var/lib/rabbitmq/
创建用户
rabbitmqctl add_user admin root
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin “.*” “.*” “.*”
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin “.*” “.*” “.*”
消息发布接收流程
生产者
1.生产者和Broker建立TCP连接
2.生产者和Broker建立通道
3.生产者通过通道将消息发送给Broker,由Exchange交换器将消息转发
4.Exchange将消息发送到Queue队列
消费者
1.消费者和Broker建立TCP通道
2.消费者和Broker建立通道
3.消费者监听指定Queue队列
4.当消息到达Queue队列时,Broker自动将消息推给消费者
5.消费者接受到消息
流程图
死信队列
死信队列介绍
死信队列:DLX,dead-letter-exchange
利用DLX,当消息在一个队列中变成死信 (dead message) 之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX
消息变成死信有以下几种情况
消息被拒绝(basic.reject / basic.nack),并且requeue = false
消息TTL过期
队列达到最大长度
死信处理过程
DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性
当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
可以监听这个队列中的消息做相应的处理。
实现死信队列
1.首先需要设置死信队列的exchange和queue,然后进行绑定
Exchange: dlx.exchange
Queue: dlx.queue
Queue: dlx.queue
2.然后需要有一个监听,去监听这个队列进行处理
3.然后我们进行正常声明交换机、队列、绑定,只不过我们需要在队列加上一个参数即可:arguments.put(" x-dead-letter-exchange","dlx.exchang e");,这样消息在过期、requeue、 队列在达到最大长度时,消息就可以直接路由到死信队列!
TTL消息/队列
集群架构
MQ组件
0 条评论
下一页