RocketMQ(角色/集群/消息种类)
2021-02-23 17:11:20 0 举报
AI智能生成
角色/集群/消息种类
作者其他创作
大纲/内容
RocketMQ是由阿里研发的,基于Java,后来交给Apache孵化,是一款分布式、队列模型的消息中间件
优点
支持事务消息
严格保证消息顺序
提供丰富的消息拉取模式
高效的订阅者水平扩展能力
实时的消息订阅机制
吞吐量仅次于Kafka,亿级的消息堆积能力与Kafka相当
角色
架构
Producer:消息的发送者
Consumer:消息接收者
Broker:暂存和传输消息
NameServer:路由中心(管理Broker)
类似注册中心
类似注册中心
NameServer挂了怎么办?
只要有一台NameServer存活就可以通信
NameServer全都挂了呢?
RocketMQ不可用,生产者发消息会失败
Topic:区分消息的种类; 一个发送者可以发送消息给一个或者多个Topic; 一个消息的接收者可以订阅一个或者多个Topic消息
Message Queue:相当于是Topic的子分区;用于并行发送和接收消息
集群
特点
NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步
Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的Brokerld来定义, Brokerld为0表示Master, 非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信 息到所有NameServer
Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。 Producer完全无状态, 可集群部署
Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slave建 立长连接,且定时向Master、 Slave发送心跳。 Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定
模式
单Master模式
Broker重启或者宕机时,会导致整个服务不可用
多Master模式
2m-noslave
2m-noslave
全是Master,没有Slave
优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘
非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高
非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响
多主多从模式(异步)
2m-2s-async
2m-2s-async
主备消息同步采用异步复制方式,Master成功后立即响应,然后异步发送到从节点,主备有短暂消息延迟(毫秒级)
优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,
而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样
而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样
缺点:Master宕机,磁盘损坏情况下会丢失少量消息
多主多从模式(同步)
2m-2s-sync
2m-2s-sync
主备消息同步采用同步双写方式,只有主备都写成功,才向应用返回成功
优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机
Dledger集群
支持高可用
工作流程
1. 启动NameServer,NameServer起来后监听端口,等待Broker、Producer. Consumer连上来,相当于一个路由控制中心
2. Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,
NameServer集群中就有Topic跟Broker的映射关系
NameServer集群中就有Topic跟Broker的映射关系
3. 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic
4. Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列
列表中选择一个队列, 然后与队列所在的Broker建立长连接从而向Broker发消息
列表中选择一个队列, 然后与队列所在的Broker建立长连接从而向Broker发消息
5. Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费
消息发送
消息的生产/消费
生产消息
1. 创建消息生产者producer,并制定生产者组名
2. 指定Nameserver地址
3. 启动producer
4. 创建消息对象,指定主题Topic、Tag和消息体
5. 发送消息
6. 关闭生产者producer
2. 指定Nameserver地址
3. 启动producer
4. 创建消息对象,指定主题Topic、Tag和消息体
5. 发送消息
6. 关闭生产者producer
消费消息
1. 创建消费者Consumer,制定消费者组名
2. 指定Nameserver地址
3. 订阅主题Topic和Tag
4. 设置回调函数,处理消息
5. 启动消费者consumer
2. 指定Nameserver地址
3. 订阅主题Topic和Tag
4. 设置回调函数,处理消息
5. 启动消费者consumer
消费模式
负载均衡(默认)
广播模式(consumer.setMessageModel(MessageModel.BROADCASTING);)
消息种类
顺序消息
概念
消息有序指的是可以按照消息的发送顺序来消费(FIFO)
如何保证消息有序
全局有序:发送和消费参与的queue只有一个
分区有序:控制发送的顺序消息只依次发送到同一个queue中,
消费的时候只从这个queue上依次拉取,则保证顺序
消费的时候只从这个queue上依次拉取,则保证顺序
分区有序的实现原理
生产者构建 消息队列选择器,通过订单号路由消息
消费者用 单线程的监听器,消费队列中的有序消息
延时消息
使用限制
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级:1 到 18
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级:1 到 18
用途举例:一小时后检查订单状态,如未付款就取消订单释放库存
实现原理
生产者的消息设置超时时间
消费者没有特殊改动
批量消息
Batch机制
把许多小的消息,合成为一条批量消息,一次发过去
减少网络IO,能显著提高传递消息的性能,限制4M
使用限制
批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息
批量消息的单次发送总大小不能超过4MB,如果超过4M则需要对消息进行分割
实现原理
生产者批量发送消息
消费者没有特殊变化
过滤消息
在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC"); //消费者将接收包含TAGA或TAGB或TAGC的消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC"); //消费者将接收包含TAGA或TAGB或TAGC的消息
使用限制
一个消息只能有一个标签,这对于复杂的场景可能不起作用
在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算
实现原理
生产者发送消息时,可以通过`putUserProperty`来设置消息的属性
消费者消费消息时可以用MessageSelector.bySql来使用sql筛选消息
事务消息
防止生产者丢消息
同步发送+多次尝试
事务消息机制
推荐使用
推荐使用
(1) 生产者发送 half 消息到 MQ(对消费者不可见)
(2) MQ服务端收到 half 消息后记录消息并回复生产者
(3) 生产者根据MQ响应结果执行本地事务,并发送本地事务的执行状态
(4) MQ服务器根据本地事务状态执行Commit或者Rollback
Commit操作提交half消息,使消费者可见
RollBack是进行回滚操作,删除half消息
(5) 对没有发送状态的事务消息,MQ服务端会发起“回查”(默认回查15次,如果仍然失败则丢弃消息)
(6) 生产者收到回查消息,检查对应的本地事务的状态,重新Commit或者Rollback
防止 Broker 丢消息
刷盘策略
默认为异步刷盘,修改为同步刷盘,存入磁盘后再返回写入成功
通过Broker配置文件里的 flushDiskType 参数设置
ASYNC_FLUSH
SYNC_FLUSH
集群同步
默认为异步同步(master写成功就返回)修改为同步到slave再返回成功
通过Broker配置文件里的 brokerRole 参数设置
ASYNC_MASTER
SYNC_MASTER
因此可以通过同步刷盘策略+同步双写策略+主从的方式解决丢失消息的可能
防止消费者丢消息
消费者收到消息后先执行本地事务,再修改offset,然后通知Broker,如果通知失败则重试
不要使用异步处理逻辑,如果收到消息后开启线程异步处理,就返回成功,很容易导致消息丢失
事务消息状态
提交状态
提交事务,它允许消费者消费此消息
回滚状态
回滚事务,它代表该消息将被删除,不允许被消费
中间状态
中间状态,它代表需要检查消息队列来确定状态
问题
为什么要发送 half 消息?
验证MQ服务器的可靠性
half 消息写入 MQ 失败怎么办?
说明MQ服务器异常,不执行本地事务
half 消息成功了 MQ 没有收到后续响应怎么办?
MQ服务器回查事务状态
整个 MQ 服务器挂了怎么保证消息零丢失?
发送消息时增加降级方案:缓存
下单成功后如何等待支付成功?
下单成功后标记为unknow,利用RocketMQ的消息回查机制实现
0 条评论
下一页