3.MQ避免重复消费、保证可靠性传输、保证消息的顺序性
2021-06-01 16:34:21 19 举报
MQ避免重复消费、保证可靠性传输、保证消息的顺序性
作者其他创作
大纲/内容
rabbitmq生产者消息不丢方案
kafka消息不丢失方案
消费者
生产者
1.将channel设置为confirm的模式2. 发送一个消息后不做任何处理3.rabbitmq如果接受这条消息的话,会回调本地的ack(Message msg) 接口通知这条消息已经收到4.rabbitmq如果没接收到这条消息的话,会回调本地的nack(Message msg)接口通知这条消息接收失败了,需要重新发送
data1、data2、data3
数据3, offset=102
消费者宕机若未提交offset=102,导致重启后又从offset=100开始消费
3.由于消费者是并行执行,且业务逻辑执行时间不一致,可能导致数据落库乱序
提交offset
rabbitmq消息丢失问题
thread
解决方案:写一个内存队列queue,相同key的写入到同一个queue中。N个线程各对应一个queue去消费处理。
对消费的消息做落库
kafka
案例: 用消费者的多线程并发处理方式,经过压测,消费者4核8G的机器,单击开启32条线程,最高每秒可以处理上千条消息
channel.txSelecttry { // 发送消息}catch(Exception e){ channel.txRollback // 再次重试发送这条消息}channel.txCommit
rabbitmq消息重复消费问题
data1 data2 data3
2.每个消费者都从唯一一个queue里拿到一个消息消费
内存队列1
数据库
事务机制
rabbitmq消息乱序问题
2. rabbitmq将接收到的消息写入内存,在消费者未消费之前宕机导致内存数据丢失
rabbitmq
数据1, offset=100
数据2, offset=101
kafka消费端重复消费问题
partition1
2.消费者消费时也是有序消费。
这种方式为异步非阻塞的,生产者可以发挥最好的性能,吞吐量不受影响
按照数据进入kafka的顺序给每条数据分配一个offset
1.producer开启acks=all,保证所有follower都同步数据成功2.producer开启retreis=MAX,失败无限次重试
data1data2data3
data3
data2
uniqe解决kafka消费端重复消费问题
data2data1data3
queue3
磁盘
消费一定数量的消息提交一次offset
返回offset给kafka
生产者
3.消费者消费到这个消息当没来得及处理就宕机了。重启后以为已经处理过没有再去消费导致丢失数据。
queue2
partition3
内存队列2
同步
1.写消息过程中,消息没有被网络传输到rabbitmq中
每次消费往DB或内存map写入一个数据,用来保证唯一值。如:订单回退,每个消息都将orderId写入DB,如果重复就提交失败并回滚事务
confirm机制
1. 一个生产者生产后的消息被多个消费者消费
broker 1partion1leader
queuerabbitmq
kafka消息不丢方案
kafka消息消费最终无序问题
queue1
zookeeper
关闭消费者自动提交offset机制,在消费消息并且正常执行完业务逻辑后在调用API提交offset。
DB
partition2
data1
1.生产者在写消息的时候根据指定key来让消息一定写入到一个partition中,这一过程写入的数据是有序的
kafka如何保证消息顺序性
kafka消息重复消费问题
rabbitmq保证消息有序
broker 3partion1follower
2.每个消费者都从各自的queue里拿到消息消费
rabbitmq如何保证消息顺序性
说明: 消费者从partition中取出是一定有序的,但是消费者使用并发多线程执行的方式去处理消息的消费,导致partition取出的消息不能保证有序的执行。这里将从partition取出的消息消费后写入内存queue,然后再从内存queue中取出进行处理,可以保证线程执行的有序性。
1.replication.factor>1,至少两个副本2.min.insycn.replicas >1,至少一个follower可用
1. 需要保持顺序的消息有序写入同一个queue
持久化
以订单业务为例,该消息写入到partition1中是有序的
开启rabbitmq的持久化,将消息写入到磁盘中。当rabbitmq宕机后重启,会自动读取持久化的数据,来保证消息不丢失
关闭消费者的autoAck机制,在消费消息并且正常执行完业务逻辑后在调用API提交ack。
3.消费者对消息的处理不可能采用单线程,每个消息的消费都是以单个线程并发执行。这样的话,一条消息处理要几十ms,每秒只能处理十几条数据,太慢了。这就导致线程执行的快慢会影响向DB写入数据的顺序不同。导致最终结果的数据是乱序的
内存队列3
2.消费者从对应的queue取出消息进行消费,保证有序性
broker 2partion1follower
rabbitmq和消费者端消息不丢方案
事务机制处理消息丢失,该机制为同步的,生产者发送消息后会同步阻塞等待rabbitmq是成功或失败这种同步阻塞的方式会导致生产者发送消息的吞吐量下降
0 条评论
回复 删除
下一页