rocketMq
2023-01-12 15:25:59 0 举报
AI智能生成
RocketMQ是一款开源的分布式消息中间件,具有高性能、高可靠、高实时性等特点。它采用了发布-订阅模式,支持多种消息类型,包括普通消息、顺序消息、事务消息等。RocketMQ提供了丰富的API和工具,方便用户进行消息的生产和消费。同时,它还具有良好的扩展性和可定制性,可以满足不同场景下的消息传递需求。RocketMQ广泛应用于金融、电商、物流等领域,为企业提供了高效、稳定的信息交流渠道。
作者其他创作
大纲/内容
技术选型
RabbitMQ
优点
可以保证数据不丢失。
也能保证高可用性,即集群部署的时候部分机器宕机可以继续运行。
支持部分高级功能,比如死信队列,消息重试之类的。
官方文档很详细
缺点
吞吐量比较低,一般每秒几万级别,所以遇到特别高并发的情况,支撑起来很困难。
集群扩展很麻烦。
还有一个致命缺陷是开发语言是erlang,国内少有精通erlang语言的工程师,因此也没法阅读源码,甚至修改源码。
适用场景
一些不需要很高吞吐量,也不需要部署特别大规模集群,无需阅读和修改rabbitmq源码的中小型公司
高可用架构
普通集群/镜像集群
Kafka
优点
吞吐量高,常规机器配置,一台机器可达每秒十几万的QPS,相当强悍。
性能高,基本上发送给kafka都是毫秒级的性能。可用性很高。
可支持集群部署,部分机器宕机可以继续运行。
官方文档很详细
缺点
数据丢失问题
kafka收到消息后会写入磁盘缓冲区里,没有直接落到物理磁盘上去,所以机器本身故障了,可能导致磁盘缓冲区里的数据丢失。
功能单一,主要支持发送消息,消费消息。其他场景比较受限制。
适用场景
各大公司一般把kafka用在用户行为日志的采集和传输上,比如大数据团队要收集APP上用户的一些日志,这种日志就是用kafka来收集和传输的,因为这种日志适当丢失数据也没有关系,而且一般量特别大,要求吞吐量高,一般就是收发消息,不需要太多高级功能。所以比较适合这种场景。
kafka快的原因
顺序读写
索引
批量读写和文件压缩(清除已经更新消息的历史版本)
零拷贝
拓展知识点
用户空间和内核空间
传统I/O过程
topic存储结构
RocketMQ
优点
RocketMQ是阿里的开源消息中间件,久进沙场,非常可靠,几乎同时解决了kafka和Rabbitmq的缺陷。
吞吐量很高,单机可达10万QPS以上
可以保证高可用性,性能很高,而且支持通过配置保证数据不丢失,可以部署大规模的集群。
可支持各种高级功能,比如延迟消息,事务消息,消息回溯,死信队列,消息积压等。
基于java开发,符合国内大多数公司技术栈,很容易阅读他的源码,甚至修改他的源码。
缺点
官方文档相对简单。
运用场景
国内很多一线互联网大厂都切换使用RocketMQ了,他们需要rocketMQ的高吞吐量,大规模部署能力,还有各种高阶功能去支撑自己的各种业务场景,同时还可以根据自己的需求定制修改RocketMQ的源码。
总结
rocketMq
官方文档
RocketMq是一个由阿里巴巴开源的消息中间件,2012年开源,2017年成为apache顶级项目。
每秒处理几十万请求,它的核心设计借鉴了Kafka
每秒处理几十万请求,它的核心设计借鉴了Kafka
RocketMq架构组成
消息模型(MessageModel)
RocketMQ主要由Producer、Broker、Consumer三部分组成,其中Producer负责生产消息,Consumer负责消费消息,Broker负责存储消息。
消息生产者(Producer)
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
与nameserver建立长连接,定时从namesercer获取topic路由信息.-->向指定broke建立TCP长连接,发送消息到broker
Producer写数据只能操作master节点。
Producer写数据只能操作master节点。
消息消费者(Consumer)
消费者有两种消费方式:集群消费(消息轮询)、广播消费(全部收到相同副本)
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
与nameserver建立长连接,获取topic路由信息.-->向指定broke建立TCP长连接,从指定的broker拉取消息
由于Master和Slave都可以读取消息,因此Consumer会与Master和Slave都建立连接。
由于Master和Slave都可以读取消息,因此Consumer会与Master和Slave都建立连接。
类型
集群消费
相同ConsumerGroup的每个Consumer实例平均分摊消息。
设置消费者模型 consumer.setMessageModel(**)
设置消费者模型 consumer.setMessageModel(**)
广播消费
全部收到相同副本
相同ConsumerGroup的每个Consumer实例都接收全量的消息
相同ConsumerGroup的每个Consumer实例都接收全量的消息
主题(Topic)
Topic是一个逻辑概念,消息不是按Topic划分存储的。如果Topic不存在,会自动创建
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
子主题
tag
消息标签,用于区分同一个主题下同一个类型的消息
MessageQueue
对于每个topic都可以设置一定数量的消息队列用于数据的读取
kafka:partition
kafka:partition
代理服务器(BrokerServer)
消息中转角色,负责存储消息、转发消息。
负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。
broker可集群部署【为了提升Broker的可用性(防止单点故障),以及提升服务器的性能(实现负载)】,每个Broker可以有自己的副本(slave)【提升可靠性(防止数据丢失)】
Broker每隔30秒会向所有的NameServer发送心跳信息
主从注册,定时探活:
broker与每个NameServer建立TCP长连接,注册自己的信息之后每隔30s发送心跳信息,注册topic路由。(服务续约)
每个NameServer每隔10s扫描各个Broker列表的最近一次心跳时间,如果发现某个Broker超过120s都没发送心跳,就认为这个Broker已经挂掉了,会将其从路由信息里移除。
每个NameServer每隔10s扫描各个Broker列表的最近一次心跳时间,如果发现某个Broker超过120s都没发送心跳,就认为这个Broker已经挂掉了,会将其从路由信息里移除。
子主题
子模块
作用跟partition类似
writeQueueNums:写队列数量
readQueueNums:读队列数量
readQueueNums:读队列数量
写队列的数量决定了有几个MessageQueue,
读队列的数量决定了有几个线程来消费这些MessageQueue(只是用来负载的)。
服务端创建一个Topic默认8个队列(BrokerConfig):
读队列的数量决定了有几个线程来消费这些MessageQueue(只是用来负载的)。
服务端创建一个Topic默认8个队列(BrokerConfig):
子主题
名字服务(NameServer)
NameServer作为一个名称服务,需要提供服务注册、服务剔除、服务发现这些基本功能,
但是NameServer节点之间并不通信,无法进行数据复制。RocketMQ采取的策略是,在Broker节点在启动的时候,轮训NameServer列表,与每个NameServer节点建立长连接,发起注册请求。NameServer内部会维护一个Broker表,用来动态存储Broker的信息。
但是NameServer节点之间并不通信,无法进行数据复制。RocketMQ采取的策略是,在Broker节点在启动的时候,轮训NameServer列表,与每个NameServer节点建立长连接,发起注册请求。NameServer内部会维护一个Broker表,用来动态存储Broker的信息。
消息重试
Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。
Consumer消费消息失败通常可以认为有以下几种情况:
由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10秒后再重试。
由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。
RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。
Consumer消费消息失败通常可以认为有以下几种情况:
由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10秒后再重试。
由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。
RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。
部署
NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。
Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。
Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。
集群工作流程
集群工作流程
启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker允许等待30s的时间,只要这段时间内有新消息到达,将直接返回给消费端。这里,RocketMQ的具体做法是,使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。
页缓存与内存映射
页缓存(PageCache)是OS对文件的缓存,用于加速对文件的读写。
消息刷盘
同步刷盘
只有在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。
异步刷盘
能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。
图示
消息零丢失方案
发送消息零丢失
(生产者->cache)
(生产者->cache)
方案一(同步发送消息+反复多次重试)
同步消息重试机制:发送同步消息,如果发生异常,则重试,如果重试超过一定次数仍然失败,则可以先入库,后续再补偿。
方案二(事务消息机制),
两者都有保证消息发送零丢失的效果,但是经过分析,事务消息方案整体会更好一些
两者都有保证消息发送零丢失的效果,但是经过分析,事务消息方案整体会更好一些
事务消息机制:使用事务消息能保证生产者一定会将消息写入MQ,但是会降低吞吐量。
broker零丢失
buffer->broker broker的master->slave
buffer->broker broker的master->slave
刷盘:异步改同步
默认刷盘方式为异步,
发送消息时,broker写入到cache后就返回成功了,而producer只要获取到ACK就说明消息发送成功了,
然后再异步由cache到broker持久化
发送消息时,broker写入到cache后就返回成功了,而producer只要获取到ACK就说明消息发送成功了,
然后再异步由cache到broker持久化
主从复制(同步):异步改同步 通过主从架构模式避免磁盘故障导致的消息丢失
总结
开启同步刷盘策略+主从架构同步机制,,只要让一个Broker收到消息之后同步写入磁盘,同时复制给其他Broker,然后再返回响应给生产者说写入成功,此时就可以保证MQ自己不会再弄丢消息。
消费消息零丢失
broker->消费者
broker->消费者
手动提交offset:只有正确处理了消息之后,才手动提交offset,通知Broker可以将消息删除。
子主题
不能有任何差错的核心链路采用消息零丢失方案,非核心链路采用同步发送消息+反复重试几次的方案。
缺点:
系统复杂度变高
子主题
重复消费
幂等:使用业务唯一id,例如订单Id,。在消费之前判断唯一键是否在三方库(关系数据库、redis)中存在。如果不存在则插入,并消费,否则跳过。
RocketMQ的顺序消息怎么实现呢?
场景
支付成功后->扣库存->物流下单
->消息发送
->消息发送
如何保证消息的顺序性?
需要由三个阶段去保障PBC
消息顺序发送、顺序存储、顺序消费
消息发送保持顺序、消息存储保持和发送一致、消息被消费保持和存储顺序一致
消息顺序发送、顺序存储、顺序消费
消息发送保持顺序、消息存储保持和发送一致、消息被消费保持和存储顺序一致
实现方式
全局有序(性能差)
是什么?
一个Topic内所有的消息按照先进先出的顺序进行发布和消费
实现
设置topic下读写queue队列数量为1
为什么要设置读写队列数量为1呢?
假设读写队列有多个,消息就会存储在多个队列中,消费者负载时可能会分配到多个消费队列同时进行消费,多队列并发消费时,无法保证消息消费顺序性
假设读写队列有多个,消息就会存储在多个队列中,消费者负载时可能会分配到多个消费队列同时进行消费,多队列并发消费时,无法保证消息消费顺序性
局部有序
实现
一个Topic内多个queue,
(1)首先是生产者发送消息,是单线程同步发送的。
要保证在一个线程内顺序发送,在上一个消息发送成功后,在进行下一个消息的发送。
对应到mq中,消息发送方法就得使用同步发送,异步发送无法保证顺序性
对应到mq中,消息发送方法就得使用同步发送,异步发送无法保证顺序性
(2)其次是消息要路由到相同的queue。
mq的topic下会存在多个queue,要保证消息的顺序存储,同一个业务编号的消息需要被发送到一个queue中。
对应到mq中,通过MessageQueueSelector,只要大家使用相同的hashkey,根据路由规则算法,就会路由到相同的queue
producer.sendOneway(recketMsg,messageQueueSelector,hashKey);
即对业务编号进行hash,然后根据队列数量对hash值取余,将消息发送到一个queue中
对应到mq中,通过MessageQueueSelector,只要大家使用相同的hashkey,根据路由规则算法,就会路由到相同的queue
producer.sendOneway(recketMsg,messageQueueSelector,hashKey);
即对业务编号进行hash,然后根据队列数量对hash值取余,将消息发送到一个queue中
根据消息的shardingkey路由到不同的queue
子主题
(3)最后在消费者,需要保证一个队列只有一个线程消费。
同一个queue就只能被一个消费者所消费,因此对broker中消费队列加锁是无法避免的。同一时刻,一个消费队列只能被一个消费者消费,消费者内部,也只能有一个消费线程来消费该队列。即,同一时刻,一个消费队列只能被一个消费者中的一个线程消费
RocketMq消费端有两种类型:pull和push,本质上底层都是通过pull拉机制去实现的,pushConsumer 是一种API封装
pull模式用户从broker拿到一批消息后自己保证消费顺序
push模式由用户注册MessageListener来消费消息,客户端保证调用MessageListener时消息的顺序性
pull模式用户从broker拿到一批消息后自己保证消费顺序
push模式由用户注册MessageListener来消费消息,客户端保证调用MessageListener时消息的顺序性
缺点
发送顺序消息无法利用集群的Failover特性,因为不能更换MessageQueue进行重试
因为发送的路由策略导致的热点问题,可能某一些MessageQueue的数据量特别大
消费的并行读依赖于分区数量
消费失败时无法跳过
因为发送的路由策略导致的热点问题,可能某一些MessageQueue的数据量特别大
消费的并行读依赖于分区数量
消费失败时无法跳过
延迟消息队列怎么实现?
开源版本的RocketMQ中,对延迟消息并不支持任意时间的延迟设定(商业版本中支持),而是只支持18个固定的延迟级别,1到18分别对应:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。
将延迟消息通过一个临时存储进行暂存,到期后才投递到目标Topic中。
步骤说明如下:
1.producer要将一个延迟消息发送到某个Topic中
2.Broker判断这是一个延迟消息后,修改topic,tag将其存储到broker。
3.Broker内部通过一个延迟服务(delayservice)检查消息是否到期,将到期的消息还原topic和tag。
这个的延迟服务名字为delayservice,不同消息中间件的延迟服务模块名称可能不同。
4.消费者消费目标topic中的延迟投递的消息
1.producer要将一个延迟消息发送到某个Topic中
2.Broker判断这是一个延迟消息后,修改topic,tag将其存储到broker。
3.Broker内部通过一个延迟服务(delayservice)检查消息是否到期,将到期的消息还原topic和tag。
这个的延迟服务名字为delayservice,不同消息中间件的延迟服务模块名称可能不同。
4.消费者消费目标topic中的延迟投递的消息
Message message = new Message("TopicTest", "TagA", ("Hello MQ:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
message.setDelayTimeLevel(3);
SendResult result = producer.send(message);
message.setDelayTimeLevel(3);
SendResult result = producer.send(message);
事务消息
Half(Prepare) Message——半消息(预处理消息)
半消息是一种特殊的消息类型,该状态的消息暂时不能被Consumer消费。当一条事务消息被成功投递到Broker上,但是Broker并没有接收到Producer发出的二次确认时,该事务消息就处于"暂时不可被消费"状态,该状态的事务消息被称为半消息。
Message Status Check——消息状态回查
由于网络抖动、Producer重启等原因,可能导致Producer向Broker发送的二次确认消息没有成功送达。如果Broker检测到某条事务消息长时间处于半消息状态,则会主动向Producer端发起回查操作,查询该事务消息在Producer端的事务状态(Commit 或 Rollback)。可以看出,Message Status Check主要用来解决分布式事务中的超时问题。
实现原理
Step1:Producer向Broker端发送Half Message;【半消息】
Step2:Broker 发送ACK确认,Half Message发送成功;
Step3:Producer执行本地事务;
Step4:本地事务完毕,根据事务的状态,Producer向Broker发送二次确认消息,确认该Half Message的Commit或者Rollback状态。
Step5:Broker收到二次确认消息后,
1、对于Commit状态,将半事务消息标记为可投递,直接发送到Consumer端执行消费逻辑,
2、而对于Rollback则直接标记为失败,则删除半事务消息,订阅方将不会接受该消息。
3、如果超时未收到broker的本地事务提交通知消息(即一段时间后Broker仍没有收到Producer的二次确认消息),Broker主动向Producer发起消息回查;
Step6:Producer处理回查消息,返回对应的本地事务的执行结果;
Step7:Broker针对回查消息的结果,执行Commit或Rollback操作,同Step4。
1、对于Commit状态,将半事务消息标记为可投递,直接发送到Consumer端执行消费逻辑,
2、而对于Rollback则直接标记为失败,则删除半事务消息,订阅方将不会接受该消息。
3、如果超时未收到broker的本地事务提交通知消息(即一段时间后Broker仍没有收到Producer的二次确认消息),Broker主动向Producer发起消息回查;
Step6:Producer处理回查消息,返回对应的本地事务的执行结果;
Step7:Broker针对回查消息的结果,执行Commit或Rollback操作,同Step4。
应用
支付成功后,需要保存订单状态,物流下单
常见问题
.Zookeeper实现了CP,NameServer选择了AP,放弃了实时一致性,一致性问题如何解决?
考虑跟broker、producter、consumer三者关系。
跟broker:主从注册,定时探活(broker与每个NameServer建立TCP长连接,注册自己的信息之后每隔30s发送心跳信息,注册topic路由。
每个NameServer每隔10s扫描各个Broker列表的最近一次心跳时间,如果发现某个Broker超过120s都没发送心跳,就认为这个Broker已经挂掉了,会将其从路由信息里移除。)
跟producter、consumer而且生产者和消费者会定期更新路由信息,所以可以获取最新的信息。(因为NameServer不会主动推送服务信息给客户端,客户端也不会发送心跳到Nameserver,所以在建立连接之后,需要生产者和消费者定期更新。在MQClientlnstance类(生产者消费者通用)的start方法中,启动了一个定时任务,默认是30秒定期更新NameServer信息的)
跟broker:主从注册,定时探活(broker与每个NameServer建立TCP长连接,注册自己的信息之后每隔30s发送心跳信息,注册topic路由。
每个NameServer每隔10s扫描各个Broker列表的最近一次心跳时间,如果发现某个Broker超过120s都没发送心跳,就认为这个Broker已经挂掉了,会将其从路由信息里移除。)
跟producter、consumer而且生产者和消费者会定期更新路由信息,所以可以获取最新的信息。(因为NameServer不会主动推送服务信息给客户端,客户端也不会发送心跳到Nameserver,所以在建立连接之后,需要生产者和消费者定期更新。在MQClientlnstance类(生产者消费者通用)的start方法中,启动了一个定时任务,默认是30秒定期更新NameServer信息的)
为什么不用ZK,而重复造轮子,用自己设计的NameServer?
架构设计决定只需要一个轻量级的原数据的服务器,只需要保持最终一致,而不需要像ZK一样强一致性。
事物消息
二阶段提交
消息回查:消息队列RocketMQ版服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。
消息回查:消息队列RocketMQ版服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。
所有的nameServer全部挂掉了怎么办?
获取到nameServer之后,客户端会有一个缓存,不会影响正常的连接
消费者组订阅问题?
因此,保持订阅关系一致意味着同一个消费者GroupID下所有的实例需在以下两方面均保持一致:
订阅的Topic必须一致
订阅的Topic中的Tag必须一致(包括Tag的数量和Tag的顺序)
订阅的Topic必须一致
订阅的Topic中的Tag必须一致(包括Tag的数量和Tag的顺序)
消息堆积会带来什么后果?如何解决?
后果:消息延迟
解决:消费耗时eg:读写外部数据库、缓存、下游系统调用慢
消费并发度eg:客户端消费并发度由单节点线程数和节点数量共同决定,一般情况下需要优先调整单节点的线程数,若单机硬件资源达到上限,则必须通过扩容节点来提高消费并发度。
解决:消费耗时eg:读写外部数据库、缓存、下游系统调用慢
消费并发度eg:客户端消费并发度由单节点线程数和节点数量共同决定,一般情况下需要优先调整单节点的线程数,若单机硬件资源达到上限,则必须通过扩容节点来提高消费并发度。
如何解决消息堆积和延迟问题?
通过消息队列RocketMQ版提供的监控报警功能,动态调整线程数,节点数
namerserver通信?
1、服务注册2、服务发现3发送消息4消费消息
子主题
子主题
nameserver集群
NameServer作为一个名称服务,需要提供服务注册、服务剔除、服务发现这些基本功能,但是NameServer节点之间并不通信,无法进行数据复制。RocketMQ采取的策略是,在Broker节点在启动的时候,轮训NameServer列表,与每个NameServer节点建立长连接,发起注册请求。NameServer内部会维护一个Broker表,用来动态存储Broker的信息。
Broker集群
一个Broker集群由多组Master/Slave组成,Master可写可读,Slave只可以读,Master将写入的数据同步给Slave。
每个Broker节点,在启动时,都会遍历NameServer列表,与每个NameServer建立长连接,注册自己的信息,之后定时上报。
Producer集群:
消息的生产者,通过NameServer集群获得Topic的路由信息,包括Topic下面有哪些Queue,这些Queue分布在哪些Broker上等。Producer只会将消息发送到Master节点上,因此只需要与Master节点建立连接。
Consumer集群:
消息的消费者,通过NameServer集群获得Topic的路由信息,连接到对应的Broker上消费消息。注意,由于Master和Slave都可以读取消息,因此Consumer会与Master和Slave都建立连接。
NameServer作为一个名称服务,需要提供服务注册、服务剔除、服务发现这些基本功能,但是NameServer节点之间并不通信,无法进行数据复制。RocketMQ采取的策略是,在Broker节点在启动的时候,轮训NameServer列表,与每个NameServer节点建立长连接,发起注册请求。NameServer内部会维护一个Broker表,用来动态存储Broker的信息。
Broker集群
一个Broker集群由多组Master/Slave组成,Master可写可读,Slave只可以读,Master将写入的数据同步给Slave。
每个Broker节点,在启动时,都会遍历NameServer列表,与每个NameServer建立长连接,注册自己的信息,之后定时上报。
Producer集群:
消息的生产者,通过NameServer集群获得Topic的路由信息,包括Topic下面有哪些Queue,这些Queue分布在哪些Broker上等。Producer只会将消息发送到Master节点上,因此只需要与Master节点建立连接。
Consumer集群:
消息的消费者,通过NameServer集群获得Topic的路由信息,连接到对应的Broker上消费消息。注意,由于Master和Slave都可以读取消息,因此Consumer会与Master和Slave都建立连接。
为什么用用nemeserver?
在RocketMQ的早期版本也是依赖Zookeeper的
rocketmq只需要保证最终一致,不需要Zookeeper这样的强一致性解决方案,不需要再依赖另一个中间件,从而减少整体维护成本
rocketmq只需要保证最终一致,不需要Zookeeper这样的强一致性解决方案,不需要再依赖另一个中间件,从而减少整体维护成本
NameServer如何保证数据的最终一致?
路由注册
Broker节点在启动的时候,轮训NameServer列表,与每个NameServer节点建立长连接,发起注册请求。NameServer内部会维护一个Broker表,用来动态存储Broker的信息。
之后每隔30s发送心跳包,心跳包中包含BrokerId、Broker地址、Broker名称、Broker所属集群名称等等,NameServer接收到心跳包后,会更新时间戳,记录这个Broker的最新存活时间。
并发问题:NameServer在处理心跳包的时候,存在多个Broker同时操作一张Broker表,为了防止并发修改Broker表导致不安全,路由注册操作引入了ReadWriteLock读写锁,这个设计亮点允许多个消息生产者并发读,保证了消息发送时的高并发,但是同一时刻NameServer只能处理一个Broker心跳包,多个心跳包串行处理。这也是读写锁的经典使用场景,即读多写少。
路由剔除
正常情况下,如果Broker关闭,则会与NameServer断开长连接,Netty的通道关闭监听器会监听到连接断开事件,然后会将这个Broker信息剔除掉。
异常情况下,NameServer中有一个定时任务,每隔10秒扫描一下Broker表,如果某个Broker的心跳包最新时间戳距离当前时间超多120秒,也会判定Broker失效并将其移除。
路由发现
对于生产者,可以发送消息到多个Topic,因此一般是在发送第一条消息时,才会根据Topic获取从NameServer获取路由信息。
对于消费者,订阅的Topic一般是固定的,所在在启动时就会拉取。
Broker节点在启动的时候,轮训NameServer列表,与每个NameServer节点建立长连接,发起注册请求。NameServer内部会维护一个Broker表,用来动态存储Broker的信息。
之后每隔30s发送心跳包,心跳包中包含BrokerId、Broker地址、Broker名称、Broker所属集群名称等等,NameServer接收到心跳包后,会更新时间戳,记录这个Broker的最新存活时间。
并发问题:NameServer在处理心跳包的时候,存在多个Broker同时操作一张Broker表,为了防止并发修改Broker表导致不安全,路由注册操作引入了ReadWriteLock读写锁,这个设计亮点允许多个消息生产者并发读,保证了消息发送时的高并发,但是同一时刻NameServer只能处理一个Broker心跳包,多个心跳包串行处理。这也是读写锁的经典使用场景,即读多写少。
路由剔除
正常情况下,如果Broker关闭,则会与NameServer断开长连接,Netty的通道关闭监听器会监听到连接断开事件,然后会将这个Broker信息剔除掉。
异常情况下,NameServer中有一个定时任务,每隔10秒扫描一下Broker表,如果某个Broker的心跳包最新时间戳距离当前时间超多120秒,也会判定Broker失效并将其移除。
路由发现
对于生产者,可以发送消息到多个Topic,因此一般是在发送第一条消息时,才会根据Topic获取从NameServer获取路由信息。
对于消费者,订阅的Topic一般是固定的,所在在启动时就会拉取。
Brocker
master可读可写,slave只可以读,master通过主从复制的方式将数据同步给slave,支持同步复制和异步复制两种复制方式,目前master宕机后,slave不能自动切换为master。
消息发送类型
发送的特点分
同步消息
场景
producer发送时会同步等待broker返回一个发送状态。如果失败会重试。吞吐量最低,但安全
可靠性同步地发送方式使用比较广泛,比如:重要的消息通知,短信通知。
可靠性同步地发送方式使用比较广泛,比如:重要的消息通知,短信通知。
代码实现
//5.发送消息
SendResult result = producer.send(msg);
//发送状态
SendStatus staus = result.getSendStatus();
SendResult result = producer.send(msg);
//发送状态
SendStatus staus = result.getSendStatus();
异步消息
场景
producer在发送后去做自己的事情,异步接受broker的回调结果
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
代码实现
单向发送消息
场景
关键点就是使用producer.sendOneWay方式来发送消息,这个方法没有返回值,也没有回调。就是只管把消息发出去就行了。吞吐量非常高,但容易丢消息
主要用在不特别关心发送结果的场景,比如,日志发送
主要用在不特别关心发送结果的场景,比如,日志发送
代码实现
按照使用功能特点分
顺序消息
延迟
事务
批量
性能好,在消息量特别大的时候,性能明显
缺点:没有顺序,无法实现顺序消息
限制
同一批次的消息应该具有相同的主题、相同的消息配置
不支持延迟消息
同一个批次的大小最好不要超过1M
消息接收模式
拉模式
消费者主动去通过offset偏移量去拉取Broker上拉取消息
拉消息要根据主题和tag过滤
拉消息要根据主题和tag过滤
两种方式
自己管理offset:由于每次拉取消息个数有限,需要多次拉取。offset偏移量,记录着上次拉取消息的位置。
目的是防止消息被重复消费。使用时可以把offset存在redis中,这样就可以灵活控制消息的消费位置。
目的是防止消息被重复消费。使用时可以把offset存在redis中,这样就可以灵活控制消息的消费位置。
不需管理offset的方式,默认一次拉取32条
// 不指定offset,直接拉取
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.start();
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.start();
缺点
消息延迟:间隔时间长
空轮训:间隔时间短,且没消息
空轮训:间隔时间短,且没消息
推模式
Broker收到消息后,主动推送到消费者上
RocketMQ的push模式是对拉模式进行的一层包装
通过 Pull 不断不断不断轮询 Broker 获取消息。当不存在新消息时,Broker 会挂起请求,直到有新消息产生,取消挂起,返回新消息。这样,基本和 Broker 主动 Push 做到接近的实时性(当然,还是有相应的实时性损失)。原理类似 长轮询( Long-Polling )
通过 Pull 不断不断不断轮询 Broker 获取消息。当不存在新消息时,Broker 会挂起请求,直到有新消息产生,取消挂起,返回新消息。这样,基本和 Broker 主动 Push 做到接近的实时性(当然,还是有相应的实时性损失)。原理类似 长轮询( Long-Polling )
实现形式是:消费端注册一个监听器MessageListenerConcurrently,监听着broker上的消息,
如果broker上有新消息,则触发监听器MessageListenerConcurrently。
broker会自动往监听器中的ConsumeMessage()方法中推送消息
如果broker上有新消息,则触发监听器MessageListenerConcurrently。
broker会自动往监听器中的ConsumeMessage()方法中推送消息
缺点
可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久。概括起来地说就是“慢消费问题”),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常;
优点
实时性高
0 条评论
下一页