RabbitMQ
2024-08-01 18:38:16 26 举报
AI智能生成
RabbitMQ是一个可靠的、可扩展的消息代理中间件,用于处理高吞吐量的消息传递。它支持多种编程语言和协议,如AMQP、MQTT、STOMP等。企业级特性包括高级消息队列、集群、数据完整性和弹性。 RabbitMQ主要用于处理分布式系统中的异步通信和并发,可以应用于异构系统间的消息传递和流处理场景。
作者其他创作
大纲/内容
概述
基于AMQP协议Erlang语言开发的一款消息中间件,客户端语言支持比较多,
比如Python,Java,Ruby,PHP,JS,Swift.运维简单,灵活路由,但是性能不高,
可以满足一般场景下的业务需要,三高场景下吞吐量不高,消息持久化没有采取
零拷贝技术,消息堆积时,性能会下降
比如Python,Java,Ruby,PHP,JS,Swift.运维简单,灵活路由,但是性能不高,
可以满足一般场景下的业务需要,三高场景下吞吐量不高,消息持久化没有采取
零拷贝技术,消息堆积时,性能会下降
消息吞吐量在1w~10w级
没有消费者组的概念,需要依赖Exchange和队列之间的绑定关系
模型设计图
特点
保证可靠性。RabbitMQ使用一些机制来保证可靠性。如持久化、传输确认、发布确认等
具有灵活的路由功能。在消息进入队列之前,是通过Exchange(交换器)来路由消息的。
对于典型的路由功能,针对更复杂的路由功能,可以将多个Exchange绑定在一起,也可以通过插件机制
来实现自己的Exchange
对于典型的路由功能,针对更复杂的路由功能,可以将多个Exchange绑定在一起,也可以通过插件机制
来实现自己的Exchange
支持多种协议.RabbitMQ除了支持AMQP协议之外,还通过插件的方式支持其他消息队列协议,比如STOMP,MQTT等
支持多语言客户端.RabbitMQ几乎支持所有常用的语言,比如Java、.NET、Ruby等
提供管理界面.RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息Broker的许多方面
提供跟踪机制.RabbitMQ提供了消息跟踪机制。如果消息异常,使用者可以查出发生了什么情况
提供插件机制.RabbitMQ提供了许多插件,从多方面进行扩展,也可以编写自己的插件
核心组件
VirtualHost(虚拟主机)
可以在一个RabbitMQ集群中划分出多个虚拟主机,每个虚拟主机都有AMQP的全套基础组件,
并且可以针对每个虚拟主机进行权限以及数据分配,并且不同虚拟主机之间是完全隔离的
并且可以针对每个虚拟主机进行权限以及数据分配,并且不同虚拟主机之间是完全隔离的
Connection(连接)
客户端与RabbitMQ进行交互,首先就需要建立一个TCP连接
Channel(信道)
一旦客户端与RabbitMQ建立了连接,就会分配一个AMQP信道Channel.每个信道都会被分配一个唯一的ID
也可以理解为是客户端与RabbitMQ实际进行数据交互的通道
RabbitMQ为了减少性能开销,也会在一个Connection中建立多个Channel,这样便于客户端进行多线程连接
这些连接会复用同一个Connection的TCP通道
也可以理解为是客户端与RabbitMQ实际进行数据交互的通道
RabbitMQ为了减少性能开销,也会在一个Connection中建立多个Channel,这样便于客户端进行多线程连接
这些连接会复用同一个Connection的TCP通道
Exchange(交换机)
这是RabbitMQ中进行数据路由的重要组件。消息发送到RabbitMQ中后,会首先进入一个交换机
然后由交换机负责将数据转发到不同的队列中。RabbitMQ中有多种不同类型的交换机来支持不同的路由策略
Exchange可以持久化
然后由交换机负责将数据转发到不同的队列中。RabbitMQ中有多种不同类型的交换机来支持不同的路由策略
Exchange可以持久化
交换器类型
Direct
如果消息中的路由键(RoutingKey)和Binding中的绑定键(binding key)一致,
交换器就将消息发送到对应的队列中。路由键与队列名称要完全匹配,
如果将一个队列绑定到交换机要求路由键(RoutingKey)为dog,则只转发RoutingKey标记为dog
的消息,不会转发dog.puppy消息,也不会转发dog.guard消息等
Direct交换器是完全匹配、单播的模式
交换器就将消息发送到对应的队列中。路由键与队列名称要完全匹配,
如果将一个队列绑定到交换机要求路由键(RoutingKey)为dog,则只转发RoutingKey标记为dog
的消息,不会转发dog.puppy消息,也不会转发dog.guard消息等
Direct交换器是完全匹配、单播的模式
Fanout
Fanout交换器不处理路由键,只是简单地将队列绑定到交换器,发送到交换器地每条消息都会被
转发到与该交换器绑定的所有队列中,这很像子网广播,子网内的每个主机都获得了一份赋值的消息
转发到与该交换器绑定的所有队列中,这很像子网广播,子网内的每个主机都获得了一份赋值的消息
Topic
Topic交换器通过模式匹配分配消息的路由键属性,将路由键和某种模式进行匹配。
此时队列需要绑定一种模式,Topic交换器将路由键和绑定键的字符串切分成单词,这些单词可以用"."隔开
该交换器会识别两个通配符"#"和"*",其中"#"匹配0个或者多个单词"*"匹配不多不少一个单词
此时队列需要绑定一种模式,Topic交换器将路由键和绑定键的字符串切分成单词,这些单词可以用"."隔开
该交换器会识别两个通配符"#"和"*",其中"#"匹配0个或者多个单词"*"匹配不多不少一个单词
Headers
Headers交换器匹配AMPQ消息的Header而不是路由键,此外Header交换器和Direct交换器完全一致
但是性能相差很多,目前几乎不用了
但是性能相差很多,目前几乎不用了
Bingding(绑定)
用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则
可以将交换器理解成一个由绑定构成的路由表
可以将交换器理解成一个由绑定构成的路由表
支持的消息类型
普通消息
无序消息,效率最高
顺序消息
利用队列的FIFO属性,保证一个Exchange只能路由到一个队列上,可以实现顺序消费,但是性能不会很高
延时消息
设置消息的ttl,当这个消息死亡时,进入到死信队列,可以实现,原生不支持
死信队列
第一种,消息无法消费成功
第二种,消息已经过期
Producer生产消息
消息确认机制
单向发送
不需要确认,效率最高,也最容易丢消息
同步确认
单条消息确认
Producer每发送一条信息,等待收到确认之后再发送下一条,消息最可靠,性能不高
批量消息确认
当批量消息中有一条消息出错时,整批消息都需要重传,将会增大重复消费的可能
异步确认
通过回调接口来判断哪些消息被确认收到,消息最高,实现最复杂
Consumer消费消息
Pull拉取消息
由Consumer主动向RabbitMQ拉取消息,请求由consumer发起,broker的压力相对来说会比较小
1.如果只想从队列中获取单条消息而不是持续订阅,则可以使用channel.basicGet方法来进行消费消息
2.拉模式在消费者需要时采取消息中间件拉取消息,这段网络开销会明显增加消息延迟,
降低系统吞吐量
降低系统吞吐量
3.由于拉模式需要消费者手动去RabbitMQ中拉取消息,所以实时性较差,消费者难以获取实时消息
具体什么时候能拿到新消息完全取决于消费者什么时候去拉取消息
具体什么时候能拿到新消息完全取决于消费者什么时候去拉取消息
Push推送消息
由RabbitMQ Server主动向Consumer推送消息,请求由broker发起,broker压力会比较大
1.推模式接收消息是最有效的一种消息处理方式,channel.basicConsume(queueName,consumer)
方法将信道(channel)设置成投递模式,知道取消队列的订阅为止,在投递模式期间,当消息到达
RabbitMQ时,RabbitMQ会自动地、不断地投递消息给匹配地消费者,而不需要消费端手动来拉取,
当然投递消息地个数还是会收到channel.basicQos地限制
方法将信道(channel)设置成投递模式,知道取消队列的订阅为止,在投递模式期间,当消息到达
RabbitMQ时,RabbitMQ会自动地、不断地投递消息给匹配地消费者,而不需要消费端手动来拉取,
当然投递消息地个数还是会收到channel.basicQos地限制
2.推模式将消息提前推送给消费者,消费者必须设置一个缓冲区缓存这些消息,优点是消费者总是有一堆在
内存中待处理地消息,所以当真正去消费消息时效率很高,缺点就是缓冲区可能会移除
内存中待处理地消息,所以当真正去消费消息时效率很高,缺点就是缓冲区可能会移除
3.由于推模式时消息到达RabbitMQ后,就会立即被投递给匹配地消费者,所以实施性非常好,
消费者能即时得到最新的消息
消费者能即时得到最新的消息
消费消息确认机制
自动提交
消费者接收到消息时,自动地向broker进行提交,容易造成消息丢失
手动提交
由消费者手动在合适的场景下进行提交,更灵活,不容易造成消息没有消费到而丢失
拒绝消息
当消费者处理消息失败或者当前不能处理该消息时,可以给Broker发送一个拒绝消息的指令.
并且可要求Broker将该消息丢弃或者重新放入队列中
需要注意的是,当队列中只有一个消费者时,需要确认不会因为拒绝消息并选择重新放入队列中而导致
消息在同一个消费者上发生死循环
并且可要求Broker将该消息丢弃或者重新放入队列中
需要注意的是,当队列中只有一个消费者时,需要确认不会因为拒绝消息并选择重新放入队列中而导致
消息在同一个消费者上发生死循环
拒绝一条消息/拒绝多条消息
消息预取
在实际场景中,如果对每条消息的处理时间不同,可能导致有些消费者一直很忙,而有些消费者
处理很快并一直空间,这时可以通过设置预取数量(PrefetchCount)限制每个消费者在收到下一个确认回直前
一次最多可以接收到多少条消息
处理很快并一直空间,这时可以通过设置预取数量(PrefetchCount)限制每个消费者在收到下一个确认回直前
一次最多可以接收到多少条消息
流控机制
当生产消息速度大于消费速度时,会造成队列中堆积大量消息,
服务端默认配置是当内存使用达到40%,磁盘空间小于50M时,会触发
服务端默认配置是当内存使用达到40%,磁盘空间小于50M时,会触发
RabbitMQ可以对内存的使用量设置阈值,当达到阈值后生产者将被阻塞,,直到对相应资源的使用回复正常
除了这两个阈值外,RabbitMQ还用流控(FlowControl)机制来确保稳定性。由于Erlang进程之间并不共享内存(binaries类型)
而是通过传递消息来通信的,所以每个进程都有自己的进程邮箱(mailbox),因为Erlang默认不会对mailbox的大小进行设限
所以如果由大量消息持续发往某个进程,将会导致该mailbox过大,最终内存溢出,进程崩溃
在RabbitMQ中如果生产者持续高速发送消息,而消费者消费的速度又低于生产者发送的速度,若没有流控很快就会使mailbox
达到阈值限制,从而阻塞生产者的操作(因为有Block机制,所以进程不会崩溃),然后RabbitMQ会进行换页操作,把内存中的数据持久化到磁盘上
触发流控机制后RabbitMQ服务端接收消息的速度就会变慢,从而使进入队列的消息减少,同时RabbitMQ服务端的消息推送也会收到极大
的影响,服务器端推送的频率会大幅下降
除了这两个阈值外,RabbitMQ还用流控(FlowControl)机制来确保稳定性。由于Erlang进程之间并不共享内存(binaries类型)
而是通过传递消息来通信的,所以每个进程都有自己的进程邮箱(mailbox),因为Erlang默认不会对mailbox的大小进行设限
所以如果由大量消息持续发往某个进程,将会导致该mailbox过大,最终内存溢出,进程崩溃
在RabbitMQ中如果生产者持续高速发送消息,而消费者消费的速度又低于生产者发送的速度,若没有流控很快就会使mailbox
达到阈值限制,从而阻塞生产者的操作(因为有Block机制,所以进程不会崩溃),然后RabbitMQ会进行换页操作,把内存中的数据持久化到磁盘上
触发流控机制后RabbitMQ服务端接收消息的速度就会变慢,从而使进入队列的消息减少,同时RabbitMQ服务端的消息推送也会收到极大
的影响,服务器端推送的频率会大幅下降
常见问题
如何保证顺序消费
单队列+单消息。
RabbitMQ当中,针对消息顺序的设计是比较弱的。唯一比较好的策略就是单队列+单消息推送。
即一组有序消息,只发到一个队列中,利用队列的FIFO特性保证消息在队列内顺序不会乱.
但是这是以极度消耗性能作为代价的。在业务场景中,应该尽量避免这种场景。
然后在消费者进行消费时,保证只有一个消费者。同时指定prefetch属性为1
RabbitMQ当中,针对消息顺序的设计是比较弱的。唯一比较好的策略就是单队列+单消息推送。
即一组有序消息,只发到一个队列中,利用队列的FIFO特性保证消息在队列内顺序不会乱.
但是这是以极度消耗性能作为代价的。在业务场景中,应该尽量避免这种场景。
然后在消费者进行消费时,保证只有一个消费者。同时指定prefetch属性为1
问题:如果生产者发送的消息123,结果1消息发送失败了,进行重试发送,再进行到broker中接收时则会变成231,尽可能不要异步发送,改成同步发送
如何避免重复消费
无法做到绝对的消息不重复
Producer
如果采用同步发送确认机制。生产者发送消息之后,要确保一条消息已经发送Broker
如果采用的是异步发送确认机制。需要保证每个消息具有唯一的id,可以供Producer查询到这条消息是否发送到了Broker
尽量不要采用批量消息发送,如果其中某一条消息存在问题,则会导致整批消息都需要重传
消息去重,增加唯一id
给消息增加过期时间,如果在消息存活期内没有被消费,则进入死信队列
问题:有可能Broker已经收到了消息,由于网络抖动,网络向Producer返回的确认ACK,Producer暂未收到,Producer先于Broker重发了消息
Consumer
尽可能地做到幂等消费
逐条消费消息,手动确认
首先要确保消息是可以正常被消费的,不然会进入重试队列,一直被消费,直到进入死信队列中
给消息增加一个唯一标识,消费消息时,在业务当中判断一下改消息是否被消费过
问题:Consumer已经消费完了消息,但是在手动提交位点时,发生了宕机,这时Broker将消息转发到了其他的Consumer上,造成了消息的重复消费
Broker
队列设置为持久化,避免消费过的消息位点没有持久化到磁盘上而重复消费
如何避免消息丢失
步骤(1,2,3,4)都有可能造成消息的丢失,
无法做到绝对的消息不丢失
无法做到绝对的消息不丢失
Producer
消息确认模式调整为同步模式
Consumer
消费消息时逐条消息消费,并且在业务执行完毕时进行手动提交
Broker
将队列声明为持久化
主从集群,每条消息等待其他所有的slave节点复制完成,可以等到大多数节点
调整Broker刷盘的缓冲区大小,缩小刷盘间隔,争取让更多的消息落在磁盘上,防止内存中的数据丢失
操作系统
尽可能地进行同步刷盘,来一条消息,刷到磁盘上
刷盘分为同步刷盘和异步刷盘
消息堆积如何解决
Producer
如果业务允许生产速度降低,则可以进行使用,大多数场景下,不会降低生产者生产消息的速度
消息确认机制,尽可能地使用批量消息确认或者异步确认,增大吞吐量,让消费者做到幂等
Consumer
增加消费者数量,可以提升消息消费的速度
1.增加单台消费者拉取消息的数量,尽可能地一次性拉取多条消息
2.增加单台消费者地线程数,并发消费
2.增加单台消费者地线程数,并发消费
单条消息调整为手动批量提交,相比逐条消息提交性能会比较好
Broker
可以使用懒加载队列的方式,先存入到磁盘,使用到的时候再加载到内存中
要付出一定的磁盘IO性能
要付出一定的磁盘IO性能
消息推送模式由Push模式调整为Poll模式,降低Broker的压力
0 条评论
下一页