消息队列进阶
2023-05-20 16:30:31 1 举报
AI智能生成
消息队列进阶
作者其他创作
大纲/内容
分布式事务
事务
对若干数据进行更新操作,为了保证这些数据的完整性和一致性,希望这些更新操作要么都成功,要么都失败。
ACID特性:原子性、一致性、隔离性、持久性
概念
在分布式系统中的实现事务
常见的分布式事务实现
2PC(Two-phase Commit,也叫二阶段提交)
TCC(Try-Confirm-Cancel)
事务消息
适用的场景主要是那些需要异步更新数据,并且对数据实时性要求不太高的场景
消息队列中的分布式
事务消息需要消息队列提供相应的功能才能实现,Kafka和RocketMQ都提供了事务相关功能
如何用消息队列实现分布式事务
例子:订单系统创建订单后,发消息给购物车系统,将已下单的商品从购物车中删除
1.订单系统在消息队列上开启事务
2.订单系统给消息队列发送半消息
半消息:包含的内容就是完整的消息内容,在事务提交之前,对于消费者来说,这个消息是不可见的。
3.半消息发送成功后,订单系统就可以执行本地事务了,在订单库中创建一条订单记录,并提交订单库的数据库事务。
4.根据本地事务的执行结果决定提交或者回滚事务消息
5.提交事务,则消息对消费者可见
问题:第4步提交事务失败该如何?
Kafka
简单粗暴:直接抛异常,让用户自行处理。
我们可在业务代码中反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿。
我们可在业务代码中反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿。
RocketMQ
事务反查机制:RocketMQ的Broker没有收到提交或者回滚的请求,
Broker会定期去Producer上反查这个事务对应的本地事务的状态,
然后根据反查结果决定提交或者回滚这个事务。
Broker会定期去Producer上反查这个事务对应的本地事务的状态,
然后根据反查结果决定提交或者回滚这个事务。
为支撑事务反查机制,业务代码需要实现一个反查本地事务状态的接口,
告知RocketMQ本地事务是成功还是失败。
告知RocketMQ本地事务是成功还是失败。
如何保证消息不会丢失
检测消息丢失的方法
利用消息队列的有序性
Producer端对发出的消息附加一个连续递增的序号
Consumer端来检查这个序号的连续性
Consumer端来检查这个序号的连续性
分布式系统要注意:Kafka和RocketMQ不保证Topic上严格顺序,只能保证分区上的消息是有序的
在发消息的时候必须要指定分区
在每个分区单独检测消息序号的连续性
各阶段如何确保
生产阶段
做了什么
消息在Producer创建出来,经过网络传输发送到Broker端
如何确保
消息队列通过最常用的请求确认机制,来保证消息的可靠传递。
客户端==>Broker
Broker==>客户端
只要Producer收到了Broker的确认响应,就可以保证消息在生产阶段不会丢失
客户端==>Broker
Broker==>客户端
只要Producer收到了Broker的确认响应,就可以保证消息在生产阶段不会丢失
实践
在编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失
同步发送
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("消息发送成功。");
} catch (Throwable e) {
System.out.println("消息发送失败!");
System.out.println(e);
}
RecordMetadata metadata = producer.send(record).get();
System.out.println("消息发送成功。");
} catch (Throwable e) {
System.out.println("消息发送失败!");
System.out.println(e);
}
异步发送
在回调方法里进行检查
producer.send(record, (metadata, exception) -> {
if (metadata != null) {
System.out.println("消息发送成功。");
} else {
System.out.println("消息发送失败!");
System.out.println(exception);
}
});
producer.send(record, (metadata, exception) -> {
if (metadata != null) {
System.out.println("消息发送成功。");
} else {
System.out.println("消息发送失败!");
System.out.println(exception);
}
});
存储阶段
做了什么
消息在Broker端存储,如果是集群,消息会在这个阶段被复制到其他的副本上
(只要Broker在正常运行,就不会出现丢失消息的问题。
但是如果Broker出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。)
(只要Broker在正常运行,就不会出现丢失消息的问题。
但是如果Broker出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。)
如何确保
如果对消息的可靠性要求非常高,可以通过配置Broker参数来避免因为宕机丢消息。
实践
单节点Broker
配置Broker参数,在收到消息后,将消息写入磁盘后再给Producer返回确认响应
Broker集群
至少将消息发送到2个以上的节点,再给客户端回复发送确认响应
(当某个Broker宕机时,其他的Broker可以替代宕机的Broker,也不会发生消息丢失)
(当某个Broker宕机时,其他的Broker可以替代宕机的Broker,也不会发生消息丢失)
消费阶段
做了什么
Consumer从Broker上拉取消息,经过网络传输发送到Consumer上
如何确保
采用确认机制来保证消息的可靠传递
实践
不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。
def callback(ch, method, properties, body):
print(" [x] 收到消息 %r" % body)
# 在这儿处理收到的消息
database.save(body)
print(" [x] 消费完成")
# 完成消费业务逻辑后发送消费确认响应
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(queue='hello', on_message_callback=callback)
print(" [x] 收到消息 %r" % body)
# 在这儿处理收到的消息
database.save(body)
print(" [x] 消费完成")
# 完成消费业务逻辑后发送消费确认响应
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(queue='hello', on_message_callback=callback)
如何处理消费过程中的重复消息
重复消息的产生原因
传递消息的服务质量标准
At most once
消息在传递时,最多会被送达一次。
没什么消息可靠性保证,允许丢消息
对消息可靠性要求不太高的监控场景使用
At least once
消息在传递时,至少会被送达一次
不允许丢消息,但是允许有少量重复消息出现
Exactly once
消息在传递时,只会被送达一次,不允许丢失也不允许重复
✅扩展思考
为什么大部分消息队列都选择只提供At least once的服务质量,而不是级别更高的Exactly once呢?
常用的绝大部分消息队列提供的服务质量都是At least once,包括RocketMQ、RabbitMQ和Kafka 都是这样。
也就是说,消息队列很难保证消息不重复
也就是说,消息队列很难保证消息不重复
重复消息产生的场景
网络抖动,生产者未收到Broker的ACK,重复发送消息
Broker未收到消费者的消费ACK
如何解决
消费端:让消费消息的操作具备幂等性
利用数据库的唯一约束实现幂等
为更新的数据设置前置条件
记录并检查操作
思路
在执行数据更新操作之前,先检查一下是否执行过这个更新操作
实现
在发送消息时,给每条消息指定一个全局唯一的ID,消费时,先根据这个ID检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。
消息积压了如何处理
如何规避消息积压
消息队列的性能优化
不需要关注消息队列本身的性能
(对于绝大多数使用消息队列的业务来说,消息队列本身的处理能力要远大于业务系统的处理能力。)
(对于绝大多数使用消息队列的业务来说,消息队列本身的处理能力要远大于业务系统的处理能力。)
关键在于业务代码如何与消息队列配合
生产端
发送端性能上不去原因
先检查下是不是发消息之前的业务逻辑耗时太多导致
如何优化
注意设置合适的并发和批量大小
消费端
原因
使用消息队列的时候,大部分的性能问题都出现在消费端,
如果消费的速度跟不上发送端生产消息的速度,就会造成消息积压
如果消费的速度跟不上发送端生产消息的速度,就会造成消息积压
如何优化
思路
一定要保证消费端的消费性能要高于生产端的发送性能,这样的系统才能健康的持续运行
落地
优化消费业务逻辑
水平扩容,增加消费端的并发数来提升总体的消费性能
(在扩容Consumer的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,
确保Consumer的实例数和分区数量是相等的。如果Consumer的实例数量超过分区数量,
这样的扩容实际上是没有效果的。原因我们之前讲过,因为对于消费者来说,在每个分区
上实际上只能支持单线程消费。)
(在扩容Consumer的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,
确保Consumer的实例数和分区数量是相等的。如果Consumer的实例数量超过分区数量,
这样的扩容实际上是没有效果的。原因我们之前讲过,因为对于消费者来说,在每个分区
上实际上只能支持单线程消费。)
错误解决方案
在收到消息的OnMessage方法中,不处理任何业务逻辑,把这个消息放到一个内存队列里面就返回了
会丢消息!!!
如果收消息的节点发生宕机,在内存队列中还没来及处理的这些消息就会丢失
如果收消息的节点发生宕机,在内存队列中还没来及处理的这些消息就会丢失
消息积压发生了该如何处理
粗粒度原因分析
发送变快了
通过扩容消费端的实例数来提升总体的消费能力(要同步扩容分区数量)
(无法扩容)将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量
消费变慢了
优先检查一下日志是否有大量的消费错误
消费线程是不是阻塞住了或者发生了死锁等
检查消费端,是不是消费失败导致的一条消息反复消费这种情况比较多,
这种情况也会拖慢整个系统的消费速度
这种情况也会拖慢整个系统的消费速度
0 条评论
下一页
为你推荐
查看更多