RocketMQ事务消息流程
2025-02-12 20:01:10 0 举报
RocketMQ事务消息流程
作者其他创作
大纲/内容
opTopic
TRANSACTION_ROLLBACK_TYPE
DefaultMQProducerImpl
fromTransactionCheck为true?
TransactionMQProducer
④开启事务消息生产者
AbstractTransactionalMessageCheckListener
删除半事务消息deletePrepareMessage
解析消息
localTransactionState
获取系统半事务消息topicRMQ_SYS_TRANS_HALF_TOPIC的所有消息队列
ClientRemotingProcessor
broker
前期准备
消息保存到RMQ_SYS_TRANS_OP_HALF_TOPIC,这个opTopic中
processRequest
③定义一个checkExecutor线程池,表示生产者客户端自身的本地事务检查线程
消息发送流程和发送同步消息流程一样
EndTransactionProcessor
TRANSACTION_NOT_TYPE
发送同步消息
sendResult
while循环
TransactionListenerImpl
根据本地事务的执行结果,向broker发送本地事务确认结果
根据生产者组select一个生产者
发送事务消息
消息属性:消息类型为TRAN_MSG生产者组名称为PGROUP,值为我们自定义的生产者组
endTransaction方法
使用Transaction-msg-check-thread线程来发送检查事务消息,线程池执行该线程
commitOrRollback存在
invokeOneway
是否是broker事务回查后生产者重新检查本地事务后的二次确认结果发送?
①创建一个事务监听器TransactionListener
broke启动时的定时事务消息检查服务
直接return null说明本地事务的状态仍然是未知状态,需要再次等待下一次事务消息的回查
checkTransactionState
send方法
返回本地事务执行状态LocalTransactionStateCOMMIT_MESSAGEROLLBACK_MESSAGEUNKNOW
否
事务消息检查间隔:默认30秒
生产者发送半事务消息的首次本地事务确认结果请求
producerTable这个map中获取
通过偏移量获取半事务消息
MessageStore将消息异步存储到brokerputMessage
发送CheckTransactionStateRequest请求
TRANSACTION_COMMIT_TYPE
RequestCode.CHECK_TRANSACTION_STATE
检查消息
MQClientInstance
处理本地事务状态processTransactionState
RMQ_SYS_TRANS_OP_HALF_TOPIC
发送成功
生产者发送半事务消息流程
run方法
sendCheckMessage(msgExt);
判断是否需要检查半事务消息如果是,则通过监听器解析半事务消息
设置事务ID(一个UNIQ_KEY)
使用Netty的异步通道发送RequestCode.END_TRANSACTION
实际上是创建一个线程request,然后提交到生产者端创建的线程池checkExecutor中
TransactionalMessageCheckService
30秒后获取事务超时时间:默认6秒事务被检查的最大次数:15次
执行本地事务executeLocalTransaction
获取消息中的生产者组
从本地事务缓存localTrans中根据事务Id获取事务的状态
producer
构建EndTransactionRequest请求
实际上是再次向broker发送二次确认结果,请求仍然是下面的请求RequestCode.END_TRANSACTIONcommitOrRollback的值可能会变化同时将fromTransactionCheck设置为true
resolveHalfMsg
check
遍历消息队列集合
请求参数为:topiccommitLogOffsetmsgId(UNIQ_KEY)transactionId(msgId)等
invokeOneway(不考虑消息发送结果,如果消息发送失败,可能会消息丢失)
调用该生产者的检查事务状态方法
sendMessageInTransaction
TransactionalMessageServiceImpl
通过生产者创建的事务监听器来检查本地事务transactionListener.checkLocalTransaction(message);
请求参数为:topic事务idcommitLog偏移量brokerName生产者组commitOrRollback属性:TRANSACTION_COMMIT_TYPETRANSACTION_ROLLBACK_TYPETRANSACTION_NOT_TYPE
broker事务回查后,生产者检查本地事务后的二次确认结果请求
回滚消息这个步骤没有
getOpQueue
获取之前发送的半事务消息
switch
0 条评论
下一页