Seata源码分析
2023-12-26 18:54:13 25 举报
Seata源码分析
作者其他创作
大纲/内容
添加根事务管理器
GlobalTransaction.commit()
PreparedStatementProxy.execute()
UndoExecutorFactory.getUndoExecutor(DbType,sqlUndoLog)
DefaultCoordinator.doGlobalCommit()
删除当前分支事务的lock_table数据
false
DefaultCore.rollback()
更新状态值(DB)
postProcessAfterInitialization()后置处理器
AbstractSessionManager.onBegin()
globalSession.changeStatus(Committing)
globalSession.canBeCommittedAsync()
AbstractLockManager.acquireLock()
exists = true
connectionProxy.commit()
logStore.deleteBranchTransactionDO
connectionProxy.getAutoCommit()默认自动提交 true
构建undo执行器(策略 + 模板)
beforeImage()
executeAutoCommitTrue()
DefaultCoordinator.doGlobalBegin()
true
客户端上报本地事务处理
GlobalTransactionScanner全局事物扫描器
initClient()
初始化资源管理者
数据库处理
AbstractDMLBaseExecutor.doExecute()
syncCall(GlobalBeginRequest)
PreparedStatement原生JDBC
赋值
如果taskName为空,执行插入,否则,更新
retryRollbacking.scheduleAtFixedRate()
DataBaseSessionManager.removeGlobalSession()
业务方法调用
原生JDBC
AbstractCore.branchRegister()
branchSession.setStatus(status)
return false
遍历全局事务中所有的分支事务
实现
客户端注册分支事务
RMClient.init()
rs = ps.executeQuery()
transactionalTemplate.execute()
processGlobalTransactionCommit()
lifecycleListener.onBranchStatusChange(branchSession)
switch (SQLType)策略+模版 -- Executor
DefaultResourceManager.branchReport()
执行
如果结果集rs不为空
wrapIfNecessary()
PhaseTwo_CommitFailed_Unretryable提交失败且不重试
branchSessionUnlock(branchSession)
行锁收集
globalSession.queueToRetryRollback()
默认情况下:以异步定时任务的方式处理重试回滚.这里只是修改特定状态值(DB)
提交sql
PhaseTwo_RollbackFailed_Unretryable回滚失败且不重试
DefaultCoordinator.doGlobalReport()
core.begin()
globalSession.addSessionLifecycleListener(ROOT_SESSION_MANAGER)
释放全局事物锁:删除global内存对象中所有分支事务的lock_table表数据
if(isRetryTimeout())
添加根事务处理器
GlobalTransactionalInterceptor.invoke()
开启全局事物
transactionManager.rollback(xid)
globalSession.end()
report(Boolean)
构建回滚sql模板
修改状态值status,并更新DB
true异步提交
ResultSet rs = selectPST.executeQuery()
connectionProxy.appendLockKey()
二阶段
AbstractRMHandler.doBranchRollback()
客服端源码入口
修改状态值
GlobalSession.createGlobalSession()
关闭自动提交
extendsConnectionProxy.LockRetryPolicy@OverrideLockRetryPolicy.execute()
rollbackTransaction()
服务端全局提交
while (true) { try { return callable.call(); } catch (LockConflictException) { lockRetryController.sleep(lockConflict); } }
AbstractUndoLogManager.flushUndoLogs()
上方法的方法体与上面的一样.区别在于 callable对象
business.execute()
@GlobalTransactionalpublic void create(Order order){ .....}
异常处理
afterPropertiesSet()
DefaultCoordinator.doGlobalRollback()
DefaultCoordinator.init()
继续执行循环
执行回滚sql
transactionManager.commit()
ConnectionProxy子类:LockRetryPolicy.execute()
handleRetryCommitting()
false同步提交
doAcquireLocks(unrepeatedLockDOs)
移除GlobalSession内存对象维护的分支列表中的当前分支对象
根据RowKeys拼接锁表检查sql
afterImage()
DataBaseSessionManager.addGlobalSession()
continue;
globalSession.removeBranch()
completeTransactionAfterThrowing
AbstractAutoProxyCreator事务和AOP的顶层抽象父类
handleGlobalTransaction()
default提交失败且重试
globalSession.asyncCommit()
connectionProxy.setAutoCommit(false)
if (!retrying)
session.addSessionLifecycleListener(SessionManager)
LockStoreSqls.getCheckLockableSql()
handleAsyncCommitting()
default 回滚失败且重试
while (rs.next()) { dbXID = rs.getString(\"xid\
BranchStatus == PhaseOne_Failed
源码精髓:如何通过一张表lock_table给另外一张表的行记录加锁
删除global_table数据
BranchStatus = AbstractCore.branchCommit()
globalSession.closeAndClean()
初始化事务管理者
解析,获取分支undoLog
try: span style=\"font-size: inherit;\
回滚客户端undoLog
生成后置镜像
执行具体回滚业务
if (context.hasUndoLog())DB保存undo_log
DataSourceManager.branchRollback()
lockKeysBuffer.add(lockKey)
retryCommitting.scheduleAtFixedRate()
获取待回滚的行记录数据: update或delete -- Beforeinsert -- After
请求分支锁资源
asyncCommitting.scheduleAtFixedRate()
doRetryOnLockConflict(callable)注意⚠️: 该方法实体仅位于ConnectionProxy.LockRetryPolicy类中
DB类型
session.begin()
InitializingBeanSpring生命周期接口
OrderServiceImpl(Proxy)
undoPST.executeUpdate()
一阶段
初始化异步提交定时任务
switch (branchStatus)
SessionHelper.endRollbackFailed()
DefaultCore.commit()
getUndoRows()
注入事务管理器(SPI)
AbstractCore.branchReport()
deleteUndoLog()
commitTransaction
继承
DB插入分支事务
globalSession.queueToRetryCommit()
遍历所有待重试的全局事务
ExecuteTemplate.execute()
transactionStoreManager.writeSession()
SessionHelper.newBranchByGlobal()
BranchUndoLog = parser.decode(rollbackInfo)
初始化客户端
undoExecutor.executeOn(conn)
执行业务sql
doRetryOnLockConflict()
DataBaseLocker.acquireLock(List<RowLock>)
@Bean
globalSession.close()
失效全局事务,并释放lock
Find UNDO LOG
statementCallback.execute()
if (!branchSession.lock())
tx.rollback()
active属性值置为false
spring-cloud-alibaba-seata.jar包中span style=\"font-size: inherit;\
服务端全局回滚
targetConnection.commit()
提交事务
globalSession.changeStatus(Rollbacking)
执行本地事务
context.appendLockKey(lockKey)
DefaultCore.globalReport()
BRANCH_UPDATE
创建全局事务session对象
remove(branchSession)
insertUndoLogWithGlobalFinished()
PhaseTwo_Committed提交成功
LockStoreDataBaseDAO.acquireLock()
PhaseTwo_Rollbacked回滚成功
TMClient.init()
SPI加入事务处理器
ATCore.branchSessionLock()
collectRowLocks(branchSession)
handleRetryRollbacking()
SessionHelper.endCommitFailed(globalSession)
创建分支事务对象
AbstractUndoLogManager.undo()
globalSession.changeStatus(CommitFailed)
catch:recognizeLockKeyConflictException()
changeStatus(AsyncCommitting)
executeAutoCommitFalse(callable)
生成SQL解析器
DefaultCoordinator.doBranchRegister()
changeStatus(CommitRetrying)
branchSession.unlock()
globalSession.removeBranch
如果bean被子类标识为代理,则使用配置的拦截器创建代理
Before和After是模版方法
生成undoLog对象,未保存DB
clean()
commitDone ? PhaseOne_Done : PhaseOne_Failed
globalSession.addBranch(branchSession)
transactionManager.begin()
global.changeStatus(RollbackRetrying)
Executor.execute()
注册分支事务
删除branch_table数据
DefaultCore.branchRegister()
遍历所有分支事务,如果当前分支的状态是PhaseOne_Failed,意味着本地事务执行失败时,上报的commitDone为false,则直接跳过continue
call() -> doCommit()
buildUndoSQL()
syncCall(GlobalRollbackRequest)
0 条评论
下一页