rocketmq源码(九)——事务消息
2024-09-19 16:33:17 2 举报
在RocketMQ中,事务消息是保证分布式事务一致性的一种机制。事务消息通过提供一组API,使生产者能够在发送消息时将其与本地事务关联。如果本地事务成功提交,消息将被发送到Broker;如果本地事务回滚,消息将被丢弃。同样,消费者在消费消息时,也可以将消息与本地事务关联。如果本地事务成功提交,消息将被确认;如果本地事务回滚,消息将被重发或转移至死信队列。RocketMQ的事务消息实现了Producer-Consumer之间的分布式事务,确保了消息发送与本地事务执行的一致性。
作者其他创作
大纲/内容
msg
同步发送的事务消息消息
异步执行request任务this.checkExecutor.submit(request);
事务提交
拿到生产者设置的事务监听器TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
回查事务消息listener.resolveHalfMsg(msgExt);
初始化Boolean initResult = controller.initialize();
转换后的普通消息写到commitlog日志RemotingCommand sendResult = sendFinalMessage(msgInner);
向生产者设置事务监听器producer.setTransactionListener(transactionListener);
拿到事务监听器TransactionListener transactionListener = getCheckListener();
标记消息为事务消息
设置回滚或者提交状态
循环回查事务状态,最大间隔checkInterval,默认60*1000
消息转换:将事务消息转换为普通消息,设置真正的topic,queueIdMessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
MessageSysFlag
同步发送消息
run()
处理事务消息this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
最低检查次数int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
启动Broker服务端controller.start()
将事务消息设置为half消息parseHalfMessageInner(messageInner)
线程池异步执行sendCheckMessage(msgExt);
事务消息回滚,删除事务消息topic队列中的half消息
创建事务监听器TransactionListener transactionListener = new TransactionListenerImpl();
return null
返回成功return response;
设置消息code = CHECK_TRANSACTION_STATE
创城Brokercontroller = createBrokerController()
TRANSACTION_NOT_TYPE
判断是否是事务消息,这里是trueBoolean.parseBoolean(transFlag)
创建生产者TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
startProcessorByHa(messageStoreConfig.getBrokerRole());
从commitlog中读取half消息result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
this.onWaitEnd();
事务消息提交成功,删除事务消息topic队列中的half消息
获取事务消息标志String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
send msg
BrokerController
发送事务消息main
this.transactionalMessageCheckService.start();
检查超时时间long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
transactionalMessageCheckService.run()
启动生产者super.start();
TRANSACTION_ROLLBACK_TYPE
TRANSACTION_COMMIT_TYPE
将消息持久化到RMQ_SYS_TRANS_HALF_TOPIC对应的日志中,对消费者不可见store.asyncPutMessage(parseHalfMessageInner(messageInner));
创建检查事务状态线程池this.defaultMQProducerImpl.initTransactionEnv();
初始化: new transactionalMessageCheckListener()new transactionalMessageCheckService()
producer 生产者端
事务消息发送half消息,默认使用的同步发送sendResult = this.send(msg);
Broker 服务端
启动生产者producer.start();
发送成功后,执行本地事务
执行回查操作localTransactionState = transactionListener.checkLocalTransaction(message);
启动线程run()
拿到连接channelChannel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
0 条评论
下一页