Seata源码
2021-06-19 14:21:37 94 举报
分布式事务框架Seata源码流程图
作者其他创作
大纲/内容
如果结果集rs不为空
添加根事务管理器
遍历全局事务中所有的分支事务
retryRollbacking.scheduleAtFixedRate()
globalSession.queueToRetryRollback()
BranchUndoLog = parser.decode(rollbackInfo)
BranchStatus = AbstractCore.branchCommit()
ResultSet rs = selectPST.executeQuery()
ExecuteTemplate.execute()
default 回滚失败且重试
lockKeysBuffer.add(lockKey)
ATCore.branchSessionLock()
switch (branchStatus)
rollbackTransaction()
statementCallback.execute()
globalSession.changeStatus(Rollbacking)
if (!branchSession.lock())
spring-cloud-alibaba-seata.jar包中span style=\"font-size: inherit;\
business.execute()
globalSession.canBeCommittedAsync()
context.appendLockKey(lockKey)
遍历所有分支事务,如果当前分支的状态是PhaseOne_Failed,意味着本地事务执行失败时,上报的commitDone为false,则直接跳过continue
logStore.deleteBranchTransactionDO
UndoExecutorFactory.getUndoExecutor(DbType,sqlUndoLog)
开启全局事物
wrapIfNecessary()
if(isRetryTimeout())
DB类型
客户端上报本地事务处理
AbstractRMHandler.doBranchRollback()
数据库处理
DefaultCoordinator.doGlobalBegin()
DefaultCoordinator.init()
注册分支事务
DataSourceManager.branchRollback()
globalSession.removeBranch
根据RowKeys拼接锁表检查sql
执行具体回滚业务
上方法的方法体与上面的一样.区别在于 callable对象
SessionHelper.endCommitFailed(globalSession)
DefaultCore.rollback()
remove(branchSession)
session.addSessionLifecycleListener(SessionManager)
globalSession.end()
AbstractUndoLogManager.flushUndoLogs()
commitDone ? PhaseOne_Done : PhaseOne_Failed
exists = true
postProcessAfterInitialization()后置处理器
继承
clean()
branchSession.setStatus(status)
while (true) { try { return callable.call(); } catch (LockConflictException) { lockRetryController.sleep(lockConflict); } }
true异步提交
globalSession.removeBranch()
修改状态值
DataBaseSessionManager.removeGlobalSession()
BRANCH_UPDATE
删除global_table数据
执行回滚sql
DefaultCore.commit()
PhaseTwo_Rollbacked回滚成功
changeStatus(CommitRetrying)
globalSession.addSessionLifecycleListener(ROOT_SESSION_MANAGER)
doAcquireLocks(unrepeatedLockDOs)
continue;
global.changeStatus(RollbackRetrying)
LockStoreSqls.getCheckLockableSql()
修改状态值status,并更新DB
生成undoLog对象,未保存DB
false
原生JDBC
Before和After是模版方法
commitTransaction
doRetryOnLockConflict(callable)注意⚠️: 该方法实体仅位于ConnectionProxy.LockRetryPolicy类中
try: span style=\"font-size: inherit;\
rs = ps.executeQuery()
更新状态值(DB)
globalSession.queueToRetryCommit()
DefaultCore.globalReport()
DefaultCoordinator.doGlobalCommit()
创建分支事务对象
DataBaseSessionManager.addGlobalSession()
行锁收集
GlobalTransactionalInterceptor.invoke()
afterPropertiesSet()
return false
实现
executeAutoCommitFalse(callable)
AbstractCore.branchReport()
active属性值置为false
syncCall(GlobalRollbackRequest)
执行业务sql
SessionHelper.newBranchByGlobal()
AbstractAutoProxyCreator事务和AOP的顶层抽象父类
handleGlobalTransaction()
Find UNDO LOG
DefaultCoordinator.doBranchRegister()
processGlobalTransactionCommit()
completeTransactionAfterThrowing
report(Boolean)
PreparedStatement原生JDBC
extendsConnectionProxy.LockRetryPolicy@OverrideLockRetryPolicy.execute()
DefaultResourceManager.branchReport()
提交事务
DefaultCoordinator.doGlobalReport()
执行本地事务
提交sql
doRetryOnLockConflict()
失效全局事务,并释放lock
initClient()
if (context.hasUndoLog())DB保存undo_log
生成后置镜像
@Bean
while (rs.next()) { dbXID = rs.getString(\"xid\
connectionProxy.appendLockKey()
globalSession.changeStatus(Committing)
transactionalTemplate.execute()
true
handleAsyncCommitting()
执行
PhaseTwo_CommitFailed_Unretryable提交失败且不重试
初始化客户端
LockStoreDataBaseDAO.acquireLock()
OrderServiceImpl(Proxy)
handleRetryCommitting()
SPI加入事务处理器
回滚客户端undoLog
core.begin()
创建全局事务session对象
继续执行循环
connectionProxy.commit()
connectionProxy.setAutoCommit(false)
Executor.execute()
释放全局事物锁:删除global内存对象中所有分支事务的lock_table表数据
globalSession.changeStatus(CommitFailed)
移除GlobalSession内存对象维护的分支列表中的当前分支对象
AbstractSessionManager.onBegin()
false同步提交
globalSession.closeAndClean()
beforeImage()
transactionManager.rollback(xid)
GlobalTransaction.commit()
注入事务管理器(SPI)
GlobalSession.createGlobalSession()
if (!retrying)
default提交失败且重试
collectRowLocks(branchSession)
客户端注册分支事务
如果bean被子类标识为代理,则使用配置的拦截器创建代理
getUndoRows()
二阶段
globalSession.addBranch(branchSession)
初始化异步提交定时任务
构建回滚sql模板
AbstractCore.branchRegister()
AbstractDMLBaseExecutor.doExecute()
解析,获取分支undoLog
session.begin()
connectionProxy.getAutoCommit()默认自动提交 true
undoPST.executeUpdate()
AbstractLockManager.acquireLock()
构建undo执行器(策略 + 模板)
lifecycleListener.onBranchStatusChange(branchSession)
PhaseTwo_RollbackFailed_Unretryable回滚失败且不重试
一阶段
deleteUndoLog()
服务端全局回滚
globalSession.close()
insertUndoLogWithGlobalFinished()
异常处理
switch (SQLType)策略+模版 -- Executor
buildUndoSQL()
@GlobalTransactionalpublic void create(Order order){ .....}
源码精髓:如何通过一张表lock_table给另外一张表的行记录加锁
PreparedStatementProxy.execute()
tx.rollback()
asyncCommitting.scheduleAtFixedRate()
删除branch_table数据
服务端全局提交
DefaultCoordinator.doGlobalRollback()
初始化资源管理者
如果taskName为空,执行插入,否则,更新
executeAutoCommitTrue()
handleRetryRollbacking()
获取待回滚的行记录数据: update或delete -- Beforeinsert -- After
客服端源码入口
syncCall(GlobalBeginRequest)
赋值
添加根事务处理器
初始化事务管理者
afterImage()
undoExecutor.executeOn(conn)
生成SQL解析器
branchSession.unlock()
changeStatus(AsyncCommitting)
transactionManager.commit()
SessionHelper.endRollbackFailed()
PhaseTwo_Committed提交成功
retryCommitting.scheduleAtFixedRate()
DefaultCore.branchRegister()
DB插入分支事务
globalSession.asyncCommit()
AbstractUndoLogManager.undo()
业务方法调用
遍历所有待重试的全局事务
DataBaseLocker.acquireLock(List<RowLock>)
BranchStatus == PhaseOne_Failed
call() -> doCommit()
branchSessionUnlock(branchSession)
transactionStoreManager.writeSession()
catch:recognizeLockKeyConflictException()
targetConnection.commit()
ConnectionProxy子类:LockRetryPolicy.execute()
默认情况下:以异步定时任务的方式处理重试回滚.这里只是修改特定状态值(DB)
关闭自动提交
删除当前分支事务的lock_table数据
TMClient.init()
GlobalTransactionScanner全局事物扫描器
InitializingBeanSpring生命周期接口
请求分支锁资源
RMClient.init()
transactionManager.begin()
0 条评论
回复 删除
下一页