RocketMQ-2
2022-05-28 12:34:21 1 举报
RocketMQ机制,功能
作者其他创作
大纲/内容
6、检查本地事务的状态
看其中具体的一小块代码
对于支付系统,大量用户每天会下很多订单,但是不少订单可能是一直没有进行支付的,所以一般订单系统都必须设置一个规则,当一个订单下单之后,超过比如30分钟没有支付,那么就必须订单系统自动关闭这个订单,后续如果要购买这个订单里的商品,就得重新下订单了。故订单系统就需要有一个后台线程,不停的扫描订单数据库里所有的未支付状态的订单,如果超过30分钟了还没支付,那么就必须自动把订单状态 更新为“已关闭”但是订单系统的后台线程必须要不停的扫描各种未支付的订单,这种实现方式实际上并不是很好原因:1、未支付状态的订单可能是比较多的,然后需要不停的扫描他们,可能每个未支付状态的订单要被扫描N多遍,才会发现已经超过30分钟没支付了2、很难去分布式并行扫描订单。因为假设订单数据量特别的多,然后要是打算用多台机器部署订单扫描服务,但是每台机器扫描哪些订单?怎么扫描?什么时候扫描?这都是一系列的麻烦问题。可以采用MQ里的延迟消息,意思就是订单系统在创建了一个订单之后,可以发送一条消息到MQ中,指定这条消息是延迟消息,比如要等待30分钟之后,才能被订单扫描服务给消费到当订单扫描服务在30分钟后消费到了一条消息,就可以针对这条消息的信息,去订单数据库里查询这个订单,看看过了30分钟了,此时订单是否还是未支付状态?如果订单还是未支付状态,那么进行关闭操作,否则订单如果已经支付了,什么都不用处理。这种方式就比用后台线程扫描订单的方式要好很多一个是对每个订单只会在创建30分钟后查询一次而已,不会反复扫描订单多次。另外就是如果订单数量很多,完全可以让订单扫描服务多部署几台机器,然后对于MQ中的Topic可以多指定一个MessageQueue,这样每个订单扫描服务的机器作为一个Consumer都会处理一部分订单的查询任务
2、half消息发送成功
为什么基于MQ来传输数据会出现消息乱序
7、根据事务的状态Commit或RollBack
4、Commit或者RollBack
发送延迟消息的核心,就是设置消息的 delayTimeLevel,也就是延迟级别RocketMQ默认支持一些延迟级别如下:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h所以左边代码中设置延迟级别为3,意思就是延迟10s,发送出去的消息,会过10s被消费者获取到。那么如果是订单延迟扫描场景,可以设置延迟级别为16,也就是对应上面的30分钟
解决生产者重复发送消息方案:1、业务判断法 当生产者重试发送消息时,先查询当前MQ里是否存在这条消息 缺点:RocketMQ虽然是支持查询某个消息是否存在,但是在这个环节直接从MQ查询消息是没这个必要的,他的性能也不是太好,会影响接口的性能2、基于Redis缓存的幂等性机制(状态判断法) Redis缓存来存储是否发送过消息的状态,如果成功发送了一条消息到MQ中,就在Redis缓存里写一条数据,标记这个消息已经发送过 缺点:生产者发送消息给了MQ,但是此时生产者突然崩溃,没来得及把消息发送的状态写入Redis,在极端情况下还是没法100%保证幂等性解决消费者重复消费方案:可基于 业务判断法 进行处理,消费数据之前判断数据库数据是否已经处理过举例:支付系统通过MQ通知是否支付成功,则判断订单状态是否为已支付,如果不是则进行消费 若是同一条消息重复消费,可封装生产者发送的消息,自定义唯一ID,消费者消费这条消息的时候对ID进行上锁(分布式锁)
发现:1、RocketMQ的消费者中会注册一个监听器 MessageListenerConcurrently ,当消费者获取到一批消息之后,就会回调这个监听器函数来处理这一批消息2、处理完毕之后,才会返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 作为消费成功的示意,告诉RocketMQ这批消息已经处理完毕了所以对于RocketMQ而言,只要消费者是在监听器的函数中先处理完一批消息,然后返回了消费成功的状态,接着才会去提交这批消息的offset到broker去。所以在这个情况下,如果消息都处理完毕了,然后再提交消息的offset给broker,接着消费者系统崩溃了,此时是不会丢失消息的。如果对一批消息都没提交offset给broker的话,broker不会认为已经处理完了这批消息,此时消费者的一台机器宕机了,MQ其实会感知到你消费者的一台机器作为一个Consumer挂了。接着MQ会把没处理完的那批消息交给消费者的其他机器去进行处理,所以在这种情况下,消息也绝对是不会丢失的
如果消费者系统的数据库宕机,如何用死信队列解决这种异常场景
3、执行本地事务
借鉴两阶段提交,流程如下:1、生产者发送事务消息的场景时,先发送prepare消息(half消息)给MQ2、prepare消息发送成功后,生产者执行数据库事务(本地事务)3、根据数据库事务执行的结果,再返回Commit或Rollback给MQ4、如果是Commit,MQ把消息下发给Consumer端,如果是Rollback,直接删掉prepare消息5、第3步的执行结果如果没响应或是超时的,启动定时任务回查(回调接口)事务状态(最多重试15次,超过了默认丢弃此消息),处理结果同第4步6、MQ消费的成功机制由MQ自己保证。
Kafka的消费者采用的消费的方式跟RocketMQ是有些不一样的,如果按照Kafka的消费模式,就是会产生数据丢失的风险。原因:Kafka消费者拿到一批消息,还没来得及处理,结果就提交offset到broker去了,然后消费者系统挂掉了,这批消息就再也没机会处理了,因为消费者重启之后已经不会再次获取提交过offset的消息了。
如果采用同步发送消息 + 反复多次重试的方案,会发现实际落地的时候是可以的,但是里面存在一些问题比如可能会让订单事务执行成功,结果消息没发送出去,或者是订单事务执行成功了,但是反复多次重试发送消息到MQ极为耗时,导致调用接口的用户频繁超时异常。所以真正要保证消息一定投递到MQ,同时保证业务系统之间的数据完全一致,业内最佳的方案还是用基于RocketMQ的事务消息机制。因为这个方案落地之后,可以保证订单系统的本地事务一旦成功,那么必然会投递消息到MQ去,通知红包系统去派发红包,保证业务系统的数据是一致的。而且整个流程中,没必要进行长时间的阻塞和重试。如果half消息发送失败了,就直接回滚整个流程如果half消息发送成功了,后续的rollback或者commit发送失败了,不需要自己去卡在那里反复重试,直接让代码结束即可,因为后续MQ会过来回调接口判断再次rollback or commit
生产者让消息进入一个MessageQueue
发送消息的时候,给消息设置tag、属性等多个附加的信息的
1、常见的就是部署更多的consumer机器但是要注意,Topic的MessageQueue要有对应的增加,因为如果consumer机器有5台,然后MessageQueue只有4个,那么意味着有一个consumer机器是获取不到消息的。2、增加consumer的线程数量,可以设置consumer端的参数:consumeThreadMin、consumeThreadMax,这样一台consumer机器上的消费线程越多,消费的速度就越快3、开启消费者的批量消费功能,就是设置font color=\"#f44336\
1、发送half消息
Broker消息零丢失方案:同步刷盘 + Raft协议主从同步
基于延迟消息机制优化大量订单的定时退款扫描问题
consumer是支持设置从哪里开始消费消息的,常见的有两种:1、从Topic的第一条数据开始消费 CONSUME_FROM_FIRST_OFFSET2、从最后一次消费过的消息之后开始消费 CONSUME_FROM_LAST_OFFSET一般来说,都会选择CONSUME_FROM_FIRST_OFFSET,这样刚开始就从Topic的第一条消息开始消费,但是以后每次重启,都是从上一次消费到的位置继续往后进行消费的
发送事务消息需要一个事务监听对象,它实现 TransactionListener 接口,其中有两个方法作用分别是执行本地事务和消息回查:
事务消息机制的底层实现原理
RocketMQ的顺序消息机制的代码实现
丢弃
Consumer消息零丢失方案:手动提交offset + 自动故障转移
订单扫描服务,正常会对每个订单创建的消息,在30分钟以后才获取到,然后去查询订单状态,判断如果是未支付的订单,就自动关闭这个订单
代码实现例子
Kafka消费者的数据丢失问题
同步刷盘与异步刷盘、同步复制、异步复制
同步发送,指定队列选择策略 https://www.jianshu.com/p/f1a1de6b8cee
创建一个 TransactionProducerb style=\
关键因素就是两个:第一个是发送消息的时候传入MessageQueueSelector,在里面根据订单id和MessageQueue数量去选择这个订单id的数据进入哪个MessageQueue第二个是在发送消息的时候除了带上消息以外,还要带上订单id,然后MessageQueueSelector就会根据订单id去选择一个MessageQueue发送过去,这样就可以保证一个订单的多个binlog都会进入一个MessageQueue中
RollBack 删除消息不投递
消息乱序
在消费数据的时候根据tag和属性进行过滤
参考 RocketMQ-1 图文讲解 的 同步刷盘与异步刷盘、同步复制、异步复制
消费者如何保证按照顺序来获取一个MessageQueue中的消息
在Consumer处理消息的时候,可能会因为底层存储挂了导致消息处理失败,此时可以返回RECONSUME_LATER状态,然后broker会过一会儿自动重试但是这个方案用在有序消息中是不可以的因为如果consumer获取到消息A(必须先执行的消息),结果处理失败,此时返回RECONSUME_LATER,那么这条消息会进入重试队列,过一会儿才会重试但是此时broker会直接把下一条消息(消息B),交给消费者来处理,此时万一执行成功了,就会出现消息乱序的问题所以对于有序消息的方案中,如果遇到消息处理失败的场景,就必须返回 SUSPEND_CURRENT_QUEUE_A_MOMENT 状态,意思是先等一会儿再继续处理这批消息,而不能把这批消息放入重试队列中,然后直接处理下一批消息。
Producer
要不要消费历史消息
Consumer
在消费的时候根据tag和属性进行过滤,比如可以通过下面的代码去指定,只要tag=TableA和tag=TableB的数据
如果对消息的处理有异常,可以返回RECONSUME_LATER状态意思是,消费者现在没法完成这批消息的处理,稍后过段时间再次发送这批消息进行重试消费,代码如下:
half 消息是如何对消费者不可见的?1、生产者发送了一条half状态的消息到一个Topic中,消费者也是订阅了这个Topic,从里面获取消息2、从之前的讲解中了解到,其实写入一个Topic,最终是定位到这个Topic的某个MessageQueue,然后定位到一台Broker机器上去,然后写入的是Broker上的CommitLog文件,同时将消费索引写入MessageQueue对应的ConsumeQueue文件中3、实际上消费者看不到这条消息,其本质原因就是RocketMQ一旦发现发送的是一条half消息,他不会把这条half消息的offset写入Topic的ConsumeQueue中。MQ会把这条half消息写入到自己内部的 “RMQ_SYS_TRANS_HALF_TOPIC” 这个Topic对应的一个ConsumeQueue中在什么情况下生产者会收到half消息成功的响应?必须要half消息进入到RocketMQ内部的 RMQ_SYS_TRANS_HALF_TOPIC 的ConsumeQueue文件了,此时就会认为half消息写入成功了,然后就会返回响应给生产者。假如因为各种问题,没有执行rollback或者commit会怎么样?后台有定时任务会去扫描RMQ_SYS_TRANS_HALF_TOPIC中的half消息,如果超过一定时间还是half消息,会回调生产者的接口,判断这个half消息是要rollback还是commit如果执行rollback操作的话,如何标记消息回滚?RocketMQ内部有一个OP_TOPIC,此时可以写一条rollback OP(操作)记录到这个Topic里,标记某个half消息是rollback如果执行commit操作,如何让消息对红包系统可见?1、执行commit操作之后,RocketMQ就会在OP_TOPIC里写入一条记录,标记half消息已经是commit状态了2、接着需要把放在RMQ_SYS_TRANS_HALF_TOPIC中的half消息给写入到Topic的ConsumeQueue里去,然后消费者就可以看到这条消息进行消费了
灵活的运用 tags来过滤数据
延迟消息的代码实现
重复消费
为什么解决发送消息零丢失方案,一定要使用事务消息方案
让消费者仅仅关注他想要的数据
因为给每个Topic指定多个MessageQueue,然后写入消息的时候,其实是会把消息均匀分发给不同的MessageQueue,部署多台机器组成一个Consumer Group,对于Consumer Group中的每台机器都会负责消费一部分MessageQueue的消息,处理和消费的时间不同,没有顺序。
或者也可以通过下面的语法去指定,要根据每条消息的属性的值进行过滤,此时可以支持一些语法,比如:
基于消息key来定位消息是否丢失
消费者的数据库宕机了,就必然会导致从MQ里获取到消息之后是没办法进行处理的
合理的规划Topic和里面的tags,一个Topic代表了一类业务消息数据,然后对于这类业务消息数据,如果希望继续划分一些类别,可以在发送消息的时候设置tags。举个例子,比如现在一个系统要发送外卖订单数据到MQ里去,就可以针对性的设置tags,不同的外卖数据都到一个“WaimaiOrderTopic”中。但是不同类型的外卖可以有不同的tags:“meituan_waimai”,“eleme_waimai”,“other_waimai”,等等。然后对消费“WaimaiOrderTopic”的系统,可以根据tags来筛选。
提高消费者的吞吐量
RocketMQBroker
在发送消息的时候,给消息设置tag和属性
引入rocketMq相关的依赖:
MQ消息幂等性的方案
可能产生的原因:1、出现了接口超时等问题,可能会导致生产者进行重试操作,对一个消息重复发送两条到MQ中 重试是把双刃剑:生产者重复发送消息 假设生产者发送了一条消息到MQ,其实MQ是已经接收到这条消息了,结果MQ返回响应的时候,网络有问题超时了,生产者没能及时收到MQ返回的响应,这时代码里可能会发现网络超时的异常,然后生产者会进行重试再次发送这条消息到MQ中,最后MQ必然会收到一条一模一样的消息,进而导致消息重复发送。2、消费者重复消费 假设消费者拿到了一条消息,然后都已经进行处理了,这时应该返回CONSUME_SUCCESS的状态,然后提交消费进度offset到broker的。 但是消费者刚处理完这条消息,还没来得及提交消息offset到broker,消费者就进行了一次重启 这时因为没提交这条消息的offset给broker,broker并不知道消费者已经处理完了这条消息,然后消费者重启之后,broker就会再次把这条消息交给消费者再一次进行处理,导致重复消费。
RocketMQ消费者不同之处
比如通过下面的方式设置一个消息的key为订单id:message.setKeys(orderId),这样这个消息就具备一个key了。接着这个消息到broker上,会基于key构建hash索引,这个hash索引就存放在IndexFile索引文件里。
方案
先回顾RocketMQ消费者的代码
RocketMQ是如何让消费者进行消费重试的RocketMQ会有一个针对ConsumerGroup的重试队列如果返回了RECONSUME_LATER状态,MQ会把这批消息放到这个消费组的重试队列中比如消费组的名称是“TestConsumerGroup”,那么会有一个“%RETRY%TestConsumerGroup”这个名字的重试队列默认最多是重试16次,每次重试之间的间隔时间是不一样的,这个间隔时间可以如下进行配置:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h意思是:第一次重试是1秒后,第二次重试是5秒后,第三次重试是10秒后,第四次重试是30秒后,第五次重试是1分钟后,以此类推,最多重试16次!如果连续重试16次还是无法处理消息,然后怎么办?一批消息重试了16次还是无法成功处理,这批消息会自动进入 死信队列 (死掉的消息放到这个队列里,不再进行重试)死信队列的名字是“%DLQ%TestConsumerGroup”,可以在RocketMQ的管理后台上看到对死信队列中的消息怎么处理需要看用户的使用场景,比如可以专门开一个后台线程,订阅“%DLQ%TestConsumerGroup”这个死信队列,对死信队列中的消息,还是一直不停的重试
注意:使用的是MessageListenerOrderly,里面有Orderly这个名称,平常使用的是MessageListenerConcurrently也就是Consumer会对每一个ConsumeQueue,都仅仅用一个线程来处理其中的消息。
RocketMQ还是支持比较丰富的数据过滤语法的,如下所示:(1)数值比较,比如:>,>=,<,<=,BETWEEN,=;(2)字符比较,比如:=,<>,IN;(3)IS NULL 或者 IS NOT NULL;(4)逻辑符号 AND,OR,NOT;(5)数值,比如:123,3.1415;(6)字符,比如:'abc',必须用单引号包裹起来;(7)NULL,特殊的常量(8)布尔值,TRUE 或 FALSE
在RocketMQ的生产实践中积累的经验总结
不能在代码中对消息进行异步的处理,如下错误的示范,开启了一个子线程去处理这批消息,然后启动线程之后,就直接返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态
RocketMQ事务消息的实现流程
Commit 消费消息
需要警惕的地方:不能异步消费消息
消息零丢失方案
RocketMQ事务消息(发送消息零丢失方案)
如果一条数据有多次操作,要走MQ,并且有顺序,那就让其MQ消息进入一个MessageQueue可以对这条数据的某个唯一字段(比如订单数据的订单ID)对MessageQueue的数量进行取模,然后进入指定MessageQueue
5、未收到步骤4的消息,回查事务状态
RocketMQ的数据过滤机制
如果消息处理失败,走重试队列怎么办?
0 条评论
下一页
为你推荐
查看更多