基于消息的分布式事务
2021-01-30 10:15:08 0 举报
分布式事务 消息
作者其他创作
大纲/内容
虚拟节点
transaction
H1
XMLMapperBuilder解析mq-tx的XML注册到mybatis的Configuration
P4
节点编号:1
业务处理
阶段四:创建返回阿里云Producer代理对象
H4
2
P
:
常态部署节点
P1/P2/P3/P4
消息落库
P2
K2
【0 ~ uint32】
H2
H
消息落库TOPIC_TABLE
1
阶段一:扫描mq-tx注册beanDefinition交由spring管理
consumer
处理BeanProcess实现一
节点编号:2
本地业务处理。。。。。
3
P3
index=(uint32)hash(key)
节点编号:3
投递/可重试
H3
处理BeanProcess实现二
调用
消息落库询标1:通过hash算法对key进行hash,得到uint32的index2:寻找hash环中最近的一个虚拟节点3:寻找虚拟节点映射的后端真实节点节点编号
K4
P1
mq-tx中间插件
TransactionSynchronizationAdapterafterCommit
K3
spring
节点编号
启动类加入@EnableLefitMqTransaction实现了ImportBeanDefinitionRegistrar
producer
H1/H2/H3/H4
异步任务待处理
消费成功
K
扩容增加部署节点
K1
index
减少部署节点
K1/K2/K3/K4
调用阿里云producer真实对象send更新本地库msgId
扫描被spring管理的bean 即被定义为@Compten@Configuration等
mybatis
调用producer的send
消息落库由一致性hash得到最近虚拟index节点对应真实节点打标
Rocket MQ
代理ONS的Producerbean instanceof Producerorbean field instanceof Producer
真实节点
重写mybatis @MapperScan
扫描配置原业务mapper
启动
阶段二:扫描mq-tx的mapper,目的统一交由spring transaction维护
自定义@MapperSacn
消费回执
处理实现ImportBeanDefinitionRegistrar
@Transaction
基于spring无侵入的分布式消息落地织入流程
扫描mq-tx中的mapper
阶段三:扫描mq-tx的mybatis的xml创建阶段二中mapper的代理实现
保证幂等
重写postProcessAfterInitialization方法处理SqlSessionFactoryBean创建阶段二mq-tc中mapper代理实现
服务下线:1:K服务下线选定第一个(策略待定)虚拟节点;延时触发(时间待定)2:寻找hash环中最近一个虚拟节点;3:获取映射虚拟节点对应的真实节点;例如:P节点服务4:P节点服务查询编号3对应的DB数据,创建新队列,启用线程池消费;服务上线:1:ZK通知在线节点服务,处理是否存在非自己节点以外的队列数据,存在即立即停止;2:上线服务获取自己节点对应的节点编号查询DB数据,加入队列线程池消费;
0 条评论
下一页