RocketMQ脑图
2021-09-03 12:57:20 1 举报
AI智能生成
rocketmq技术脑图
作者其他创作
大纲/内容
OS内核参数调整
vm.overcommit_memory
0
表示内核将检查是否有足够的可用内存供应用进程使用;
如果有足够的可用内存,内存申请允许;
否则,内存申请失败,并把错误返回给应用进程
如果有足够的可用内存,内存申请允许;
否则,内存申请失败,并把错误返回给应用进程
1
表示内核允许分配所有的物理内存,
而不管当前的内存状态如何
而不管当前的内存状态如何
2
表示内核允许分配超过所有物理内存和交换空间总和的内存
修改命令
echo 'vm.overcommit_memory=1' >> /etc/sysctl.conf
vm.max_map_count
可以开启的线程的数量
默认65536
可以调大10倍655360
修改命令
echo 'vm.max_map_count=655360' >> /etc/sysctl.conf
vm.swappiness
用来控制进程的swap行为
0
尽量别把任何一个进程放到磁盘swap区域去,尽量大家都用物理内存
默认60
可能会导致我们的中间件运行不活跃的时候被迫腾出内存空间然后放磁盘swap区域去
100
尽量把一些进程给放到磁盘swap区域去,内存腾出来给活跃的进程使用
通常调小一点,比如设置10
尽量用物理内存,别放磁盘swap区域去
修改命令
echo 'vm.swappiness=10' >> /etc/sysctl.conf。
ulimit
控制linux上的最大文件链接数
默认值是1024
不够,很可能出现error:too many open files
修改命令
echo 'ulimit -n 1000000' >> /etc/profile
sendMessageThreadPoolNums
默认值16
下面的目录里有dledger
rocketmq/distribution/target/apache-rocketmq/conf/dledger
RocketMQ内部用来发送消息的线程池的线程数量
根据CPU核数进行分配,比如24核分配为24
jvm参数调优
在rocketmq/distribution/target/apache-rocketmq/bin目录下,就有对应的启动脚本,比如mqbroker是用来启动Broker的,mqnamesvr是用来启动NameServer的。
runbroker.sh脚本的JAVA_OPT参数
-server
用服务器模式启动
-Xms8g -Xmx8g -Xmn4g
默认的堆大小是8g内存,新生代是4g内存
需要根据机器调整,堆内存20g,新生代10g
-XX:+UseG!GC -XX:G1HeapRegionSize=16m
选用了G1垃圾回收器来做分代回收
对新生代和老年代都是用G1回收
G1的region大小设置了16m
默认是2m,内存大会导致region数量过多
-XX:G1ReservePercent=25
在G1管理的老年代里预留25%的空闲内存,
保证新生代对象晋升到老年代的时候有足够空间
避免老年代内存都满了
新生代有对象要进入老年代没有充足内存了
保证新生代对象晋升到老年代的时候有足够空间
避免老年代内存都满了
新生代有对象要进入老年代没有充足内存了
默认值是10%
-XX:InitiatingHeapOccupancyPercent=30
当堆内存的使用率达到30%之后就会自动启动G1的并发垃圾回收,
开始尝试回收一些垃圾对象
开始尝试回收一些垃圾对象
默认值是45%
提高了GC的频率,避免了垃圾对象过多,一次垃圾回收耗时过长的问题
-XX:SoftRefLRUPolicyMSPerMB=0
默认值为0
调整为1000
避免频繁回收一些软引用的Class对象
-verbose:gc -Xloggc:/dev/shm/mq_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m
这一堆参数都是控制GC日志打印输出的,
确定了gc日志文件的地址,
要打印哪些详细信息,
然后控制每个gc日志文件的大小是30m,
最多保留5个gc日志文件。
确定了gc日志文件的地址,
要打印哪些详细信息,
然后控制每个gc日志文件的大小是30m,
最多保留5个gc日志文件。
-XX:-OmitStackTraceInFastThrow
有时候JVM会抛弃一些异常堆栈信息,
因此这个参数设置之后,就是禁用这个特性,
要把完整的异常堆栈信息打印出来
因此这个参数设置之后,就是禁用这个特性,
要把完整的异常堆栈信息打印出来
-XX:+AlwaysPreTouch
参数的意思是我们刚开始指定JVM用多少内存,
不会真正分配给他,会在实际需要使用的时候再分配给他
不会真正分配给他,会在实际需要使用的时候再分配给他
使用这个参数之后,
强制让JVM启动的时候直接分配我们指定的内存,
不要等到使用内存的时候再分配
强制让JVM启动的时候直接分配我们指定的内存,
不要等到使用内存的时候再分配
-XX:MaxDirectMemorySize=15g
RocketMQ里大量用了NIO中的direct buffer,
这里限定了direct buffer最多申请多少,
如果你机器内存比较大,可以适当调大这个值
这里限定了direct buffer最多申请多少,
如果你机器内存比较大,可以适当调大这个值
-XX:-UseLargePages -XX:-UseBiasedLocking
这两个参数的意思是禁用大内存页和偏向锁,
这两个参数对应的概念每个要说清楚都得一篇文章,
所以这里大家直接知道人家禁用了两个特性即可。
这两个参数对应的概念每个要说清楚都得一篇文章,
所以这里大家直接知道人家禁用了两个特性即可。
检查机器负载情况
查看CPU使用率
top命令
观察cpu load
uptime
查看内存使用率
free
JVM GC频率
jstat
磁盘IO负载
top命令
wa
磁盘IO等待在CPU执行时间中的百分比
比例太高说明CPU执行的时候大部分时间都在等待执行IO,说明IO负载很高,导致大量的IO等待
网卡流量
sar -n DEV 1 2
可以看到每秒钟网卡读写数据量
作用
降低系统耦合
流量削峰
异步化提升性能
重要组成部分
producer(生产者)
负责生产消息
多种发送方式
同步发送
接收sendResult对象
producer.send(msg)
异步发送
producer.send(msg,sendCallback)
处理sendCallback
顺序发送
单向发送
sendOneway(msg)
发送给mq,不关心结果是否成功
Producer Group
同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。
如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费
cosumer(消费者)
负责消费消息
两种消费形式
Pull拉取式消费
Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
Push推动式消费
Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。
当消费者发送请求到Broker去拉取消息的时候,如果有新的消息可以消费那么就会立马返回一批消息到消费机器去处理,处理完之后会接着立刻发送请求到Broker机器去拉取下一批消息
本质上是消费者不停的发送请求到broker去拉取一批一批的消息
请求挂起和长轮询
当请求发送到Broker,如果没有新的消息处理,就会让请求线程挂起,默认是15秒,期间会有后台线程每隔一会就去检查一下是否有新的消息,如果有新的消息到达了会主动唤醒挂起的线程,把消息发送给消费端
Consumer Group
同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。
消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。
要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。
RocketMQ 支持两种消息模式
集群消费(Clustering)
集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息
即组下的集群机器一条消息只会被一台机器接收
默认模式
一个MessageQueue只能被一个消费机器去处理,但是一台消费者机器可以负责多个MessageQuque的消息处理
广播消费(Broadcasting)
广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息
consumer.setMessageModel(MessageModel.BROADCASTING)
比较少用
不同的系统应该设置不同的消费组,如果不同的消费组订阅了同一个Topic,对Topic里的一条消息,每个消费组都会获取到这条消息
topic
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位
NameServer
名称服务充当路由消息的提供者
生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表
多个Namesrv实例组成集群,但相互独立,没有信息交换
broker server
代理服务器
消息中转角色,负责存储消息、转发消息
在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备
存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等
Message
消息系统所传输信息的物理载体
生产和消费数据的最小单位,每条消息必须属于一个主题
RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。
系统提供了通过Message ID和Key查询消息的功能。
顺序消息分类
普通顺序消息(Normal Ordered Message)
普通顺序消费模式下,消费者通过同一个消息队列( Topic 分区,称作 Message Queue) 收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。
严格顺序消息(Strictly Ordered Message)
严格顺序消息模式下,消费者收到的所有消息均是有顺序的。
tag
为消息设置的标志,用于同一主题下区分不同类型的消息。
来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。
标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。
消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
特性
订阅与发布
消息顺序
全局顺序
对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费
适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
分区顺序
对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区
同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费
Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念
适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。
消息过滤
RocketMQ的消费者可以根据Tag进行消息过滤,也支持自定义属性过滤。
消息过滤目前是在Broker端实现的,优点是减少了对于Consumer无用消息的网络传输,缺点是增加了Broker的负担、而且实现相对复杂。
消息可靠性
1.Broker非正常关闭
2.Broker异常Crash
3.OS Crash
4.机器掉电,但是能立即恢复供电情况
5.机器无法开机(可能是cpu、主板、内存等关键设备损坏)
6.磁盘设备损坏
1)、2)、3)、4) 四种情况都属于硬件资源可立即恢复情况,RocketMQ在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)
5)、6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ在这两种情况下,通过异步复制,可保证99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与Money相关的应用。注:RocketMQ从3.0版本开始支持同步双写。
至少一次
至少一次(At least Once)指每个消息必须投递一次
Consumer先Pull消息到本地,消费完成后,才向服务器返回ack,如果没有消费一定不会ack消息,所以RocketMQ可以很好的支持此特性。
回溯消费
回溯消费是指Consumer已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。
并且重新消费一般是按照时间维度,例如由于Consumer系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。
RocketMQ支持按照时间回溯消费,时间维度精确到毫秒。
事务消息
RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。
RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
定时消息
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。
broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。
可以配置自定义messageDelayLevel。
level有三种情况
level == 0,消息为非延迟消息
1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
level > maxLevel,则level== maxLevel,例如level==20,延迟2h
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。
需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。
消息重试
Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。
Consumer消费消息失败通常可以认为有以下几种情况
由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10秒后再重试。
由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep 30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。
RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。
消息重投
生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer负载变化也会导致重复消息
设置消息重试策略
retryTimesWhenSendFailed:同步发送失败重投次数,默认为2,因此生产者会最多尝试发送retryTimesWhenSendFailed + 1次。不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现RemotingException、MQClientException和部分MQBrokerException时会重投。
retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢。
retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他broker,默认false。十分重要消息可以开启。
流量控制
生产者流控
因为broker处理能力达到瓶颈
commitLog文件被锁时间超过osPageCacheBusyTimeOutMills时,参数默认为1000ms,返回流控。
如果开启transientStorePoolEnable == true,且broker为异步刷盘的主机,且transientStorePool中资源不足,拒绝当前send请求,返回流控。
broker每隔10ms检查send请求队列头部请求的等待时间,如果超过waitTimeMillsInSendQueue,默认200ms,拒绝当前send请求,返回流控。
broker通过拒绝send 请求方式实现流量控制。
注意,生产者流控,不会尝试消息重投。
消费者流控
因为消费能力达到瓶颈
消费者本地缓存消息数超过pullThresholdForQueue时,默认1000。
消费者本地缓存消息大小超过pullThresholdSizeForQueue时,默认100MB
消费者本地缓存消息跨度超过consumeConcurrentlyMaxSpan时,默认2000
消费者流控的结果是降低拉取频率。
死信队列
死信队列用于处理无法被正常消费的消息
当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在RocketMQ中,可以通过使用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。
推荐配置
Producer中开启一个开关:sendLatencyFaultEnable
作用:会有一个自动容错机制,一个Broker故障之后,自动回避一段时间不要访问这个Broker,过段时间再去访问
Broker数据存储
CommitLog消息顺序写入
每个CommitLog限制1GB
基于操作系统的PageCache和顺序写两个机制提升写入CommitLog文件的性能
异步刷盘可以提高吞吐量,但是会有数据丢失的可能
同步刷盘可以保证数据不丢失,但是会降低吞吐量
ConsumeQueue中的一个物理位置其实是对CommitLog文件中一个消息的引用
基于os cache
Topic的每个MessageQueue都对应了Broker机器上的多个ConsumeQueue文件,保存了MessageQuque的所有消息在CommitLog文件中的物理位置,也就是offset偏移量
从Broker读取数据
master broker/slave broker读取
master负载太高的时候会通知客户端下次从slave去拉取
master内存不够用会通知从slave拉取
os cache读取
有内存大小限制,只会有最新的数据在os cache
超过os cache的会写入磁盘文件
磁盘文件读取
os cache已经写入磁盘会从这里读
可能是消费端消费太慢,导致消息在os cache存取不下,刷盘到了磁盘文件
DLedger
基于Raft协议选举Leader Broker
获取(机器数量/2)+1票数则为Leader
核心机制:一轮选举不出来,都进行休眠,先苏醒过来的人会投票给自己,发送给其他人,其他人苏醒后发现自己收到选票了,就会直接投票给那个人
数据同步阶段
uncommitted
commited
数据同步机制
Leader Broker上的DLedger收到一条数据之后,会标记为uncommitted
然后会通过自己的DLedgerServer组件把这个uncommitted数据发送给Follower Broker的DLedgerServer
接着Follower Broker的DLedgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的DLedgerServer
然后如果Leader Broker收到超过半数的Follower Broker返回ack之后,就会将消息标记为committed状态
然后Leader Broker上的DLedgerServer就会发送committed消息给Follower Broker机器的DLedgerServer,让他们也把消息标记为committed状态
网络通信
Reactor主线程在端口上监听Producer建立连接的请求,建立长连接
默认3个线程
Reactor线程池并发的监听多个连接的请求是否到达
Worker请求并发的对多个请求进行预处理
默认8个线程
业务线程池并发的对多个请求进行磁盘读写业务操作
可配置
配置越多处理消息的吞吐量越高
网络通信图
https://processon.com/diagraming/610b2b39e401fd6714bf0e49
磁盘文件读写
核心技术:mmap
避免多次拷贝
只会有一次拷贝
磁盘文件映射到内存
JDK NIO包下的MappedByteBuffer的map()函数
有大小限制1.5GB~2GB之间
所以CommitLog单个文件在1GB
ConsumeQueue文件在5.72MB
文件预热
对一些文件完成映射之后加载到内存中
https://processon.com/diagraming/610c7b3b1e0853337b1ee99d
消息丢失的情况
生产者发送消息网络故障没有到达mq
发送消息进行消息确认
异步刷盘的模式,mq重启会导致没有把内存数据写入磁盘
使用同步刷盘,性能会降低
磁盘文件摔坏,没有slave和备份
进行集群部署和备份
消费者获取到数据处理异常
消费消息进行消息确认
消息0丢失方案
生产端
1.发送half消息到MQ
2.MQ返回half消息响应
3.生产者更新订单数据
4.生产者发送rollback or commit给MQ
commit之后则消费者系统接收到消息
rollback之后则对half消息进行op标记
消费者系统接收不到消息
5.配置同步刷盘
调整broker的配置文件
flushDiskType
SYNC_FLUSH
同步刷盘
ASYNC_FLUSH
默认值
异步刷盘
避免在os cache中的时候机器突然宕机
6.主从同步
基于DLedger技术和Raft协议
避免master磁盘损坏导致写入磁盘的数据丢失
补偿机制:MQ回调接口判断消息状态
当生产者系统没有rollback or commit
当half消息超时的时候
当生产者发送rollback or commit异常的时候
最多回调15次,15次之后都没法告知half消息的状态,就自动把消息标记为rollback
消费端
1.先处理业务逻辑
2.再提交CONSUME_SUCCESS
注意点
消息是按批的,如果其中一个处理失败,整批都会回滚,下次再整批推送给消费端
需要做好消费幂等性
原因
生产端回查机制可能会有重复数据推送到MQ
消费端提交offset到broker异常
如何做幂等
Redis缓存幂等
基于数据库查询
业务处理逻辑不能用异步
消费异常
返回RECONSUME_LATER状态进入重试队列
最多重试16次
超过16次则进入死信队列
每次重试间隔messageDelayLevel
重试间隔逐渐加大
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
消费乱序
产生原因
消息进入不同的MessageQueue
多个consumer并行消费引发
解决办法
有顺序相关的数据流进入同一个MessageQueue
一个MessageQueue只能交给一个Consumer进行处理
消费处理失败不能返回RECONSUME_LATER
会进入重试队列导致乱序
只能返回SUSPEND_CURRENT_QUEUE_A_MOMENT
先等待一阵再处理这批消息
优劣势
优势
消息不会丢失
劣势
吞吐量大幅度下降
同步记录磁盘
同步到slave机器并记录磁盘
同步的网络和通信开销
折中方案
同步发送消息+反复重试机制
主从架构+异步刷盘
适用于非100%0丢失的方案
消息过滤
给消息设置tag
Message的第二个参数
给消息设置属性
Message.putUserProperty
数据过滤语法
数值比较
>, >=, <, <=, BETWEEN, =;
字符比较
=, <>, IN
IS NULL 或者 IS NOT NULL
逻辑符号
AND, OR, NOT
数值
123, 3.1415
字符
'abc', 必须用单引号包裹起来
NULL,特殊的常量
布尔值
TRUE或FALSE
MQ的延迟消息
创建一个消息,到了指定时间之后才会发送给消费者
场景
订单超时关闭
https://processon.com/diagraming/6122e45b0e3e743b327a150c
生产使用经验总结
1.灵活运用tags来过滤数据
2.基于消息key来定位消息是否丢失
3.消息零丢失方案的补充
mq集群挂了之后写入本地磁盘或数据库暂存
4.提高消费者的吞吐量
部署更多的consumer机器
MessageQueue得有对应得增加1:1的比例
设置consumer端的参数
consumeThreadMin
consumerThreadMax
开启消费者的批量消费功能
consumeMessageBatchMaxSize
默认值1
可以设置多一些
sql进行批量处理
5.消费历史消息
从Topic的第一条数据开始消费
CONSUME_FROM_FIRST_OFFSET
后面重启都是从最后消费的消息进行消费
最后一次消费过的消息之后开始消费
CONSUME_FROM_LAST_OFFSET
ACL对Topic进行权限控制
AclClientRPCHook
如何配置
1.在每个Broker的配置文件里需要设置aclEnable=true
2.在每个Broker部署机器的${ROCKETMQ_HOME}/store/config目录下放一个plain_acl.yml配置文件
配置项
globalWhiteRemoteAddresses
全局白名单
定义的ip地址,都是可以访问Topic的
accounts
定义账号,每个账号在这里配置对哪些Topic具有操作权限
accessKey
用户名的意思
secretKey
用户名密码的意思
whiteRemoteAddress
当前这个用户名下哪些机器要加入白名单
admin
这个账号是不是管理员账号
true/false
defaultTopicPerm
默认情况下这个账号的Topic权限
DENY
defaultGroupPerm
默认情况下这个账号的ConsumerGroup权限
SUB
topicPerms
CreateOrderInformTopic
PUB|SUB|DENY
PUB就是发布消息的权限
SUB就是订阅消息的权限
DENY就是拒绝这个账号访问这个Topic
PaySuccessInformTopic
PUB|SUB|DENY
具体的一些账号权限
groupPerms
对ConsumerGroup的权限
消息轨迹
broker的配置文件里开启traceTopicEnable=true
开启之后启动Broker会自动创建一个内部的Topic,RMQ_SYS_TRACE_TOPIC
会记录消息的轨迹
消息存储的Topic
消息存储的位置
消息的key
消息的tags
创建Producer的时候将第二个参数enableMsgTrace参数为true
对消息开启轨迹追踪
会将信息上报到创建的内部Topic
Producer的信息
发送消息的时间
消息是否发送成功
发送消息的耗时
创建Consumer也需要在构造函数的第二个参数设置为true
对消息开启轨迹追踪
会将信息上报到创建的内部Topic
Consumer的消息
投递消息的时间
这是第几轮投递消息
消息消费是否成功
消费这条消息的耗时
在RocketMQ控制台里,在导航栏里就有一个消息轨迹
在里面可以创建任务
可以根据messageId/messageKey或者Topic来查询
消息挤压处理
如果消息可以丢弃
增加消费程序,获取消息直接返回成功
不可以丢弃
增加消费者程序并行消费
RocketMQ集群崩溃处理
发送重试,如果重试3次还是失败,则把消息顺序写入到其他存储中去
数据库
本地磁盘
nosql
必须有一个后台线程把之前持久化存储的消息都查询出来
然后依次按照顺序发送到MQ集群里去
注意点
存储和查询重发都需要按顺序
需要检测MQ是否已经恢复
恢复之后的切换
MQ限流
根据压测结果做限流
令牌桶算法
0 条评论
下一页