Seata----源码剖析
2022-03-02 22:29:50 1 举报
seata 二阶段提交,客户端与服务端的交互,以及提交/回滚,获取事务,生成undolog日志等等步骤
作者其他创作
大纲/内容
logStore.queryGlobalTransactionDO(xid);
PhaseTwo_Committed删除事务释放全局锁
@GlobalTransactional拿到注解的值
GlobalSession#changeStatus
提交事务
transactionStoreManager
statementCallback.execute()
获得GlobalSession列表
事务提交
生成前置/后置镜像生成undolog日志
SessionHolder.findGlobalSession(xid);
DataBaseLocker#acquireLock
5.清除
与fengin如何整合xid
DefaultCoordinator
DefaultGlobalTransaction#begin()
生成后置镜像
TccActionInterceptor
GlobalTransactionScanner
AbstractSessionManager#writeSession
connectionProxy.setAutoCommit(true);
核心处理逻辑
每隔一秒就异步提交
ipAddress + \":\" + port + \":\" + tranId;
SeataFeignClient#execute
执行目标方法methodInvocation.proceed();
DataBaseSessionManager#addGlobalSession
RootContext.bind(xid);
session.begin();
DefaultCoordinator#doGlobalBegin
BaseTransactionalExecutor#execute
DefaultTransactionManager#begin
获得XID
分支事务注册发起BranchRegisterRequest 请求 返回BranchId
TmNettyRemotingClient.getInstance().sendSyncRequest(request)
context.isGlobalLockRequire()
finally
executeAutoCommitFalse(args)
GlobalTransactionScanner#wrapIfNecessary
rs = business.execute();
AbstractCore#branchRegister
拿出所有事务,循环
extends
开启/获得全局事务
DefaultCoordinator#handleAsyncCommitting
beforeImage();
DataBaseTransactionStoreManager#readSession()
注册分支事务获取全局锁
globalSession.removeBranch(branchSession);
TCC
ConnectionProxy#doCommit
DefaultTransactionManager#commit
交由事务模板方法执行
DefaultGlobalTransaction();
发起RPC调用
DefaultSqlSession#insert()
ConnectionProxy#processGlobalTransactionCommit
BranchSession#lock
getModifyRequest(request);
RootContext.getXID();
拿不到锁且不是AT模式
1.1 拿到当前的全局事务
DefaultResourceManager#branchRegister
statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
GlobalStatus.Finished
设置自动提交
iDefaultCoordinator#doBranchRegister
如果状态=GlobalStatus.AsyncCommitting
DefaultTransactionManager#syncCall
true
globalSession.getSortedBranches()
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
connectionProxy.getContext().reset();
ConnectionProxy#processLocalCommitWithGlobalLocks
调用拦截器的invoke方法
数据库模型
2.设置全局锁
AbstractAutoProxyCreator
orderMapper.insert(order);
status = GlobalStatus.Begin;
globalSession.removeBranch(branchSession);
SessionHolder.getAsyncCommittingSessionManager() .allSessions();
初始化 transactionStoreManager 对象
绑定xid到上下文,全文传递rpc调用需要传xid
AT
finally清除上下文
创建个分支事务
AbstractSessionManager#onBegin
TransactionalExecutor#execute
PhaseOne_Failed则删除内存的事务
AbstractDMLBaseExecutor#doExecute
DefaultCore#branchRegister
AbstractNettyRemotingClient#sendSyncRequest()
new
NettyRemotingServer
MySQLUndoLogManager#insertUndoLogWithNormal
1、拿到事务信息
lifecycleListener.onStatusChange
logStore.updateGlobalTransactionDO
GlobalTransactionContext.getCurrent();
本地事务提交
DefaultGlobalTransaction#commit
commitTransaction(tx);
logStore.queryBranchTransactionDO(globalTransactionDO.getXid())
PhaseTwo_CommitFailed_Unretryable
把前置镜像和后置镜像封装成SQLUndoLog绑定在上下文
设置全局锁
context.reset();
globalSession.asyncCommit();
返回
//5. clear resumeGlobalLockConfig(previousConfig); triggerAfterCompletion(); cleanUp();
AT 才获取锁
transactionStoreManager = DataBaseTransactionStoreManager.getInstance();
SQL识别器根据DML的类型选择executor
加载持久化方式模型
false
设置为手动提交
context.inGlobalTransaction()
3.执行业务出现异常/回滚
重置上下文
BranchId
初始化协调者
再执行目标client的execute方法
UUIDGenerator.generateUUID()
轮循处理任务
afterImage(beforeImage);
MySQLInsertExecutor
SessionHolder.getRootSessionManager()
初始化任务,开启定时,时间间隔1s
获取数据库类型插入undolog日志
交给定时任务处理
查询全局事务
this.getModifyRequest(request)
全局事务扫描器@Bean
拿到xid,并放在request上传递
targetConnection.commit();
DefaultCoordinator#doGlobalCommit
globalSession == null
DefaultCore#commit
connectionProxy.setAutoCommit(false);
implements
try
开启全局事务/获取XID
LogOperation.GLOBAL_UPDATE
this.status = AsyncCommitting
LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable)
执行业务代码
寻找全局事务
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this)
设置状态为开启
ExecuteTemplate#execute(.)
如果不是发起者,不允许开启全局事务
支持mysqlorcalpostgresql
根据xid查询全局事务
GlobalTransactionalInterceptor#invoke
tx = GlobalTransactionContext.createNew();
AbstractDMLBaseExecutor#executeAutoCommitTrue
GlobalStatus.Committing
try执行自己的业务
true走普通的执行方法
business.getTransactionInfo();
4.提交
AbstractResourceManager#branchRegister
connectionProxy.commit();
rpc调用
获取全局事务Session
ConnectionProxy#register
RootContext.requireGlobalLock()
异常重试 5次
本地事务提交逻辑
事务持久化核心管理器接口
AbstractSessionManager
获取全局锁
获取数据库方式插入数据库
logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session)
PreparedStatementProxy#execute
AbstractNettyRemotingServer#sendSyncRequest()
2.开启全局事务
ioAbstractCore#branchCommit
GlobalTransactionalInterceptor
mysql
.DefaultCoordinator#init
SeataLoadBalancerFeignClient#execute
全局锁,保证唯一性
进行增强PreparedStatement.execute(sql)
TransactionStoreManager
查询分支事务
branchStatus
添加数据库执行器
AbstractUndoLogManager#flushUndoLogs
SessionHelper.newBranchByGlobal();
new StringBuilder().append(resourceId).append(LOCK_SPLIT).append(tableName).append(LOCK_SPLIT).append(pk) .toString();
netty发起调用初始化TM客户端
MethodInterceptor
分支事务的注册
ATCore#branchSessionLock
AT模式下默认是异步提交,正常情况返回commiting
AbstractLocker#convertToLockDO
TransactionalTemplate#execute
一次100条
如果全局事务为空,则创建一个新的
targetConnection.commit();
statementProxy.getConnectionProxy().bind(xid);
获取xid 实际上发起远程调用
生成前置镜像
AbstractLockManager#acquireLock
DataBaseSessionManager#init
支持的数据库类型
0 条评论
回复 删除
下一页