0604 - 分布式消息中间件
2021-05-12 23:06:22 1 举报
AI智能生成
系统架构、Java技术栈、面试宝典
作者其他创作
大纲/内容
基础知识
使用场景
应用解耦合
队列可以对应用进行解耦合,应用之间不用直接调用。
服务通信
可以通过队列来传递消息,完成通信。
异步任务
队列也可以用来执行异步任务,任务提交方无需等待结果。
消减填谷
队列的另一个作用是削峰填谷,在突发流量时,可以通过队列做缓冲,不会对后端服务产生较大压力,当峰值过去时,可以逐渐消费堆积的数据,填平流量低谷。
消息广播
消息队列一般还提供了一写多读的能力,可以用来做消息的多播与广播。
消息协议
JMS
MS 是 Java 的消息服务,规定了 Java 使用消息服务的 API,在前面 Spring 的课时提到过,Spring 提供了支持 JMS 的组件。
AMQP
AMQP 是高级消息队列协议,是应用层协议的一个开放标准,AMQP 不从 API 层进行限定,而是直接定义网络交换的数据格式,因此支持跨语言的能力,例如 RabbitMQ 就使用了 AMQP 实现。
常见MQ对比
RabbitMQ
使用 Erlang 开发的开源消息队列,通过 Erlang 的 Actor 模型实现了数据的稳定可靠传输。支持 AMQP、XMPP、SMTP 等多种协议,因此也比较重量级。由于采用 Broker 代理架构,发送给客户端时先在中心队列排队,疑似 RabbitMQ 的单机吞吐量在万级,不算很高。
Redis
Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。
ActiveMQ
ActiveMQ是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。
RocketMQ
阿里开源的消息中间件,单机能够支持 10w 级的吞吐量,使用 Java 开发,具有高吞吐量、高可用性的特点、适合在大规模分布式系统中应用。
Kafka/Jafka
Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制统一了在线和离线的消息处理。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。
MQ介绍
消息队列是一种“先进先出”的数据结构
应用解耦
流量削峰
数据分发
优点和缺点
优点:解耦、消峰、数据分发
缺点:系统可用性降低、系统复杂度提高、一致性问题
RocketMQ
RocketMQ集群搭建
各角色介绍
- Producer:消息的发送者;举例:发信者
- Consumer:消息接收者;举例:收信者
- Broker:暂存和传输消息;举例:邮局
- NameServer:管理Broker;举例:各个邮局的管理机构
- Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息
- Message Queue:相当于是Topic的分区;用于并行发送和接收消息
- Consumer:消息接收者;举例:收信者
- Broker:暂存和传输消息;举例:邮局
- NameServer:管理Broker;举例:各个邮局的管理机构
- Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息
- Message Queue:相当于是Topic的分区;用于并行发送和接收消息
集群部署流程图
集群搭建方式
集群特点
- NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
- Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。
- Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
- Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
- Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。
- Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
- Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
集群模式
单Master模式
这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。
多Master模式
优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
多Master多Slave模式(异步)
优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
缺点:Master宕机,磁盘损坏情况下会丢失少量消息。
多Master多Slave模式(同步)
优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。
双主双从集群搭建
消息高可用采用2m-2s(同步双写)方式
2m-2s同步双写方式
详解RocketMQ监控与运维
集群管理
mqadmin管理工具
集群监控平台
git clone https://github.com/apache/rocketmq-externals
cd rocketmq-console
mvn clean package -Dmaven.test.skip=true
cd rocketmq-console
mvn clean package -Dmaven.test.skip=true
打包前在rocketmq-console中配置namesrv集群地址:
rocketmq.config.namesrvAddr=192.168.25.135:9876;192.168.25.136:9876
rocketmq.config.namesrvAddr=192.168.25.135:9876;192.168.25.136:9876
java -jar rocketmq-console-ng-1.0.0.jar
详解普通消息、顺序消息、事务消息、定时消息
基本样例
消息发送
发送同步消息
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
单向发送消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
消费消息
负载均衡模式
消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同
广播模式
消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的
顺序消息
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
顺序消息
顺序消费消息
延时消息
比如电商里,提交了一个订单就可以发送一个延时消息,
1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18
1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18
发送延时消息
消费延时消息
批量消息
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,
相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如上
过滤消息
事务消息
上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
1)事务消息发送及提交
(1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
(1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
2)事务补偿
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况
3)事务消息状态
事务消息共有三种状态,提交状态、回滚状态、中间状态:
- TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
- TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
- TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
事务消息共有三种状态,提交状态、回滚状态、中间状态:
- TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
- TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
- TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
创建事务性生产者:使用 TransactionMQProducer类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来
处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。
处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。
实现事务的监听接口:当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTranscation 方法用于检查本地事务状态,并回应消息队列的检查请求。
1. 事务消息不支持延时消息和批量消息。
2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
3. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
4. 事务性消息可能不止一次被检查或消费。
5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
3. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
4. 事务性消息可能不止一次被检查或消费。
5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
高级功能
消息存储
概述
分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。
1. 消息生成者发送消息
2. MQ收到消息,将消息进行持久化,在存储中新增一条记录
3. 返回ACK给生产者
4. MQ push 消息给对应的消费者,然后等待消费者返回ACK
5. 如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消费成功,在存储中删除消息,即执行第6步;
如果MQ在指定时间内没有收到ACK,则认为消息消费失败,会尝试重新push消息,重复执行4、5、6步骤
6. MQ删除消息
2. MQ收到消息,将消息进行持久化,在存储中新增一条记录
3. 返回ACK给生产者
4. MQ push 消息给对应的消费者,然后等待消费者返回ACK
5. 如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消费成功,在存储中删除消息,即执行第6步;
如果MQ在指定时间内没有收到ACK,则认为消息消费失败,会尝试重新push消息,重复执行4、5、6步骤
6. MQ删除消息
存储介质
关系型数据库
Apache下开源的另外一款MQ—ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于,普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。在可靠性方面,该种方案非常依赖DB,如果一旦DB出现故障,则MQ的消息就无法落盘存储会导致线上故障
文件系统
目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。
消息的存储和发送
消息存储
磁盘如果使用得当,磁盘的速度完全可以匹配上网络 的数据传输速度。目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。RocketMQ的消息用顺序写,保证了消息存储的速度。
消息发送
Linux操作系统分为【用户态】和【内核态】,文件操作、网络操作需要涉及这两种形态的切换,免不了进行数据复制。
一台服务器 把本机磁盘文件的内容发送到客户端,一般分为两个步骤:
1)read;读取本地文件内容;
2)write;将读取的内容通过网络发送出去。
这两个看似简单的操作,实际进行了4 次数据复制,分别是:
1. 从磁盘复制数据到内核态内存;
2. 从内核态内存复制到用户态内存;
3. 然后从用户态内存复制到网络驱动的内核态内存;
4. 最后是从网络驱动的内核态内存复 制到网卡中进行传输。
数据 ------------> 内核态 -------------> 用户态 -----------> 网络驱动内核 ---------> 网卡 ----------> 数据
通过使用mmap的方式,可以省去向用户态的内存复制,提高速度。这种机制在Java中是通过MappedByteBuffer实现的
RocketMQ充分利用了上述特性,也就是所谓的“零拷贝”技术,提高消息存盘和网络发送的速度。
这里需要注意的是,采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了
一台服务器 把本机磁盘文件的内容发送到客户端,一般分为两个步骤:
1)read;读取本地文件内容;
2)write;将读取的内容通过网络发送出去。
这两个看似简单的操作,实际进行了4 次数据复制,分别是:
1. 从磁盘复制数据到内核态内存;
2. 从内核态内存复制到用户态内存;
3. 然后从用户态内存复制到网络驱动的内核态内存;
4. 最后是从网络驱动的内核态内存复 制到网卡中进行传输。
数据 ------------> 内核态 -------------> 用户态 -----------> 网络驱动内核 ---------> 网卡 ----------> 数据
通过使用mmap的方式,可以省去向用户态的内存复制,提高速度。这种机制在Java中是通过MappedByteBuffer实现的
RocketMQ充分利用了上述特性,也就是所谓的“零拷贝”技术,提高消息存盘和网络发送的速度。
这里需要注意的是,采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了
消息存储结构
RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成 的,消息真正的物理存储文件是CommitLog,
ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每 个Topic下的每个
Message Queue都有一个对应的ConsumeQueue文件。
ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每 个Topic下的每个
Message Queue都有一个对应的ConsumeQueue文件。
- CommitLog:存储消息的元数据
- ConsumerQueue:存储消息在CommitLog的索引
- IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程
- ConsumerQueue:存储消息在CommitLog的索引
- IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程
刷盘机制
RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。
RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候,
有两种写磁盘方式,分布式同步刷盘和异步刷盘。
RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候,
有两种写磁盘方式,分布式同步刷盘和异步刷盘。
1)同步刷盘
在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态。
2)异步刷盘
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
3)配置
同步刷盘还是异步刷盘,都是通过Broker配置文件里的flushDiskType 参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个。
在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态。
2)异步刷盘
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
3)配置
同步刷盘还是异步刷盘,都是通过Broker配置文件里的flushDiskType 参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个。
高可用性机制
概述
RocketMQ分布式集群是通过Master和Slave的配合达到高可用性的。
Master和Slave的区别:
在Broker的配置文件中,参数 brokerId的值为0表明这个Broker是Master,大于0表明这个Broker是 Slave,同时
brokerRole参数也会说明这个Broker是Master还是Slave。
Master角色的Broker支持读和写,Slave角色的Broker仅支持读,也就是 Producer只能和Master角色的Broker连
接写入消息;Consumer可以连接 Master角色的Broker,也可以连接Slave角色的Broker来读取消息。
在Broker的配置文件中,参数 brokerId的值为0表明这个Broker是Master,大于0表明这个Broker是 Slave,同时
brokerRole参数也会说明这个Broker是Master还是Slave。
Master角色的Broker支持读和写,Slave角色的Broker仅支持读,也就是 Producer只能和Master角色的Broker连
接写入消息;Consumer可以连接 Master角色的Broker,也可以连接Slave角色的Broker来读取消息。
消息消费高可用
在Consumer的配置文件中,并不需要设置是从Master读还是从Slave 读,当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave 读。有了自动切换Consumer这种机制,当一个Master角色的机器出现故障后,Consumer仍然可以从Slave读取消息,不影响Consumer程序。这就达到了消费端的高可用性。
消息发送高可用
在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同
brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可用后,其他组的Master仍然
可用,Producer仍然可以发送消息。 RocketMQ目前还不支持把Slave自动转成Master,如果机器资源
不足, 需要把Slave转成Master,则要手动停止Slave角色的Broker,更改配置文 件,用新的配置文件
启动Broker。
brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可用后,其他组的Master仍然
可用,Producer仍然可以发送消息。 RocketMQ目前还不支持把Slave自动转成Master,如果机器资源
不足, 需要把Slave转成Master,则要手动停止Slave角色的Broker,更改配置文 件,用新的配置文件
启动Broker。
消息主从复制
如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,
有同步和异步两种复制方式。
有同步和异步两种复制方式。
1)同步复制
同步复制方式是等Master和Slave均写 成功后才反馈给客户端写成功状态;
在同步复制方式下,如果Master出故障, Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入 延迟,降低系统吞吐量。
在同步复制方式下,如果Master出故障, Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入 延迟,降低系统吞吐量。
2)异步复制
异步复制方式是只要Master写成功 即可反馈给客户端写成功状态。
在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写 入Slave,有可能会丢失;
在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写 入Slave,有可能会丢失;
3)配置
同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个。
4)总结
实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式, 尤其是SYNC_FLUSH方式,由于频繁地触发磁盘写动作,会明显降低 性能。通常情况下,应该把Master和Save配置成ASYNC_FLUSH的刷盘方式,主从之间配置成SYNC_MASTER的复制方式,这样即使有一台 机器出故障,仍然能保证数据不丢,是个不错的选择。
负载均衡
Producer负载均衡
Producer端,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均
落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下,如
下图:
落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下,如
下图:
Consumer负载均衡
集群模式
在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。
RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。
而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。
默认的分配算法是AllocateMessageQueueAveragely,如下图:
RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。
而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。
默认的分配算法是AllocateMessageQueueAveragely,如下图:
需要注意的是,集群模式下,queue都是只允许分配只一个实例,这是由于如果多个实例同时消费一个
queue的消息,由于拉取哪些消息是consumer主动控制的,那样会导致同一个消息在不同的实例下被
消费多次,所以算法上都是一个queue只分给一个consumer实例,一个consumer实例可以允许同时分
到不同的queue。
通过增加consumer实例去分摊queue的消费,可以起到水平扩展的消费能力的作用。而有实例下线的
时候,会重新触发负载均衡,这时候原来分配到的queue将分配到其他实例上继续消费。
但是如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法
分到queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让queue的总数量
大于等于consumer的数量。
queue的消息,由于拉取哪些消息是consumer主动控制的,那样会导致同一个消息在不同的实例下被
消费多次,所以算法上都是一个queue只分给一个consumer实例,一个consumer实例可以允许同时分
到不同的queue。
通过增加consumer实例去分摊queue的消费,可以起到水平扩展的消费能力的作用。而有实例下线的
时候,会重新触发负载均衡,这时候原来分配到的queue将分配到其他实例上继续消费。
但是如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法
分到queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让queue的总数量
大于等于consumer的数量。
广播模式
由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,
所以也就没有消息被分摊消费的说法。
所以也就没有消息被分摊消费的说法。
在实现上,其中一个不同就是在consumer分配queue的时候,所有consumer都分到所有的queue。
消息重试
顺序消息的重试
对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会
出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
无序消息的重试
对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。
无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
重试次数
消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下:
如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在
一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将
不再重试投递。
注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。
如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在
一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将
不再重试投递。
注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。
配置方式
消费失败后,重试配置方式
集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种
方式任选一种):
返回 Action.ReconsumeLater (推荐)
返回 Null
抛出异常
方式任选一种):
返回 Action.ReconsumeLater (推荐)
返回 Null
抛出异常
消费失败后,不重试配置方式
集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回
Action.CommitMessage,此后这条消息将不会再重试。
Action.CommitMessage,此后这条消息将不会再重试。
自定义消息最大重试次数
消息队列 RocketMQ 允许 Consumer 启动的时候设置最大重试次数,重试时间间隔将按照如下策略:
最大重试次数小于等于 16 次,则重试时间间隔同上表描述。
最大重试次数大于 16 次,超过 16 次的重试时间间隔均为每次 2 小时。
最大重试次数小于等于 16 次,则重试时间间隔同上表描述。
最大重试次数大于 16 次,超过 16 次的重试时间间隔均为每次 2 小时。
注意:
消息最大重试次数的设置对相同 Group ID 下的所有 Consumer 实例有效。
如果只对相同 Group ID 下两个 Consumer 实例中的其中一个设置了 MaxReconsumeTimes,那
么该配置对两个 Consumer 实例均生效。
配置采用覆盖的方式生效,即最后启动的 Consumer 实例会覆盖之前的启动实例的配置
消息最大重试次数的设置对相同 Group ID 下的所有 Consumer 实例有效。
如果只对相同 Group ID 下两个 Consumer 实例中的其中一个设置了 MaxReconsumeTimes,那
么该配置对两个 Consumer 实例均生效。
配置采用覆盖的方式生效,即最后启动的 Consumer 实例会覆盖之前的启动实例的配置
获取消息重试次数
消费者收到消息后,可按照如下方式获取消息的重试次数:
死信队列
当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依
然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消
息丢弃,而是将其发送到该消费者对应的特殊队列中。
在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter
Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消
息丢弃,而是将其发送到该消费者对应的特殊队列中。
在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter
Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
特性
死信消息具有以下特性
不会再被消费者正常消费。
有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。
死信队列具有以下特性:
一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。
不会再被消费者正常消费。
有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。
死信队列具有以下特性:
一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。
一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息,
因此,通常需要您对其进行特殊处理。排查可疑因素并解决问题后,可以在
消息队列 RocketMQ 控制台重新发送该消息,让消费者重新消费一次。
因此,通常需要您对其进行特殊处理。排查可疑因素并解决问题后,可以在
消息队列 RocketMQ 控制台重新发送该消息,让消费者重新消费一次。
消费幂等
消息队列 RocketMQ 消费者在接收到消息以后,有必要根据业务上的
唯一Key对消息做幂等处理的必要性。
唯一Key对消息做幂等处理的必要性。
消费幂等的必要性
在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息
有可能会出现重复,这个重复简单可以概括为以下情况:
有可能会出现重复,这个重复简单可以概括为以下情况:
发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务
端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收
到两条内容相同并且 Message ID 也相同的消息。
端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收
到两条内容相同并且 Message ID 也相同的消息。
投递时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络
闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投
递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投
递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
负载均衡时消息重复
(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)
当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者
可能会收到重复消息。
当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者
可能会收到重复消息。
处理方式
因为 Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID
作为处理依据。 最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消
息 Key 进行设
作为处理依据。 最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消
息 Key 进行设
源码分析
环境搭建
源代码结构
源码目录结构:
broker: broker 模块(broke 启动进程)
client :消息客户端,包含消息生产者、消息消费者相关类
common :公共包
dev :开发者信息(非源代码)
distribution :部署实例文件夹(非源代码)
example: RocketMQ 例代码
filter :消息过滤相关基础类
filtersrv:消息过滤服务器实现相关类(Filter启动进程)
logappender:日志实现相关类
namesrv:NameServer实现相关类(NameServer启动进程)
openmessageing:消息开放标准
remoting:远程通信模块,给予Netty
srcutil:服务工具类
store:消息存储实现相关类
style:checkstyle相关实现
test:测试相关类
tools:工具类,监控命令相关实现类
broker: broker 模块(broke 启动进程)
client :消息客户端,包含消息生产者、消息消费者相关类
common :公共包
dev :开发者信息(非源代码)
distribution :部署实例文件夹(非源代码)
example: RocketMQ 例代码
filter :消息过滤相关基础类
filtersrv:消息过滤服务器实现相关类(Filter启动进程)
logappender:日志实现相关类
namesrv:NameServer实现相关类(NameServer启动进程)
openmessageing:消息开放标准
remoting:远程通信模块,给予Netty
srcutil:服务工具类
store:消息存储实现相关类
style:checkstyle相关实现
test:测试相关类
tools:工具类,监控命令相关实现类
架构设计
消息中间件的设计思路一般是基于主题订阅发布的机制,消息生产者(Producer)发送某一个主题到消
息服务器,消息服务器负责将消息持久化存储,消息消费者(Consumer)订阅该兴趣的主题,消息服
务器根据订阅信息(路由信息)将消息推送到消费者(Push模式)或者消费者主动向消息服务器拉去
(Pull模式),从而实现消息生产者与消息消费者解耦。为了避免消息服务器的单点故障导致的整个系
统瘫痪,通常会部署多台消息服务器共同承担消息的存储。那消息生产者如何知道消息要发送到哪台消
息服务器呢?如果某一台消息服务器宕机了,那么消息生产者如何在不重启服务情况下感知呢?
NameServer就是为了解决以上问题设计的。
息服务器,消息服务器负责将消息持久化存储,消息消费者(Consumer)订阅该兴趣的主题,消息服
务器根据订阅信息(路由信息)将消息推送到消费者(Push模式)或者消费者主动向消息服务器拉去
(Pull模式),从而实现消息生产者与消息消费者解耦。为了避免消息服务器的单点故障导致的整个系
统瘫痪,通常会部署多台消息服务器共同承担消息的存储。那消息生产者如何知道消息要发送到哪台消
息服务器呢?如果某一台消息服务器宕机了,那么消息生产者如何在不重启服务情况下感知呢?
NameServer就是为了解决以上问题设计的。
Broker消息服务器在启动的时向所有NameServer注册,消息生产者(Producer)在发送消息时之前先
从NameServer获取Broker服务器地址列表,然后根据负载均衡算法从列表中选择一台服务器进行发
送。NameServer与每台Broker保持长连接,并间隔30S检测Broker是否存活,如果检测到Broker宕
机,则从路由注册表中删除。但是路由变化不会马上通知消息生产者。这样设计的目的是为了降低
NameServer实现的复杂度,在消息发送端提供容错机制保证消息发送的可用性。
NameServer本身的高可用是通过部署多台NameServer来实现,但彼此之间不通讯,也就是
NameServer服务器之间在某一个时刻的数据并不完全相同,但这对消息发送并不会造成任何影响,这
也是NameServer设计的一个亮点,总之,RocketMQ设计追求简单高效。
从NameServer获取Broker服务器地址列表,然后根据负载均衡算法从列表中选择一台服务器进行发
送。NameServer与每台Broker保持长连接,并间隔30S检测Broker是否存活,如果检测到Broker宕
机,则从路由注册表中删除。但是路由变化不会马上通知消息生产者。这样设计的目的是为了降低
NameServer实现的复杂度,在消息发送端提供容错机制保证消息发送的可用性。
NameServer本身的高可用是通过部署多台NameServer来实现,但彼此之间不通讯,也就是
NameServer服务器之间在某一个时刻的数据并不完全相同,但这对消息发送并不会造成任何影响,这
也是NameServer设计的一个亮点,总之,RocketMQ设计追求简单高效。
NameServer
Producer
消息存储
Consumer
Kafka
第一章 Kafka 概述
定义
Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
使用消息队列的好处
解耦
在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
缓冲
在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽可能的快速。有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
灵活性&峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
冗余(副本)
有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。
顺序保证
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性。
消息队列的两种模式
(1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息消除)
消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。
消息被消费以后,queue 中不再有存储,所以消息消费者不可能消费到已经被消费的消息。
Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
消息被消费以后,queue 中不再有存储,所以消息消费者不可能消费到已经被消费的消息。
Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
(2)发布/订阅模式(一对多,消费者消费数据之后不会清除数据)
消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。
和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。
和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。
Kafka 架构
架构图
术语解释
Producer
生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。
生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition
生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition
Consumer
消息消费者,向Kafka Broker取消息的客户端
Broker
一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic。
Consumer Group
消费者组,由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。
所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处) 类似于数据库的表名,可以理解为一个队列,生产者和消费者面向的都是一个topic;
Partition
topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1
Replica
副本,为了保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower
Leader
每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
Follower
Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower
原理
首先 Kafka 消息队列由三个角色组成,左面的是消息的生产方 Producer;中间是 Kafka 集群, Kafka 集群由多台 Kafka server 组成,每个 Server 称为一个 Broker,也就是消息代理;右面的是消息的消费方 Consumer。
Kafka 中消息是按照 Topic 进行划分的,一个 Topic 就是一个 Queue。在实际应用中,不同业务数据就可以设置为不同的 Topic。一个 Topic 可以有多个消费方,当生产方在某个 Topic 发出一条消息后,所有订阅了这个 Topic 的消费方都可以收到这条消息。
为了提高并行能力,Kafka 为每个 Topic 维护了多个 Partition 分区,每个分区可以看作一份追加类型的日志。 每个分区中的消息保证 ID 唯一且有序,新消息不断追加到尾部。Partition 实际存储数据时,会对按大小进行分段(Segment),来保证总是对较小的文件进行写操作,提高性能,方便管理。
如图中间部分,Partition 分布于多个 Broker 上。图中绿色的模块表示 Topic1 被分为了 3 个 Partition。每个 Partition 会被复制多份存在于不同的 Broker 上,如图中红色的模块,这样可以保证主分区出现问题时进行容灾。每个 Broker 可以保存多个 Topic 的多个 Partition。
Kafka 只保证一个分区内的消息有序,不能保证一个 Topic 的不同分区之间的消息有序。为了保证较高的处理效率,所有的消息读写都是在主 Partition 中进行,其他副本分区只会从主分区复制数据。Kafka 会在 ZooKeeper 上针对每个 Topic 维护一个称为 ISR(in-sync replica),就是已同步的副本集。如果某个主分区不可用了,Kafka 就会从 ISR 集合中选择一个副本作为新的主分区。
Kafka 中消息是按照 Topic 进行划分的,一个 Topic 就是一个 Queue。在实际应用中,不同业务数据就可以设置为不同的 Topic。一个 Topic 可以有多个消费方,当生产方在某个 Topic 发出一条消息后,所有订阅了这个 Topic 的消费方都可以收到这条消息。
为了提高并行能力,Kafka 为每个 Topic 维护了多个 Partition 分区,每个分区可以看作一份追加类型的日志。 每个分区中的消息保证 ID 唯一且有序,新消息不断追加到尾部。Partition 实际存储数据时,会对按大小进行分段(Segment),来保证总是对较小的文件进行写操作,提高性能,方便管理。
如图中间部分,Partition 分布于多个 Broker 上。图中绿色的模块表示 Topic1 被分为了 3 个 Partition。每个 Partition 会被复制多份存在于不同的 Broker 上,如图中红色的模块,这样可以保证主分区出现问题时进行容灾。每个 Broker 可以保存多个 Topic 的多个 Partition。
Kafka 只保证一个分区内的消息有序,不能保证一个 Topic 的不同分区之间的消息有序。为了保证较高的处理效率,所有的消息读写都是在主 Partition 中进行,其他副本分区只会从主分区复制数据。Kafka 会在 ZooKeeper 上针对每个 Topic 维护一个称为 ISR(in-sync replica),就是已同步的副本集。如果某个主分区不可用了,Kafka 就会从 ISR 集合中选择一个副本作为新的主分区。
消息发布/消费流程
来看图中的例子,这个 Topic 分为 4 个 Partition,就是图中绿色的 P1到 P4,上部的生产方根据规则选择一个 Partition 进行写入,默认规则是轮询策略。也可以由生产方指定 Partition 或者指定 key 来根据 Hash 值选择 Partition。
消息的发送有三种方式:同步、异步以及 oneway。
(1)同步模式下后台线程中发送消息时同步获取结果,这也是默认模式。
(2)异步的模式允许生产者批量发送数据,可以极大的提高性能,但是会增加丢失数据的风险。
(3)oneway 模式只发送消息不需要返回发送结果,消息可靠性最低,但是低延迟、高吞吐,适用于对可靠性要求不高的场景。
来看消息的消费,Consumer 按照 Group 来消费消息,Topic 中的每一条消息可以被多个 Consumer Group 消费,如上图中的 GroupA 和 GroupB。Kafka 确保每个 Partition 在一个 Group 中只能由一个 Consumer 消费。Kafka 通过 Group Coordinator 来管理 Consumer 实际负责消费哪个 Partition,默认支持 Range 和轮询分配。
Kafka 在 ZK 中保存了每个 Topic 中每个 Partition 在不同 Group 的消费偏移量 offset,通过更新偏移量保证每条消息都被消费。
注意:用多线程来读取消息时,一个线程相当于一个 Consumer 实例。当 Consumer 的数量大于分区的数量的时候,有的 Consumer 线程会读取不到数据。
消息的发送有三种方式:同步、异步以及 oneway。
(1)同步模式下后台线程中发送消息时同步获取结果,这也是默认模式。
(2)异步的模式允许生产者批量发送数据,可以极大的提高性能,但是会增加丢失数据的风险。
(3)oneway 模式只发送消息不需要返回发送结果,消息可靠性最低,但是低延迟、高吞吐,适用于对可靠性要求不高的场景。
来看消息的消费,Consumer 按照 Group 来消费消息,Topic 中的每一条消息可以被多个 Consumer Group 消费,如上图中的 GroupA 和 GroupB。Kafka 确保每个 Partition 在一个 Group 中只能由一个 Consumer 消费。Kafka 通过 Group Coordinator 来管理 Consumer 实际负责消费哪个 Partition,默认支持 Range 和轮询分配。
Kafka 在 ZK 中保存了每个 Topic 中每个 Partition 在不同 Group 的消费偏移量 offset,通过更新偏移量保证每条消息都被消费。
注意:用多线程来读取消息时,一个线程相当于一个 Consumer 实例。当 Consumer 的数量大于分区的数量的时候,有的 Consumer 线程会读取不到数据。
设计目标
以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
同时支持离线数据处理和实时数据处理。
Scale out:支持在线水平扩展
高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
同时支持离线数据处理和实时数据处理。
Scale out:支持在线水平扩展
第二章 Kafka集群搭建与使用
集群部署
集群规划
hadoop102 hadoop103 hadoop104
zk zk zk
kafka kafka kafka
zk zk zk
kafka kafka kafka
http://kafka.apache.org/downloads.html
1)解压安装包 $ tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/
2)修改解压后的文件名称 $ mv kafka_2.11-0.11.0.0/ kafka
3)在opt/module/kafka目录下创建logs文件夹 $ mkdir logs
4)修改配置文件 $ cd config/ $ vi server.properties
#broker 的全局唯一编号,不能重复
broker.id=0
#删除 topic 功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接 Zookeeper 集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
broker.id=0
#删除 topic 功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接 Zookeeper 集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
5)配置环境变量 $ sudo vi /etc/profile
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
$ source /etc/profile
6)分发安装包 $ xsync kafka/
注意:分发之后记得配置其他机器的环境变量
7)分别在 hadoop103 和 hadoop104 上修改配置文件/opt/module/kafka/config/server.properties中的 broker.id=1、broker.id=2
8) 启动集群
依次在hadoop102、 hadoop103、 hadoop104节点上启动 kafka
$ bin/kafka server start.sh daemon config/server.properties
9) 关闭集群
$ bin/kafka server stop.s h stop
10) kafka群起脚本
for i in hadoop102 hadoop 103 hadoop104
do
echo "========== $i =========="
ssh $i '/opt/module/kafka/bin/kafka server start.sh daemon
/opt/module/kafka/config/server.
done
do
echo "========== $i =========="
ssh $i '/opt/module/kafka/bin/kafka server start.sh daemon
/opt/module/kafka/config/server.
done
命令行操作
(1)查看当前服务器中的所有topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh zookeeper hadoop102:2181 list
(2)创建topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first
--topic 定义 topic名
--replication-factor 定义副本数
--partitions 定义分区数
--replication-factor 定义副本数
--partitions 定义分区数
(3)删除topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop10 2:2181 --delete --topic first
需要server.properties中设置 delete.topic.enable=true否则只是标记删除
(4)发送消息
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
(5)消费消息
[atguigu@hadoop10 2 kafka]$ bin/kafka-console-consumer.sh \--zookeeper hadoop102: 2181 --topic first
(6)查看某个Topic的详情
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first
(7)修改分区数
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6
第三章 Kafka 架构设计原理分析
Kafka工作流程及文件存储机制
Kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。
--------------------------------------------------------------------------------------------
topic 是逻辑上的概念,而partition 是物理上的概念,每个partition 对应于一个log 文件,
该log 文件中存储的就是producer 生产的数据。
Producer 生产的数据会被不断追加到该log 文件末端,且每条数据都有自己的offset。
消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,
从上次的位置继续消费。
--------------------------------------------------------------------------------------------
topic 是逻辑上的概念,而partition 是物理上的概念,每个partition 对应于一个log 文件,
该log 文件中存储的就是producer 生产的数据。
Producer 生产的数据会被不断追加到该log 文件末端,且每条数据都有自己的offset。
消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,
从上次的位置继续消费。
由于生产者生产的消息会不断追加到log 文件末尾,为防止log 文件过大导致数据定位
效率低下,Kafka 采取了分片和索引机制,将每个partition 分为多个segment。
每个segment对应两个文件--“.index”文件和“.log”文件。这些文件位于一个文件
夹下,该文件夹的命名规则为:topic 名称+分区序号。
例如,first 这个topic 有三个分区,则其对应的文件夹为first-0,first-1,first-2
效率低下,Kafka 采取了分片和索引机制,将每个partition 分为多个segment。
每个segment对应两个文件--“.index”文件和“.log”文件。这些文件位于一个文件
夹下,该文件夹的命名规则为:topic 名称+分区序号。
例如,first 这个topic 有三个分区,则其对应的文件夹为first-0,first-1,first-2
index 和log 文件以当前segment 的第一条消息的offset 命名。上图为index 文件和log文件的结构示意图。
“.index”文件存储大量的 索引信息 ,“.log”文件存储大量的 数据 ,索引文件中的元数据指向对应
数据文件中 message的物理偏移地址 。
“.index”文件存储大量的 索引信息 ,“.log”文件存储大量的 数据 ,索引文件中的元数据指向对应
数据文件中 message的物理偏移地址 。
Kafka副本机制与选举原理详解
Kafka生产者
分区策略
分区的原因
(1)方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic
又可以又多个Partition组成,因此整个集群就可以适应任意大小的数据了;
又可以又多个Partition组成,因此整个集群就可以适应任意大小的数据了;
(2)可以提高并发,因为可以以Partition为单位读写了。
分区的原则
我们需要将producer发送的数据封装成一个 ProducerRecord对象 。
(1)指明partition的情况下,直接将指明的值直接作为partiton值;
(2)没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;
(3)既没有partition值又没有key值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),
将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。
(2)没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;
(3)既没有partition值又没有key值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),
将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。
数据可靠性保证
原理
为保证producer发送的数据,能可靠的发送到指定的 topic topic的每个partition收到producer发送的数据后, 都需要向producer
发送ack (acknowledgement)确认收到 ,如果producer收到 ack 就会进行下一轮的发送,否则重新发送数据。
发送ack (acknowledgement)确认收到 ,如果producer收到 ack 就会进行下一轮的发送,否则重新发送数据。
1、副本数据同步策略
Kafka 选择了第二种方案,原因如下:
1、同样为了容忍n 台节点的故障,第一种方案需要2n+1个副本,而第二种方案只需要n+1
个副本,而Kafka的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
2、虽然第二种方案的网络延迟会比较高,但网络延迟对Kafka的影响较小。
1、同样为了容忍n 台节点的故障,第一种方案需要2n+1个副本,而第二种方案只需要n+1
个副本,而Kafka的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
2、虽然第二种方案的网络延迟会比较高,但网络延迟对Kafka的影响较小。
2、ISR
采用第二种方案之后,设想以下情景:leader 收到数据,所有follower 都开始同步数据,
但有一个follower,因为某种故障,迟迟不能与leader 进行同步,那leader 就要一直等下去,
直到它完成同步,才能发送ack。这个问题怎么解决呢?
但有一个follower,因为某种故障,迟迟不能与leader 进行同步,那leader 就要一直等下去,
直到它完成同步,才能发送ack。这个问题怎么解决呢?
Leader 维护了一个动态的in-sync replica set (ISR),意为和leader保持同步的follower集合。
当ISR 中的follower 完成数据的同步之后,leader就会给follower 发送ack。如果follower
长时间未向leader 同步数据, 则该follower 将被踢出ISR , 该时间阈值由replica.lag.time.max.ms
参数设定。Leader 发生故障之后,就会从ISR 中选举新的leader。
当ISR 中的follower 完成数据的同步之后,leader就会给follower 发送ack。如果follower
长时间未向leader 同步数据, 则该follower 将被踢出ISR , 该时间阈值由replica.lag.time.max.ms
参数设定。Leader 发生故障之后,就会从ISR 中选举新的leader。
3、ack应答机制
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR 中的follower 全部接收成功。
所以Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置:
0:producer不等待broker的ack,这一操作提供了一个最低的延迟,broker 一接收到还没有写入磁盘就已经返回,当broker 故障时有可能丢失数据;
1:producer 等待broker的ack,partition 的leader 落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据;
-1(all):producer 等待broker的ack,partition的leader和follower全部落盘成功后才返回ack。但是如果在follower 同步完成后,broker发送
ack之前,leader发生故障,那么会造成数据重复。
所以Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置:
0:producer不等待broker的ack,这一操作提供了一个最低的延迟,broker 一接收到还没有写入磁盘就已经返回,当broker 故障时有可能丢失数据;
1:producer 等待broker的ack,partition 的leader 落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据;
-1(all):producer 等待broker的ack,partition的leader和follower全部落盘成功后才返回ack。但是如果在follower 同步完成后,broker发送
ack之前,leader发生故障,那么会造成数据重复。
4、故障处理细节
LEO:指的是每个副本最大的offset;
HW:指的是消费者能见到的最大的offset,ISR 队列中最小的LEO。
HW:指的是消费者能见到的最大的offset,ISR 队列中最小的LEO。
(1)follower故障
follower 发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘
记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。
等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重
新加入ISR了。
记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。
等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重
新加入ISR了。
(2)leader故障
leader发生故障之后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性
其余的follower会先将各自的log文件高于HW的部分截掉 ,然后从新的leader同步数据。
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
其余的follower会先将各自的log文件高于HW的部分截掉 ,然后从新的leader同步数据。
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
Exactly Once语义
将服务器的ACK级别设置为 -1,可以保证Producer到Server之间不会丢失数据,即At Least Once语义 。
相对的将服务器ACK级别设置为 0,可以保证生产者每条消息只会被发送一次,即At Most Once语义。
相对的将服务器ACK级别设置为 0,可以保证生产者每条消息只会被发送一次,即At Most Once语义。
At Least Once可以保证数据不丢失,但是不能保证数据不重复;相对的, At Least Once可以保证数据不重复,但是不能保证数据不丢失。 但是,对于一些非常重要的息,比如说交易数据,下游数据消费者要求数据既不重复也不丢失,即 Exactly Once语义。
在 0.11版本以前的 Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。
0.11版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer不论向 Server发送多少次重复数据, Server端都只会持久化一条。幂等性结合 At Least Once语义,就构成了 Kafka的 Exactly Once语义。即:At Least Once + 幂等性 = Exactly Once
要启用幂等性,只需要将Producer的参数中 enable.idompotence设置为 true即可。 Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer在初始化的时候会被分配一个 PID,发往同一 Partition的消息会附带 Sequence Number。而Broker端会对 <PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时, Broker只会持久化一条。
但是PID重启就会变化,同时不同的 Partition也具有不同主键,所以幂等性无法保证跨分区跨会话的 Exactly Once。
在 0.11版本以前的 Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。
0.11版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer不论向 Server发送多少次重复数据, Server端都只会持久化一条。幂等性结合 At Least Once语义,就构成了 Kafka的 Exactly Once语义。即:At Least Once + 幂等性 = Exactly Once
要启用幂等性,只需要将Producer的参数中 enable.idompotence设置为 true即可。 Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer在初始化的时候会被分配一个 PID,发往同一 Partition的消息会附带 Sequence Number。而Broker端会对 <PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时, Broker只会持久化一条。
但是PID重启就会变化,同时不同的 Partition也具有不同主键,所以幂等性无法保证跨分区跨会话的 Exactly Once。
Kafka消费者
消费方式
consumer采用pull(拉)模式从broker中读取数据
pull 模式不足之处是,如果kafka 没有数据,消费者可能会陷入循环中,一直返回空数据。
针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数timeout,如果当前没有
数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为timeout。
针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数timeout,如果当前没有
数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为timeout。
push(推)模式很适应消费速率不同的消费者,因为消息发送速率是由broker决定的
它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer来不及处理消息,
典型的表现就是拒绝服务以及网络拥塞。而pull 模式则可以根据consumer 的消费能力以适
当的速率消费消息。
典型的表现就是拒绝服务以及网络拥塞。而pull 模式则可以根据consumer 的消费能力以适
当的速率消费消息。
分区分配策略
一个consumer group 中有多个consumer,一个 topic 有多个partition,所以必然会涉及
到partition 的分配问题,即确定那个partition 由哪个consumer 来消费。
Kafka 有两种分配策略,一是RoundRobin,一是Range。
到partition 的分配问题,即确定那个partition 由哪个consumer 来消费。
Kafka 有两种分配策略,一是RoundRobin,一是Range。
(1)RoundRobin
(2)Range
offset的维护
Kafka0.9版本之前,consumer默认将offset保存在Zookeeper中,从0.9 版本开始,consumer默认
将offset保存在Kafka一个内置的topic 中,该topic 为consumer_offsets。
将offset保存在Kafka一个内置的topic 中,该topic 为consumer_offsets。
由于consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,
需要从故障前的位置的继续消费,所以consumer 需要实时记录自己消费到了
哪个offset,以便故障恢复后继续消费。
需要从故障前的位置的继续消费,所以consumer 需要实时记录自己消费到了
哪个offset,以便故障恢复后继续消费。
(1) 修改配置文件 consumer.properties exclue.internal.topics=false
(2) 读取offset
0.11.0.0之前版本
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper hadoop 102 :2181 --formatter
"kafka.coordinator.GroupMetadataManager \$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
"kafka.coordinator.GroupMetadataManager \$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
0.11.0.0之后版本
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper hadoop 102 :2181 --formatter
"kafka.coordinator.group.GroupMetadataManager \\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
"kafka.coordinator.group.GroupMetadataManager \\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
Kafka 高效读写数据
(1)顺序写磁盘
Kafka的producer生产数据,要写入到 log文件中,写的过程是一直追加到文件末端,为顺序写 。 官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间 。
(2)零复制技术
Zookeeper在Kafka中的作用
Kafka 集群中有一个broker会被选举为Controller,负责管理集群broker的上下线,
所有topic的分区副本分配和leader选举等工作。
Controller的管理工作都是依赖于Zookeeper的。
所有topic的分区副本分配和leader选举等工作。
Controller的管理工作都是依赖于Zookeeper的。
Kafka 事务
Kafka从0.11版本开始引入了事务支持。事务可以保证Kafka 在Exactly Once语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。
Producer事务
为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将 Producer获得的 PID和 Transaction ID绑定。这样当 Producer重启后就可以通过正在进行的 Transaction ID获得原来的 PID。
为了管理Transaction Kafka引入了一个新的组件 Transaction Coordinator。 Producer就是通过和 Transaction Coordinator交互获得 Transaction ID对应的任务状态。 Transaction Coordinator还负责将事务所有写入 Kafka的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
为了管理Transaction Kafka引入了一个新的组件 Transaction Coordinator。 Producer就是通过和 Transaction Coordinator交互获得 Transaction ID对应的任务状态。 Transaction Coordinator还负责将事务所有写入 Kafka的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
Consumer事务
上述事务机制主要是从Producer方面考虑,对于 Consumer而言,事务的保证就会相对较弱,尤其时无法保证 Commit的信息被精确消费。这是由于 Consumer可以通过 offset访问任意信息,而且不同的 Segment File生命周期不同,同一事务的消息可能会出现重启后被删除的情况。
第四章 Kafka API
Producer API
消息发送流程
Kafka的 Producer发送消息采用的是 异步发送 的方式。在消息发送的过程中,涉及到了两个线程 main线程和 Sender线程 ,以及 一个线程共享变量 RecordAccumulator。main线程将消息发送给 RecordAccumulator Sender线程不断从 RecordAccumulator中拉取消息发送到 Kafka broker。
相关参数:
batch.size:只有数据积累到batch.size 之后,sender 才会发送数据。
linger.ms:如果数据迟迟未达到batch.size,sender 等待linger.time 之后就会发送数据。
batch.size:只有数据积累到batch.size 之后,sender 才会发送数据。
linger.ms:如果数据迟迟未达到batch.size,sender 等待linger.time 之后就会发送数据。
异步发送API
(1)导入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
(2)编写代码
需要用到的类:
KafkaProducer:需要创建一个生产者对象,用来发送数据
ProducerConfig:获取所需的一系列配置参数
ProducerRecord:每条数据都要封装成一个ProducerRecord 对象
KafkaProducer:需要创建一个生产者对象,用来发送数据
ProducerConfig:获取所需的一系列配置参数
ProducerRecord:每条数据都要封装成一个ProducerRecord 对象
代码举例
不带回调函数的API
带回调函数的 API
回调函数会在producer收到ack时调用,为异步调用, 该方法有两个参数,分别是RecordMetadata 和
Exception ,如果 Exception 为 null ,说明消息发送成功,如果Exception 不为 null ,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
回调函数会在producer收到ack时调用,为异步调用, 该方法有两个参数,分别是RecordMetadata 和
Exception ,如果 Exception 为 null ,说明消息发送成功,如果Exception 不为 null ,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
同步发送的意思就是,一条消息发送之后,会阻塞当前线程, 直至返回ack。
由于send方法返回的是一个Future对象,根据Futrue对象 的特点,我们也可以实现同步发送的效果 ,只需在调用 Future
对象的 get方发即可。
由于send方法返回的是一个Future对象,根据Futrue对象 的特点,我们也可以实现同步发送的效果 ,只需在调用 Future
对象的 get方发即可。
ConsumerAPI
Consumer消费数据时的可靠性是很容易保证的,因为数据在Kafka中是持久化的,故不用担心数据丢失问题。
由于consumer在消费过程中可能会出现断电宕机等故障, consumer恢复后,需要从故障前的位置的继续消费,
所以consumer需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。
所以offset的维护是Consumer消费数据是必须考虑的问题。
由于consumer在消费过程中可能会出现断电宕机等故障, consumer恢复后,需要从故障前的位置的继续消费,
所以consumer需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。
所以offset的维护是Consumer消费数据是必须考虑的问题。
自动提交offset
导入依赖
<dependcy>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka clients</artifactId>
<version>0.11.0.0</version>
</dependcy>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka clients</artifactId>
<version>0.11.0.0</version>
</dependcy>
需要用到的类
KafkaConsumer:需要创建一个 消费 者对象,用来 消费 数据
ConsumerConfig:获取所需的一系列配置参数
ConsuemrRecord:每条数据都要封装成一个 ConsumerRecord对象
为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交 offset的功能 。
自动提交offset的相关参数:
enable.auto.commit 是否开启自动提交 offset功能
auto.commit.interval.ms 自动提交 offset的时间间隔
ConsumerConfig:获取所需的一系列配置参数
ConsuemrRecord:每条数据都要封装成一个 ConsumerRecord对象
为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交 offset的功能 。
自动提交offset的相关参数:
enable.auto.commit 是否开启自动提交 offset功能
auto.commit.interval.ms 自动提交 offset的时间间隔
编写代码
自动提交 offset
自动提交 offset 的代码
手动提交 offset
虽然自动提交offset十分简介便利,但由于其是基于时间提交的 开发人员难以把握offset提交的时机。因此 Kafka还提供了手动提交 offset的 API。
手动提交offset的方法有两种:分别是 commitSync(同步提交 和 commitAsync(异步提交) 。
两者的相同点是,都会将本次poll的一批数据最高的偏移量提交 ;
不同点是commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试( 由不可控因素导致,也会出现提交失败 );
而 commitAsync则没有失败重试机制,故有可能提交失败。
手动提交offset的方法有两种:分别是 commitSync(同步提交 和 commitAsync(异步提交) 。
两者的相同点是,都会将本次poll的一批数据最高的偏移量提交 ;
不同点是commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试( 由不可控因素导致,也会出现提交失败 );
而 commitAsync则没有失败重试机制,故有可能提交失败。
同步提交 offset
由于同步提交offset有失败重试机制,故更加可靠 ,以下为同步提交 offset的示例。
由于同步提交offset有失败重试机制,故更加可靠 ,以下为同步提交 offset的示例。
异步提交offset
虽然同步 提交 offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会
收到很大的影响。因此更多的情况下,会选用异步提交 offset的方式。
虽然同步 提交 offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会
收到很大的影响。因此更多的情况下,会选用异步提交 offset的方式。
数据漏消费和重复消费分析
无论是同步提交还是异步提交offset,都有可能会造成数据的漏消费或者重复消费。先提交 offset后消费,有可能造成数据的漏消费;而先消费后提交 offset,有可能会造成数据的重复消费。
自定义存储 offset
Kafka 0.9版本之前, offset存储在zookeeper0.9版本及之后,默认将offset存储在Kafka的一个内置的topic中。除此之外, Kafka还可以选择自定义存储offset。
offset的维护是相当繁琐的, 因为需要考虑到消费者的 Rebalace。
当有新的消费者加入消费者组、已有的消费者推出消费者组 或者所订阅的主题的分区发生变化 ,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance。
消费者发生Rebalance之后,每个消费者消费的分区就会发生变 化。 因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的
offset位置继续消费。
要实现自定义存储offset,需要借助 ConsumerRebalanceListener 以 下为 示例代码 ,其中提交和获取 offset的方法,需要根据所选的 offset存储系统自行实现。
offset的维护是相当繁琐的, 因为需要考虑到消费者的 Rebalace。
当有新的消费者加入消费者组、已有的消费者推出消费者组 或者所订阅的主题的分区发生变化 ,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance。
消费者发生Rebalance之后,每个消费者消费的分区就会发生变 化。 因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的
offset位置继续消费。
要实现自定义存储offset,需要借助 ConsumerRebalanceListener 以 下为 示例代码 ,其中提交和获取 offset的方法,需要根据所选的 offset存储系统自行实现。
自定义Interceptor
拦截器原理
Producer拦截器 (interceptor)是在 Kafka 0.10版本被引入的,主要用于实现 clients端的定制化控制逻辑。
对于producer而言, interceptor使得用户在消息发送前以及 producer回调逻辑前有机会对消息做一些定制化需求,比如 修改消息等。同时, producer允许用户指定多个 interceptor按序作用于同一条消息从而形成一个拦截链 (interceptor chain)。 Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其 定义的方法包括:
(1)configure(configs) 获取配置信息 和 初始化数据时调用 。
(2)onSend(ProducerRecord) 该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。 Producer确保在消息被序列化以及计算分区前调用该方法。
用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic和分区, 否则会影响目标分区的计算。
(3)onAcknowledgement(RecordMetadata, Exception) 该方法会在消息从RecordAccumulator成功发送到Kafka Broker之后,或者在发送过程中失败时调用。 并且通常都是在 producer回调逻辑触发之前。 onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer的消息
发送效率 。
(4) close 关闭interceptor,主要用于执行一些资源清理工作如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外 倘若指定了多个 interceptor,则 producer将按照指定顺序调用它们 ,并仅仅是捕获每个 interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。
对于producer而言, interceptor使得用户在消息发送前以及 producer回调逻辑前有机会对消息做一些定制化需求,比如 修改消息等。同时, producer允许用户指定多个 interceptor按序作用于同一条消息从而形成一个拦截链 (interceptor chain)。 Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其 定义的方法包括:
(1)configure(configs) 获取配置信息 和 初始化数据时调用 。
(2)onSend(ProducerRecord) 该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。 Producer确保在消息被序列化以及计算分区前调用该方法。
用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic和分区, 否则会影响目标分区的计算。
(3)onAcknowledgement(RecordMetadata, Exception) 该方法会在消息从RecordAccumulator成功发送到Kafka Broker之后,或者在发送过程中失败时调用。 并且通常都是在 producer回调逻辑触发之前。 onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer的消息
发送效率 。
(4) close 关闭interceptor,主要用于执行一些资源清理工作如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外 倘若指定了多个 interceptor,则 producer将按照指定顺序调用它们 ,并仅仅是捕获每个 interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。
拦截器案例
需求:实现一个简单的双interceptor组成的拦截链。第一个 interceptor会在消息发送前将时间戳信息加到
消息value的最前部;第二个 interceptor会在消息发送后更新成功发送消息数或失败发送消息数。
消息value的最前部;第二个 interceptor会在消息发送后更新成功发送消息数或失败发送消息数。
第五章 Kafka 监控
Kafka Eagle
第六章 Flume 对接 Kafka
(1)配置 flume(flume-kafka.conf)
(2)启动 kafkaIDEA消费者
(3)进入 flume根目录下,启动 flume $ bin/flume ng agent c conf/ n a1 f jobs/flume kafka.conf
(4)向 /opt/module/data/flume.log里追加数据,查看 kafka消费者消费情况
$ echo hello >>> /opt/module/data/flume.log
(2)启动 kafkaIDEA消费者
(3)进入 flume根目录下,启动 flume $ bin/flume ng agent c conf/ n a1 f jobs/flume kafka.conf
(4)向 /opt/module/data/flume.log里追加数据,查看 kafka消费者消费情况
$ echo hello >>> /opt/module/data/flume.log
##################### flume-kafka.conf ############
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail F c +0 /opt/module/data/flume.log
a1.sources.r1.shell = /bin/bash c
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:90 92,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail F c +0 /opt/module/data/flume.log
a1.sources.r1.shell = /bin/bash c
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:90 92,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
第七章 Kafka 面试问题
1、Kafka中的 ISR(InSyncRepli)、 OSR(OutSyncRepli)、 AR(AllRepli)代表什么?
2、Kafka中的 HW、 LEO等分别代表什么?
3、Kafka中是怎么体现消息顺序性的?
4、Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?
5、Kafka生产者客户端的整体结构是什么样子的?使用了几个线程来处理?分别是什么?
6、“消费组中的消费者个数如果超过 topic的分区,那么就会有消费者消费不到数据”这句话是否正确?
7、消费者提交消费位移时提交的是当前消费到的最新消息的 offset还是 offset+1
8、有哪些情形会造成重复消费?
9、那些情景会造成消息漏消费?
10、当你使用kafka-topics.sh 创建(删除)了一个topic 之后,Kafka 背后会执行什么逻辑?
(1) 会在zookeeper 中的/brokers/topics 节点下创建一个新的topic 节点, 如:/brokers/topics/first
(2)触发Controller 的监听程序
(3)kafka Controller 负责topic 的创建工作,并更新metadata cache
(1) 会在zookeeper 中的/brokers/topics 节点下创建一个新的topic 节点, 如:/brokers/topics/first
(2)触发Controller 的监听程序
(3)kafka Controller 负责topic 的创建工作,并更新metadata cache
11、topic 的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?
12、topic 的分区数可不可以减少?如果可以怎么减少?如果不可以,那又是为什么?
13、Kafka 有内部的topic 吗?如果有是什么?有什么所用?
14、Kafka 分区分配的概念?
15、简述Kafka 的日志目录结构?
16、如果我指定了一个offset,Kafka Controller 怎么查找到对应的消息?
17、聊一聊Kafka Controller 的作用?
18、Kafka 中有那些地方需要选举?这些地方的选举策略又有哪些?
19、失效副本是指什么?有那些应对措施?
20、Kafka 的哪些设计让它有如此高的性能?
第八章 最佳实践
基于Kafka的大规模日志系统实现原理分析
亿级流量生成系统Kafka性能优化最佳实践
面试问题
为什么使用消息队列
解耦
有个服务专门给设备推送数据,数据以Protobuf形式放入到消息队列中,公司不定期会有新的定制设置需要推送数据,这样新的设备就可以从消息队列获取数据,不需要更改原推送代码。
异步
推送消息到设备,其中有一次一次性要给几十万台设备推送消息,要是同步一条条去推,那要等多久才能完成,最后推送的设备很久才能收到消息,如果有了消息队列我们就可以把消息放到多个mq中,可以同时从多个消息队列中消风,大大提高了推送时间。
消峰
有一些特殊时段,服务器请求的并发量如果暴增,也就是所谓的请求高峰期。服务器处理能力有有限,如果高峰期并发量超过了限定,那么服务器就会崩溃,所以先把请求都压在MQ中,然后分批消费。往往高峰期只有某个时间段或者很短的时间。
消息队列有什么优点和缺点
优点
解耦、异步、消峰
缺点
系统可用性降低
引入外部依赖越多,系统约容易挂掉
系统复杂性提高
加个MQ进来,需要考虑消息重复、丢失、传输顺序问题
一致性问题
如果多个系统写入数据,其中某个失败了,怎么保证这些系统的一致性问题
Kafka、ActiveMQ、RabbitMQ、RocketMQ有什么优缺点
ActiveMQ
最早大家使用ActiveMQ,现在用的少了,社区也不是很活跃,不推荐使用
RabbitMQ
erlang语言阻止了大量的Java工程师去深入研究和掌握它,对公司而言几乎处于不可控的状态,但是确认是开源的,比较稳定的支持,活跃度也高
RocketMQ
越来越多的公司去用RocketMQ,确实不错,比较是阿里出品,但社区可能有突入黄掉的风险
Kafka
大数据领域的实时工具,日志采集场景用Kafka是业内标准的
如果保证消息队列的高可用
RabbitMQ的高可用
基于主从(非分布式)做高可用
集群搭建
普通集群模式
无高可用,rabbitMQ部署多个实例分别在iyge服务器上;每个rabbitMQ对应一个queue;只有一个queue放消息,其他queue只同步queue元数据
镜像集群模式
你创建的queue,无论元数据还是queue里的消息都会存在于多个实例上,每次你写消息到queue的时候,都会自动把消息同步到多个实例的queue上
如何开启这个镜像集群模式
后台新增、镜像集群群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点
镜像的配置通过policy策略的方法
all所有的节点都将同步;exactly(指定个数的节点被同步);nodes(指定的名称的节点被同步)
Kakfa的高可用性
天然的分布式消息队列
架构
Kafka8.0后,提供了HA机制(副本机制);
每个partition的数据都会同步到其他集群上,形成自己的多个副本(replica);
副本(replica)会选举一个leader出来,其他副本就是follower;
当leader挂掉时,从follower中重新选举出leader,读和写只有Leader负责,follower只用来备份数据;
在写的时候leader会负责把数据同步到所有的follower上去,所有的follower都同步成功后返回ack。
每个partition的数据都会同步到其他集群上,形成自己的多个副本(replica);
副本(replica)会选举一个leader出来,其他副本就是follower;
当leader挂掉时,从follower中重新选举出leader,读和写只有Leader负责,follower只用来备份数据;
在写的时候leader会负责把数据同步到所有的follower上去,所有的follower都同步成功后返回ack。
搭建集群
如果保证消息不被重复消费(使用消息队列如何保证幂等性)
kafka消费数据的过程
consumer消费了数据之后,每隔一段时间(定时定期)会把自己消费过的消息offset提交到Zoopkeeper,又给kafka
幂等性
通俗的说,每一个数据,或者一个请求,让你重复来过你次,你得确保队对应的数据不会变,不能出差。
解决方案
如何保证MQ的消息是幂等性的,需要结合具体的业务来看,例需要生产者发送每条数据的时候,里面加一个全局意唯一的ID,先根据这个ID去比如Redis里查一下,如果存在跳过消费,消费并将ID写入redis
如何处理消息丢失的问题
场景
绝对不会把计费消息给弄丢,弄丢会造成经济损失
消息丢失的情况
生成者弄丢数据
消息传入MQ过程中丢失
由于网络原因等问题
解决方案
RabbitMQ
方案一 事务机制(同步、损耗性能吞吐量下降)
使用RabbitMQ提供的事务功能
生产者发送数据之前开启RabbitMQ事务channel.txSelect
发送消息,如过消息没有成功被RabbitMQ接收到,那么生产者收到异常报错
此时就可以回滚事务channel.txCommit
生产者发送数据之前开启RabbitMQ事务channel.txSelect
发送消息,如过消息没有成功被RabbitMQ接收到,那么生产者收到异常报错
此时就可以回滚事务channel.txCommit
方案二 confirm机制(异步,推荐使用)
生产者那里设置开启confirm模式
每次写的消息都会分配一个唯一的ID
如果写入了RabbitMQ中,RabbitMQ会给你回传一个ack(回执)消息告诉你说这个消息OK
如果RabbitMQ没有写入这个消息,会回调你的一个nack接口,告诉你这个消息接收失败,你可以重试
每次写的消息都会分配一个唯一的ID
如果写入了RabbitMQ中,RabbitMQ会给你回传一个ack(回执)消息告诉你说这个消息OK
如果RabbitMQ没有写入这个消息,会回调你的一个nack接口,告诉你这个消息接收失败,你可以重试
Kafka
acks=all
MQ弄丢数据
RabbitMQ
场景
MQ挂掉还没有消费的消息会丢失
解决方案
丢失可能性不大,可以开启RabbitMQ的持久化
设置持久化:创建queue的时候,设置其持久化,持久化queue的元数据,但是它是不会持久化queue里的数据的,发送消息的时候将消息的deliveryModel默认值设置为2
当写入RabbitMQ后,还没持久化到挂掉会丢数据,可以使用confirm机制,等待久化在返回ack
Kafka
场景
follower同步leader数据时,leader挂掉了
解决方案
给topic设置replication.factor参数:这个值必须小于1,要求每个partition必须有至少2个步骤
Kafka服务器端min.insync.replicas参数,这个值必须大于1,leader至少感知到有至少一个follower正常
producer端满足ack=all,消息写入所有replica之后,才能认为是写成功了
在Producer端设备retries=MXA,很大很大的一个道理,无限制重置的意思
Kafka服务器端min.insync.replicas参数,这个值必须大于1,leader至少感知到有至少一个follower正常
producer端满足ack=all,消息写入所有replica之后,才能认为是写成功了
在Producer端设备retries=MXA,很大很大的一个道理,无限制重置的意思
消费者丢弃数据
消费者消费消息的过程中挂掉,导致消息丢失
解决方案
RabbitMQ
ack机制
刚消费到还没有处理结果进程来了
使用rabbitSQ提供的ack机制
必须关闭RabbitMQ的自动ack机制,可以通过一个api来调用就行
然后每次你自己代码里确保处理完的的时候,再在程序里手动ack
使用rabbitSQ提供的ack机制
必须关闭RabbitMQ的自动ack机制,可以通过一个api来调用就行
然后每次你自己代码里确保处理完的的时候,再在程序里手动ack
kafka
场景-自动提交了offset消息还没处理完,消费者就挂掉了
关闭自动提交offset
在处理完之后自己手动提交offset
保证消息的顺序性
顺序会错乱的场景
RabbitMQ
场景1:一个queue,有多个consumer去消费,consumer从MQ里面读取数据是有序的,每个consumer的执行时间是不固定的,造成数据顺序顺利错误。
场景2:一个queue对应一个consumer,但是consumer里面进行了多线程消费,这样也会造成消息消费顺序错误
KafKa
Kafka一个topic,一个partiton,一个consumer,但是consumer内部进行多线程消费
解决方案
方案一
在消费者处可以把消息写入多个内存queue,需要顺序执行的消息写入同一个queue然后N个线程分别消费一个内存queue
方案二
吧需要顺序执行的消息放入同一个queue中,然后一个queue对应一个消费者去消费
如何解决消息队列的延时导致消息大量积压
场景
消费者出现问题,或者直接挂掉,消息累积,大量积压
解决方案
临时紧急扩容
先修复consumer的问题,确保其恢复消费速度,然后将现有consumer都停掉;
新建一个topic,partition是原理的10倍,临时建立好原先10倍的queue数量;
然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接轮询写入临时建立好的10倍数量的queue;
接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据;
等到快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的consumer集器来消费信息
新建一个topic,partition是原理的10倍,临时建立好原先10倍的queue数量;
然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接轮询写入临时建立好的10倍数量的queue;
接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据;
等到快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的consumer集器来消费信息
子主题
MQ中的消息过期失效了
场景
RabbitMQ是可以设置过期时间TTL
消息在queue中积压超过一定时间就会被RabbitMQ给清理掉
解决方案
批量重导
写程序,将丢失的那批数据写个临时程序,一点一点的查出来,然后重新灌入mq里面去
0 条评论
下一页
为你推荐
查看更多