RocketMQ消息类型(延迟消息/事务消息)
2021-03-09 22:13:12 1 举报
AI智能生成
RocketMQ消息类型(延迟消息/事务消息)
作者其他创作
大纲/内容
微信扫一扫关注公众号喜欢收藏+点赞👍 谢谢
顺序消息
概念
消息有序指的是可以按照消息的发送顺序来消费(FIFO)
如何保证消息有序
全局有序:发送和消费参与的queue只有一个(没什么用)
分区有序:控制发送的顺序消息只依次发送到同一个queue中,消费时只从这个queue上依次拉取,则可保证顺序
分区有序的实现原理
生产者构建 消息队列选择器 new MessageQueueSelector(),通过订单号路由消息
消费者用 单线程的监听器 new MessageListenerOrderly(),消费队列中的有序消息
广播消息
集群模式(CLUSTERING),一条消息在同一个消费者组下,只会有一个消费者来消费(默认模式)
广播模式(BROADCASTING),不管消费者组的概念,一条消息过来就会推送给所有订阅了该topic的消费者
关键代码
consumer.setMessageModel(MessageModel.BROADCASTING); //设置消费者为广播模式
延时消息
Producer发送消息到Broker后,等待一段时间(可设置)再发送给消费者,可用作定时任务
使用限制
rocketmq定义了18个延迟级别:messageDelayLevel = \"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h\";开源版本并不支持任意时间的延时,只能设置几个固定的延时等级,不过可以通过修改broker.conf来自定义符合自己预期的18个级别
阿里商业版的RocketMQ在延时消息模块没有等级划分,取而代之得是 setStartDeliverTime(long value) 方法,自定义开始时间
msg.setDelayTimeLevel(3); //取值范围 1~18
批量消息
Batch机制
把多条消息合成为一条批量消息,一次发过去
减少网络IO,能显著提高传递消息的性能
批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息、事务消息
批量消息的单次发送总大小不能超过4MB,如果超过4M则需要对消息进行分割
生产者批量发送消息
过滤消息
过滤有两种方式
Tag过滤
SQL过滤
enablePropertyFilter = true
一个消息只能有一个标签,这对于复杂的场景可能不起作用
在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算
生产者发送消息时可以通过msg.putUserProperty来设置消息的属性
消费者消费消息时可以用MessageSelector.bySql来使用sql筛选消息
事务消息
防止生产者丢消息
同步发送+多次尝试
事务消息机制 推荐使用
(1) 生产者发送 half 消息到 MQ(对消费者不可见)
(2) MQ服务端收到 half 消息后记录消息并回复生产者
(3) 生产者根据MQ响应结果执行本地事务,并发送本地事务的执行状态
(4) MQ服务器根据本地事务状态执行Commit或者Rollback
Commit操作提交half消息,使消费者可见
RollBack是进行回滚操作,删除half消息
(5) 对没有发送状态的事务消息,MQ服务端会发起“回查”(默认回查15次,如果仍然失败则丢弃消息)
(6) 生产者收到回查消息,检查对应的本地事务的状态,重新Commit或者Rollback
防止 Broker 丢消息
刷盘策略
默认为异步刷盘,修改为同步刷盘,存入磁盘后再返回写入成功
通过Broker配置文件里的 flushDiskType 参数设置
ASYNC_FLUSH
SYNC_FLUSH
集群同步
默认为异步同步(master写成功就返回)修改为同步到slave再返回成功
通过Broker配置文件里的 brokerRole 参数设置
ASYNC_MASTER
SYNC_MASTER
因此可以通过同步刷盘策略+同步双写策略+主从的方式解决丢失消息的可能
提高消息的可靠性但是会降低性能
防止消费者丢消息
消费者收到消息后先执行本地事务,再修改offset,然后通知Broker,如果通知失败则重试
不要使用异步处理逻辑,如果收到消息后开启线程异步处理,就返回成功,很容易导致消息丢失
事务消息状态
提交状态
提交事务,它允许消费者消费此消息
回滚状态
回滚事务,它代表该消息将被删除,不允许被消费
中间状态
中间状态,它代表需要检查消息队列来确定状态
0 条评论
回复 删除
下一页