rocketMq
2023-01-12 15:25:59 0 举报
AI智能生成
RocketMQ是一款开源的分布式消息中间件,具有高性能、高可靠、高实时性等特点。它采用了发布-订阅模式,支持多种消息类型,包括普通消息、顺序消息、事务消息等。RocketMQ提供了丰富的API和工具,方便用户进行消息的生产和消费。同时,它还具有良好的扩展性和可定制性,可以满足不同场景下的消息传递需求。RocketMQ广泛应用于金融、电商、物流等领域,为企业提供了高效、稳定的信息交流渠道。
作者其他创作
大纲/内容
可以保证数据不丢失。
也能保证高可用性,即集群部署的时候部分机器宕机可以继续运行。
支持部分高级功能,比如死信队列,消息重试之类的。
官方文档很详细
优点
吞吐量比较低,一般每秒几万级别,所以遇到特别高并发的情况,支撑起来很困难。
集群扩展很麻烦。
还有一个致命缺陷是开发语言是erlang,国内少有精通erlang语言的工程师,因此也没法阅读源码,甚至修改源码。
缺点
一些不需要很高吞吐量,也不需要部署特别大规模集群,无需阅读和修改rabbitmq源码的中小型公司
适用场景
普通集群/镜像集群
高可用架构
RabbitMQ
吞吐量高,常规机器配置,一台机器可达每秒十几万的QPS,相当强悍。
性能高,基本上发送给kafka都是毫秒级的性能。可用性很高。
可支持集群部署,部分机器宕机可以继续运行。
kafka收到消息后会写入磁盘缓冲区里,没有直接落到物理磁盘上去,所以机器本身故障了,可能导致磁盘缓冲区里的数据丢失。
数据丢失问题
功能单一,主要支持发送消息,消费消息。其他场景比较受限制。
各大公司一般把kafka用在用户行为日志的采集和传输上,比如大数据团队要收集APP上用户的一些日志,这种日志就是用kafka来收集和传输的,因为这种日志适当丢失数据也没有关系,而且一般量特别大,要求吞吐量高,一般就是收发消息,不需要太多高级功能。所以比较适合这种场景。
顺序读写
索引
批量读写和文件压缩(清除已经更新消息的历史版本)
用户空间和内核空间
传统I/O过程
拓展知识点
零拷贝
kafka快的原因
topic存储结构
Kafka
RocketMQ是阿里的开源消息中间件,久进沙场,非常可靠,几乎同时解决了kafka和Rabbitmq的缺陷。
吞吐量很高,单机可达10万QPS以上
可以保证高可用性,性能很高,而且支持通过配置保证数据不丢失,可以部署大规模的集群。
可支持各种高级功能,比如延迟消息,事务消息,消息回溯,死信队列,消息积压等。
基于java开发,符合国内大多数公司技术栈,很容易阅读他的源码,甚至修改他的源码。
官方文档相对简单。
国内很多一线互联网大厂都切换使用RocketMQ了,他们需要rocketMQ的高吞吐量,大规模部署能力,还有各种高阶功能去支撑自己的各种业务场景,同时还可以根据自己的需求定制修改RocketMQ的源码。
运用场景
RocketMQ
总结
技术选型
官方文档
RocketMq是一个由阿里巴巴开源的消息中间件,2012年开源,2017年成为apache顶级项目。每秒处理几十万请求,它的核心设计借鉴了Kafka
RocketMQ主要由Producer、Broker、Consumer三部分组成,其中Producer负责生产消息,Consumer负责消费消息,Broker负责存储消息。
消息模型(MessageModel)
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
font color=\"#F44336\
消息生产者(Producer)
消费者有两种消费方式:集群消费(消息轮询)、广播消费(全部收到相同副本)负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
相同ConsumerGroup的每个Consumer实例平均分摊消息。设置消费者模型 consumer.setMessageModel(**)
集群消费
全部收到相同副本相同ConsumerGroup的每个Consumer实例都接收全量的消息
广播消费
类型
消息消费者(Consumer)
Topic是一个逻辑概念,消息不是按Topic划分存储的。如果Topic不存在,会自动创建表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
子主题
主题(Topic)
消息标签,用于区分同一个主题下同一个类型的消息
tag
对于每个topic都可以设置一定数量的消息队列用于数据的读取kafka:partition
MessageQueue
负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。
消息中转角色,负责存储消息、转发消息。
broker可集群部署【为了提升Broker的可用性(防止单点故障),以及提升服务器的性能(实现负载)】,每个Broker可以有自己的副本(slave)【提升可靠性(防止数据丢失)】
Broker每隔30秒会向所有的NameServer发送心跳信息
主从注册,定时探活:
作用跟partition类似
写队列的数量决定了有几个MessageQueue,读队列的数量决定了有几个线程来消费这些MessageQueue(只是用来负载的)。服务端创建一个Topic默认8个队列(BrokerConfig):
writeQueueNums:写队列数量readQueueNums:读队列数量
子模块
代理服务器(BrokerServer)
NameServer作为一个名称服务,需要提供服务注册、服务剔除、服务发现这些基本功能,但是NameServer节点之间并不通信,无法进行数据复制。RocketMQ采取的策略是,在Broker节点在启动的时候,轮训NameServer列表,与每个NameServer节点建立长连接,发起注册请求。NameServer内部会维护一个Broker表,用来动态存储Broker的信息。
名字服务(NameServer)
RocketMq架构组成
Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer消费消息失败通常可以认为有以下几种情况:由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10秒后再重试。由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。
消息重试
NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。
集群工作流程
Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。
启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
部署
当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker允许等待30s的时间,只要这段时间内有新消息到达,将直接返回给消费端。这里,RocketMQ的具体做法是,使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。
页缓存(PageCache)是OS对文件的缓存,用于加速对文件的读写。
页缓存与内存映射
只有在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。
同步刷盘
能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。
异步刷盘
图示
消息刷盘
同步消息重试机制:发送同步消息,如果发生异常,则重试,如果重试超过一定次数仍然失败,则可以先入库,后续再补偿。
方案一(同步发送消息+反复多次重试)
事务消息机制:使用事务消息能保证生产者一定会将消息写入MQ,但是会降低吞吐量。
方案二(事务消息机制),两者都有保证消息发送零丢失的效果,但是经过分析,事务消息方案整体会更好一些
发送消息零丢失(生产者->cache)
默认刷盘方式为异步,发送消息时,broker写入到cache后就返回成功了,而producer只要获取到ACK就说明消息发送成功了,然后再异步由cache到broker持久化
刷盘:异步改同步
主从复制(同步):异步改同步 通过主从架构模式避免磁盘故障导致的消息丢失
开启同步刷盘策略+主从架构同步机制,,只要让一个Broker收到消息之后同步写入磁盘,同时复制给其他Broker,然后再返回响应给生产者说写入成功,此时就可以保证MQ自己不会再弄丢消息。
broker零丢失buffer->broker broker的master->slave
手动提交offset:只有正确处理了消息之后,才手动提交offset,通知Broker可以将消息删除。
消费消息零丢失broker->消费者
不能有任何差错的核心链路采用消息零丢失方案,非核心链路采用同步发送消息+反复重试几次的方案。
系统复杂度变高
缺点:
消息零丢失方案
重复消费
支付成功后->扣库存->物流下单 ->消息发送
场景
需要由三个阶段去保障PBC消息顺序发送、顺序存储、顺序消费消息发送保持顺序、消息存储保持和发送一致、消息被消费保持和存储顺序一致
如何保证消息的顺序性?
一个Topic内所有的消息按照先进先出的顺序进行发布和消费
是什么?
为什么要设置读写队列数量为1呢?假设读写队列有多个,消息就会存储在多个队列中,消费者负载时可能会分配到多个消费队列同时进行消费,多队列并发消费时,无法保证消息消费顺序性
设置topic下读写queue队列数量为1
实现
全局有序(性能差)
要保证在一个线程内顺序发送,在上一个消息发送成功后,在进行下一个消息的发送。对应到mq中,消息发送方法就得使用同步发送,异步发送无法保证顺序性
(1)首先是生产者发送消息,是单线程同步发送的。
根据消息的shardingkey路由到不同的queue
(2)其次是消息要路由到相同的queue。
同一个queue就只能被一个消费者所消费,因此对broker中消费队列加锁是无法避免的。同一时刻,一个消费队列只能被一个消费者消费,消费者内部,也只能有一个消费线程来消费该队列。即,同一时刻,一个消费队列只能被一个消费者中的一个线程消费
(3)最后在消费者,需要保证一个队列只有一个线程消费。
局部有序
实现方式
发送顺序消息无法利用集群的Failover特性,因为不能更换MessageQueue进行重试 因为发送的路由策略导致的热点问题,可能某一些MessageQueue的数据量特别大 消费的并行读依赖于分区数量 消费失败时无法跳过
RocketMQ的顺序消息怎么实现呢?
开源版本的RocketMQ中,对延迟消息并不支持任意时间的延迟设定(商业版本中支持),而是只支持18个固定的延迟级别,1到18分别对应:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。
将延迟消息通过一个临时存储进行暂存,到期后才投递到目标Topic中。
Message message = new Message(\"TopicTest\
延迟消息队列怎么实现?
半消息是一种特殊的消息类型,该状态的消息暂时不能被Consumer消费。当一条事务消息被成功投递到Broker上,但是Broker并没有接收到Producer发出的二次确认时,该事务消息就处于\"暂时不可被消费\"状态,该状态的事务消息被称为半消息。
Half(Prepare) Message——半消息(预处理消息)
由于网络抖动、Producer重启等原因,可能导致Producer向Broker发送的二次确认消息没有成功送达。如果Broker检测到某条事务消息长时间处于半消息状态,则会主动向Producer端发起回查操作,查询该事务消息在Producer端的事务状态(Commit 或 Rollback)。可以看出,Message Status Check主要用来解决分布式事务中的超时问题。
Message Status Check——消息状态回查
Step1:Producer向Broker端发送Half Message;【半消息】
Step2:Broker 发送ACK确认,Half Message发送成功;
Step3:Producer执行本地事务;
Step4:本地事务完毕,根据事务的状态,Producer向Broker发送二次确认消息,确认该Half Message的Commit或者Rollback状态。
Step5:Broker收到二次确认消息后, 1、对于Commit状态,将半事务消息标记为可投递,直接发送到Consumer端执行消费逻辑, 2、而对于Rollback则直接标记为失败,则删除半事务消息,订阅方将不会接受该消息。 3、如果超时未收到broker的本地事务提交通知消息(即一段时间后Broker仍没有收到Producer的二次确认消息),Broker主动向Producer发起消息回查;Step6:Producer处理回查消息,返回对应的本地事务的执行结果;Step7:Broker针对回查消息的结果,执行Commit或Rollback操作,同Step4。
实现原理
发送半消息
执行本地事物
二次确认消息
支付成功后,需要保存订单状态,物流下单
应用
事务消息
架构设计决定只需要一个轻量级的原数据的服务器,只需要保持最终一致,而不需要像ZK一样强一致性。
为什么不用ZK,而重复造轮子,用自己设计的NameServer?
二阶段提交消息回查:消息队列RocketMQ版服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。
事物消息
获取到nameServer之后,客户端会有一个缓存,不会影响正常的连接
所有的nameServer全部挂掉了怎么办?
因此,保持订阅关系一致意味着同一个消费者GroupID下所有的实例需在以下两方面均保持一致:订阅的Topic必须一致订阅的Topic中的Tag必须一致(包括Tag的数量和Tag的顺序)
消费者组订阅问题?
后果:消息延迟解决:消费耗时eg:读写外部数据库、缓存、下游系统调用慢消费并发度eg:客户端消费并发度由单节点线程数和节点数量共同决定,一般情况下需要优先调整单节点的线程数,若单机硬件资源达到上限,则必须通过扩容节点来提高消费并发度。
消息堆积会带来什么后果?如何解决?
如何解决消息堆积和延迟问题?
nameserver集群NameServer作为一个名称服务,需要提供服务注册、服务剔除、服务发现这些基本功能,但是NameServer节点之间并不通信,无法进行数据复制。RocketMQ采取的策略是,在Broker节点在启动的时候,轮训NameServer列表,与每个NameServer节点建立长连接,发起注册请求。NameServer内部会维护一个Broker表,用来动态存储Broker的信息。Broker集群一个Broker集群由多组Master/Slave组成,Master可写可读,Slave只可以读,Master将写入的数据同步给Slave。每个Broker节点,在启动时,都会遍历NameServer列表,与每个NameServer建立长连接,注册自己的信息,之后定时上报。Producer集群:消息的生产者,通过NameServer集群获得Topic的路由信息,包括Topic下面有哪些Queue,这些Queue分布在哪些Broker上等。Producer只会将消息发送到Master节点上,因此只需要与Master节点建立连接。Consumer集群:消息的消费者,通过NameServer集群获得Topic的路由信息,连接到对应的Broker上消费消息。注意,由于Master和Slave都可以读取消息,因此Consumer会与Master和Slave都建立连接。
1、服务注册2、服务发现3发送消息4消费消息
namerserver通信?
在RocketMQ的早期版本也是依赖Zookeeper的rocketmq只需要保证最终一致,不需要Zookeeper这样的强一致性解决方案,不需要再依赖另一个中间件,从而减少整体维护成本
为什么用用nemeserver?
路由注册Broker节点在启动的时候,轮训NameServer列表,与每个NameServer节点建立长连接,发起注册请求。NameServer内部会维护一个Broker表,用来动态存储Broker的信息。之后每隔30s发送心跳包,心跳包中包含BrokerId、Broker地址、Broker名称、Broker所属集群名称等等,NameServer接收到心跳包后,会更新时间戳,记录这个Broker的最新存活时间。并发问题:NameServer在处理心跳包的时候,存在多个Broker同时操作一张Broker表,为了防止并发修改Broker表导致不安全,路由注册操作引入了ReadWriteLock读写锁,这个设计亮点允许多个消息生产者并发读,保证了消息发送时的高并发,但是同一时刻NameServer只能处理一个Broker心跳包,多个心跳包串行处理。这也是读写锁的经典使用场景,即读多写少。路由剔除正常情况下,如果Broker关闭,则会与NameServer断开长连接,Netty的通道关闭监听器会监听到连接断开事件,然后会将这个Broker信息剔除掉。异常情况下,NameServer中有一个定时任务,每隔10秒扫描一下Broker表,如果某个Broker的心跳包最新时间戳距离当前时间超多120秒,也会判定Broker失效并将其移除。路由发现对于生产者,可以发送消息到多个Topic,因此一般是在发送第一条消息时,才会根据Topic获取从NameServer获取路由信息。对于消费者,订阅的Topic一般是固定的,所在在启动时就会拉取。
NameServer如何保证数据的最终一致?
master可读可写,slave只可以读,master通过主从复制的方式将数据同步给slave,支持同步复制和异步复制两种复制方式,目前master宕机后,slave不能自动切换为master。
Brocker
producer发送时会同步等待broker返回一个发送状态。如果失败会重试。吞吐量最低,但安全可靠性同步地发送方式使用比较广泛,比如:重要的消息通知,短信通知。
//5.发送消息 SendResult result = producer.send(msg); //发送状态 SendStatus staus = result.getSendStatus();
代码实现
同步消息
producer在发送后去做自己的事情,异步接受broker的回调结果异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
异步消息
关键点就是使用producer.sendOneWay方式来发送消息,这个方法没有返回值,也没有回调。就是只管把消息发出去就行了。吞吐量非常高,但容易丢消息主要用在不特别关心发送结果的场景,比如,日志发送
单向发送消息
发送的特点分
顺序消息
延迟
事务
性能好,在消息量特别大的时候,性能明显
缺点:没有顺序,无法实现顺序消息
同一批次的消息应该具有相同的主题、相同的消息配置
不支持延迟消息
同一个批次的大小最好不要超过1M
限制
批量
按照使用功能特点分
消息发送类型
消费者主动去通过offset偏移量去拉取Broker上拉取消息拉消息要根据主题和tag过滤
自己管理offset:由于每次拉取消息个数有限,需要多次拉取。offset偏移量,记录着上次拉取消息的位置。目的是防止消息被重复消费。使用时可以把offset存在redis中,这样就可以灵活控制消息的消费位置。
// 不指定offset,直接拉取 DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(\"lite_pull_consumer_test\"); litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); litePullConsumer.subscribe(\"TopicTest\
不需管理offset的方式,默认一次拉取32条
两种方式
消息延迟:间隔时间长空轮训:间隔时间短,且没消息
拉模式
RocketMQ的push模式是对拉模式进行的一层包装通过 Pull 不断不断不断轮询 Broker 获取消息。当不存在新消息时,Broker 会挂起请求,直到有新消息产生,取消挂起,返回新消息。这样,基本和 Broker 主动 Push 做到接近的实时性(当然,还是有相应的实时性损失)。原理类似 长轮询( Long-Polling )
实现形式是:消费端注册一个监听器MessageListenerConcurrently,监听着broker上的消息,如果broker上有新消息,则触发监听器MessageListenerConcurrently。broker会自动往监听器中的ConsumeMessage()方法中推送消息
可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久。概括起来地说就是“慢消费问题”),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常;
实时性高
Broker收到消息后,主动推送到消费者上
推模式
消息接收模式
常见问题
rocketMq
MQ
0 条评论
回复 删除
下一页