Seata源码分析
2025-02-26 14:12:50 1 举报
Seata是一款开源的分布式事务解决方案,提供高性能和易用性的分布式事务服务。核心在于全局事务管理、分支事务处理和数据一致性保障。
作者其他创作
大纲/内容
response.setGlobalStatus(core.commit(request.getXid()))
每隔1秒执行rollback
设置全局事务为begin状态将xid保存到线程上下文
2.扣减库存
AT模式默认异步提交,正常情况下最终返回GlobalStatus.Committed
ConnectionProxy
DataBaseLocker#acquireLock
AbstractAutoProxyCreator
LogOperation.GLOBAL_UPDATE
GlobalTransactionScanner
AbstractUndoLogManager#undo
LogStore#updateGlobalTransactionDO
根据xid查询分支事务信息
轮询处理AsyncCommitting任务
SessionHolder.findGlobalSession(xid)
status = GlobalStatus.Begin; RootContext.bind(xid);
getLocker(branchSession).releaseLock(locks)
返回BranchStatus
DB模式,删除lock_table表锁记录
注册分支事务
AbstractSessionManager#onStatusChange
orderDao.saveOrder(order)
TransactionalTemplate#execute
targetConnection.commit()
globalSession.removeBranch(branchSession)
SessionManager#allSessions
DefaultTransactionManager#commit(xid)
设置分支session状态是已注册
AbstractRMHandler#doBranchRollback
AbstractResourceManager#branchRegister
PreparedStatementProxy
DataBaseTransactionStoreManager#writeSession
session.begin()
删除分支事务信息释放全局锁
syncCall(request)
Async
查询branch_table
数据库本地事务提交
AbstractLockManager#releaseLock
前置镜像和后置镜像生成undolog
发起GlobalRollbackRequest,调用seata-server服务
GlobalStatus返回TimeoutRollbackFailed或RollbackFailed
业务方法调用先进入invoke
ConnectionProxy#register
PhaseTwo_Committed
删除undo_log表中记录
1.遍历BranchSession列表
1.分支事务回滚逻辑,遍历分支事务
false
TM
获取RPC调用返回的全局事务idxid结构: ipAddress + \":\" + port + \":\" + tranId192.168.50.80:8091:91933590230990848
DefaultCoordinator#doGlobalBegin
globalSession == null
globalSession.changeStatus(GlobalStatus.Rollbacking)
返回的状态
AbstractSessionManager#onAddBranch
globalSession == null
删除global_table记录
SessionHelper.newBranchByGlobal
PhaseTwo_RollbackFailed_Unretryable
globalSession.isSaga()
AbstractUndoLogManager#deleteUndoLog
删除branch_table的信息
seata-server服务TC全局事务回滚
status = transactionManager.rollback(xid)
LockManager
LogStoreDataBaseDAO#deleteGlobalTransactionDO
LogStoreDataBaseDAO#queryGlobalTransactionDO
PhaseTwo_CommitFailed_Unretryable支持异步提交continue,交给定时线程池处理
commit后返回GlobalStatus
extends
AbstractCore#branchCommit
删除分支事务信息
@Bean
syncCall(globalRollback)
初始化RMClient
Connection
GlobalStatus.Finished
AsyncWorker#doBranchCommits
seata-server服务TC注册分支事务
向RM发起BranchCommitResponse请求,返回BranchStatus
targetConnection.setAutoCommit(false)
implements
seata-server服务端TC提交全局事务
AOP逻辑,初始化bean的后置处理中调用此方法,添加interceptor
@GlobalLock+select for update 可以实现全局事务的读隔离
seata-server服务端TC开始全局事务
DB模式,从数据库获取GlobalSession
获取RPC调用返回的全局事务状态
executeAutoCommitFalse(args)
tx.commit()
response.setXid(DefaultCore#begin)
Core
根据xid查询全局事务信息
DB模式
默认限制每次取100条
branchSession.unlock()
插入到undo_log表
本地事务提交逻辑
@GlobalLock
释放锁
globalSession.getStatus() == GlobalStatus.Begin
DataSourceManager#branchRollback
分支事务注册,发起BranchRegisterRequest请求,返回分支BranchId
源码入口seata-spring-boot-starter-1.4.0.jar!/META-INF/spring.factories
branchSession.setStatus(BranchStatus.Registered)
使用@GlobalTransactional 或者@GlobalLock
收集行锁
DefaultCoordinator#doGlobalRollback
AbstractUndoLogManager#flushUndoLogs
DELETE FROM undo_log WHERE branch_id = ? AND xid = ?
发起GlobalBeginRequest,调用seata-server服务获取全局事务id
Locker
MySQLUndoLogManager#insertUndoLog
查询global_table
finally
获取GlobalSession
PhaseTwo_Rollbacked
DefaultCore#rollback
提交分支事务
RPC调用
GlobalTransaction
lockStore.acquireLock(convertToLockDO(locks))
GlobalTransactionalInterceptor#invoke
源码入口spring-cloud-alibaba-seata-2.1.0.RELEASE.jar!\\META-INF\\spring.factories
SeataAutoConfiguration
保存到lock_table表
LogOperation.BRANCH_REMOVE
collectRowLocks(branchSession)
执行Saga模式的回滚逻辑
processLocalCommitWithGlobalLocks()
AbstractCore#branchCommitSend
前置镜像
保存undolog
RM定时监听每隔1s执行
LockStore
RM
2.删除全局事务
LockStoreDataBaseDAO#unLock
DataSource
DefaultCoordinator#doBranchRegister
LogStoreDataBaseDAO#insertGlobalTransactionDO
获取全局事务id
DefaultTransactionManager#begin
tx.rollback()
存储GlobalSession
LogStoreDataBaseDAO#deleteBranchTransactionDO
分支事务回滚,返回BranchStatus
GlobalTransactionScanner#wrapIfNecessary
Bean初始化逻辑
2.删除全局事务信息
DataSourceProxy
删除undo_log记录
异常重试,默认5次
LogOperation.BRANCH_ADD
PreparedStatement
释放全局锁
response.setGlobalStatus(core.rollback(request.getXid()))
生成前置后置镜像,执行业务sql
removeGlobalSession(globalSession)
DefaultTransactionManager#rollback
logStore.queryGlobalTransactionDO(xid)
TransactionStoreManager
beforeImage()
branch_table表中插入分支事务信息
获取回滚状态,出现异常会进行重试,默认5次
AbstractUndoLogManager#batchDeleteUndoLog
SessionHelper.endRollbacked(globalSession)
statementCallback.execute
AbstractDMLBaseExecutor#executeAutoCommitTrue
DefaultCoordinator#handleAsyncCommitting
AT模式
设置本地事务为手动提交
targetConnection.setAutoCommit(true)
DefaultCoordinator#init
LogOperation.GLOBAL_ADD
用户下单
开启全局事务
发送BranchRollbackRequest请求
发生异常
角色:GlobalTransactionRole.Launcher
context.inGlobalTransaction()
DefaultCore#commit
DefaultCoordinator#doGlobalCommit
txInfo != null && txInfo.rollbackOn(originalException)
4.更新订单状态
branchSessionUnlock(branchSession)
TransactionManager
SQL识别器会根据DML的类型选择executor
事务存储管理器接口
每隔一秒执行异步提交
PreparedStatementProxy#execute
response.getGlobalStatus()
GlobalTransactionAutoConfiguration
根据xid查global_table,获取全局事务信息
logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session))
DataBaseTransactionStoreManager#readSession
获取GlobalSession列表
context.isGlobalLockRequire()
handleRetryRollbacking()
response.setXid 设置全局事务id
生成全局session
DefaultSqlSession#insert
GlobalSession globalSession = SessionHolder.findGlobalSession(xid)
RPC请求
处理RetryRollbacking
注册分支事务,返回BranchId
processGlobalTransactionCommit()
后置镜像
implements
DataBaseSessionManager#findGlobalSessions
执行业务逻辑
更新状态为GlobalStatus.AsyncCommitting
如果GlobalStatus为AsyncCommitting
ResourceManager
response.getXid()
响应事务状态
创建BranchSession
deletePST.executeUpdate()
globalSession.asyncCommit()
GlobalTransactionalInterceptor
MethodInterceptor
当前分支事务的状态为BranchStatus.PhaseOne_Failed
prepareStatement(sql)
ConnectionProxy#doCommit
提交全局事务
缓存undolog
LogStoreDataBaseDAO#queryBranchTransactionDO
UndoLogManager
execute()
LockStoreDataBaseDAO#doAcquireLocks
异常回滚
初始化retryRollbacking定时任务线程池,默认间隔1s
InitializingBean
db模式从数据库获取GlobalSession信息
获取全局锁
SagaCore#doGlobalRollback
LogStoreDataBaseDAO#insertBranchTransactionDO
afterImage(beforeImage)
AbstractLockManager#acquireLock
分支事务回滚
connectionProxy.commit()
向全局session中添加分支session
Resource
BranchStatus.PhaseTwo_Rollbacked
3.扣减余额
this.changeStatus(GlobalStatus.AsyncCommitting)
SessionHelper.endCommitted(globalSession)
全局事务提交,发起GlobalCommitRequest请求返回GlobalStatus
GlobalStatus.Committed
分支事务的注册逻辑
删除全局事务信息
AbstractCore#branchRollback
core.branchRegister
TccActionInterceptor
GlobalTransactionScanner#afterPropertiesSet
执行业务sql
获取锁
回滚
全局事务管理器接口
logStore.queryBranchTransactionDO(globalTransactionDO.getXid())
DataBaseSessionManager#addGlobalSession
connectionProxy.setAutoCommit(true)
http://localhost:8081/order/placeOrder
span style=\"font-size: inherit;\
AsyncWorker#init
删除global_table的信息
OrderServiceImpl#placeOrder
删除undo_log
初始化TMClient
@GlobalTransactional
初始化asyncCommitting定时任务线程池,默认间隔1s
true
SessionHelper.endRollbackFailed(globalSession)
globalSession.addBranch(branchSession)
MySQLInsertExecutor#execute
1.保存订单
Tcc模式
sqlUndoItemsBuffer.add(sqlUndoLog)
connectionProxy.setAutoCommit(false)
getConnection()
syncCall(globalCommit)
branchSession.lock()

收藏

收藏
0 条评论
下一页