02_事务
2024-05-23 21:44:45 0 举报
从零开始剖析分布式事务
作者其他创作
大纲/内容
在response中设置header事务信息
#prepare
5.2、发送到MQ
Consistency一致性
4、返回失败,或调用失败
JTA事务管理器JtaTransactionManager#doGetTransaction()
TransactionInfo事务状态 和 属性
触发所有Participant(子事务)
StackCompositeTransaction1CompositeTransaction2CompositeTransaction3
失败
addParticipant ( XAResourceTransaction: 3139322E3136382E312E3130362E746D30303030313030303033:3139322E3136382E312E3130362E746D31 ) for transaction 192.168.1.106.tm0000100003
监听消息
获取连接AbstractDataSourceBean#getConnection()
重写负载均衡算法同一个IP只能打到同一台远程服务器上ILoadBanlancer(ribbon)
执行业务代码之前 织入开启事务的逻辑
外层方法A的事务报错,方法B的事务也会回滚。方法B报错,只回滚自己的,不会影响外层的。
设置Druid分布式数据源数据源atomikosDataSource.setXaDataSource(DruidXADataSource)
调用父类
事务属性TransactionAttribute
定时清理,保持连接池健康ConnectionPool#launchMaintenanceTimer
1、创建pre消息
通过我们的@Compensable注解定义的cancelKey获取到cancel的bean,并执行cancel方法
PROPAGATION_MANDATORY
Atomikos封装数据源AbstractDataSourceBean
Mybatis执行prepareSimpleExecutor#prepareStatement
SpringCloudCoordinator#invoke#invokePostCoordinator获取RestTemplate,拼装远程地址并调用
通过上面创建的DruidPooledXAConnection#getConnection
父类
XAResourceTransaction
维护分布式事务对象
PROPAGATION_NEVER
TransactionInterceptor事务拦截器
执行真正的2PC事务提交AbstractPlatformTransactionManager#processCommit
待发送定时器1、定时扫描消息DB中,状态为待发送的消息,2、如果消息超过5分钟状态没变,就去上游服务查询相关业务,3、决定此消息是重新投递还是删除
MQ
commit()
成功
请求最终发送至标记了@feignClient的远程接口
PROPAGATION_REQUIRED默认
创建事务栈TransactionServiceImp#createCompositeTransaction()
rollback()
要么全成功,要么全失败
初始化ConnectionPool#addConnectionsIfMinPoolSizeNotReached
真正的Atomikos数据源AtomikosDataSourceBean
从headers中获取事务信息,解析出TransactionContext事务上下文
提交mysql事务
每隔1分钟不断执行TransactionRecovery#timingRecover
创建JtaTransactionObject
返回
事务管理器PlatformTransactionManager
N
事务之间不能互相影响(MVCC)
事务管理器父类AbstractPlatformTransactionManager#getTransaction()
一个事务执行完再执行另一个事务,不能并行执行事务。解决了【幻读】的问题,但是效率大大降低。一般不采用
将事务ID发送至事务参与者
Read Uncommitted读未提交
方法业务代码@transaction{ insert(100); }
7.1 通知可靠消息服务,消息已消费消息状态:已完成
Atomic原子性
true
事务的基本要素ACID
StackCompositeTransaction1CompositeTransaction2
10、根据业务表示,查询这条消息对应的业务对象是否执行成功
HTTP请求 服务B
执行业务代码过程中抛异常 织入回滚rollback事务的逻辑
构建XAResourceTransaction
3 返回消息唯一编码
获取连接DruidXADataSource#getXAConnection()
4 抛出异常
一启动就执行恢复TransactionRecovery#startRecovery
我们的类FinanceService.class
将所有prepareStatement放入集合List<Statement> statements
注入
你管好你自己反正我是什么都不管
ResourceAdapterImpl启动定时任务(在配置文件中实例化)
3、返回成功
commitTransactionAfterReturning
保护性和不变性,多账户之间转账,最后不能多出50或者少了100
获取当前这个数据源的事务,加入分布式事务中SiblingMapper#findOrCreateBranchForTransaction
AbstractPlatformTransactionManager#commit
数据源AtomikosDataSourceBean
3、处理本地事务
CompensableTransactionImpl#fireNativeParticipantCancel执行cancel
JDBC事务回滚java.sql.Connection#rollback()
获取JDBC连接java.sql.DataSource#getConnection()
如果某个参与者中try阶段失败,就执行所有参与者的cancel。如果所有参与者try阶段都成功了,但是在执行confirm的时候失败了,就执行所有参与者的confirm
AtomikosDataSourceBean
库存DB无货
执行本地和远程的事务CompensableTransactionImpl#fireNativeParticipantConfirm和#fireRemoteParticipantConfirm
TransactionManager.begin()启动tcc事务
上游服务A
AtomikosConnectionProxy#newInstance实现InvocationHandler的invoke
方法A开启事务,调用方法B,方法B加入事务,如果A没开事务,B也不开事务
切点(就是切到@transaction注解上)joinpointIdentification
B举报A开外挂(事务)
PREPARE成功
插入
初始化过程和参与者一样,唯一不一样是发起者调用远程服务会加@feignClient注解,所以会让CompensableFeignBeanPostProcessor扫描到进行处理
正常/异常
ConnectionPool#findExistingOpenConnectionForCallingThread
开始资源XAResource.end发送XA_END
AtomikosConnectionProxy#enlist
PROPAGATION_NOT_SUPPORTED
执行mapper对应的SQL
CompensableHandlerInterceptor检测request headers中是否包含TCC事务的context,如果有,则让当前事务加入到TCC事务中
MySQL 实现Read Repeatable(MVCC)Multi-Version Concurrency Control
后台线程CleanupWork清理事务日志及表临时数据
原创出品,禁止商用
创建atomikos connection proxy for com.alibaba.druid.proxy.jdbc.ConnectionProxyImpl@694fc02c
DataSourceTransactionObject
远程cancel
CompensableTransactionImpl#fireRemoteParticipantCancel循环调用resourceList的cancel接口resourceList是分布式事务的参与者
后台线程SampleTransactionLogger日志服务
2、调用
初始化事务
11.1、修改状态:已发送
CompensableCoordinatorControllerbytetcc提供的try,confirm,cancel接口
Read Repeatable可重复读(默认)
atomikos分布式事务全流程精简
PROPAGATION_SUPPORTS
设置手动提交事务setAutoCommit(false)
校验@Compensable注解类是否都有@Transaction注解,是否都有confirm和cancel接口方法
结束
7.2、手动ACK
LocalXADataSource 通过封装过的数据源获取connection 执行SQL
问题:只针对其他事务update的数据,保证当前事务不受影响。但是会受其他事务insert的影响。也就是会发生【幻读】
回滚本地事务,并抛出异常,调用者的TransactionInterceptor此时会发现分布式事务的其中一个参与者发生异常
下游业务
开始资源XAResource.start发送XA_START
CompensableFeignHandler封装了feign的相关功能
获取事务ID192.168.1.103.tm0000100001
清理事务资源cleanupTransactionInfo
tcc事务中的一个参与者启动
执行是否报错
MySQL
5 回滚本地事务
订单DB下单成功
调用业务方法mybatis的sqlsessionfactory通过connect执行SQL
TransactionManagerImp#commit
8、定时扫描状态为:待发送修改时间5分钟前
org.apache.ibatis.executor.statement.StatementHandler#prepare()来执行prepareStatement()
构建AtomikosXAConnectionFactory
XAResourceTransaction#suspend
创建事务ID <= 当前事务ID < 删除事务ID
Connection.getAutoCommit()
切点joinpointIdentification
CompensableFeignBeanPostProcessor扫描到feign的反射类,feign.ReflectiveFeign$FeignInvocationHandler,然后将其set到CompensableFeignHandler(自己封装的invokehandler)中
最大努力通知服务
6 回滚远程事务
Isolation隔离性
Atomikos数据源HeuristicDataSource
事务拦截器TransactionInterceptor#invoke
一个事务内对一个数据两次读,可能会读到不一样的值.因为这个数据被另一个事务修改并提交了。
ManagedConnectionFactoryPostProcessor构造数据源动态代理对象代理 LocalXADataSource
Atomikos想要实现分布式事务,必须在执行SQL之前通过XA_START指令告诉Resouce开启事务,然后定义好MyBatis中定义SQL,然后执行XA_END指令。这个过程必然需要Atomikos接管Connection,对其进行封装。因为在mybatis执行sql的时候需要获取sqlSessionFactory,sqlSessionFactory会去找connectionPool中的DataSource,在然后通过DataSource拿到Connection,基于Connection通过preparedStatment执行SQL。最后在提交事务的时候,JtaTransactionManager先去发送XA_PREPARE指令,然后再执行XA_COMMIT或XA_ROLLBACK指令
byteTCC初始化
尝试执行prepare()
你是你的我是我的
看你眼色你有事务我就加入你没事务我也没有
服务B
通知DB消息内容,最大重试次数,最后重试时间,重试方案,重试间隔,重试次数,状态
XAResource.commit发送XA_ROLLBACK
DataSourceTransactionManager#doCommit
CompensableAnnotationValidator扫描@Compensable注解的bean,并放入内存map中
上游业务DB
CompensableMethodInterceptor解析目标controller的@Compensable接口和其他信息
init
构建JdbcTransactionalResource
动态代理的类FinanceService$proxy1.class
获取连接ConnectionPool#borrowConnection
TransactionAspectSupport#invokeWithinTransaction
初始化AtomikosDataSourceBean#doInit
消息DB
我要存钱100块
CompensableContextPostProcessor对实现了CompensableContextAware的类进行处理
JtaTransactionManager
Durability持久性
放入MapBaseTransactionManager#setThreadMappings()
TransactionRepository内存中保持一些事务相关对象
atomikos分布式事务初始(1)
CompositeTransaction
创建CompositeTransaction分布式事务
TransactionManagerImp获取当前线程的事务栈BaseTransactionManager#getCurrentTx()
AtomikosConnectionProxy创建connection代理对象(jdk动态代理)
执行END
createTransactionIfNecessary
Spring事务拦截器TransactionInterceptor#invoke
创建子事务(atomikos提供)XAResourceTransaction
继承
创建Tcc事务对象CompensableTransaction
2PC第一阶段
执行原生数据库方法StatementImpl#execute()
将子事务加入CompositeTransaction分布式事务addParticipant
12、重投消息
byteTCC原理
JtaTransactionManager#doBegin()
7、(可选)通知服务也可提供反查接口,下游业务可以根据业务凭证号,去查询未能处理的业务
打印日志:createCompositeTransaction ( 10000 ): created new ROOT transaction with id 192.168.1.103.tm0000100001
1 开启事务已经开启了tcc事务
CompensableHandlerInterceptor发起者这个类因为没有发现header中的事务信息,就什么都不做
2 消息持久化状态:待发送
id name 创建事务id 删除事务id--------------------------------1 张三 118 空2 李四 117 1222 小李四 122 空
Read Committed读已提交(不可重复读)
MysqlXAConnection#start#dispatchCommand();#stmt.execute(command)
JtaTransactionManager#doJtaBegin()
CompensableRequestInterceptorbyteTcc的feign拦截器,将事务 Context 加入请求头中
事务状态TransactionStatus
prepareTransactionInfo()
AtomikosConnectionProxy#invoke实际close()
Atomikos执行分布式事务(3)
执行提交TransactionAspectSupport#commitTransactionAfterReturning
开始资源XAResource.prepare发送XA_PREPARE
byteTCC事务恢复
3 提交/回滚事务
MySQL隔离级别
HTTP请求 服务A
commit
钱包扣款 √
11.2、重新投递到MQ
Atomikos接管Connection(2)
completeTransactionAfterThrowing
本地事务流程
我很矜持你没事务我就报警
创建连接池代理AtomikosXAPooledConnection#doCreateConnectionProxy
Tcc事务发起者
执行是否正常
PROPAGATION_REQUIRES_NEW
Serializable串行化
AbstractPlatformTransactionManager#getTransaction
创建事务createTransactionIfNecessary
getAutoCommit()
createCompositeTransaction ( 10000 ): created new ROOT transaction with id 192.168.1.106.tm0000100003
每行数据的最后加两个隐藏列创建事务ID、删除事务ID。mysql自己维护的自增的,全局唯一
下游业务DB
执行commit
执行我们的业务方法InvocationCallback#proceedWithInvocation
触发所有Participant(子事务)的prepare
发送XA_PREPARE指令MysqlXAConnection
否
业务执行成功
买一台P30
注入DruidXADataSource
执行业务代码之后 织入提交事务的逻辑
AbstractDataSourceBean#getConnection()
创建连接池XPooledConnection#createConnectionProxy
场景:1、事务119 start,执行业务操作的时候2、事务122把李四删除了,并修改id=2的name为 小李四。3、事务119查询所有记录的时候,可以查到张三和李四,但是查不到小李四4、因为 (创建事务id 118和117) <=119 < 删除事务id 122
是
4、确认发送/删除消息
AbstractPlatformTransactionManager#processRollback
监听
5、将消息和规则持久化状态:未成功
定时器
发送XA_END指令
Thread
1、开启事务
事务A修改了数据,但没提交,但是事务B已经可以读取到被修改的数据了
钱包DB扣款成功
后台线程CompensableWork 1) 系统启动恢复未完成事务2) 定时不断推进未完成事务
Spring事务传播机制
服务A
AbstractPlatformTransactionManager#rollback
数据源事务管理器打开事务DataSourceTransactionManager#doGetTransaction
方法A不能开事务,否则调用方法B,B直接报错
Mysql MVCC
开启本地事务
开启mysql事务
XAResource
加入分布式事务CompositeTransactionImp#addParticipant
方法A开启事务,方法B开启嵌套事务
DataSourceTransactionObject#getConnectionHolder
仓储服务扣减库存 ×
调用使用过的数据库连接的close()AbstractPlatformTransactionManager#triggerBeforeCompletion
可靠消息最终一致性方案
调用远程cancel接口
1、执行connect的close方法
张三
执行目标业务方法(调用远程接口)
方法A开事务,调用方法B,B会自己开启自己新的事务
AbstractPlatformTransactionManager#processCommit
2、AOP环绕方式织入事务
订单下单 √
方法A必须开事务,否则调方法B就报错
方法A开启事务,调用方法B,方法B有事务,就加入到方法A 的事务中,共用这个事务
5.1、更新消息状态:已发送
SpringManagedTransaction#openConnection
正常
创建分布式事务BaseTransactionManager#createCompositeTransaction()
最大努力通知方案
AtomikosConnectionProxy#newInstance实现InvocationHandler的invoke这样所有的方法调用都走invoke方法
数据库连接池DataSource
将当前线程和创建出来的CompositeTransaction放入一个map中,之后同一个线程横跨多个数据源执行事务的时候,就可以通过这个map获取到对应的分布式事务对象了。
封装后的连接池AtomikosXAPooledConnection
DataSourceTransactionManager#doRollback
autoCommit(false)
1、发送消息,附带重试规则
3、监控整个过程并处理
PROPAGATION_NESTED
一个事务从start开始到commit结束,这个过程中,读同一个数据是一样的。
开启事务DataSourceTransactionManager#doBegin
6.1 获取消息6.2 幂等处理6.3 执行业务及事务
远程confirm
放入连接池集合List<XPooledConnection> connections
提交分布式事务JtaTransactionManager#doCommit
创建连接池AtomikosXAConnectionFactory#createPooledConnection
方法A开事务,调用方法B,B会用非事务方式执行。
bytecc提供的统一处理try,confirm,cancel请求的controllerCompensableCoordinatorController#rollback()
我们定义的数据源spring与Atomikos整合的数据源beanAtomikosDataSourceBean
TransactionManagertcc自己的事务管理器
CompensableManagerImpl#compensableBegin()开启tcc分布式事务创建分布式事务所需关键对象CompensableTransactionImpl、TransactionXid、TransactionContext
XAResource.commit发送XA_COMMIT
本地事务回滚
*** 终止分布式事务CoordinatorImp#terminate
6、定时扫描状态为“未成功”的消息,根据重试方案和规则,重新调用下游业务。超过规则限定后,将状态改为:Dead
X
下游服务B
创建分布式事务TransactionServiceImp#createCT()
问题:发生【脏读】
数据库连接池初始化AtomikosXAPooledConnection
注入Connection
JDBC事务提交java.sql.Connection#commit()
11.3、修改状态:移除。或直接删除消息
执行prepareStatment的SQL
合二为一
可靠消息服务
保存到数据库中,不会被回滚掉
spring事务7个传播机制
很明显是个AOP的组件TransactionAspectSupport#invokeWithinTransaction
已发送定时器1、定时扫描消息DB中,状态为已发送的消息,2、如果消息超过5分钟状态没变,就重新投递到MQ
2、开始doCommit阶段
9、找到满足条件的记录
2 执行业务方法
开启JTA事务
开始资源XAResource.startXAResourceTransaction#resume
0 条评论
回复 删除
下一页