消息中间件RocketMQ
2021-12-30 00:18:48 34 举报
AI智能生成
消息中间件RocketMQ学习笔记
作者其他创作
大纲/内容
1.概念和特性
●概念(Concept): 介绍RocketMQ的基本概念模型。
1消息模型(Message Model)
2消息生产者(Producer)
3消息消费者(Consumer)
4主题(Topic)
5代理服务器(Broker Server)
6名字服务(Name Server)
7拉取式消费(Pull Consumer)
8推动式消费(Push Consumer)
9生产者组(Producer Group)
10消费者组(Consumer Group)
11集群消费(Clustering)
12广播消费(Broadcasting)
13普通顺序消息(Normal Ordered Message)
14严格顺序消息(Strictly Ordered Message)
15消息(Message)
16标签(Tag)
●特性(Features): 介绍RocketMQ实现的功能特性。
1 订阅与发布
2 消息顺序
3 消息过滤
4 消息可靠性
5 至少一次
6 回溯消费
7 事务消息
8 定时消息
9 消息重试
10 消息重投
11 流量控制
12 死信队列
2.架构设计
架构(Architecture):介绍RocketMQ部署架构和技术架构。
●设计(Design): 介绍RocketMQ关键机制的设计原理,主要包括消
1 消息存储
1.1 消息存储整体架构
(1) CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。
(2) ConsumeQueue
Consumer即可根据ConsumeQueue来查找待消费的消息
(3) IndexFile:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。
1.2 页缓存与内存映射
页缓存(PageCache)是OS对文件的缓存,用于加速对文件的读写。
1.3 消息刷盘
2 通信机制
2.1 Remoting通信类结构
2.2 协议设计与编解码
2.3 消息的通信方式和流程
2.4 Reactor多线程设计
3 消息过滤
4 负载均衡
4.1 Producer的负载均衡
4.2 Consumer的负载均衡
1、Consumer端的心跳包发送
2、Consumer端实现负载均衡的核心类—RebalanceImpl
5 事务消息
5.1 RocketMQ事务消息流程概要
5.2 RocketMQ事务消息设计
1.事务消息在一阶段对用户不可见
2.Commit和Rollback操作以及Op消息的引入
3.Op消息的存储和对应关系
4.Half消息的索引构建
5.如何处理二阶段失败的消息?
6 消息查询
6.1 按照MessageId查询消息
6.2 按照Message Key查询消息
3.样例
)1基本样例
1.1加入依赖:
1.2消息发送
1、Producer端 发送同步消息
2、发送异步消息
3、单向发送消息
1.3消费消息
) 2顺序消息样例
■2.1顺序消息生产
sh bin/tools.sh org.apache.rocketmq.example.ordermessage.Producer
2.2顺序消费消息
sh bin/tools.sh org.apache.rocketmq.example.ordermessage.Consumer
3延时消息样例
3.1启动消费者等待传入订阅消息
3.2发送延时消息
3.3验证
■3.4延时消息的使用场景
3.5延时消息的使用限制
)4批量消息样例
■4.1发送批量消息
■4.2消息列表分割
5过滤消息样例
5.1基本语法
5.2使用样例
■1、生产者样例
2、消费者样例
6消息事务样例
6.1发送事务消息样例
■1、创建事务性生产者
2、实现事务的监听接口
6.2事务消息使用上的限制
1. 事务消息不支持延时消息和批量消息。
2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。
3. 事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
4. 事务性消息可能不止一次被检查或消费。
5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
7 Logappender样例
7.1 log4j样例
7.2 log4j2样例
7.3 logback样例
8 OpenMessaging样例
8.1 OMSProducer样例
8.2 OMSPullConsumer
8.3 OMSPushConsumer
4.最佳实践
最佳实践(Best Practice)
1 生产者
1.1 发送消息注意事项
1.2 消息发送失败处理方式
1.3选择oneway形式发送
2 消费者
2.1 消费过程幂等
2.2 消费速度慢的处理方式
1 提高消费并行度
2 批量方式消费
3 跳过非重要消息
4 优化每条消息消费过程
2.3 消费打印日志
2.4 其他消费建议
3 Broker
3.1 Broker 角色
3.2 FlushDiskType
3.3 Broker 配置
4 NameServer
5 客户端配置
5.1 客户端寻址方式
5.2 客户端配置
6 系统配置
6.1 JVM选项
6.2 Linux内核参数
消息轨迹指南(Message Trace):
权限管理(Auth Management):个
Dledger快速搭建(Quick Start) :
集群部署(Cluster Deployment):
4 种高可用 RocketMQ 集群搭建方案!
5.运维管理
●集群部署(Operation): .
集群搭建
nohup sh bin/mqnamesrv &
nohup sh bin/mqbroker -n localhost:9876 &
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
1 集群搭建
1.1 单Master模式
1.2 多Master模式
1.3 多Master多Slave模式-异步复制
1.4 多Master多Slave模式-同步双写
2 mqadmin管理工具
3 运维常见问题
6. 《RocketMQ分布式消息中间件:核心原理与最佳实践》
第1章RoketMQ综述
1.1什么是消息队列
1.2为什么需要消息队列
1.3常见消息队列
1.4 RocketMQ的发展史与未来
第2章RocketMQ的生产者原理和最佳实践
2.1生产者原理
2.2生产者启动流程
2.3消息发送流程
2.4发送消息最佳实践
2.5生产者最佳实践总结
第3章RocketMQ的消费流程和最佳实践
3.1消费者概述
3.2消费者启动机制
1. DefaultLitePullConsumer
3.3消费者的Rebalance机制
1. 这里我们主要讲消费者实例在收到Broker通知后是怎么执行Rebalance的
2. 目前队列分配策略有以下5种实现方法:
AllocateMessageQueueStrategy
AllocateMessageQueueAveragely:平均分配,也是默认使用的策略(强烈推荐)。
AllocateMessageQueueAveragelyByCircle:环形分配策略。
AllocateMessageQueueByConfig:手动配置。
AllocateMessageQueueConsistentHash:一致性Hash分配。
AllocateMessageQueueByMachineRoom:机房分配策略。
3.4消费进度保存机制
从 3.1.2 节可知,在消费者启动时会同时启动位点管理器,那么位点具体是怎么管理的呢?RocketMQ 设计了远程位点管理和本地位点管理两种位点管理方式。集群消费时,位点由客户端提交给Broker保存,具体实现代码在RemoteBrokerOffsetStore.java文件中;广播消费时,位点保存在消费者本地磁盘上,实现代码在LocalFileOffsetStore.java文件中。
3.5消费方式
1. Pull方式
2. Push 方式
3.5.1 Pull消费流程
3.6消息过滤
3.6.1 为什么要设计过滤功能
RocketMQ 设计了消息过滤,来解决大量无意义流量的传输:即对于客户端不需要的消息,Broker就不会传输给客户端,以免浪费宽带。
3.6.1 为什么要设计过滤功能
3.7消费者最佳实践总结
第4章RocketMQ架构和部署最佳实践
4.1 RocketMQ架构
4.2常用的部署拓扑和部署实践
4.2.1 常用的拓扑图
4.2.2 同步复制、异步复制和同步刷盘、异步刷盘
4.2.3 部署实践
第5章Namesrv
5.1 Namesrv概述
5.1.1 什么是Namesrv
Namesrv的主要功能是临时保存、管理Topic路由信息,各个Namesrv节点是无状态的,即每两个Namesrv节点之间不通信,互相不知道彼此的存在。在Broker、生产者、消费者启动时,轮询全部配置的 Namesrv 节点,拉取路由信息。
5.1.2 Namesrv核心数据结构和API
5.1.3 Namesrv和Zookeeper
5.2 Namesrv架构
5.3 RocketMQ的路由原理
第6章Broker存储机制
6.1 Broker概述
6.1.1 什么是Broker
6.1.2 Broker存储目录结构
6.1.3 Broker启动和停止流程
6.2 Broker存储机制
6.2.1 Broker消息存储结构
6.2.2 Broker消息存储机制
1.Broker消息存储的流程
2.内存映射机制与高效写磁盘
3.文件刷盘机制
6.2.3 Broker读写分离机制
6.3 Broker CommitLog索引机制
6.3.1 索引的数据结构
6.3.2 索引的构建过程
1.创建Consume Queue和Index File
2.索引创建失败怎么办
6.3.3 索引如何使用
1.按照位点查消息
2.按照时间段查消息
3.按照key查询消息
6.4 Broker过期文件删除机制
6.4.1 CommitLog文件的删除过程
6.4.2 Consume Queue、Index File文件的删除过程
6.5 Broker主从同步机制
6.5.1 主从同步概述
6.5.2 主从同步流程
1.名词解释
2.配置数据同步流程
3.CommitLog数据同步流程
6.6 Broker的关机恢复机制
6.6.1 Broker关机恢复概述
6.6.2 Broker关机恢复流程
Broker 在启动时会初始化 abort、checkpoint 两个文件。正常关闭进程时会删除 abort文件,将 checkpoint 文件刷盘;异常关闭时,通常来不及删除abort 文件。由此,在重新启动Broker时会根据abort判断是否需要异常停止进程,而后恢复数据。
第7章RocketMQ特性一事 务消息与延迟消息机制
7.1事务消息概述
7.2事务消息机制
7.3延迟消息概述
7.4延迟消息机制
第8章RocketMQ源代码阅读
8.1 RocketMQ源代码结构概述
8.2 RocketMQ源代码编译
8.3如何阅读源代码
8.4源代码阅读范例:通过消息id查询消息
第9章RocketMQ企业最佳实践
9.1 RocketMQ落地概述
9.2 RocketMQ集群管理
9.3 RocketMQ集群监控和报警
9.4 RocketMQ集群迁移
9.5 RocketMQ测试环境实践
9.6 RocketMQ接入实践
0 条评论
下一页