分布式事务
2021-03-04 23:20:03 9 举报
AI智能生成
分布式事务
作者其他创作
大纲/内容
spring事务的源码初探
bytetcc框架的源码架构
bytetcc框架的源码架构
事务的ACID
Atomic:原子性,就是一堆SQL,要么一起成功,要么都别执行,不允许某个SQL成功了,某个SQL失败了,这就是扯淡,不是原子性
Consistency:一致性,这个是针对数据一致性来说的,就是一组SQL执行之前,数据必须是准确的,执行之后,数据也必须是准确的。别搞了半天,执行完了SQL,结果SQL对应的数据修改没给你执行,那不是坑爹么
隔离性,这个就是说多个事务在跑的时候不能互相干扰,别事务A操作个数据,弄到一半儿还没弄好呢,结果事务B来改了这个数据,导致事务A的操作出错了,那不就搞笑了
Durability:持久性,事务成功了,就必须永久对数据的修改是有效的,别过了一会儿数据自己没了,不见了,那就好玩儿了
事务隔离级别
读未提交,Read Uncommitted:
某个事务还没提交的时候,修改的数据,就让别的事务给读到了
读已提交,Read Committed(不可重复读)
事务A在跑的时候, 先查询了一个数据是值1,然后过了段时间,事务B把那个数据给修改了一下还提交了,此时事务A再次查询这个数据就成了值2了
可重复读,Read Repeatable
事务A在执行过程中,对某个数据的值,无论读多少次都是值1,哪怕这个过程中事务B修改了数据的值还提交了
串行化:幻读
事务把所有行的某个字段都修改为了2,结果另外一个事务插入了一条数据,那个字段的值是1,然后就尴尬了。第一个事务会突然发现多出来一条数据,那个数据的字段是1。
MySQL的默认隔离级别是Read Repeatable
MySQL是通过MVCC机制来实现的,就是多版本并发控制,multi-version concurrency control。
MVCC:
innodb存储引擎,会在每行数据的最后加两个隐藏列,一个保存行的创建时间,一个保存行的删除时间,但是这儿存放的不是时间,而是事务id,事务id是mysql自己维护的自增的,全局唯一
在一个事务内查询的时候,mysql只会查询创建时间的事务id小于等于当前事务id的行
子主题
MVCC机制:多版本并发控制机制
undo log多版本链条+ReadView机制
ReadView:基于undo log版本链条实现的一套试图机制,ReadView一旦生成就不会改变
RC隔离级别:的关键点在于,事务里每次查询都生成新的ReadView
RR隔离级别:的关键点在于,事务里每次查询都是同一个ReadView
spring的事务
spring支持编程式事务,和声明式事务。编程式事务就是用个事务类TransactionTemplate来管理事务,这个一般现在没人傻到干这个事儿了;声明式事务分成在xml里配置个AOP来声明个切面加事务,一般现在也没人傻到干这个了;大部分情况下,都是用@Transactional注解。
阿里编码规范,一般建议加在方法级别,就是要事务的方法就加事务,不要事务的方法就别加事务
事务的传播行为propagation
这个事务的传播机制,就是说,一个加了@Transactional的事务方法,和嵌套了另外一个@Transactional的事务方法的时候,包括再次嵌入@Transactional事务方法的时候,这个事务怎么玩儿
public class ServiceA {
@Autowired
private ServiceB b;
@Transactional
public void methodA() {
// 一坨数据库操作
for(int i = 0; i < 51; i++) {
try {
b.methodB();
} catch(Exception e) {
// 打印异常日志
}
}
// 一坨数据库操作
}
}
@Autowired
private ServiceB b;
@Transactional
public void methodA() {
// 一坨数据库操作
for(int i = 0; i < 51; i++) {
try {
b.methodB();
} catch(Exception e) {
// 打印异常日志
}
}
// 一坨数据库操作
}
}
public class ServiceB {
@Transactional(propagation = PROPAGATION_REQUIRES_NEW)
public void methodB() throws Exception {
// 一坨数据库操作
}
}
@Transactional(propagation = PROPAGATION_REQUIRES_NEW)
public void methodB() throws Exception {
// 一坨数据库操作
}
}
(1)PROPAGATION_REQUIRED:这个是最常见的,就是说,如果ServiceA.method调用了ServiceB.method,如果ServiceA.method开启了事务,然后ServiceB.method也声明了事务,那么ServiceB.method不会开启独立事务,而是将自己的操作放在ServiceA.method的事务中来执行,ServiceA和ServiceB任何一个报错都会导致整个事务回滚。这就是默认的行为,其实一般我们都是需要这样子的。
(2)PROPAGATION_SUPPORTS:如果ServiceA.method开了事务,那么ServiceB就将自己加入ServiceA中来运行,如果ServiceA.method没有开事务,那么ServiceB自己也不开事务
(3)PROPAGATION_MANDATORY:必须被一个开启了事务的方法来调用自己,否则报错
(4)PROPAGATION_REQUIRES_NEW:ServiceB.method强制性自己开启一个新的事务,然后ServiceA.method的事务会卡住,等ServiceB事务完了自己再继续。这就是影响的回滚了,如果ServiceA报错了,ServiceB是不会受到影响的,ServiceB报错了,ServiceA也可以选择性的回滚或者是提交。
(5)PROPAGATION_NOT_SUPPORTED:就是ServiceB.method不支持事务,ServiceA的事务执行到ServiceB那儿,就挂起,ServiceB用非事务方式运行结束,ServiceA事务再继续运行。这个好处就是ServiceB代码报错不会让ServiceA回滚
(6)PROPAGATION_NEVER:不能被一个事务来调用,ServiceA.method开事务了,但是调用了ServiceB会报错
(7)PROPAGATION_NESTED:开启嵌套事务,ServiceB开启一个子事务,如果回滚的话,那么ServiceB就回滚到开启子事务的这个save point
一般在单块系统开发,多人协作的时候比较常见,就是小A调用小B的模块,小A不管小B是成功还是不成功,自己都要提交,这个时候可以这么弄,就是说小B的操作不是构成小A的事务的重要组成部分,就是个分支
spring事务相关源码
spring-tx-4.3.13.RELEASE.jar,spring-tx这个项目是spring对事务的支持核心的逻辑
org.springframework.transaction.interceptor,直接看这个包
TransactioanIntercepor,这个类里是事务执行的核心的入口
TransactionIntercepor其实就是如果我们给我们的service组件加了@Transactional注解之后,就会对我们的这个service组件里的方法在调用的时候,就会先走这个TransactionoIntercepor的拦截
TransactionoIntercepor事务拦截器
拦截器里,就可以给我们先打开事务,然后再执行我们的finishRefillData()方法,接着根据方法的执行结果,报错就回滚事务,没报错就提交事务
TransactionIntercepor.invoke()方法
调用了invokeWithinTransaction()方法,核心的事务控制的逻辑都在invokeWithinTransaction()方法中,invokeWithinTransaction()方法其实是父类TransactionAspectSupport(Aspect这个名词就可以看出来了,这个东西一定是跟Spring AOP机制是有关系的,Aspect切面的意思,就是spring AOP的核心概念)
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);这是核心中的核心,如果你使用了@Transactional注解之后,意思就是要给这个方法开启事务,此时就会给你开启一个事务,创建事务
retVal = invocation.proceedWithInvocation();
这行代码,其实相当于是去调用你的那个RefillDataCenterService.finishRefillData()方法
这行代码,其实相当于是去调用你的那个RefillDataCenterService.finishRefillData()方法
invocation.proceedWithInvocation的catch里面completeTransactionAfterThrowing(txInfo, ex);
如果你的RefillDataCenterService.finishRefillData()方法报错了,抛了个异常出来,此时就会回滚事务
commitTransactionAfterReturning(txInfo);
如果没有报错的话,就会提交你的事务,完成全部逻辑的执行
流量充值场景
(1)完成支付:将用户的支付宝账号里的金额,转账到中国移动支付宝账号里去
(2)创建流量充值订单:状态为支付成功
(3)完成流量充值:找第三方的运营商的BOSS系统
(4)通知用户:找消息服务,发送短信通知用户
(5)增加抽奖机会:找抽奖服务,给用户增加一次抽奖的机会
(6)增加积分:找积分服务,给用户增加5%,24元 * 5% = 1.2积分
(7)修改流量券的状态:找流量券服务,去将使用的流量券的状态修改为已使用
(8)赠送流量券:找流量券服务,给这个用户赠送一张跟流量券活动匹配的流量券,
如果说这个流量套餐是有赠送流量券的活动的话 为什么要分库
3000万注册用户,每年几亿订单的规模之下,一年以后这个mysql数据库服务器上的磁盘空间都不够了
一般来说数据库部署在一台服务器上面,磁盘空间是1TB
数据库的拆分,刚开始不会做复杂的分库分表,就会按照业务来拆分库
具体拆分
7台数据库服务器,在各个服务器上安装mysql实例,在mysql中创建对应的库
(1)资金:data-refill-center-finance
(2)流量券:data-refill-center-coupon
(3)活动:data-refill-center-activity
(4)积分:data-refill-center-credit
(5)流量套餐:data-refill-center-package
(6)抽奖:data-refill-center-lottery
(7)充值订单:data-refill-center-order
(2)流量券:data-refill-center-coupon
(3)活动:data-refill-center-activity
(4)积分:data-refill-center-credit
(5)流量套餐:data-refill-center-package
(6)抽奖:data-refill-center-lottery
(7)充值订单:data-refill-center-order
分布式事务里面,最难的是TCC事务、其次是XA事务、再其次才是可靠消息最终一致性方案
分布式事务的模型
AP(Application,应用程序)
TM(Transaction Manager,事务管理器)
TM的话就是一个在系统里嵌入的一个专门管理横跨多个数据库的事务的一个组件
RM(Resource Manager,资源管理器)
RM的话说白了就是数据库(比如MySQL)
CRM(Communication Resource Manager,通信资源管理器)
CRM可以是消息中间件(但是也可以不用这个东西)
XA分布式规范
就是定义好的那个TM与RM之间的接口规范,就是管理分布式事务的那个组件跟各个数据库之间通信的一个接口
管理分布式事务的组件,TM就会根据XA定义的接口规范,刷刷刷跟各个数据库通信和交互,告诉大家说,各位数据库同学一起来回滚一下,或者是一起来提交个事务
XA仅仅是个规范,具体的实现是数据库产商来提供的,比如说MySQL就会提供XA规范的接口函数和类库实现
2PC分布式规范
XA接口规范也是一个比较务虚的一个东西,还是没法落地的
2PC,其实就是基于XA规范,来让分布式事务可以落地,定义了很多实现分布式事务过程中的一些细节
(1)准备阶段
TM先发送个prepare消息给各个数据库,让各个库先把分布式事务里要执行的各种操作
各个数据库会准备好随时可以提交或者是回滚
然后各个数据库都返回一个响应消息给事务管理器,如果成功了就发送一个成功的消息,如果失败了就发送一个失败的消息
(2)提交阶段
第一种情况,要是TM哥儿们发现某个数据库告诉他说,不好意思啊,我这儿失败了
把自己本地的那个事务回滚不就得了,然后各个库都回滚好了以后就通知TM,TM就认为整个分布式事务都回滚了
第二种情况,TM接收到所有的数据库返回的消息都是成功
提交好了通知下TM,TM要是发现所有数据库的事务都提交成功了
缺陷:
1、同步阻塞:在阶段一里执行prepare操作会占用资源,一直到整个分布式事务完成,才会释放资源
2、单点故障:TM是个单点,一旦挂掉就完蛋了
3、事务状态丢失:即使把TM做成一个双机热备的,一个TM挂了自动选举其他的TM出来,但是如果TM挂掉的同时,接收到commit消息的某个库也挂了,此时即使重新选举了其他的TM,压根儿不知道这个分布式事务当前的状态,因为不知道哪个库接收过commit消息,那个接收过commit消息的库也挂了
4、脑裂问题:在阶段二中,如果发生了脑裂问题,那么就会导致某些数据库没有接收到commit消息,有些库收到了commit消息,结果有些库没有收到
3PC分布式规划
针对2PC做的一个改进,主要就是为了解决2PC协议的一些问题
(1)CanCommit阶段
TM发送一个CanCommit消息给各个数据库,然后各个库返回个结果,不会执行实际的SQL语句的
就是各个库看看自己网络环境啊,各方面是否ready
(2)PreCommit阶段
会执行各个SQL语句,只是不提交
如果有个库对CanCommit消息返回了失败,TM发送abort消息给各个库,取消分布式事务
(3)DoCommit阶段
PreCommit阶段都返回了成功,那么发送DoCommit消息给各个库,提交事务,各个库如果都返回提交成功给TM,那么分布式事务成功
如果有个库对PreCommit返回的是失败,或者超时一直没返回,那么TM认为分布式事务失败,直接发abort消息给各个库,回滚,各个库回滚成功之后通知TM,分布式事务回滚成功
改进点:
(1)引入了CanCommit阶段
(2)在DoCommit阶段,各个库自己也有超时机制,如果超时时间到了,还没收到TM发送的oCommit消息或者是abort消息,直接判定为TM可能出故障了,各个库就执行DoCommit操作,提交事务,解决了TM挂掉的单点问题
另外资源阻塞问题也能减轻一下,因为一个库如果一直接收不到DoCommit消息,不会一直锁着资源,人家自己会提交释放资源的,所有能减轻资源阻塞问题
3PC的缺陷:
也不是完美的,如果人家TM在DoCommit阶段发送了abort消息给各个库,结果因为脑裂问题,某个库没接收到abort消息,自己还因为超时机制的执行了commit操作
JTA事务,全局事务
全局事务,Global Transaction,是DTP模型中的一个概念,全局事务,指的其实就是说跨多个数据库的这么一个分布式事务
JTA事务(Java Transaction API),站在另外一个角度,其实是J2EE中的一个概念,JTA一套分布式事务的编程API,他呢是按照XA、DTP那套模型和规范来搞的
在J2EE中,单库的事务是通过JDBC事务来支持的
跨多个库的事务,是通过JTA API来支持的,通过JTA API可以协调和管理横跨多个数据库的分布式事务
引入XA分布式事务
单块多库的系统的分布式事务采用的是经典的XA分布式事务的方案,比如Atomikos
管理横跨多个数据库的分布式事务
要引入客户端的TM第三方库,也就是常用的Atomikos类库
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
type: com.alibaba.druid.pool.xa.DruidXADataSource
Atomikos源码分析
原先tm其实PlatformTransactionManager,也就是单库的最基础的事务管理器,现在的话就变成了我们之前配置的那个JtaTransactionManager了,代表事务的一个对象,是JtaTransactionObject
JtaTransactionManager.doBegin()方法:TransactionManagerImpl中的一个方法,这个方法中主要就是获取一个CompositeTransaction的一个对象,根据当前线程,从一个map中获取出来一个Stack数据结构,从栈中弹出来一个对象
每个线程过来,都可以获取到属于这个线程他自己的代表着分布式事务的一个CompositeTransaction的对象
创建好了一个CompositeTransaction对象之后,会放入map中thread对应的一个Stack,将这个对象压入栈中
这个CompositeTransaction事务对象,代表了一个当前线程要执行的多个数据库的操作关联起来的一个分布式事务,所以讲这个事务对象跟当前线程通过一个map关联起来了,后面这个线程再执行的时候,就可以获取到这个关联多个数据库操作的分布式事务的CompositeTransaction对象了
AtomikosDataSourceBean,作为数据库连接池
数据库连接池最最核心的是啥?
就是从ConnectionFactory里面去获取一个Connection,放到一个ConnectionPool里面去,这个池子里面放了数据库连接,在AtomikosDataSourceBean中,我们其实设置了DruidXADataSource
AtomikosDataSourceBean底层还是依赖于人家DruidXADataSource数据库连接池的
AtomikosConnectionProxy,这个东西里面是封装了一个DruidXADataSource实际返回给他的一个数据库连接,创建了一个动态代理
子事务,XAResourceTransaction,加入分布式事务CompositeTransaction中,同时基于XAResource,执行那个库上的XA START指令
每个库都开启了XA START指令,都在依次通过prepareStatement()定义好每个库要执行的SQL,到这一步为止,其实所有的SQL都执行完了
同时对每个库都使用prepareStatement()方法定义好了每个库要执行哪些SQL语句,下一步就会去执行整个事务的这么一个提交,其实就会走2PC的一套逻辑了
如果说有人的prepare返回消息不是ok,那么就全量回滚,就是发送XA ROLLBACK指令,如果说所有人的返回消息都是ok,那么就对每个库发送XA COMMIT指令
对每个库执行XA ROLLBACK指令时
这个rollback指令,其实同样会记录rollback指令的结果,如果rollback指令发送出去之后居然有某个库连rollback指令都没执行完成,那么就一旦走了XA分布式事务之后,性能极差
用了XA分布式事务之后,增加了100ms,耗时增加了15%,性能降低了15%,我可以大家说的一点是,如果在更加复杂的场景里面,XA分布式事务,性能最极端的情况下,会降低10倍
比如说你要是不用XA分布式事务,可能就是50ms,用了就是500ms
总结
JTA + Atomikos实现XA分布式事务框架的架构设计的一个特点,基于Spring原生transaction事务控制的一个过程
JTA,其实是负责了Atomikos框架的一个指挥和调度,让他按照事务的基本步骤来走;Atomikos框架相当于是DTP模型里的TM,跟各个MySQL(RM)通信,XA START、XA END、XA COMMIT、XA ROLLBACK等符合XA规范的一套接口,调用
实现一套Atomikos框架,就是第一步,你需要跟JtaTransactionManager整合起来,需要JTA结合起来
创建分布式事务的时候,创建一个代表了分布式事务的对象;在各个SQL执行的时候,必须从你的DataSource里面获取Connection,对Connection的prepareStatement()方法的调用,你需要进行拦截,去对各个库执行XA START指令,以及定义好SQL;在提交事务的时候,你就需要去对各个库执行XA PREPARE指令,如果都成功,就执行XA COMMIT指令,如果失败,就执行XA ROLLBACK指令
比较有技术含量的一些点,还是动态代理的灵活的使用,自己去实现DataSource,Connection变成了你的动态代理
为什么要拆分为多个服务
超过10个人维护一个单块系统,协调的成本就很高了
(1)开发的时候可能会互相修改代码,涉及到大量的代码冲突的合并
(2)测试的时候,比如说,多个人同时开发多个版本,测试环境有冲突,报错或者是异常,需要所有人配合,测试人员、前端人员、后端人员,全量把这个系统几十万行代码的功能给回归测试一遍
(3)大单块系统,涉及很多东西,依赖很多,可能你修改一些代码,导致别人的代码报错
好处:
(1)一两个人维护一个程序,代码冲突的概率大大降低
(2)每个服务都有自己的测试环境
(3)部署上线,其实也是同理,你都是自己的代码部署上线,跟别人是没关系的
TCC方案
1)Try阶段:这个阶段说的是对各个服务的资源做检测以及对资源进行锁定或者预留
有些接口,没有资源锁定的操作,try接口就留空
2)Confirm阶段:这个阶段说的是在各个服务中执行实际的操作
3)Cancel阶段:如果任何一个服务的业务方法执行出错,那么这里就需要进行补偿,就是执行已经执行成功的业务逻辑的回滚操作
就是把try或者confirm里的操作给他回滚了
例子
比如说跨银行转账的时候,要涉及到两个银行的分布式事务
1)Try阶段:先把两个银行账户中的资金给它冻结住就不让操作了,比如A账户总金额减去扣款。所减的扣款放在另外一个字段里
2)Confirm阶段:执行实际的转账操作,A银行账户的资金扣减,B银行账户的资金增加
3)Cancel阶段:如果任何一个银行的操作执行失败,那么就需要回滚进行补偿,就是比如A银行账户如果已经扣减了,但是B银行账户资金增加失败了,那么就得把A银行账户资金给加回去
适合的场景:这个就是除非你是真的一致性要求太高,是你系统中核心之核心的场景,比如常见的就是资金类的场景,各个业务执行的时间都比较短,确保一个TCC分布式事务的执行,大概需要总共1秒以内的时间
比如:充值系统里,资金转账、创建订单、抽奖机会、积分、流量券相关的服务调用的逻辑,包裹在一个分布式事务内,用TCC来控制这个分布式事务,因为这里的一些操作基本都是在流量充值中心内部的一些服务,都比较快
细节问题
(1)空回滚:那要是try阶段,比如网络问题,人家压根儿没调通你的try接口,结果就认定失败,直接调用你的cancel接口,咋办?所以你这个时候啥都不能干
(2)try回滚以及confirm回滚:try阶段如果执行了,但是其他服务try失败了,那么会调用cancel来回滚,你要可以回滚掉try阶段的操作;confirm阶段要是你执行了,但是有别的服务失败了,此时你就要回滚掉confirm阶段的操作
(3)倒置请求:比如说人家调用try接口,中间网络超时了,结果认定失败,直接调用cancel空回滚了;结果过了几秒钟try接口请求到来,此时咋整呢?尴尬了吧,你要在这个时候不允许执行try接口操作;同理啊,confirm请求超时了,结果都cancel掉了,但是过了几秒
接口的幂等性保证
就是try、confirm和cancel都可能被多次调用
本地消息表
国外的ebay搞出来的这么一套思想
1)A系统在自己本地一个事务里操作同时,插入一条数据到消息表
2)接着A系统将这个消息发送到MQ中去
3)B系统接收到消息之后,在一个事务里,往自己本地消息表里也插入一条数据,同时执行其他的业务操作,如果这个消息已经被处理过了,那么此时这个事务会回滚,这样保证不会重复处理消息
4)B系统执行成功之后,就会更新自己本地消息表的状态以及A系统消息表的状态
可以通过zk通知A,A注册一个orderId=?监听,B一旦处理完成,修改node值
5)如果B系统处理失败了,那么就不会更新消息表状态,那么此时A系统会定时扫描自己的消息表,如果有没处理的消息,会再次发送到MQ中去,让B再次重试处理
6)这个方案保证了最终一致性,哪怕B事务失败了,但是A会不断重发消息,直到B那边成功为止
最大的问题就在于严重依赖于数据库的消息表来管理事务啥的???这个会导致如果是高并发场景咋办呢?咋扩展呢?所以一般确实很少用
最大努力通知方案
1)系统A本地事务执行完之后,发送个消息到MQ,假如发送mq失败了,那么A事务也回滚
2)这里会有个专门消费MQ的最大努力通知服务,这个服务会消费MQ然后写入数据库中记录下来,或者是放入个内存队列也可以,接着调用系统B的接口
3)要是系统B执行成功就ok了;要是系统B执行失败了,那么最大努力通知服务就定时尝试重新调用系统B,反复N次,最后还是不行就放弃
跟可靠消息最终一致性方案是类似的,可靠消息最终一致性方案,会保证最终必须要让那个执行成功的,但是最大努力通知方案,不一定保证最终一定会成功,可能会失败
适用场景
那种不太核心一些服务调用的操作,比如说发送短信或者站内信等消息服务,充值好了以后发送短信,一般来说肯定是要发出去短信的,但是如果真的不小心发送失败了,发送短信失败了也无所谓的。。。
常用框架
tcc-transaction框架
国内开源的tcc框架里算是最热的了
跟spring cloud整合的不是太好,但是提供了跟dubbo的整合
himly框架
官方文档里大量用的是spring xml来进行配置
ByteTCC框架
对注解的支持比较彻底一些,所以我们考虑使用ByteTCC框架来结合spring cloud进行tcc事务的开发和控制,还支持saga事务,长事务
ByteTCC框架
场景
转账,账户1,10000余额,往账户2转账1000,数据库表中字段,账户余额amount,冻结资金frozen
try阶段
try是按照顺序来的
try和cancel是配对的,try阶段成功了以后如果要回滚,可以通过cancel来进行回滚
(1)账号1的try操作失败了
正常来说,账号1,amount = 9000,frozen = 1000;账号2,amount = 10000,frozen = 1000
账号1的try失败了,账号1的try操作的本地事务,回滚,导致账号1,amount = 10000,frozen = 0
账号2的try操作,压根儿就没机会去执行
(2)账号2的try操作失败了
正常来说,账号1,amount = 9000,frozen = 1000;账号2,amount = 10000,frozen = 1000
账号1的try操作先成功了,账号1,amount = 9000,frozen = 1000;账号2的try操作失败了,所以此时将frozen = 1000的操作就会回滚,然后amount = 10000,frozen = 0
bytetcc框架要自动感知到这个问题,然后将账号1执行cancel的逻辑,amount = 10000,frozen = 0
confirm阶段
confirm也是按照顺序来的
confirm,如果有失败的,那不好意思,人家默认的一个策略,就是让别人confirm成功好了,你的confirm会被bytetcc后台框架不停的重试和调用
try、confirm是不一样的,如果你用一个cancel来回滚try和confirm的话,那是不行的
confirm只要不停的重试一定是可以成功的,就可以保证数据的一致性,作者肯定是默认confirm阶段如果失败可能一般都是网络调用超时、或者数据库不小心压力过大拒绝反问
就是让失败的confirm操作不断的重试,直到成功为止
场景演示:正常来说,执行confirm阶段后,账号1,amount = 9000,frozen = 0;账号2,amount = 11000,frozen = 0
(1)如果账号1的confirm失败了
账号1,停留在try的那个阶段的数据,amount = 9000,frozen = 1000;账号2的confirm成功了,此时账号2变成amount = 11000,frozen = 0
明显的发现,bytetcc框架,并没有根据识别到的账号1的confirm的失败,来执行对应的cancel回滚的操作
bytetcc框架,就会不停的重试调用那个服务的confirm操作,直到他执行成功为止
(2)如果账号2的confirm操作失败了
账号1的confirm操作是成功的,amount = 9000,frozen = 0;账号2,amount = 10000,frozen = 1000
按照bytetcc框架的一个逻辑,肯定也是会账号1就那样放着吧,账号2会被不断的调用和重试,直到确保账号2的confirm操作成功
cancel
ByteTCC框架的策略哲学:try-cancel配对,他是绝对可以做到的;然后如果try阶段都成功了,confirm有服务失败了,那么confirm只要不停的重试一定是可以成功的,就可以保证数据的一致性
cancel接口的回滚逻辑都是针对的是try阶段的
全局事务决定提交时,try接口全部成功了,进入confirm阶段的时候,会保证所有服务的confirm接口必须成功调用
专门在数据库里,都要建立一张表,专门给bytetcc框架自己来用的
对每个他操作的数据库都需建立bytejta表
bytetcc框架还要负责管理活动日志,这个活动日志其实就是可以记录在本地的数据库中
代码实现:
provider
对外提供TCC服务的Controller必须加@Compensable注解(若没有实质业务, 也可以不必指定confirmableKey和cancellableKey)
若不加@Compensable注解, 则ByteTCC将其当成普通服务对待, 不接收Consumer端传播的事务上下文. 若它后续调用TCC服务, 则将开启新的TCC全局事务
@Compensable(interfaceClass = IAccountService.class, confirmableKey = "accountServiceConfirm", cancellableKey = "accountServiceCancel")
@RestController
public class AccountController implements IAccountService {
@RestController
public class AccountController implements IAccountService {
增加金额increaseAmount
"update tb_account_one set frozen = frozen + ? where acct_id = ?", amount, acctId
减少金额decreaseAmount
"update tb_account_one set amount = amount - ?, frozen = frozen + ? where acct_id = ?", amount, amount, acctId);
provider执行的都是try操作
consumer
@Compensable(interfaceClass = ITransferService.class, confirmableKey = "transferServiceConfirm", cancellableKey = "transferServiceCancel")
@RestController
public class TransferController implements ITransferService {
@RestController
public class TransferController implements ITransferService {
@ResponseBody
@RequestMapping(value = "/transfer", method = RequestMethod.POST)
@Transactional
public void transfer(@RequestParam String sourceAcctId, @RequestParam String targetAcctId, @RequestParam double amount) {
this.acctService.decreaseAmount(sourceAcctId, amount);
this.increaseAmount(targetAcctId, amount);
}
@RequestMapping(value = "/transfer", method = RequestMethod.POST)
@Transactional
public void transfer(@RequestParam String sourceAcctId, @RequestParam String targetAcctId, @RequestParam double amount) {
this.acctService.decreaseAmount(sourceAcctId, amount);
this.increaseAmount(targetAcctId, amount);
}
consumer端try阶段代码
this.increaseAmount(targetAcctId, amount);
update tb_account_two set frozen = frozen + #{amount} where acct_id = #{acctId}
@Service("transferServiceCancel")
public class TransferServiceCancel implements ITransferService {
@Autowired
private TransferDao transferDao;
@Transactional
public void transfer(String sourceAcctId, String targetAcctId, double amount) {
int value = this.transferDao.cancelIncrease(targetAcctId, amount);
if (value != 1) {
throw new IllegalStateException("ERROR!");
}
System.out.printf("exec decrease: acct= %s, amount= %7.2f%n", targetAcctId, amount);
}
}
public class TransferServiceCancel implements ITransferService {
@Autowired
private TransferDao transferDao;
@Transactional
public void transfer(String sourceAcctId, String targetAcctId, double amount) {
int value = this.transferDao.cancelIncrease(targetAcctId, amount);
if (value != 1) {
throw new IllegalStateException("ERROR!");
}
System.out.printf("exec decrease: acct= %s, amount= %7.2f%n", targetAcctId, amount);
}
}
update tb_account_two set frozen = frozen - #{amount} where acct_id = #{acctId}
@Service("transferServiceConfirm")
public class TransferServiceConfirm implements ITransferService {
@Autowired
private TransferDao transferDao;
@Transactional
public void transfer(String sourceAcctId, String targetAcctId, double amount) {
int value = this.transferDao.confirmIncrease(targetAcctId, amount);
if (value != 1) {
throw new IllegalStateException("ERROR!");
}
System.out.printf("done increase: acct= %s, amount= %7.2f%n", targetAcctId, amount);
}
}
public class TransferServiceConfirm implements ITransferService {
@Autowired
private TransferDao transferDao;
@Transactional
public void transfer(String sourceAcctId, String targetAcctId, double amount) {
int value = this.transferDao.confirmIncrease(targetAcctId, amount);
if (value != 1) {
throw new IllegalStateException("ERROR!");
}
System.out.printf("done increase: acct= %s, amount= %7.2f%n", targetAcctId, amount);
}
}
update tb_account_two set amount = amount + #{amount}, frozen = frozen - #{amount} where acct_id = #{acctId}
@FeignClient(value = "SPRINGCLOUD-SAMPLE-PROVIDER")//调用PROVIDER接口
public interface IAccountService {
@RequestMapping(method = RequestMethod.POST, value = "/increase")
public void increaseAmount(@RequestParam("acctId") String accountId, @RequestParam("amount") double amount);
@RequestMapping(method = RequestMethod.POST, value = "/decrease")
public void decreaseAmount(@RequestParam("acctId") String accountId, @RequestParam("amount") double amount);
}
public interface IAccountService {
@RequestMapping(method = RequestMethod.POST, value = "/increase")
public void increaseAmount(@RequestParam("acctId") String accountId, @RequestParam("amount") double amount);
@RequestMapping(method = RequestMethod.POST, value = "/decrease")
public void decreaseAmount(@RequestParam("acctId") String accountId, @RequestParam("amount") double amount);
}
日志文件
通过文件和数据库,都会记录分布式事务执行过程中的一些日志或者记录,状态,进度,这些东西其实是用来在分布式事务执行到一半,服务挂了,服务重启,需要根据之前记录的日志和数据库里的记录,恢复这个分布式事务,继续执行
TransactionInterceptor
你只要加了这个@Transactional注解,就可以看到TransactionInterceptor的东西就会运行,他会拦截掉这个请求
(1)begin,启动一个事务 => TransactionManager
(2)执行目标方法内部的业务逻辑
(3)根据目标方法执行的结果,是否成功,或者是报错,来选择commit / rollback => TransactionManager
CompensableHandlerInterceptor
他是迎接一个http请求的第一道防线,他是一个interceptor,拦截器,他的本质跟TransactionInterceptor是一样的,就是在一个请求过来的时候,一定会先拦截掉
CompensableMethodInterceptor
获取到了目标controller方法的@Transactional注解,还获取到了目标contorller的@Compensable注解,simplified(简化)模式,在当前的类中,就会有@CompensableConfirm和@CompensableCancel
resourceList
按照调用顺序,如果你调用了一个服务,bytetcc分布式事务框架,会将你分布式事务中调用其他的服务作为一个resource放到一个resourceList中去,这样的话,分布式事务中调用了哪些其他服务,具体的状态是成功还是失败,都可以感知到
如果所有服务都try成功了,只要依次对这些服务调用confirm接口
如果一个分布式事务中要调用5个服务,结果只有2个服务调用成功了,第3个服务调用失败了,在进行cancel的时候,只要对try成功的服务执行cancel就可以了
基于RestTemplate组件调用每个已经成功try的服务接口,进行cancel
confirm阶段
运行过程:也是基于resourceList,通过tcc内置的一个controller,基于restTemplate去调用各个服务的内置controller接着调用confirm接口
失败如何进行重试
CompensableWork后台线程
第一件事情,就是在系统每次刚启动的时候,对执行到一半儿的事务,还没执行结束的事务,进行恢复,继续执行这个分布式事务
第二件事情,如果就是分布式事务中所有服务的try都成功了,然后执行confirm,其他服务的confirm都成功了,可能就1个服务的confirm失败了,此时CompensableWork会每隔一段时间,定时不断的去重试那个服务的confirm接口
如果有那种confirm或者cancel没成功的服务,CompensableWork会每隔60s去调用一次你的/commit接口或者是/rollback接口,尝试去完成你的分布式事务中的confirm操作或者是cancel操作
timingRecover
进行定时不断执行的恢复
事务恢复
ResourceAdapterImpl
TransactionRecovery compensableRecovery = this.beanFactory.getCompensableRecovery();
TransactionRecovery compensableRecovery = this.beanFactory.getCompensableRecovery();
在服务启动的时候,bytetcc框架会通过后台线程启动一个task,入口是CompensableWork,这个里面会启动一个事务恢复的这么一个过程,如果一个事务执行到了一半儿,然后这个系统就崩溃了
系统重启的时候,一定会有一个事务恢复的过程,也就是说从数据库、日志文件里加载出来没完成的事务的状态,然后继续去执行这个事务
1,有的服务这个try都没执行完毕,然后直接挂掉了,此时会怎么样呢?其实此时可以让已经try成功的服务直接全部cancel掉
2,try都成功了,confirm没有都成功,服务重启后会调用服务的confirm接口
链式调用
场景:流量充值服务
-> 资金服务 -> 账单服务
-> 订单服务
-> 抽奖服务
-> 积分服务
-> 流量券服务
-> 资金服务 -> 账单服务
-> 订单服务
-> 抽奖服务
-> 积分服务
-> 流量券服务
服务链式调用的分布式事务解决方案,其实就是跟bytetcc设计的方案是一模一样的
链条里面的所有服务都执行了try,try都成功了,然后就执行了confirm
confirm的时候,流量充值服务会去触发资金服务的confirm,资金服务内部的bytetcc框架会自动触发账单服务的confirm
bytetcc框架包含了一个事务管理器,资金服务本地的事务管理器,会记录下来之前调用过哪些服务的try接口,resouceList,就会自动去调用远程的账单服务的confirm接口
源码:CompensableTransactionImpl.fireCommit()
this.fireNativeParticipantConfirm()
触发自己本地的confirm逻辑的执行,但是还没到触发账单服务的confirm
this.fireRemoteParticipantConfirm()
其实是在触发远程的账单服务的confirm逻辑的执行
因为之前资金服务调用过账单服务的try接口,只要成功调用了其他服务的try接口,就会将这个服务记录到一个本地的resourceList里面去
记录了bill账单服务福,然后就可以对这个resourceList里的服务,通过RestTemplate,执行http请求,访问远程的账单服务的bytetcc controller,执行confirm逻辑
源码:CompensableTransactionImpl.rollback()
this.fireNativeParticipantCancel();
this.fireRemoteParticipantCancel()
最终,一定会通过SpringCloudCoordinator来发送http请求给账单服务的bytetcc controller,来执行/rollback接口,以执行账单服务自己本地的那个cancel逻辑
bytetcc分布式事务的confirm逻辑里面调用了一个非tcc的一个接口,会报错
分布式事务方案
(1)XA(两阶段提交):适用于单系统多数据库的分布式事务
(2)TCC(补偿型的分布式事务):特别适用于微服务场景,尤其是dubbo技术栈,spring cloud技术栈,跟这些微服务技术栈整合,在服务链式调用的场景里,给服务调用链条加上TCC型的分布式事务
场景:流量充值服务
-> 资金服务 -> 账单服务
-> 订单服务
-> 抽奖服务
-> 积分服务
-> 流量券服务
-> 资金服务 -> 账单服务
-> 订单服务
-> 抽奖服务
-> 积分服务
-> 流量券服务
(3)可靠消息最终一致性方案:比较适用于不是微服务复杂调用链条的场景,适用于的是调用之间可以通过MQ异步解耦,走异步化的方式,在不太复杂的一个服务调用场景里,可以使用,主要是将比较耗时的一些操作放在MQ异步化的方式来实现,同时还能实现最终一致性的一个事务的效果
调用起来很耗时的操作,比如说流量充值内,调用第三方运营商的系统接口完成流量充值
(4)最大努力通知方案:分布式数据一致性,比较适用于,哪怕是数据不一致也可以的非关键性的操作,走MQ异步通知,重试多次,实在执行不成功就算了
发送短信、站内信之类
(5)saga事务:他的地位是类似于TCC的,他跟TCC你可以从中间选择一种方案出来使用,就可以了,saga事务的本质也可以用来解决微服务里,复杂的服务调用链条下,如何加上分布式事务
saga事务
saga是将每个接口,拆分为2个接口,一个是业务接口,另外一个是补偿接口
相当于就是说将tcc里面的try和confirm合并为一个接口,就是先执行业务接口,直接就尝试完成整个业务逻辑的操作
然后如果在服务调用链条里面,某个服务的业务接口执行失败了,那么直接对已经执行成功的所有服务都调用其补偿接口,将之前执行成功的业务逻辑给回滚
核心和本质,就是把每个操作分为实际的业务逻辑以及补偿业务逻辑,正常情况下,就依次执行各个服务的业务逻辑就好了,如果某个服务调用失败的话,直接对之前执行成功的那些服务都依次执行补偿逻辑
消息驱动的,是异步的,不是同步的
tcc可不是,服务调用链的场景,但是他是同步调用,特别适合于我们之前的编程思维和模型,因为我们正常写微服务的系统里面,都是各个服务同步调用的
saga就俩接口,业务接口,补偿接口;tcc是try/confirm/cancel,三个接口,他们的事务模型是不一样的
第一种编排模式
编排模式,优点在于去中心化;缺点在于太麻烦了,如果你有连续几十个服务连续调用,会导致服务对消息的监听非常复杂,而且不好调试和定位问题,服务调用异常,服务调用链的问题排查特别麻烦,引入一个MQ,特别不好,你的服务执行依赖于MQ完成各个服务的调用,引入了复杂性,如果mq挂掉后,排查麻烦
由每个服务失败后,发送MQ事件,之前的服务需要监听mq,比如服务4失败发送mq事件,服务3消费,服务3执行回滚之后,再发送一个mq事件,服务2监听服务3发布的mq事件
第二种命令模式
某个服务如果要开启一个saga分布式事务,那么就得自己搞一个类似saga流程管理器的东西,然后来负责依次调用和执行各个服务(服务1->服务2->服务3),如果某个服务调用失败,就对之前调用成功的服务依次执行补偿接口
saga分布式事务管理器,来管理整个saga事务的调用和执行的流程,如果某个节点失败了,那么就会回滚,重新执行补偿接口
优点在于系统比较简单,定位和处理问题都比较简单,不涉及复杂的消息监听什么的;缺点的话,主要就是万一saga事务流程管理器挂了呢?那不就惨了
DDD
(1)概念一
aggregate,聚合,说白了,就是一个组合
aggregate指的就是汽车 + 轮胎 + 引擎,三个东西聚合起来的一个整体
如果有个人要开车,那么只能调用汽车,因为汽车是aggregate root,他是aggregate对外提供的一个门面,不能有人直接调用轮胎或者是引擎吧
(2)概念二
event sourcing的概念,其实说白了啊,就是事件回溯的意思
event sourcing模型里,很多时候也是会系统启动提前完成事件回溯,然后算出最新数据,放内存里缓存的,这样可以节约回溯开销,然后也可以多搞几个中间快照,每次从某个快照开始回溯
id name updated_age
1 张三 20
1 张三 1
1 张三 1
1 张三 1
1 张三 20
1 张三 1
1 张三 1
1 张三 1
如果要知道现在张三是几岁
张三的age变更的所有event,sourcing,回溯,20 + 1 + 1 + 1 = 23岁
(3)概念三
Actor模型,这个模型的意思,就是说多线程在进行交互的时候,你假设一个Actor是一个线程,互相之间是基于消息进行通信的,每个Actor有个mailbox就是信箱,拿到了消息,然后内部单线程处理的
(4)概念四
命令与查询职责的分离模型,大概意思,就是说,Command指的是类似增删改之类的处理命令,Query就是查询数据,然后让你在设计系统的时候,把增删改服务和查服务分离开来,就是在服务设计的层面,实现读写分离
C,command,指的就是增删改
Q,query,指的就是查询
你得用两个独立的服务来提供增删改操作和查询操作
Q,query,指的就是查询
你得用两个独立的服务来提供增删改操作和查询操作
写服务,负责接收增删改的请求,将数据更新到存储里面去,比如说可以使用mysql
就是会有专门的数据同步的组件,会将写服务的数据存储里面的数据,更新到读服务的数据存储里面去,比如说elasticsearch
就是会有专门的数据同步的组件,会将写服务的数据存储里面的数据,更新到读服务的数据存储里面去,比如说elasticsearch
读写分离的架构,最经典的,就是电商里的读写分离的场景,就是你的各种数据的变更统一都是走各种后台服务,都是mysql的
但是比如商品详情页,读流量那么的大,肯定不能直接从各个服务的mysql里查啊,那就私人了要,可以分离出来一个专门的读服务,每次数据变更就获取事件,同步到比如redis里,作为查数据的存储,然后别人就可以从读服务里查redis的数据
但是比如商品详情页,读流量那么的大,肯定不能直接从各个服务的mysql里查啊,那就私人了要,可以分离出来一个专门的读服务,每次数据变更就获取事件,同步到比如redis里,作为查数据的存储,然后别人就可以从读服务里查redis的数据
(5)概念五
写服务在更新完自己的mysql之后,有一个专门同步组件会将数据同步到另外一个存储里去,比如es,或者是hbase,此时可以按照Event sourcing格式来同步数据,在es或者hbase里存放的是数据变更的快照,一个一个事件
读服务在读的时候,可以基于一个一个事件来聚合数据提供查询
(6)概念六
专门帮助你实现CQRS模式,他里面的核心概念,都是DDD,领域驱动模型设计里的一些概念,包括了aggregate、event sourcing、actor、CQRS,那些概念就组成了一个CQRS框架,AxonFramework
在DDD里面提出了一个事务的模型,就是saga事务模型,AxonFramework里面实现的就是saga事务模型
CQRS
读写分离
常用框架
Eventuate Tram Saga
国外几乎都没什么人用的,不是很热门的框架,这个框架作者的更新速度很缓慢,也不是特别的好
AxonFramework
其实是个CQRS框架,是基于DDD理论来的,你可以认为说,后面我们如果学习了DDD之后来建模和解决业务极度复杂的系统,那么就可以使用类似Axon这种框架了
好处就是他里面也提供了saga事务的支持和实现,而且这个框架总体来说名气大一些,活跃度高一些,github的star也多一些,1000多。。这还凑合是吧
BAT等大公司,其实主要都是tcc事务用来解决核心业务逻辑复杂调用链路里面的分布式事务问题
他不是一个saga事务框架,他其实是一个CQRS框架,只不过顺带支持了saga事务而已
没几个公司是用saga事务的,一般tcc + MQ最终一致性
华为的ServiceComb
saga事务在国内公司实践和运用是极其少的,而且也没有成熟的技术方案
就算用saga也应该学习saga事务框架,也不应该用AxonFramework,他其实是一个CQRS框架
可靠消息最终一致性方案
不要用本地的消息表了,直接基于MQ来实现事务。比如阿里的RocketMQ就支持消息事务。
1)A系统先发送一个prepared消息到mq,这个时候B系统还收不到mq消息,如果这个prepared消息发送失败那么就直接取消操作别执行了
2)如果这个消息发送成功过了,那么接着执行A的本地事务,如果成功就告诉mq发送确认消息,如果失败就告诉mq回滚消息
3)如果发送了确认消息,相当于消息提交了,那么此时B系统会接收到确认消息,然后执行本地的事务
4)mq会自动定时轮询所有prepared消息回调A的接口,问你,这个消息是不是本地事务处理失败了,所有没发送确认消息?那是继续重试还是回滚?一般来说这里你就可以查下数据库看之前本地事务是否执行,如果回滚了,那么这里也回滚吧。这个轮询机制就是避免可能本地事务执行成功了,别确认消息发送失败了。确保本地事务成功后,发送确认消息出去
5)这个方案里,要是系统B的事务失败了咋办?
1,重试,自动不断重试直到成功,
2,假如一直不成功,要么就是针对重要的资金类业务进行回滚,比如B系统本地回滚后,想办法通知系统A也回滚;
3,或者是发送报警由人工来手工回滚和补偿
1,重试,自动不断重试直到成功,
2,假如一直不成功,要么就是针对重要的资金类业务进行回滚,比如B系统本地回滚后,想办法通知系统A也回滚;
3,或者是发送报警由人工来手工回滚和补偿
目前国内互联网公司大都是这么玩儿的,要不你举用RocketMQ支持的,要不你就自己基于类似ActiveMQ?RabbitMQ?自己封装一套类似的逻辑出来,总之思路就是这样子的
适用场景
调用起来很耗时的操作,比如说流量充值内,调用第三方运营商的系统接口完成流量充值
可靠消息最终一致性方案
适用场景
比如说,电商系统下订单之后要走调度系统去进行调度出库,那块特别耗时,可以做成异步化;流量充值中心,调用运营商BOSS系统完成流量充值,特别耗时,第三方的系统调用比较慢,可以做成异步化;针对有异步化的场景,同时还要将这个异步化的操作包裹到一个事务中去,此时就可以使用可靠消息最终一致性的方
涉及到4个组件
(1)上游服务:发送MQ消息通知下游服务执行某个操作
(2)可靠消息服务:协调上下游服务的消息传递,确保数据一致性,可以认为这个所谓的可靠消息服务是我们自己开发的,也是一个spring cloud的服务,只不过这个服务是通用的,是所有服务所有系统都基于这个可靠消息服务来实现可靠消息最终一致性的方案。“可靠消息”四个字,这一切都是基于可靠消息服务来做的,方案设计,消息如何保持可靠性
(3)MQ消息中间件:这个一般是RocketMQ或者是RabbitMQ
(4)下游服务:就是那个要被调用的服务
执行流程
(1)上游服务发送一个待确认消息给可靠消息服务
(2)可靠消息服务将这个待确认的消息保存到自己本地数据库里,保存起来,但是不发给MQ,这个时候消息的状态是“待确认”
(3)上游服务操作本地数据库
(4)上游服务根据自己操作本地数据库的结果,来通知可靠消息服务,可以确认发送消息了,或者是删除消息
操作完本地数据库之后,会有两个结果,第一个结果是操作失败了,第二个结果是操作成功了,如果本地数据库操作失败了,本地操作会回滚,回滚之后,上游服务就要通知可靠消息服务删除消息;如果本地数据库操作成功了,那么此时本地事务就提交了,接着就可以通知可靠消息服务发送消息
操作完本地数据库之后,会有两个结果,第一个结果是操作失败了,第二个结果是操作成功了,如果本地数据库操作失败了,本地操作会回滚,回滚之后,上游服务就要通知可靠消息服务删除消息;如果本地数据库操作成功了,那么此时本地事务就提交了,接着就可以通知可靠消息服务发送消息
(5)可靠消息服务将这个消息的状态修改为“已发送”,并且将消息发送到MQ中间件里去
这个环节是必须包裹在一个事务里的,如果发送MQ失败报错,那么可靠消息服务更新本地数据库里的消息状态为“已发送”的操作也必须回滚,反之如果本地数据库里的消息状态为“已发送”,那么必须成功投递消息到MQ里去
@Transactional
public void confirmMessage(Long messageId) {
messageDAO.updateStatus(messageId, MessageStatus.SENT);
rabbitmqProducer.send(message);
}
如果更新数据库里的消息状态报错了,那么消息根本不会投递到MQ里去;如果更新数据库里的消息状态成功了,但是事务还没提交,然后将消息投递到MQ里去报错了,此时事务管理器会感知到这个异常,然后会直接回滚掉整个事务,更新数据库里消息状态的操作也会回滚掉的
就可以保证说,更新数据库里的消息状态和投递消息到MQ,要么一起成功,要么一起失败,这里这第5个步骤,必须是一起成功或者是一起失败
MQ,rabbitmq,都有事务消息的一个实现,你可以先去投递一个prepare的消息,接着如果说数据库操作成功过了,那么就commit那个消息发送给rabbitmq,然后如果数据库操作失败了,就通知mq去rollback一条消息
但是MQ的事务消息最好别轻易用,因为那个性能实在是太低了,吞吐量太差
所以说我这里给大家介绍的是上面的那种方案
这个环节是必须包裹在一个事务里的,如果发送MQ失败报错,那么可靠消息服务更新本地数据库里的消息状态为“已发送”的操作也必须回滚,反之如果本地数据库里的消息状态为“已发送”,那么必须成功投递消息到MQ里去
@Transactional
public void confirmMessage(Long messageId) {
messageDAO.updateStatus(messageId, MessageStatus.SENT);
rabbitmqProducer.send(message);
}
如果更新数据库里的消息状态报错了,那么消息根本不会投递到MQ里去;如果更新数据库里的消息状态成功了,但是事务还没提交,然后将消息投递到MQ里去报错了,此时事务管理器会感知到这个异常,然后会直接回滚掉整个事务,更新数据库里消息状态的操作也会回滚掉的
就可以保证说,更新数据库里的消息状态和投递消息到MQ,要么一起成功,要么一起失败,这里这第5个步骤,必须是一起成功或者是一起失败
MQ,rabbitmq,都有事务消息的一个实现,你可以先去投递一个prepare的消息,接着如果说数据库操作成功过了,那么就commit那个消息发送给rabbitmq,然后如果数据库操作失败了,就通知mq去rollback一条消息
但是MQ的事务消息最好别轻易用,因为那个性能实在是太低了,吞吐量太差
所以说我这里给大家介绍的是上面的那种方案
(6)下游服务从MQ里监听到一条消息
(7)下游服务根据消息,在自己本地操作数据库
(8)下游服务对本地数据库操作完成之后,对MQ进行ack操作,确认这个消息处理成功
现在的MQ中间件,无论是rabbitmq、rocketmq、kafka,都是支持手动ack。如果你是使用的默认自动ack的模式,那么就会导致消息的丢失;现在一般都会用手动ack,当本地操作执行成功之后,再对MQ执行手动的ack确认
只有当我手动ack确认之后,mq才会删除消息
如果我还没ack,本地数据库比如操作失败报错了,此时MQ一直没收到ack消息,会怎么样呢?此时MQ会保证重新投递一次消息,可以给其他的消费者实例去消费
现在的MQ中间件,无论是rabbitmq、rocketmq、kafka,都是支持手动ack。如果你是使用的默认自动ack的模式,那么就会导致消息的丢失;现在一般都会用手动ack,当本地操作执行成功之后,再对MQ执行手动的ack确认
只有当我手动ack确认之后,mq才会删除消息
如果我还没ack,本地数据库比如操作失败报错了,此时MQ一直没收到ack消息,会怎么样呢?此时MQ会保证重新投递一次消息,可以给其他的消费者实例去消费
(9)下游服务对MQ进行ack之后,再给可靠消息服务发送个请求,通知该服务说,ok,我这里处理完毕了,可靠消息服务收到通知之后,将消息的状态修改为“已完成”
可靠消息方案,方案思想,大概就是这样子,分为这么几个组件,整个流程基本上就是这样子,当然还得加一些额外机制进去
可靠消息方案,方案思想,大概就是这样子,分为这么几个组件,整个流程基本上就是这样子,当然还得加一些额外机制进去
RocketMQ实现可靠消息一致性
可靠消息服务干的活儿被RocketMQ给干了
实现原理
(1)上游服务发送一个prepare消息,可以认为是一个待确认消息到RocketMQ
(2)RocketMQ会在自己内部保存这条消息,然后返回一个状态给上游服务
(3)接着上游服务执行本地事务
(4)如果失败了就发送rollback给RocketMQ,RocketMQ会删除掉那条消息,如果成功就发送commit给RocketMQ
(5)rocketmq会根据状态来处理消息,如果是rollback就删除消息,如果是commit就将消息标识为可以被下游服务消费
(6)下游服务消费消息
(7)接着下游服务会消费这个消息,执行本地事务
(8)下游服务然后返回ack给rocketmq,如果消费失败,或者是本地事务执行失败,或者ack发送失败,那么rocketmq都会有自己的重试策略,重发消息
(9)rocketmq如果能够成功的收到ack消息就会将消息删除
步骤失败
如果上游服务本地事务执行失败,也没发送rollback或者commit给rocketmq
一个检查消息状态超时的机制,发现消息超时了,就回调上游服务的一个接口,上游服务自己负责检查这个操作是否执行
如果没有就要执行,然后发送commit或者rollback给rocketmq
如果执行过了,那就执行发送commit给rocketmq
下游服务只要自己保证了幂等性就可以了
RabbitMQ实现可靠消息一致性
主要就是用一个ack机制,ack之后,就可以保证只有你处理完之后才会ack掉这条消息,然后的话呢,如果你没ack,rabbitmq会给你重新投递消息
如果是自动ack,那么rabbitmq只要一旦投递出去这个消息给消费者,立即就删除这条消息了,主要是针对的那种对消息少量丢失不太敏感的,然后要求高吞吐量的场景
手动ack的方式只有当你手动ack的时候,rabbitmq才会认为说你已经消费成功了,就会删除那条消息
channel.basicAck(deliveryTag, false);
channel.basicReject,是说,消息没处理成功,但是还是给删除掉吧
channel.basicNack,是说,消息没处理成功,但是你给我让这个消息重新排队来进行投递,可以投递给其他消费者实例来处理
1,一种做法:如果下游服务的本地数据库操作成功了,那就ack,否则处理失败了就nack,就可以了,这样的话就会不断的将消息重新入队让其他消费者实例来消费和处理
2,另一种做法:因为考虑到了可靠消息自己就会进行重新投递,所以下游服务如果操作数据库成功了就ack,否则就reject,都会删除掉消息的,但是如果操作数据库失败了,那么就不会去通知可靠消息服务了,可靠消息服务过一段时间会感知到,然后会再次重新投递消息
上游服务的confirm
有一种可能,在5这个步骤里,发送rabbitmq消息,看起来是成功了,其实消息并没有成功的投递到rabbitmq里去
本地的消息状态也变更为“已发送”,结果消息并没有投递到mq里去,下游的6789四个步骤没有执行
解决方案:可靠消息服务,他会发现已发送的但是超时的消息,重新再次投递一次消息到rabbitmq
可靠消息服务的机制设计的越是健壮,越是依赖你的可靠消息服务,而不是依赖MQ,就越好,最好对MQ的依赖是非常的轻量级的
如果你太过于依赖MQ了,可能导致MQ里的而一些bug,会导致你束手无策,无法解决
整个这个技术架构,最好是不要跟某种MQ的高阶特性重耦合,因为那样的话会导致你以后如果要切换MQ的话就会很麻烦,有大量的重构要做
spring cloud代码
定义接口prepare,confirm,remove,finish
缺点
大量的基于mysql来存储了message消息,第一个问题是并发的问题
改善:使用NoSQL类的存储
举个例子,比如可以使用redis、zookeeper、hbase
举个例子,比如可以使用redis、zookeeper、hbase
比如说电商里面,审核出库单,商品出库,这些复杂又非核心的业务逻辑,99.99%的场景下不会报错就可以了,没必要大面积的引入分布式事务,非核心的场景,只不过是业务复杂,出错了,手工修复数据
资金、交易、订单、支付,绝对不能出错的一些,必须有什么高大上的技术都要投入去使用
0 条评论
下一页