RabbitMQ
2022-10-19 11:49:12 15 举报
AI智能生成
自己整理的RabbitMQ,希望大家喜欢
作者其他创作
大纲/内容
<!--2. rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
pom
spring: rabbitmq: host: 192.168.88.131 port: 5672 username: guest password: guest virtual-host: /
application.yml
配置类
生产者
<!--RabbitMQ 启动依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
spring: rabbitmq: host: 192.168.88.131 port: 5672 username: guest password:guest virtual-host: /
@Componentpublic class RabbimtMQListener { @RabbitListener(queues = "boot_queue") public void listenerQueue(Message message){ System.out.println(new String(message.getBody())); }}
消息监听器
消费者
5. RabbitMQ集成微服务
分支主题
Step 1: 首先把消息信息(业务数据)存储到数据库中,紧接着,我们再把这个消息记录也存储到一张消息记录表里(或者另外一个同源数据库的消息记录表)
Step 2:发送消息到MQ Broker节点(采用confirm方式发送,会有异步的返回结果)
Step 3、4:生产者端接受MQ Broker节点返回的Confirm确认消息结果,然后进行更新消息记录表里的消息状态。比如默认Status = 0 当收到消息确认成功后,更新为1即可!
Step 5:但是在消息确认这个过程中可能由于网络闪断、MQ Broker端异常等原因导致 回送消息失败或者异常。这个时候就需要发送方(生产者)对消息进行可靠性投递了,保障消息不丢失,100%的投递成功!(有一种极限情况是闪断,Broker返回的成功确认消息,但是生产端由于网络闪断没收到,这个时候重新投递可能会造成消息重复,需要消费端去做幂等处理)所以我们需要有一个定时任务,(比如每5分钟拉取一下处于中间状态的消息,当然这个消息可以设置一个超时时间,比如超过1分钟 Status = 0 ,也就说明了1分钟这个时间窗口内,我们的消息没有被确认,那么会被定时任务拉取出来)
Step 6:接下来我们把中间状态的消息进行重新投递 retry send,继续发送消息到MQ ,当然也可能有多种原因导致发送失败
Step 7:我们可以采用设置最大努力尝试次数,比如投递了3次,还是失败,那么我们可以将最终状态设置为Status = 2 ,最后 交由人工解决处理此类问题(或者把消息转储到失败表中)。
步骤
6. 消息百分百成功投递
消息幂等性保障 乐观锁机制
生产者发送消息
消费者接收消息
消费者需要保证幂等性:第一次执行SQL语句
第二次执行SQL语句
7. 消息幂等性保障
配置主从
集群搭建(省略)
完成RabbitMQ默认集群模式,但并不保证队列的高可用性,尽管交换机、绑定这些可以复制到集群里的任何一个节点,但是队列内容不会复制。虽然该模式解决一项目组节点压力,但队列节点宕机直接导致该队列无法应用,只能等待重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个节点,必须要创建镜像队列。
存在问题
8. RabbitMQ集群
在不使用消息队列的情况下,用户的请求数据直接写入数据库,在高并发的情况下,会对数据库造成巨大的压力,同时也使得系统响应延迟加剧。
为什么学习消息队列?
消息队列(MQ)是一种应用程序对应用程序的通信方法
消息中间件的消息传递指的是程序之间通过消息发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程调用的技术。
什么是消息中间件?
若添加新的系统接收系统A发送的消息时,需要和系统A进行交互,频繁更改系统A的代码
系统间耦合性太强
传统模式的缺点
将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改。
中间件模式的的优点
解耦服务
串行的方式
并行的方式
传统方式
优点:用户只需要将消息写入消息队列,以异步的方式进行,即可返回,无需等待。
消息队列模式
用户注册后,需要发注册邮件和注册短信
应用场景
异步处理
秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
秒杀
例如在下单时就会往数据库写数据。但是数据库只能支撑每秒1000左右的并发写入,并发量再高就容易宕机。低峰期的时候并发也就100多个,但是在高峰期时候,并发量会突然激增到5000以上,这个时候数据库肯定卡死了。
并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常
传统模式
消息被MQ保存起来了,然后系统就可以按照自己的消费能力来消费,比如每秒1000个数据,这样慢慢写入数据库,这样就不会卡死数据库了。
系统可以根据数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。
中间件模式
使用了MQ之后,限制消费消息的速度为1000,但是这样一来,高峰期产生的数据势必会被积压在MQ中,就是削峰。在高峰期过后的一段时间内,还是会维持在消费积压的消息的速度,这就叫做“填谷”
流量削峰也叫做削峰填谷
削峰填谷
流量削峰
消息中间件的主要的作用
QPS即每秒查询率,是对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准。
每秒的响应请求数,也即是最大吞吐能力。
每秒查询率
QPS = 并发量 / 平均响应时间
并发量 = QPS * 平均响应时间
原理:每天80%的访问集中在20%的时间里,这20%时间叫做峰值时间。
公式:( 总PV数 * 80% ) / ( 每天秒数 * 20% ) = 峰值时间每秒请求数(QPS) 。
机器:峰值时间每秒QPS / 单台机器的QPS = 需要的机器 。
计算关系
QPS
PV,即页面浏览量,或点击量;通常是衡量一个网络新闻或网站主要指标
uv,指访问某个站点或点击某条新闻的不同IP地址的人数。
PR值,网页的级别技术,用来标识网页的等级/重要性。PR值越高说明该网页越受欢迎
PV ,UV ,PR
AMQP是一种高级消息队列协议。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。
AMQP
JMS即Java消息服务应用程序接口,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
JMS
JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
JMS规定了两种消息模式;而AMQP的消息模式更加丰富
AMQP 与 JMS 区别
Rabbitmq:基于AMQP协议,erlang语言开发,稳定性好
MQ的实现方式
四个消息中间件的区别
1. 消息中间件的概述
RabbitMQ是由erlang语言开发,基于AMQP协议实现的消息队列,它是一种应用程序之间的通信方法
① 创建工程(生产者、消费者)
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency>
② 分别添加依赖
/创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //主机地址 connectionFactory.setHost("192.168.6.100"); //连接端口;默认为 5672 connectionFactory.setPort(5672); //虚拟主机名称;默认为 / connectionFactory.setVirtualHost("/"); //连接用户名;默认为guest connectionFactory.setUsername("admin"); //连接密码;默认为guest connectionFactory.setPassword("123456");
创建连接工厂
Connection connection = connectionFactory.newConnection();
通过连接工厂创建连接
如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
Channel channel = connection.createChannel();
创建频道
创建队列
发送消息
channel.close();connection.close();
关闭资源
③ 编写生产者发送消息
创建队列Queue
设置回调方法
监听程序接收消息
④ 编写消费者接收消息
简单模式
多个消费端共同消费同一个队列中的消息。
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
消费者1
消费者2
1.\t在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是**竞争**的关系。默认轮询获取
2.\tWork Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
小结
工作队列模式
在订阅模型中,多了一个exchange角色
Exchange:交换机。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型。
Fanout:广播,将消息交给所有绑定到交换机的队列Direct:定向,把消息交给符合指定routing key 的队列Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
类型
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机
每个消费者监听自己的队列
生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息
简介
创建连接
创建交换机
绑定队列和交换机
//9. 释放资源channel.close(); connection.close();
释放资源
小结:交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
1、 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机 。
发布订阅模式与工作队列模式的区别
发布订阅模式
队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
路由模式特点
队列绑定交换机
小结:Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。
路由模式
Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候**使用通配符**!
item.#:能够匹配item.insert.abc 或者 item.insert
#:匹配零个或多个词
item.*:只能匹配item.insert
*:匹配不多不少恰好1个词
通配符规则
定义队列和创建交换机
绑定队列
发送消息&释放资源
Topic主题模式可以实现 Publish/Subscribe发布与订阅模式 和 Routing路由模式 的功能;只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。
通配符模式
远程调用模式(远程调用,不太算MQ;暂不作介绍)
6种模式
2. RabbitMQ简介
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.1.8.RELEASE</version> </dependency>
rabbitmq.host=192.168.88.131rabbitmq.port=5672rabbitmq.username=guestrabbitmq.password=guestrabbitmq.virtual-host=/
properties
xml
搭建生产者工程
rabbitmq.host=192.168.6.100rabbitmq.port=5672rabbitmq.username=adminrabbitmq.password=123456rabbitmq.virtual-host=/
rabbitmq.properties
<!--加载配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!-- 定义rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/> <bean id="springQueueListener" class="com.atguigu.rabbitmq.listener.SpringQueueListener"/> <bean id="fanoutListener1" class="com.atguigu.rabbitmq.listener.FanoutListener1"/> <bean id="fanoutListener2" class="com.atguigu.rabbitmq.listener.FanoutListener2"/> <bean id="topicListenerStar" class="com.atguigu.rabbitmq.listener.TopicListenerStar"/> <bean id="topicListenerWell" class="com.atguigu.rabbitmq.listener.TopicListenerWell"/> <bean id="topicListenerWell2" class="com.atguigu.rabbitmq.listener.TopicListenerWell2"/> . <rabbit:listener ref="springQueueListener" queue-names="spring_queue"/> <rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/> <rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/> <rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/> <rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/> <rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/> </rabbit:listener-container>
spring-rabbitmq.xml
队列监听器
广播监听器1
广播监听器2
星号通配符监听器
井号通配符监听器
井号通配符监听器2
监听器
搭建消费者工程
3. spring整合RabbitMQ
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。
原因
confirm 确认模式
return 退回模式
两种方式
消息的可靠投递
producer—>rabbitmq broker—>exchange—>queue—>consumer
消息从 producer 到 exchange 则会返回一个 confirmCallback 。
消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。
消息传递路径
rabbitmq.xml
!--加载配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!-- 定义rabbitmq connectionFactory 确认模式开启:publisher-confirms="true" --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}" publisher-confirms="true" publisher-returns="true" /> <!--定义管理交换机、队列--> <rabbit:admin connection-factory="connectionFactory"/> <!--定义rabbitTemplate对象操作可以在代码中方便发送消息--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> <!--消息可靠性投递(生产端)--> <rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue> <rabbit:direct-exchange name="test_exchange_confirm"> <rabbit:bindings> <rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange>
spring-rabbitmq-producer.xml
添加确认模式
添加回退模式
确认和回退模式
设置 ConnectionFactory的publisher-confirms="true" 开启 确认模式。
使用 rabbitTemplate.setConfirmCallback 设置回调函数。当消息发送到 exchange 后回调 confirm 方法。在方法中判断 ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
设置 ConnectionFactory 的 publisher-returns="true" 开启 退回模式。
使用 rabbitTemplate.setReturnCallback 设置退回函数,当消息从exchange 路由到 queue 失败后,如果设置了 rabbitTemplate.setMandatory(true) 参数,则会将消息退回给 producer并执行回调函数returnedMessage
消息的可靠投递小结
ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。
其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
自动确认:acknowledge=“none” 默认
如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
手动确认:acknowledge=“manual”
二种确认方式
ack
<!--加载配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!-- 定义rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/> <context:component-scan base-package="com.atguigu.listener" /> <!--定义监听器容器 acknowledge="manual":手动签收 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"> <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener> </rabbit:listener-container>
spring-rabbitmq-consumer.xml
@Componentpublic class AckListener implements MessageListener { @Override public void onMessage(Message message) { System.out.println(new String(message.getBody())); }}
自动确认
手动确认
添加监听器
在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认
如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,true);方法确认签收消息
如果出现异常,则在catch中调用 basicNack,拒绝消息,让MQ重新发送消息。
Ack 小结
消费者ack
有 9 条消息待消费 ,有 1 条消息未确认
后台
修改 QosListener , 添加手动签收方法 ,这样就可以确认消费限流
在 <rabbit:listener-container>中配置 prefetch 属性设置消费端一次拉取多少条消息
消费端的必须确认才会继续处理其他消息。
消费端限流小结
消费端限流
RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
队列统一过期
消息过期
TTL
无法被消费的消息,某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信
1.\t队列消息数量到达限制;比如队列最大只能存储10条消息,而发了11条消息,根据先进先出,最先发的消息会进入死信队列。
2.\t消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
3.\t原队列存在消息过期设置,消息到达超时时间未被消费;
消息成为死信的三种情况
① 丢弃,如果不是很重要,可以选择丢弃
② 记录死信入库,然后做后续的业务分析或处理
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
<!-- 死信队列: 1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx) 2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx) 3. 正常队列绑定死信交换机 设置两个参数: * x-dead-letter-exchange:死信交换机名称 * x-dead-letter-routing-key:发送给死信交换机的routingkey --> <!-- 1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx) --> <rabbit:queue name="test_queue_dlx" id="test_queue_dlx"> <!--3. 正常队列绑定死信交换机--> <rabbit:queue-arguments> <!--3.1 x-dead-letter-exchange:死信交换机名称--> <entry key="x-dead-letter-exchange" value="exchange_dlx"/> <!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey--> <entry key="x-dead-letter-routing-key" value="dlx.hehe"></entry> <!--4.1 设置队列的过期时间 ttl--> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/> <!--4.2 设置队列的长度限制 max-length --> <entry key="x-max-length" value="10" value-type="java.lang.Integer"/> </rabbit:queue-arguments> </rabbit:queue> <!--正常交换机--> <rabbit:topic-exchange name="test_exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <!-- 2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx) --> <rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue> <rabbit:topic-exchange name="exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange>
spring-rabbitmq-producer.xml
队列绑定死信交换机
③ 通过死信队列,由负责监听死信的应用程序进行处理
1.\t死信交换机和死信队列和普通的没有区别
2.\t当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
队列消息长度(数量)到达限制;
消费者拒接消费消息,并且不重回队列;
原队列存在消息过期设置,消息到达超时时间未被消费;
3.\t消息成为死信的三种情况:
死信队列小结
死信的处理方式
死信队列
延时消息是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行取消处理。这时就可以使用延时队列将订单信息发送到延时队列。
场景
1.\t下单后,30分钟未支付,取消订单,回滚库存。
2.\t新用户注册成功30分钟后,发送短信问候。
需求
但在RabbitMQ中没有提供延迟队列的功能,但可以通过TTL+死信队列 组合实现延迟队列的效果。
<!--定义监听器容器 acknowledge="manual":手动签收 acknowledge="auto" 自动签收 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"> <!--延迟队列效果实现: 一定要监听的是 死信队列!!!--> <rabbit:listener ref="orderListener" queue-names="order_queue_dlx"></rabbit:listener> </rabbit:listener-container>
延迟队列
4. RabbitMQ高级特性
RabbitMQ
0 条评论
回复 删除
下一页