rocketMQ-6-4-broker处理半消息事务
2023-08-03 17:52:22 0 举报
broker处理半消息事务的详细步骤,源码级剖析实现步骤
作者其他创作
大纲/内容
实现NettyRequestProcessor接口
DefaultMQProducerImpl生产者
asyncPrepareMessage()1.异步发送prepare消息
processRequest()
TransactionalMessageBridge
SendMessageProcessor发送消息处理器
getHalfMessageByOffset()根据offset去commitLog查询半消息
CommitLog消息落盘
asyncPutMessage()异步存储消息
commit
MQClientAPIImpl.endTransactionOneway()://【命令:结束事务】 RemotingCommand.createRequestCommand(RequestCode.font color=\"ff0000\
broker处理半消息事务
CompletableFuture<PutMessageResult> asyncPutMessage()异步写入消息(Half)
PutMessageResult putMessage()同步写入消息(OP消息)
2.rollbackMessage()
ConsumerOffsetManager消费者偏移量管理器
updateConsumeOffset()更新messageQueue、opQueue的offset
commitOffset()提交消费偏移量
桥接层(技术实现)
仓储层
putMessage()同步存储消息
netty 远程通信END_TRANSACTION终结事务
rollback
业务服务层(事务消息实现采用2PC思想)
startProcessorByHa()->TransactionalMessageCheckService.start():
broker启动
deletePrepareMessage()删除prepare消息(写OP)
checkTransactionState()反查事务状态
check()校验half消息1.校验事务状态2.更新消费偏移量
2.commitMessage()
2.processTransactionState()处理事务状态
netty 远程通信CHECK_TRANSACTION_STATE校验事务状态
asyncPutHalfMessage()异步发送半消息
EndTransactionProcessor终结事务处理器
parseHalfMessageInner()备份真实topic,重置topic为RMQ_SYS_TRANS_HALF_TOPIC,使得无法消费返回MessageExtBrokerInner
TransactionalMessageServiceImpl事务管理服务实现类
putOpMessage()往topic=RMQ_SYS_TRANS_OP_HALF_TOPIC 中写入OP消息
DefaultMessageStore存储层
1.checkTransactionState()获取事务状态(RocketMQLocalTransactionListener)
还原PrepareMessage:1)原始topic等信息2)消息写入commitLog,供消费
ClientRemotingProcessor客户端远程处理器
每60s执行一次 check
0 条评论
下一页