RocketMQ
2021-01-31 14:34:54 1 举报
AI智能生成
RocketMQ
作者其他创作
大纲/内容
RocketMQ
高可用集群
单Master
优点:除了配置简单没什么优点
缺点:不可靠,该机器重启或宕机,将导致整个服务不可用
多Master
优点:配置简单,性能最高
缺点:可能会有少量消息丢失(配置相关),单台机器重启或宕机期间,该机器下未被消费的消息在机器恢复前不可订阅,影响消息实时性
多Master多Slave异步模式
每个Master配一个Slave,有多对Master-Slave,集群采用异步复制方式,主备有短暂消息延迟,毫秒级
优点:性能同多Master几乎一样,实时性高,主备间切换对应用透明,不需人工干预
缺点:Master宕机或磁盘损坏时会有少量消息丢失
多Master多Slave同步模式
每个Master配一个Slave,有多对Master-Slave,集群采用同步双写方式,主备都写成功,向应用返回成功
优点:服务可用性与数据可用性非常高
缺点:性能比异步集群略低,当前版本主宕备不能自动切换为主
在RocketMQ里面,1台机器只能要么是Master,要么是Slave。这个在初始的机器配置里面,就定死了。不会像kafka那样存在master动态选举的功能。其中Master的broker id = 0,Slave的broker id > 0。有点类似于mysql的主从概念,master挂了以后,slave仍然可以提供读服务,但是由于有多主的存在,当一个master挂了以后,可以写到其他的master上
主从复制原理
1 )主服务器启动,并在特定端口上监昕从服务器的连接
2 )从服务器主动连接主服务器,主服务器接收客户端 的连接,并建立相关 TCP 连接
3 )从服务器主动向 主服务器发送待拉取消息偏移量 ,主服务器解析请求并返回消息给从服务器
4 )从服务器保存消息并继续发送新的消息同步请求 。
RocketMQ 读写分离
消费者首先向主服务器发起拉取消息请求,然后主服务器返回一批消息
根据主服务器负载压力与主从同步情况,向从服务器建议下次消息拉取是从主服务器还是从从服务器拉取
RocketMQ保证消息不丢失
Producer
消息丢失
如果消息未能正确的存储在MQ中,或者消费者未能正确的消费到这条消息,都是消息丢失。
保障消息不丢失
默认情况下,可以通过同步的方式阻塞式的发送,check SendStatus,状态是OK,表示消息一定成功的投递到了Broker,状态超时或者失败,则会触发默认的2次重试。此方法的发送结果,可能Broker存储成功了,也可能没成功。
重试次数同步是1 + this.defaultMQProducer.getRetryTimesWhenSendFailed(),其他方式默认1次
// 同步发送消息重试次数,默认为 2mqProducer.setRetryTimesWhenSendFailed(3);// 异步发送消息重试次数,默认为 2mqProducer.setRetryTimesWhenSendAsyncFailed(3);
多主broker,写A broker时宕机了重试路由到其他broker
异步发送消息一定要注意重写回调方法,在回调方法中检查发送结果
RocketMQ支持 日志的索引,如果一条消息发送之后超时,也可以通过查询日志的API,来check是否在Broker存储成功
Broker
宕机、磁盘崩溃
消息支持持久化到Commitlog里面,即使宕机后重启,未消费的消息也是可以加载出来的
Broker自身支持同步刷盘、异步刷盘的策略,同步刷盘可以保证接收到的消息一定存储在本地的内存中
Broker集群支持 1主N从的策略,支持同步复制和异步复制的方式,同步复制可以保证即使Master 磁盘崩溃,消息仍然不会丢失
# master 节点配置flushDiskType = SYNC_FLUSHbrokerRole=SYNC_MASTER# slave 节点配置brokerRole=slaveflushDiskType = SYNC_FLUSH
Producer发消息到Broker后,Broker的Master节点先持久化到磁盘中,然后同步数据给Slave节点,Slave节点同步完且落盘完成后才会返回给Producer说消息ok了。
Consumer
如果消息已经完成持久化了,但是Consumer取了,但是未消费成功且没有反馈,就是消息丢失
业务逻辑完成后手动进行ack确认,这时候才会真正的代表消费完成
消息消费失败自动重试。如果消费消息失败了,没有进行ack确认,则会自动重试,重试策略和次数(默认15次)如下配置
/** * Broker可以配置的所有选项 */public class org.apache.rocketmq.store.config.MessageStoreConfig { private String messageDelayLevel = \"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h\";}
RocketMQ事务消息
事务消息发送流程
事务消息发送原理
整体流程实现
0 条评论
回复 删除
下一页