RocketMQ事务消息机制流程简述
2025-02-11 23:31:41 0 举报
一文彻底读懂RocketMQ事务消息机制
作者其他创作
大纲/内容
事务消息原理图
底层原理如下所示:
一:生产者准备发送事务消息前期准备
1:实例化事务监听器TransactionListener
2:初始化一个broker回查本地事务状态的线程池,用来执行回查请求线程
3:设置生产者组
4:实例化TransactionMQProducer事务消息生产者
5:启动事务消息生产者
6:开始发送事务消息
二:发送事务消息sendMessageInTransaction
1::设置消息的属性为事务消息类型TRAN_MSG
2:将生产者组设置为事务生产者组PGROUP的值
3:发送普通同步消息到broker
4:broker接收到发送消息请求后,根据消息中类型为事务消息类型,则将消息存放到一个RMQ_SYS_TRANS_HALF_TOPIC这个半事务消息的topic中,返回生产者客户端相应
5:然后回到生产者客户端,接收到broker发送成功的响应sendResult
6:通过事务监听器执行本地事务,执行完本地事务后,将本地事务提交状态放入一个localTrans缓存中,key是事务id,value为本地事务状态,然后返回一个本地事务提交状态LocalTransactionState
7:生产者端再根据本地事务提交状态,是提交、回滚还是未知,都将结果设置到消息请求头的commitOrRollback属性,同时RequestCode为END_TRANSACTION
8:调用EndTrasactionInvokeOneway方法,通过NettyRemotingClient异步发送到broker
9:broker接收到END_TRANSACTION请求后,根据请求头中的commitOrRollback属性值来判断是提交、回滚还是未知,做不同的业务处理,如果是未知状态,直接返回一个null,如果是提交状态,则将消息存储到真正的topic下面的commitLog中,然后删除半事务消息,如果是回滚,则直接删除半事务消息
10:如果broker接收到的本地事务提交状态为未知状态,那么可以由broker启动时的检察事务定时任务进行回查本地事务提交状态,定时任务执行时间每隔30秒检查一次,事务检查最大次数为15次
11:首先根据生产者组来选择一个生产者,然后调用该生产者对应的事务监听器的检查本地事务状态方法,实际上是将一个检查请求线程提交到生产者启动前设置的线程池中,当该线程运行后,执行事务监听器的检查本地事务状态方法,最终会返回一个本地事务检查状态,提交、回滚、未知
12:回查结束后,生产者将本地事务提交确认结果(提交、回滚、未知)再次通过NettyRemotingClient发送到broker,称之为二次确认结果提交到broker,这次请求头多了一个属性fromTransactionCheck,表示来自事务检查的请求
13:同样的,broker接收到请求后,依然会判断commitOrRollback的值是提交、回滚、还是未知,重新回到9
0 条评论
下一页