RocketMQ-3
2022-06-18 16:48:34 6 举报
RocketMQ
作者其他创作
大纲/内容
4、在订阅消息的时候,对于Consumer也是同理的,在构造函数的第二个参数设置为true,就是开启了消费时候的轨迹追踪消费到Consumer端之后,会上报一些轨迹数据到内置的RMQ_SYS_TRACE_TOPIC中,包括:Consumer的信息、投递消息的时间、这是第几轮投递消息、消息消费是否成功、消费这条消息的耗时等5、接着如果想要查询消息轨迹,在RocketMQ控制台里,在导航栏里就有一个消息轨迹,在里面可以创建查询任务,可以根据messageId、message key或者Topic来查询,查询任务执行完毕之后,就可以看到消息轨迹的界面了。在消息轨迹的界面里就会展示出来刚才上面说的Producer、Broker、Consumer上报的一些轨迹数据了。
如何对线上生产环境的RocketMQ集群进行消息轨迹的追踪
1、如果这些消息是允许丢失的,那么此时就可以紧急修改消费者系统的代码,在代码里对所有的消息都获取到就直接丢弃,不做任何的处理,这样可以迅速的让积压在MQ里的百万消息被处理掉,只不过处理方式就是全部丢弃而已2、但对很多系统而言,不能简单粗暴的丢弃这些消息,所以最常见的办法,还是先等待消费者系统先恢复了,之后可以根据线上Topic的MessageQueue的数量来看看如何后续处理 1)假如Topic有20个MessageQueue,但只有4个消费者系统在消费,那么每个消费者系统会从5个MessageQueue里获取消息,所以此时如果仅仅依靠4个消费者系统是肯定不够的,毕竟MQ里积压了百万消息了。 所以此时可以临时申请16台机器多部署16个消费者系统的实例,然后20个消费者系统同时消费,每个消费者消费一个MessageQueue的消息,此时会发现消费的速度提高了5倍,积压的百万消息很快都会被处理完毕 2)如果Topic总共就只有4个MessageQueue,然后就只有4个消费者系统,这时就没办法扩容消费者系统了,因为加再多的消费者系统,还是只有4个MessageQueue,没法并行消费。 所以此时往往是临时修改那4个消费者系统的代码,让他们获取到消息不处理业务,而是直接把消息写入一个新的Topic,这个速度是很快的,因为仅仅是读写MQ而已。 然后新的Topic有20个MessageQueue,然后再部署20台临时增加的消费者系统,去消费新的Topic后处理数据,这样也可以迅速的增加消费者系统的并行处理能力,使用一个新的Topic来允许更多的消费者系统并行处理
如何针对RocketMQ集群崩溃设计高可用方案
假设公司本来线上的MQ用的主要是Kafka,现在要从Kafka迁移到RocketMQ去,那么这个迁移的过程应该怎么做呢?应该采用什么样的技术方案来做迁移呢?MQ集群迁移过程中的双写+双读技术方案1、首先要做到双写,也就是在所有的Producer系统中,要引入一个双写的代码,让他同时往Kafka和RocketMQ中去写入消息,起码双写要持续个1周左右,因为MQ一般都是实时数据,里面数据也就最多保留一周2、当双写持续一周过后,会发现Kafka和RocketMQ里的数据看起来是几乎一模一样了,因为MQ反正也就保留最近几天的数据3、但是只有双写还是不够的,还需要同时进行双读,也就是在双写的同时,所有的Consumer系统都需要同时从Kafka和RocketMQ里获取消息,分别都用一模一样的逻辑处理一遍4、只不过从Kafka里获取到的消息还是走核心逻辑处理,然后可以落入数据库或者是别的存储,但是对于RocketMQ里获取到的消息,你以用一样的逻辑处理,但是不能把处理结果具体的落入数据库之类的地方5、Consumer系统在同时从Kafka和RocketMQ进行消息读取的时候,需要统计每个MQ当日读取和处理的消息的数量,这点非常的重要,同时对于RocketMQ读取到的消息处理之后的结果,可以写入一个临时的存储中6、同时要观察一段时间,当发现持续双写和双读一段时间之后,如果所有的Consumer系统通过对比发现,从Kafka和RocketMQ读取和处理的消息数量一致,同时处理之后得到的结果也都是一致的,此时就可以判断说当前Kafka和RocketMQ里的消息是一致的,而且计算出来的结果也都是一致的。7、这个时候就可以实施正式的切换了,可以停机Producer系统,再重新修改后上线,全部修改为仅仅写RocketMQ,这个时候数据不会丢,因为之前已经双写了一段时间了,然后所有的Consumer系统可以全部下线后修改代码再上线,全部基于RocketMQ来获取消息,计算和处理,结果写入存储中
由于消费系统故障导致的RocketMQ百万消息积压问题,应该如何处理
类似金融级的系统的数据很重要,但是MQ集群崩溃,针对这种场景,通常都会在发送消息到MQ的系统中设计高可用的降级方案,这个降级方案通常的思路是:需要在发送消息到MQ代码里去try catch捕获异常,如果发现发送消息到MQ有异常,此时需要进行重试如果发现连续重试 比如超过3次还是失败,说明此时可能就是MQ集群彻底崩溃了,此时必须把这条重要的消息写入到本地存储中去,可以是写入数据库,还是机器的本地磁盘文件,或者是NoSQL存储中,具体要根据具体情况来决定之后要不停的尝试发送消息到MQ去,一旦发现MQ集群恢复了,必须有一个后台线程可以把之前持久化存储的消息都查询出来,然后依次按照顺序发送到MQ集群里去,这样才能保证消息不会因为MQ彻底崩溃会丢失很关键的注意点:就是把消息写入存储中暂存时,一定要保证其顺序性,比如按照顺序一条一条的写入本地磁盘文件去暂存消息。而且一旦MQ集群故障了,后续的所有写消息的代码必须严格的按照顺序把消息写入到本地磁盘文件中暂存,这个顺序性是要严格保证的。
RocketMQ支持的消息轨迹功能,配置过程:1、在broker的配置文件里开启 traceTopicEnable=true 这个选项,此时就会开启消息轨迹追踪的功能消息到Broker端之后,Broker端会记录消息的轨迹数据,包括:消息存储的Topic、消息存储的位置、消息的key、消息的tags等2、当开启了上述的选项之后,启动Broker的时候会自动创建出来一个内部的Topic(RMQ_SYS_TRACE_TOPIC),这个Topic就是用来存储所有的消息轨迹追踪的数据的3、创建Producer的时候要用如下的方式,下面构造函数中的第二个参数,就是enableMsgTrace参数,设置为true,就是可以对消息开启轨迹追踪发送消息时上报Producer的信息、发送消息的时间、消息是否发送成功、发送消息的耗时等信息
设计一套Kafka到RocketMQ的双写+双读技术方案,实现无缝迁移
0 条评论
下一页