RabbitMQ
2019-08-13 10:39:09 17 举报
AI智能生成
RabbitMQ
作者其他创作
大纲/内容
工作模式
Work queues(工作队列)
1、一条消息只会被一个消费者接收;
2、rabbit采用轮询的方式将消息是平均发送给消费者的;
3、消费者在处理完某条消息后,才会收到下一条消息。
2、rabbit采用轮询的方式将消息是平均发送给消费者的;
3、消费者在处理完某条消息后,才会收到下一条消息。
Publish/Subscribe(发布订阅)
1)work queues不用定义交换机,而publish/subscribe需要定义交换机。
2)publish/subscribe的生产方是面向交换机发送消息,work queues的生产方是面向队列发送消息(底层使用默认
交换机)。
3)publish/subscribe需要设置队列和交换机的绑定,work queues不需要设置,实质上work queues会将队列绑
定到默认的交换机 。
相同点:
所以两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。
2)publish/subscribe的生产方是面向交换机发送消息,work queues的生产方是面向队列发送消息(底层使用默认
交换机)。
3)publish/subscribe需要设置队列和交换机的绑定,work queues不需要设置,实质上work queues会将队列绑
定到默认的交换机 。
相同点:
所以两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。
Routing(路由模式)
Routing模式和Publish/subscibe有啥区别?
Routing模式要求队列在绑定交换机时要指定routingkey,消息会转发到符合routingkey的队列。
Routing模式要求队列在绑定交换机时要指定routingkey,消息会转发到符合routingkey的队列。
Topics(通配符模式)
队列绑定交换机指定通配符:
统配符规则:
中间以“.”分隔。
符号#可以匹配多个词,符号*可以匹配一个词语。
统配符规则:
中间以“.”分隔。
符号#可以匹配多个词,符号*可以匹配一个词语。
与SpringBoot整合
生产者
添加依赖
application配置
定义RabbitConfig类,配置Exchange、Queue、及绑定交换机
定义消息发送方法(开启confirm、return)
消费者
添加依赖
application配置
定义监听方法(开启手工签收)
消费端限流
解释
RabbitMQ提供了qos(服务质量保证)功能:在非自动消息确认的前提下,如果一定数目的消息未被消费前,不会进行消费新的消息
方法
方法:void basicQos(int prefetchSize, int prefetchCount, boolean global)
方法参数
1、prefetchSize: 单个消息大小限制,一般为0
2、prefetchCount:告诉rabbitmq不要给一个消费之一次推送超过N条消息,直到消费者主动ack
3、global: true\false是否将上述设置应用于channel,即上面的设置是通道级别还是消费之级别,一般为false
TTL消息\队列
消息过期 & 队列过期
消息过期时间:在发送消息的时候可以指定,只对当前消息有效
队列过期时间:发送到该队列的消息都有过期时间
可靠消息投递解决方案
消息落库,对消息进行打标
架构图
步骤
1、业务数据、消息数据分别入库,标记消息发送中、记录消息落库时间
2、发送消息,等待rabbitmq应答
3、异步监听rabbitmq应答,如果返回true,即消息投递成功,将消息标记为发送成功
如果返回false,即消息投递失败,将消息标记为发送失败
如果返回false,即消息投递失败,将消息标记为发送失败
4、定时任务定时捞取消息发送失败和发送超时的消息,重新发送(阀值为3),超过三次还是发送失败,将该消息标记为失败消息,并且通知人工介入
缺点
需要对消息进行持久化,有两次数据库写操作,高并发情况下,影响性能
消息延迟发送,做二次确认,回调检查
架构图
步骤
常见问题
消息队列积压大量消息未消费
1、检查消费端代码是否存在缺陷,提升消费端性能
2、增加队列绑定的服务器,利用rabbit采用轮询的方法将消息平均发送给消费者
消息保障不丢失
生产端:可靠消息投递
broker:exchange、queue、消息都需要持久化
消费端:手动签收
简介
介绍
MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message
Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开
发中应用非常广泛。RabbitMQ官方地址:http://www.rabbitmq.com/
Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开
发中应用非常广泛。RabbitMQ官方地址:http://www.rabbitmq.com/
MQ优点
1、任务异步处理。
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
2、应用程序解耦合
MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
2、应用程序解耦合
MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
RabbitMQ优点
1、使得简单,功能强大。
2、基于AMQP协议。
3、社区活跃,文档完善。
4、高并发性能好,这主要得益于Erlang语言。
5、Spring Boot默认已集成RabbitMQ
2、基于AMQP协议。
3、社区活跃,文档完善。
4、高并发性能好,这主要得益于Erlang语言。
5、Spring Boot默认已集成RabbitMQ
基本结构
Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue
Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
消息发布接收流程
发送消息
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)
接收消息
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
消息确认
消息发送
事物模式
消息异步确认模式
confirm消息确认
消息确认是指:生产者投递消息后,如果broker接收到消息后,会给生产者一个应答
接收消息成功,返回true
接收消息失败,返回false
如果无应答,可能与broker服务器tcp通信故障
接收消息成功,返回true
接收消息失败,返回false
如果无应答,可能与broker服务器tcp通信故障
application添加配置:rabbitemq.publisher-confirms: true #开启消息发布确认(消息成功发送到broker)
代码实现见springboot整合章节
return消息机制
application配置:rabbitmq.publisher-returns: true #开启消息回调监听器
rabbitmq.template.mandatory: true
rabbitmq.template.mandatory: true
return listener: 用于处理那些无法路由的消息
生产者通过exchange和routingKey,将消息路由到指定的队列中去;消费者监听队列,进行消息消费;在某些情况下,我们指定的exchange不存在,或者指定的key路由不到队列,这些消息就是不可达的消息,这个时候就需要用到return listener监听这些不可达的消息;
关键配置项:Mandatory 如果为true:监听会接收到路由不可达的消息,进行后续处理; 如果为false:broker端会自动删除该消息
代码实现见springboot整合章节
消息接收
自动确认
消费者接收到消息之后自动确认,同时broker会删除消息,这里有一个问题,如果消费者接收到消息处理业务失败怎么办?
手动ack确认
application配置:rabbitmq.listener.simple.acknowledge-mode: manual
代码实现见与springboot整合章节
消费端ack与重回队列(配合手动确认消息使用)
basicAck
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
此方法:告诉服务器收到这条消息,已经被我消费了,可以在队列删掉这样以后就不会再发了,否则消息服务器以为这条消息没处理掉,后续还会重发
第二个参数:是否批量
第一个参数:消息id
basicNack
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
第一个参数:消息id
第二个参数:是否批量
第三个参数:true 表示消息重新投递,消费者还会重新接收到该消息
false 表示消息在rabbitmq服务器中删除,消费者不会接收到该消息
false 表示消息在rabbitmq服务器中删除,消费者不会接收到该消息
死信队列(dlx)
解释
定义:当一个消息在一个队列中变成死信(dead message)后,它会重新publish到一个exchange上,这个exchange就是dlx
说明:dlx就是一个普通的exchange,他能在任何queue上被指定,实际就是在这个queue上设置属性
当这个queue上有死信是,rabbitmq会自动将这个死信重新发布到定义的exchange上(dlx),进而被路由到一个队列上
业务可以监听这个队列的消息,做一些处理
当这个queue上有死信是,rabbitmq会自动将这个死信重新发布到定义的exchange上(dlx),进而被路由到一个队列上
业务可以监听这个队列的消息,做一些处理
消息变成死信的几种情况
1、被拒绝的消息(basic.reject\basic.nack),并且requeue = false
2、消息TTL过期
3、队列达到最大长度
实现死信队列
设置一个死信队列:Exchange:dlx.exchange
Queue:dlx.queue
RoutingKey: #
Queue:dlx.queue
RoutingKey: #
设置正常队列添加参数属性:arguments.put("x-dead-letter-exchange","dlx.exchange")
这时这个正常队列里的死信将会被重新publish到dlx.exchange交换机上
这时这个正常队列里的死信将会被重新publish到dlx.exchange交换机上
集群架构
主备模式
所谓主备模式:如果主节点挂了,从节点提供服务
架构图
镜像模式
镜像队列
架构图
构建集群视频
多活模式
架构图
MQ组件&架构设计
架构图
组件实现功能点
支持消息高性能的序列化转换,异步化发送消息
支持消息生产实例消费实例的连接处化、缓存化,提高性能
支持可靠性投递消息,保证100%不丢失
支持消费端的幂等操作,保证消费端不会重复消费
支持迅速消息发送模式,比如一些日志收集,统计分析,需要提高吞吐量,保证高性能
非核心消息,如日志收集、性能统计
消息不需要落库
发送消息不需要confirm确认
支持消息延迟发送模式,消息可以延迟发送,指定延迟时间,比如用于延迟检查,服务限流场景
业务场景
商品签收,七天之后自动付款
优惠券指定日期作废
实现
封装Message的时候指定delayTime属性
支持事物消息
支持顺序消息,保证消息送达消费端的前后顺序
前提
同一个队列
只有一个消费者(独占型)
思路
架构图
支持消息补偿、重试,以及快速定位失败消息\异常
支持消息集群的负载均衡
支持消息路由策略,保证正确路由到指定机器
支持批量发送消息
把批量消息放进集合里统一提交,对于channl来说还是发送一个消息,对于消费方,可以批量的消费
0 条评论
下一页