RocketMQ
2020-09-10 13:39:10 4 举报
AI智能生成
RocketMq的总结,实战+原理
作者其他创作
大纲/内容
RocketMQ
Netty
Namesrv
Topic路由注册中心
BrokerName
Client
生产者/producer
三种模式
同步
重要业务才使用同步
案例
异步
尽量使用异步
oneway
这是一种变相的异步,只不过不要异步返回的结果
选择发送的队列
容错机制
记录上一次失败的brokerName,lastBrokerName,下次选择messageQueue的时候,主动避让这个borker
消息重投
负载均衡
消费者/consumer
consumeFromWhere消费对消费启动点
CONSUME_FROM_LAST_OFFSET
CONSUME_FROM_LAST_OFFSET :消费客户拿起它以前停止。 如果它是一个新开机的消费客户,根据消费group的年代,一般有两种情况:如果创建消费群所以最近,最早的消息所预订尚未到期,这意味着消费群体代表了最近推出的业务,将消费从最开始启动;如果最早的消息所预订已过期,将从最新消息开始消费,意在时间戳之前产生的消息将被忽略。
CONSUME_FROM_FIRST_OFFSET
从最早可到的消息开始消费
CONSUME_FROM_TIMESTAMP
从指定的时间戳开始消费。
消费轨迹
是否打开消费轨迹
定制化消费轨迹
广播消息
只和消费者有关
广播模式表示集群/group下面所有的节点都会消费到这个消息
两种使用方式
DefaultMQPullConsumer
案例2
MQPullConsumerScheduleService
DefaultMQPushConsumer
建议使用这种方式
消息的类型
单体消息
顺序消息
集群消息
RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
原理
在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
批量消息
只和发送者Producer有关
不能是延时消息,一批消息的总大小不应超过4MB
案例1
大于4MB的时候,消息分割
延时消息
broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。注意,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:- level == 0,消息为非延迟消息- 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s- level > maxLevel,则level== maxLevel,例如level==20,延迟2h
事务消息
事务消息共有三种状态,提交状态、回滚状态、中间状态:- TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。- TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。- TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
限制
filter
SqlFilter
这是一个以SQL92 为标准的 MessageQueue 过滤器,并且consume会帮你过滤掉不符合规则的消息
发送消息时,你能通过`putUserProperty`来设置消息的属性,这个属性同样可以用在SQL92标准中
SQL92标准
TagFilter
消费指定Tag的消息,多种类型的Tag使用 “||” 分割开。
所谓的filter就是一个queue选择器,RocketMq为我们封装了如上两种queue选择器在producer中我们需要传递tag,底层会根据tag选择适当的queue,然后consumer也根据相通的规则,找到对应的queue,从queue中拉取数据拉取消息下来之后,会再帮你做一次过滤,过滤符合你提的filter规则的消息,然后才会给你
需要在Broker中打开filter开关 enablePropertyFilter=true
主要是使用这个接口来调用
Message
字段
topic:String
flag:int
properties:Map
tag:String,消息tag,用于消息过滤
keys:Collection<String>,RocketMq可以根据这些key快速检索到消息
waitStoreMsgOk:boolean,消息发送时是否等待消息储存完成再返回
delayTimeLevel:int
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
body:byte[]
序列化问题
RPCHook
一个类 implements RPCHook,实现指定的方法,然后提交给producer和consumer,底层会帮我们和broker交互之前或之后执行
比如:AclClientRPCHook
Broker
1. Remoting Module
整个Broker的实体,负责处理来自clients端的请求。
2. Client Manage
负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
3. Store Service
提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
储存文件
1. commitLog
消息主体以及元数据的存储主体
这种所有的topic的数据保存为一个commitLog的方式,在业界称为混合型的存储结构
实现原理: RandomAccessFile,FileChannel,Buffer
RocketMQ主要通过MappedByteBuffer对文件进行读写操作。其中,利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap的方式减少了传统IO将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故RocketMQ的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)。
RocketMq的内存要够大。
刷盘
1. 同步刷盘
性能不好,提高了可靠性
实现: 提交一次,flush一次
2. 异步刷盘
更好的性能和吞吐量,降低了可靠性
实现:提交之后,立刻返回,其它线程在适当的时候帮你flush
2. conusmeQueue
消息消费队列,引入的目的主要是提高消息消费的性能
ConsumeQueue是commitLog文件的索引,保存了指定Topic下对应的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。
consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
实现原理:1. 有一个后台ReputMessageService,一直在分发 commitLog 里面的数据到各个consumequeue文件下2. consume
3. index
IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。
这个常常在RocketMq-Console中的查询消息之中用到。
Index文件的存储位置是:$HOME \\store\\index\\${fileName}
架构
4. HA Service
高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。
5. Index Service
根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。
DLedger
DLedger是自动容灾切换的 RocketMQ 集群
部署
RocketMQ-Console
消费者
重置消费位点
回溯消费是指Consumer已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于Consumer系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ支持按照时间回溯消费,时间维度精确到毫秒。
子主题
消息
重新发送消息
运维
开启从节点读的功能
slaveReadEnable=true
在某些情况下,Consumer需要将消费位点重置到1-2天前,这时在内存有限的Master Broker上,CommitLog会承载比较重的IO压力,影响到该Broker的其它消息的读与写。可以开启`slaveReadEnable=true`,当Master Broker发现Consumer的消费位点与CommitLog的最新值的差值的容量超过该机器内存的百分比(`accessMessageInMemoryMaxRatio=40%`),会推荐Consumer从Slave Broker中去读取数据,降低Master Broker的IO。
开启异步刷盘
transientStorePoolEnable = true
默认同步刷盘
同步刷盘建议使用重入锁
useReentrantLockWhenPutMessage=true
消费的时候
transferMsgByHeap
其它
消息轨迹
记录消息的消费记录,在追踪问题的时候非常有用
需要打开消费轨迹traceTopicEnable=true
对于消息轨迹数据量较大的场景,可以在RocketMQ集群中选择其中一个Broker节点专用于存储消息轨迹,使得用户普通的消息数据与消息轨迹数据的物理IO完全隔离,互不影响。在该模式下,RockeMQ集群中至少有两个Broker节点,其中一个Broker节点定义为存储消息轨迹数据的服务端。
enableMsgTrace=true可以打开全局的消费轨迹
自定义存储消息轨迹Topic
namespace
生产者
pull
push
个人对rocketmq的namespace理解和nacos一致,都是架构级别的,比如开发环境,测试环境,使用同一套RocketMQ但是不同的namespace可以隔离开
权限控制
通过AclClientRPCHook (一个rpcHook class)封装的 SessionCredentials 实现
vip通道
应对重要业务的使用建议使用vip通道,它有独有的线程池,独有的通道,不用和别的请求一起挤。
OpenMessaging
旨在建立消息和流处理规范,以为金融、电子商务、物联网和大数据领域提供通用框架及工业级指导方案。在分布式异构环境中,设计原则是面向云、简单、灵活和独立于语言。符合这些规范将帮助企业方便的开发跨平台和操作系统的异构消息传递应用程序。
RocketMQ对OpenMessaging做了适配
流量控制
生产者流控
生产者流控,因为broker处理能力达到瓶颈
生产者流控:- commitLog文件被锁时间超过osPageCacheBusyTimeOutMills时,参数默认为1000ms,返回流控。- 如果开启transientStorePoolEnable == true,且broker为异步刷盘的主机,且transientStorePool中资源不足,拒绝当前send请求,返回流控。- broker每隔10ms检查send请求队列头部请求的等待时间,如果超过waitTimeMillsInSendQueue,默认200ms,拒绝当前send请求,返回流控。- broker通过拒绝send 请求方式实现流量控制。
注意,生产者流控,不会尝试消息重投。
消费者流控
消费者流控,因为消费能力达到瓶颈
消费者流控:- 消费者本地缓存消息数超过pullThresholdForQueue时,默认1000。- 消费者本地缓存消息大小超过pullThresholdSizeForQueue时,默认100MB。- 消费者本地缓存消息跨度超过consumeConcurrentlyMaxSpan时,默认2000。
死信队列
死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在RocketMQ中,可以通过使用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。
经验总结
1. messageQueue为什么有多个,为什么读写nums有多个
commitLog保存的时候,是多线程操作,需要保持数据一致性,需要加锁,多个queue就可以减少锁的力度,减少碰撞
所以如果消息读写越频繁,建议适当加大队列读写的数目
0 条评论
下一页