消息队列MQ,rabbitmq,kafka
2024-03-24 17:36:14 11 举报
AI智能生成
消息队列 - activemq, rabbitmq, kafka, redis 知识图谱,架构设计,中间件,主从,分布式,消息队列设计,
作者其他创作
大纲/内容
MQ基本知识
MQ优点
1.解偶
本质把问题拆分为生产(1-n个生产者),消费(1-n个消费者)的概念
2.异步
3.削峰
4.解决高并发
MQ缺点
1.系统可用性降低
MQ一旦故障,系统A就没办法发送消息到MQ了,系统BCD也没法接收到消息。下游流程没有办法运转了。
2.导致系统致系统复杂性变高
系统A本来给系统B发送一条就可以,但是由于系统A和MQ协调问题,系统A重复发送了两条数据给MQ。
3.一致性问题
有人给A发送请求,本来这个请求ABCD都执行成功才能返回,结果D执行失败了,就导致整个请求给用户返回失败。后台逻辑实际上差了一点需要解决。
MQ调研对比
ActiveMQ
定位小规模吞吐量
成熟,功能强大
偶尔丢失数据,维护越来越少
RocketMQ
吞吐量万级,消息队列
分布式扩展起来比较方便
java开发,源码可以研究点
如果阿里不支持了就黄了
RabbitMQ
万级吞吐量,MQ功能比较完备
用erlang开发,性能极好,延迟最低。
可靠性比较高,有队列模式,发布订阅模式,路由模式等方便业务。
社区相对比较活跃,开源提供管理界面很棒。
Redis
十万级别吞吐量,定位灵活的队列消息
多路复用 IO 模型,提升队列效率,使用简单。
基于内存,高效的数据结构。
场景的限制,如果需要更复杂的数据流处理,建议选择更适合的框架或者二次开发。
Kafka
百万万级吞吐量
多broke的高吞吐量性能,多分区的高并发,多副本的高可用。
天然适合大数据的实时计算,数据流式处理。作为消息队列,结合流处理引擎可以实现高性能、可伸缩、可靠且低延迟的数据流处理框架。
数据功能比较少。
如何保证消息不被重复消费(幂等性)?
怎么保证幂等性
幂等性:一个数据或者一个请求,给你重复来多次,你得保证对应的数据是不会变的
根据业务来解决
基于Redis,生产者发送每条数据时,里面加一个全局唯一的id,类似订单id之类的东西。消费者先根据id去Redis里查一之前消费过么?
如果没有消费过,就正常处理,然后将id写到redis。
如果有消费过,那就不处理了,保证别重复处理相同的消息即可。
如果没有消费过,就正常处理,然后将id写到redis。
如果有消费过,那就不处理了,保证别重复处理相同的消息即可。
kafka消费端出现非幂等
每个消息都有一个offset,代表这个消息的顺序的序号
消费者从kafka消费的时候,是按照这个offset顺序去消费的
消费者会去提交offset,告诉kafka已经消费到offset=153的这条数据了,offset是存储在zookeeper里的
消费者系统被重启,重启以后,去kafka继续上次接着消费,
但是offset是定时定期提交一次,可能有几条数据消费了,但是没有告诉kafka就重启了
导致这几条数据在重启完后重新消费了一次
消费者从kafka消费的时候,是按照这个offset顺序去消费的
消费者会去提交offset,告诉kafka已经消费到offset=153的这条数据了,offset是存储在zookeeper里的
消费者系统被重启,重启以后,去kafka继续上次接着消费,
但是offset是定时定期提交一次,可能有几条数据消费了,但是没有告诉kafka就重启了
导致这几条数据在重启完后重新消费了一次
如何保证从mq拿出来的数据按照顺序执行?
场景:通过mysql binlog进行数据复制,新增,修改,删除操作时,必须保证顺序一致。
RabbitMQ
原因:rabbitMQ顺序不一致情况:一个queue多个消费者consumer
解决:设置多个queue,每个queue对应一个消费者,这种顺序操作的数据写入到一个queue里面去
kafka
kafka能做到的保障:
1.写入partition中的数据一定是有顺序的。
1.写入partition中的数据一定是有顺序的。
解决:
1.生产者对需要保证顺序的数据指定一个key,保证这些数据都写入同一个partition中。
2.消费者来说:一个partition只能被一个消费者消费。
1.生产者对需要保证顺序的数据指定一个key,保证这些数据都写入同一个partition中。
2.消费者来说:一个partition只能被一个消费者消费。
如何使用延迟队列?
生成订单之后,放入TTL队列设置超时时间。
如果信息超过设置时间没处理,就进入处理死信的交换机。
然后消费者处理交换机绑定的队列消息,判断是否支付等操作,进行解锁库存取消订单等操作。
如果信息超过设置时间没处理,就进入处理死信的交换机。
然后消费者处理交换机绑定的队列消息,判断是否支付等操作,进行解锁库存取消订单等操作。
消息超过了有效期,没有人处理,成为死信
消息队列满了以后该怎么处理
场景:消费者故障 1个消费者1s 1000条,一秒三个消费者是3000条,一分钟是18万条,1000多万条积压数据。
所以如果你积压了几百万到上千万的数据,即使消费者恢复了,也需要大概1小时的时间才能恢复过来。
所以如果你积压了几百万到上千万的数据,即使消费者恢复了,也需要大概1小时的时间才能恢复过来。
kafka
1.先修复consumer消费者的问题,确保其恢复消费速度
2.新建一个topic,临时建立好partition原来10倍或者20倍的数量
3.然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮训写入到临时建立好的topic里的10倍数量的partition里
4.接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时的partition数据。
rabbitMQ
如上面所诉,新建一个MQ,之前的消费者拿到数据什么都不处理,写到新建的MQ中进行10倍消费
如果消费者故障问题没解决,可以先改下消费者代码,拿到消费者的数据直接扔掉,先保证MQ中的磁盘空间
等晚上再临时写程序找到丢失的数据重新补回来。
等晚上再临时写程序找到丢失的数据重新补回来。
批量重导:假设1万个订单积压在MQ里面,没有处理,其中1000个订单都丢了,
你只能手动写程序把1000个订单查出来,手动发到mq里再去补一次。
你只能手动写程序把1000个订单查出来,手动发到mq里再去补一次。
如果让你来开发一个消息队列中间件,如何设计架构?
1.首先MQ得支持扩容吧,所以可以设计一个分布式的MQ,参考kafka的设计理念,broker->topic->partition,每个partition放一个机器,就存一部分数据。等资源不够了,直接给topic增加partition,然后做数据迁移,增加机器,就可以存放更多数据,增加吞吐量。
2.考虑MQ的数据要持久化,落磁盘的数据要顺序写,这样就没有了磁盘随机读写的寻址开销,性能更高。kafka思路
3.mq可用性。参考kafka,多个副本 leader&follower->broker 挂了重新选举leader即可对外服务
上面整体的总结
RabbitMQ篇
基本结构
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue
Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
消息发布接收
发送消息
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道Channel。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息根据规则转发到指定的Queue。
接收消息
1、消费者和Broker建立TCP连接
2、消费者和Broker建立Channel
3、消费者监听指定的Queue
4、当有消息到达Queue时,Broker默认将消息推送给消费者。Comsumer收到消息。
工作模式
simplest(简单模式)
一个生产者发送消息到一个队列,一个消费者从队列中接收消息进行处理。
work queues(工作队列模式)
生产者发送消息到一个工作队列,多个消费者同时从队列中接收消息并进行处理。
一条消息只会被一个消费者接收;可以有多个消费者同时处理队列中的消息。
可以设置消息的确认机制和fair dispatch(公平调度)来保证消息处理的可靠性和均衡性。
特点:一个生产者,多个消费者中每个消费者获取到的消息唯一
Publish/Subscribe(发布订阅模式)
生产者发送消息到一个被称为"交换器(Exchange)"的组件,然后交换器将消息广播到绑定在它上面的所有队列。
publish/subscribe的生产者是面向交换机发送消息;
通过交换器(exchange)实现消息分发到queue,每个queue都有自己的消费者进行处理;
特点:一个生产者,发送的消息会被多个消费者获取。使用了交换机的概念,去分发消息到不同队列。
Routing(路由模式)
Routing模式要求队列在绑定交换机时要指定routingkey,消息会转发到符合routingkey的队列
特点:在发布/订阅基础上,增加消息过滤功能,更精确地控制消息流向。
Topics(通配符模式)
队列绑定交换机指定通配符,和路由模式类似,但支持模式匹配(wildcards),如 *(匹配一个词)和 #(匹配多个词)。
统配符规则:中间以“.”分隔。
RPC(客户端远程调用服务端)
Kafka篇
基本结构
组件架构
服务协调
使用 Zookeeper
Zookeeper是Kafka集群的协调者,负责管理Broker的状态、Partition的分配和偏移量的管理。
Kafka依赖Zookeeper来实现集群的协调和一致性。
生产者Producer
中间人Broker
broker就是kafka server,每一台kafka服务器都是一个brocker
broker内部有物理的分区partiton,用作高并发(提高吞吐量)
broker之间有数据副本follower,用作高可用
消费者Consumer
核心设计
topic
topic是kafka的基础单元,是一个逻辑概念。数据实际写入的是物理的partition,一个topic包含多个partition。
每条消息属于且仅属于一个Topic
发送和订阅消息都必须指定topic
broker
Kafka集群中的每个服务器节点都是一个Broker,负责存储和管理消息数据。
每个Broker可以容纳多个Partition。
每个Partition又可以分为多个Segment。
partition
partition中消息持久化时,每条消息都是根据一定的分区规则路由到对应的partition中,并append到log文件的尾部,提高并发。
partition的副本可以被分布在不同的broker上,某个broker的故障不会影响到整个主题的可用性。
partition中消息是顺序写入磁盘且有序的,但不同partiton之间不能保证消息的有序性,提高效率。
partition个数最好与服务器个数相当
副本 replication
partition可以有指定数据的副本。主从模式producer和consumer只与leader交互。
replication follower从leader复制数据
Kafka会在Zookeeper上针对每个Topic维护一个in-sync replica(ISR)已同步的副本。
如果某个分区的Leader不可用,Kafka就会从ISR集合中选择一个副本作为新的Leader。
如果某个分区的Leader不可用,Kafka就会从ISR集合中选择一个副本作为新的Leader。
setment file
segment file (多个大小相等的段) 组成了一个partition,每个partition 就相当于一个巨型的文件
segment file 的消息数量并不一定相等
组成
.index 索引文件
包含若干索引条目,每个条目表示数据文件中一条message的索引
.log 数据文件
offset
位移
partition中的每个消息都有一个连续的序号,用于partition标识唯一的ID序号消息。
Offset记录着下一条将要发送给Consumer的消息的ID序号。
Offset从语义上来看拥有两种:Current Offset和Committed Offset。
Offset记录着下一条将要发送给Consumer的消息的ID序号。
Offset从语义上来看拥有两种:Current Offset和Committed Offset。
Current Offset
Current Offset保存在Consumer中,它表示Consumer希望收到的下一条消息的序号。它仅仅在poll()方法中使用。
例如,Consumer第一次调用poll()方法后收到了20条消息,那么Current Offset就被设置为20。
这样Consumer下一次调用poll()方法时,Kafka就知道应该从序号为21的消息开始读取。这样能够保证每次Consumer poll消息时,收到不重复的消息。
例如,Consumer第一次调用poll()方法后收到了20条消息,那么Current Offset就被设置为20。
这样Consumer下一次调用poll()方法时,Kafka就知道应该从序号为21的消息开始读取。这样能够保证每次Consumer poll消息时,收到不重复的消息。
Committed Offset
已提交位移,保存在Broker上,表示Consumer已经确认消费过的消息的序号。
举个例子,Consumer通过poll() 方法收到20条消息后,此时Current Offset就是20,经过一系列的逻辑处理后,并没有调用consumer.commitAsync()或consumer.commitSync()来提交Committed Offset,那么此时Committed Offset依旧是0。
举个例子,Consumer通过poll() 方法收到20条消息后,此时Current Offset就是20,经过一系列的逻辑处理后,并没有调用consumer.commitAsync()或consumer.commitSync()来提交Committed Offset,那么此时Committed Offset依旧是0。
总结
Current Offset是针对Consumer的poll过程的,它可以保证每次poll都返回不重复的消息;
Committed Offset能够保证新的Consumer能够从正确的位置开始消费一个partition,从而避免重复消费。
Committed Offset能够保证新的Consumer能够从正确的位置开始消费一个partition,从而避免重复消费。
生产者Producer
self.producer.send(topic=kafka_topic_name, value=message.encode('utf-8'), key=key, partition=partition)
消费者Consumer
self.consumer.poll(self, timeout_ms=0, max_records=None, update_offsets=True)
保证mq高可用
RabbitMQ集群模式
主备集群模式
每个节点上都有queue的完整镜像,包含了queue的全部数据
优点
能够从多个机器上去消费提高消费的吞吐量而已
缺点
如果queue所在的节点宕机了,可用性几乎没有什么保障,就导致该queue的数据丢失了;数据量大的时候,高并发问题
镜像集群模式
每个节点上都有queue的完整镜像,包含了queue的全部数据
优点:任何一个节点宕机了,高可用。其他节点上还包含了这个queue的完整数据,别的consumer都可以到其他活着的节点上去消费
缺点:不是分布式,如果queue的数据量很大,大到机器上的容量无法容纳了,此时该怎么办呢
如何开启镜像集群模式:在管理控制台,后台新增一个策略。这个策略就是镜像集群模式,指定的时候可以要求数据同步到所有节点的,也可以要求同步到指定数量节点,然后再次创建queue的时候应用这个策略,就会自动将数据同步到其他节点上去了
分布式集群
集群分布式无主架构:弹性扩缩容,容量无上限。
Kafka集群高可用架构
强大天生的分布式系统:每台机器上的broker进程,就可以认为是kafka集群中的一个节点
每个节点存储一部分topic的partition
每个节点可以设置多个副本,选举一个为leader,其他副本为follower
生产者只能往leader里写数据,写入数据到leader的时候,leader就会将数据同步到follower上去
生产者只能往leader里写数据,写入数据到leader的时候,leader就会将数据同步到follower上去
保证mq的可靠性
RabbitMQ
1.生产者丢失数据
写消息过程中,消息都没到rabbitmq在网络传输过程中就丢了
解决
方案1:commit事务
使用rabbitMQ提供的事务功能,发送数据前开启rabbitmq事务(channel.txSelect),然后发送消息。
如果没有被rabbitMQ接收到消息,生产者会收到异常报错,此时就可以回滚事务(channel.txRollback),然后重新发送消息;
如果收到了消息,那么可以channel.txCommit 用于提交事务;
如果没有被rabbitMQ接收到消息,生产者会收到异常报错,此时就可以回滚事务(channel.txRollback),然后重新发送消息;
如果收到了消息,那么可以channel.txCommit 用于提交事务;
缺点:这个事务机制是同步的,生产者发送消息会阻塞卡住等待成功,会导致生产者发送消息的吞吐量降下来
方案2:confirm确认
先把channel设置成confirm模式,发送一个消息,发送完消息后就不用管了。
rabbitmq如果接收到这条消息,就会回调你生产者本地的一个接口,通知说这条消息已经收到了;
rabbitmq如果在接收消息的时候报错了,就会回调你的接口告诉这个消息接收失败了,你可以再次重发;
rabbitmq如果接收到这条消息,就会回调你生产者本地的一个接口,通知说这条消息已经收到了;
rabbitmq如果在接收消息的时候报错了,就会回调你的接口告诉这个消息接收失败了,你可以再次重发;
生产者这块如果要保证消息不丢,一般是用confirm机制,异步的模式,你发送消息之后不会阻塞
2.rabbitMQ丢失数据
rabbitMQ将数据暂存在自己的内存里,结果消费者还没来得及消费,rabbitmq挂掉了,就导致暂存在内存里的数据丢失
解决
rabbitMQ开启持久化
1.创建queue的时候设置queue为持久化,但是queue里的消息不会持久化
2.发送消息msg的时候将msg设置为持久化的,deliveMode设置为2
必须要同时设置这两个持久化才行,rabbitmq哪怕是挂了,再次重启也会从磁盘上重启恢复queue,恢复这个queue里的数据。
3.消费者丢失数据
打开了autoAck机制,消费者消费到了这个消息,但是没处理就挂掉了,rabbitmq以为这个消费者已经处理完自动ack了。
解决
在消息消费完后,手动ack通知数据成功消费。
在应用端,用逻辑或者约束实现幂等来保证一致性。
Kafka
生产者会不会丢数据?
如果按照上面的配置设置acks=all,一定不会丢
kafka丢失数据
生产者将数据传给了主节点 partition1 leader,但是leader还没有把数据同步到自己的从节点partition1 follower就挂掉了。
经过重新指定leader后,新leader里没有这个未同步的数据。
经过重新指定leader后,新leader里没有这个未同步的数据。
解决
1.给topic设置replication.factor参数:这个值必须大于1,要求每个partition必须有至少2个副本
2.在kafka服务器端设置min.insync.replicas参数:这个值必须大于1,这是要求一个leader至少感知到至少有一个follower还跟自己保持联系,没掉队
3.在producer(生产者)端设置 acks=all:这个是要求每条数据,必须写入所有replica之后,才能认为是写成功了。
acks=0:发送完数据就不管了。
acks=1:发送完数据只要leader接收到就算成功,默认设置这个比较好
acks=all:必须所有分片都同步完数据才算成功
消费者丢失数据
消费者消费到消息,自动提交offset,让kafka以为你已经消费好了消息。但是实际上你刚拿到这个消息还没处理就挂了,导致数据丢失
解决
取消自动offset,进行手动offset,彻底处理完数据再进行offset传输
但是容易出现消费完消息还没提交offset就挂了的情况,导致重复消费
根据实际情况保证幂等性就行
0 条评论
下一页