kafka
2021-03-11 11:29:35 38 举报
AI智能生成
Kafka是一个分布式流处理平台,由LinkedIn于2010年开源。它主要用于构建实时数据管道和流应用。Kafka的核心是一个发布/订阅模式的消息队列,它能够处理消费者网站的所有数据流。这种设计使得Kafka可以处理数以亿计的数据记录,而不影响性能。Kafka的主要特点包括高吞吐量、可扩展性、持久性和容错性。它的设计理念是基于消息传递而不是存储,这使得它可以处理大量的数据流,而不需要担心数据的存储和管理。Kafka已经被广泛应用于大数据处理、日志收集、实时监控等领域。
作者其他创作
大纲/内容
基础知识
基本概念
Broker
kafka由多个broker组成,每个broker是一个节点(伪集群模式下,多个broker运行在一个机器上时,一个broker就是一个进程)
borker负责接收和处理客户端发送过来的请求,以及对消息进行持久化
topic(主题)
承载消息的逻辑容器,在实际业务中多用来区分具体的业务
partition(分区)
一个topic可以划分为多个partition,每个partition可以存在于不同的broker上
每个分区是一组有序的消息日志
生产者生产的每条消息只会发送到一个分区中
副本
副本概念
把相同的数据拷贝到多台机器上,这些相同的数据拷贝在kafka中被称为副本(Replica)
kafka定义了两类副本:领导者副本(Leader Replica)和追随者副本(Follower Replica)
每个分区下可以配置若干个副本,其中只能有1个领导者副本和N-1个追随者副本
副本工作机制
领导者(Leader)副本:与客户端程序进行交互(生产者总是向领导者副本写消息,消费者总是从领导者副本读取消息)
追随者(follower)副本只做一件事:向领导者副本发送请求,请求领导者把最新生产的消息发送给它,这样它才能保持与领导者的同步
offset
每个消息被添加到分区时,分配唯一的offset,以此保证partition内消息的顺序性
分区位移总是从0开始,假设一个生产者向一个空分区写入了10条消息,那么这10条消息的位移依次是0、1、2、...、9
Consumer Group(消费者组)
多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐
每个Consumer属于一个特定的Consumer Group,一条消息可以发送到多个不同的Consumer Group,但是一个Consumer Group中只能有一个Consumer能够消费该消息
Consumer Group Offset (消费者位移)
表示消费者消费进度,每个消费者都有自己的消费者位移
Rebalance(重平衡)
消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程
Rebalance是kafka消费者端实现高可用的重要手段
Producer (生产者)
向Topic发送新消息的应用程序
Consumer (消费者)
从Topic订阅新消息的应用程序
AR、ISR、OSR
AR(Assigned Replicas)
分区中所有副本统称为 AR
ISR(In-Sync Replicas)
所有与leader副本保持一定程度的同步的副本(包括leader)组成ISR,ISR是AR的一个子集
一定程度同步
replica.lag.time.max.ms(默认10秒):follower副本能落后leader副本的最长时间间隔
只要follower副本落后leader副本的时间不连续超过10秒,就认为是同步的(即使消息数相差很多)
因为消息会先发送到leader副本,然后follower副本拉取消息进行同步,这期间follower副本会有一定程度的滞后
OSR(Out-of-sync replicas)
与leader副本同步滞后太多的副本(不包括leader)组成OSR;(如果后面追上leader进度的话,会被重新加回ISR)
正常情况下,所有follower副本都应该与leader副本保持一定程度的同步,即AR=ISR,OSR为空
AR = ISR + OSR
核心参数配置
Broker
log.dir
表示单个路径
不推荐使用,线上推荐配置多个,使用log.dirs
log.dirs
含义
指定了Broker需要使用的若干个文件目录路径(无默认值,必须手动配置)
如何配置
用逗号隔开的多个路径
比如:/home/kafka1,/home/kafka2,/home/kafka3
建议
如果可以的话,建议将这些目录挂载到不同的物理磁盘上
好处
提升读写性能
多个物理磁盘同时读写有更高的吞吐量
能够实现故障转移(Failover)
kafka1.1版本加入的功能
kafka 1.1版本之前,kafka Broker使用到任何一块磁盘挂掉了,整个Broker进程都会关闭
kafka 1.1 版本之后,坏掉的磁盘上的数据会自动转移到其他正常的磁盘上
zookeeper.connect
zookeeper集群地址,多个逗号隔开
示例:zk1:2181,zk2:2181,zk3:2181
如果多个kafka集群使用同一套zookeeper集群,需要使用chroot,chroot是zookeeper的别名
zk1:2181,zk2:2181,zk3:2181/kafka1 和 zk1:2181,zk2:2181,zk3:2181/kafka2
listeners(内网)
监听器,只有内网中的服务使用
listeners:<协议名称>://<内网ip>: <端口>
例如:listeners: SASL_PLAINTEXT://192.168.0.4:9092
协议:PLAINTEXT表示明文传输,SSL表示使用SSL或TLS加密传输
advertised.listeners(外网)
是暴露给外部的listeners,如果没有设置,会用listeners
比如在内网部署kafka服务,但是生产者或者消费者在外网环境时
示例
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
listeners=INTERNAL://192.168.133.11:9092,EXTERNAL://192.168.133.11:9093
advertised.listeners=INTERNAL://192.168.133.11:9092,EXTERNAL://<公网ip>:<端口>
inter.broker.listener.name=INTERNAL
listeners=INTERNAL://192.168.133.11:9092,EXTERNAL://192.168.133.11:9093
advertised.listeners=INTERNAL://192.168.133.11:9092,EXTERNAL://<公网ip>:<端口>
inter.broker.listener.name=INTERNAL
Topic
auto.create.topics.enable
含义
是否允许自动创建Topic
建议
建议设置成false,即不允许自动创建Topic,应该由运维严格把控
unclean.leader.election.enable
含义
是否允许Unclean Leader选举
建议
建议设置成false,不让落后太多的副本竞选Leader
auto.leader.rebalance.enable
含义
是否允许定期进行Leader选举
建议
设置成false
因为换一次 Leader 代价很高的,原本向 A 发送请求的所有客户端都要切换成向 B 发送请求,而且这种换 Leader 本质上没有任何性能收益
retention.ms
规定了该Topic消息被保存的时长
默认是7天,即该Topic只保存最近7天的消息
一旦设置了这个值,会覆盖掉Broker端的全局参数值(log.retention.minutes)
retention.bytes
规定了该Topic预留多大的磁盘空间
默认值-1,表示无限使用磁盘空间
使用场景是在多租户的kafka集群中
数据存储
log.retention.{hour|minutes|ms}
含义
一条消息数据被保存多长时间
如果公司把Kafka当存储使用,这个值就要相应地调大
示例
log.retention.hour=168表示默认保存 7 天的数据,自动删除 7 天前的数据
log.retention.bytes
含义
指定Broker为消息保存的总磁盘容量大小
默认值是-1,表示在这台Broker上保存多少数据都可以
用途
这个参数的使用场景是在云上构建多租户的kafka集群,防止恶意用户使用过多的磁盘空间
message.max.bytes
含义
控制 Broker 能够接收的最大消息大小
默认值1000012太少了,不到1M,实际场景需要调大
JVM参数
堆大小
建议设置成6G
默认1G有点小,因为kafka Broker在于客户端进行交互时会在JVM堆上创建大量的ByteBuffer实例,Heap Size不能太小
垃圾回收器(GC)
java 7
如果Broker所在机器的CPU资源非常充裕,建议使用CMS收集器;使用方法:指定-XX:+UseCurrentMarkSweepGC
如果CPU资源不充裕,使用吞吐量收集器;开启方式是指定-XX:+UseParallelGC
java 8
建议使用G1收集器
G1表现比CMS出色,主要体现在更少的Full GC,需要调整的参数更少
如何设置
KAFKA_HEAP_OPTS
指定堆大小
KAFKA_JVM_PERFORMANCE_OPTS
指定 GC 参数
操作系统参数
文件描述符限制
设置参数
ulimit -n
建议
通常情况下将它设置成一个超大的值是合理的做法,比如ulimit -n 1000000
如果不设置的话,可能会经常出现"Too many open files"错误
建议所有java项目都设置下这个值
文件系统类型
含义
这里说的文件系统指的是如ext3、ext4或XFS这样的日志型文件系统
建议
根据官网的测试报告,XFS的性能要强于ext4。所以生产环境最好使用XFS
Swappiness
设置为0
将swap设置成0,即将swap完全禁掉以防止kafka进程使用swap空间
当物理内存耗尽时,操作系统会触发OOM killer这个组件,它会随机挑选一个进程然后kill掉,即根本不给用户任何的预警
建议设置为1
如果设置成一个比较小的值,当开始使用swap空间时,至少能够观测到Broker性能开始出现急剧下降,从而给出进一步调优和诊断问题的时间
基于这个考虑,个人建议将swappiness设置成一个接近0但不为0的值,比如1
提交时间(Flush落盘时间)
含义
向 Kafka 发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了,随后操作系统根据 LRU 算法会定期将页缓存上的“脏”数据落盘到物理磁盘上
这个定期就是由提交时间来确定的,默认是 5 秒
建议
一般情况下我们会认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作
当然你可能会有这样的疑问:如果在页缓存中的数据在写入到磁盘前机器宕机了,那岂不是数据就丢失了。的确,这种情况数据确实就丢失了,但鉴于 Kafka 在软件层面已经提供了多副本的冗余机制,因此这里稍微拉大提交间隔去换取性能还是一个合理的做法
架构
组成
一个典型的 kafka 集群包含若干 Producer(可以是应用节点产生的消息,也可以是通过Flume 收集日志产生的事件)
若干个 Broker(kafka 支持水平扩展)
若干个 Consumer Group
一个 zookeeper 集群(kafka 通过 zookeeper 管理集群配置及服务协同)
推送和消费机制
Producer 使用 push 模式将消息发布到 broker,consumer 通过监听使用 pull 模式从broker 订阅并消费消息
多个 broker 协同工作,producer 和 consumer 部署在各个业务逻辑中
三者通过zookeeper 管理协调请求和转发。这样就组成了一个高性能的分布式消息发布和订阅系统
图例
高可用(副本机制)
版本历史
0.8版本之前,是无HA机制的
0.8版本之后,提供了HA机制,增加了replica(复制品)副本机制
原理(副本机制)
副本定义
1. 每个topic下每个partition都有N多个副本(Replica),并且副本的数量一定小于broker数量
因为每个副本的数据必须保存在不同的broker上,否则没有意义。
因为如果一份数据的副本保存在同一个broker,那么这个broker挂了,数据依然丢失
因为如果一份数据的副本保存在同一个broker,那么这个broker挂了,数据依然丢失
2. 对于每个partition而言,每个broker上最多只有一个副本
3. kakfa还有个机制,默认会把副本均匀分布到所有的broker上
副本机制原理
容错性
1.每个partition的数据会同步到其他机器上,形成自己的多个副本(备份以partition为单位),这样可以提高容错性
副本角色
2. 每个分区在创建的时候,kafka会选举其中的某一个partition为leader,其余的都为follower。
数据同步
3. 在进行数据备份的时候,不是leader主动将数据push给follower,而是follower向leader pull数据过来
处理读写请求
4. follower副本不对外提供服务。所有的读写请求都必须发往leader副本所在的broker,由该broker处理
follower副本不处理客户端请求,唯一的任务就是从leader副本异步拉取消息,并写入到自己的提交日志中,实现与leader副本的同步
follower副本不处理客户端请求,唯一的任务就是从leader副本异步拉取消息,并写入到自己的提交日志中,实现与leader副本的同步
宕机选举
5. 当领导者(leader)副本挂了,或者说leader副本所在的Broker宕机时,kakfa依托于Zookeeper提供的监控功能能够实时感知到,并立即开启新一轮的领导者选择,从追随者(follower)副本中选举一个作为新的领导者。老leader副本重启回来后,只能作为follower副本加入到集群中
副本机制的好处
1. 方便实现“Read-your-writes”
当使用生产者API向kafka成功写入消息后,马上使用消费者API去读取刚才生产的消息
举例:比如你平时发微博时,你发完一条微博,肯定是希望能立即看到的,这就是典型的 Read-your-writes 场景
如果允许follower副本对外提供服务,由于副本同步是异步的,因为有可能出现follower副本还没有从leader副本拉取到最新的消息,从而导致客户端消费不到最新写入的消息
2. 方便实现单调读(Monotonic Reads)
单调读:对于一个消费者而言,在多次消费消息的时候,不会看到某条消息一会存在一会不存在
如果允许追随者副本提供读服务,那么假设当前有 2 个追随者副本 F1 和 F2,它们异步地拉取领导者副本数据。倘若 F1 拉取了 Leader 的最新消息而 F2 还未及时拉取,那么,此时如果有一个消费者先从 F1 读取消息之后又从 F2 拉取消息,它可能会看到这样的现象:第一次消费时看到的最新消息在第二次消费时不见了,这就不是单调读一致性。但是,如果所有的读请求都是由 Leader 来处理,那么 Kafka 就很容易实现单调读一致性
3. ISR(In-sync-Replicas)
为什么要引入ISR
由于follower副本不提供服务,只是定期异步拉取leader副本中的数据,既然是异步的,可能存在与leader不实时同步的风险
ISR的目的,kafka要明确地告诉我们,follower副本到底在什么条件下才算与leader同步
ISR定义
leader会维持一个与其保持同步的replica集合,该集合就是ISR,每一个partition都有一个ISR,它是由leader动态维护
ISR中的副本都是与leader同步的副本;相反,不在ISR中的follower副本就被认为是与leader不同步的
leader副本天然就在ISR中,ISR不只是follower副本的集合,必然包括leader副本
我们要保证kafka不丢失message,就要保证ISR这组集合存活(至少有一个存活),并且消息commit成功
判定进入或踢出ISR副本集合的标准
这个标准就是 Broker 端参数 replica.log.time.max.ms 参数值,这个参数的含义是follower副本能够落后leader副本的最长时间间隔
默认值是10秒,也就是说,只要一个follower副本落后leader副本的时间不连续超过10秒,那么kafka就认为该follower副本与leader副本是同步的,即使当前follower副本中保存的消息明显少于leader副本中的消息
在同步的过程中如果速度持续慢于leader副本的消息写入速度,那么在replica.lag.time.max.ms 时间后,此follower副本就会被认为与leader副本是不同步的。此时kafka会自动收缩ISR集合,将该副本踢出ISR
如果该副本后面慢慢追上了leader的进度,它是可以重新被加回ISR的。所以ISR是一个动态调整的集合,并非是静态不变的
Unclean领导者选择(Unclean Leader Election)
思考
如果Kafka leader副本所在broker挂了,leader副本如何选举?
分析
1. 当ISR不为空时,直接从ISR选举
2. 当ISR为空时,kafka也可以从不在ISR中的存活副本中选举,这个过程称为Unclean领导者选举
如何开启
通过Broker 端参数 unclean.leader.election.enable 控制是否允许 Unclean 领导者选举
影响
开启Unclean领导者选举
开启Unclean领导者选举可能会造成数据丢失,但好处是,它使得分区leader副本一致存在,不至于对外提供服务,提高了高可用性
禁止Unclean领导者选举
维护了数据的一致性,避免了消息丢失,但牺牲了高可用性
一个分布式系统通常只能同时满足一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)中的两个。显然,在这个问题上,Kafka 赋予你选择 C 或 A 的权利。
建议
强烈建议不要开启Unclean leader election,毕竟我们还可以通过其他的方式来提升高可用性。如果为了这点儿高可用性的改善,牺牲了数据一致性,那就非常不值当了
消息
消息格式
消息(Record)
等价于日志项(record item),在里层
消息集合(Message Set)
等价于消息批次(Record batch), 在外层,里面包含若干条消息
Producer以record batch为单位发送消息
消息压缩与解压缩
压缩的目的
时间换空间,用CPU时间去换磁盘空间或网络IO传输量
区分消息是否压缩
kafka在消息头部添加了一个描述压缩属性字节,这个字节的后两位表示消息压缩采用的编码,如果后两位是0,表示消息未被压缩
消息压缩
生产者端
怎么压缩的
把消息的公共部分抽取出来放到外层消息集合里面,例如消息的 CRC 值(这样就不用每条消息都保存这些信息了)
如何开启
代码:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 开启 GZIP 压缩
props.put("compression.type", "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 开启 GZIP 压缩
props.put("compression.type", "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);
props.put(“compression.type”, “gzip”),表示该Producer的压缩算法使用的是Gzip
压缩的好处
Producer生产的每个消息集合都是经过GZIP压缩过的,可以节省网络传输带宽以及kafka Broker端的磁盘占用
Broker端
正常情况下,Broker从Producer端接收到消息后不会再做压缩处理
Broker重新压缩的两种情况(需要解压再压缩)
1. Broker端制定了和Producer端不同的压缩算法
Broker端也有一个参数叫compression.type,这个参数的默认值是producer,表示使用producer端的压缩算法
但是如果在Broker端设置了不同的compression.type值,就会发生Broker端重新压缩;
会导致Broker端CPU使用率飙升
会导致Broker端CPU使用率飙升
2. Broker端发生了消息格式转换
为了兼容老版本的格式,Broker端会对新版本消息执行向老版本格式的转换
这个过程会涉及到消息的解压缩和重新压缩;这种转换对性能的影响很大(会丧失零拷贝特性)(尽量避免)
消息解压缩
Broker端
每个压缩过的消息集合在Broker端写入时,都要发生解压缩,目的是为了对消息进行逐条检验(CRC)
这种解压缩对Broker端性能有一定影响(主要影响CPU使用率)(kafka后续可能会规避这一操作)
这种解压缩对Broker端性能有一定影响(主要影响CPU使用率)(kafka后续可能会规避这一操作)
跟上面消息格式转换时发生的解压缩不是同一场景
消费者端
1. Producer发送压缩消息到Broker
2. Broker原样保存起来
3. Consumer程序请求这部分消息时,Broker按原样发送出去,当消息到达Consumer端后,Consumer自行解压缩还原成之前的消息
2. Broker原样保存起来
3. Consumer程序请求这部分消息时,Broker按原样发送出去,当消息到达Consumer端后,Consumer自行解压缩还原成之前的消息
压缩算法
压缩算法的存储
kafka会将启用了哪种压缩算法封装进消息集合中,当Consumer读取到消息集合时,会知道这些消息使用了哪一种压缩算法
分类
GZIP
Snappy
LZ4
Zstandard(2.1.0版本后)(能够提供超高的压缩比)
评价指标
压缩比
原先占100份空间,压缩后占20份,压缩比就是5
压缩/解压缩吞吐量
每秒能够压缩或解压缩多少MB的数据
性能比较
吞吐量方面
LZ4 > Snappy > zstd和GZIP
压缩比方面
zstd > LZ4 > GZIP > Snappy
带宽占比
Snappy算法占用的网络带宽最多,zstd最少
CPU使用率
各个算法差不多
在压缩时Snappy算法使用的CPU较多一些
在解压缩时GZIP算法使用更多的CPU
最佳实践
开启压缩
如果客户端CPU资源富足,建议开启zstd压缩,可以极大的节省网络带宽资源
千兆网络下kafka集群容易出现带宽资源耗尽的情况
解压缩
尽量避免意料之外的解压缩
1. Broker端制定了和Producer端不同的压缩算法
2. 为了兼容老版本的格式,Broker端会对新版本消息执行向老版本格式的转换
消息的存储
topic中partition存储分布
在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为 topic名称+有序序号
第一个partiton序号从0开始,序号最大值为partitions数量减1
partiton中文件存储方式
每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中
每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定
这样做的好处就是能快速删除无用文件,有效提高磁盘利用率
partiton中segment文件存储结构
在partition中如何通过offset查找message
消息过期
消息发送
生产者
作用:
生产者(Producer)负责向kafka发送消息
生产者(Producer)负责向kafka发送消息
实现:
KafkaProducer
KafkaProducer
生产者拦截器 (0.10.0.0 版本被引入)
作用:
既可以用来在消息发送前做一些准备工作(如按照某个规则过滤不符合条件的消息、修改消息内容等),
也可以用来在发送回调逻辑前做一些定制化的需求(如统计类工作)
既可以用来在消息发送前做一些准备工作(如按照某个规则过滤不符合条件的消息、修改消息内容等),
也可以用来在发送回调逻辑前做一些定制化的需求(如统计类工作)
生产者拦截器接口:
org.apache.kafka.clients.producer.ProducerInterceptor
org.apache.kafka.clients.producer.ProducerInterceptor
public interface ProducerInterceptor<K, V> extends Configurable {
/**
* 消息发送前调用
*/
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
/**
* 消息发送后,服务器返回结果(成功或错误)时调用
*/
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
/**
* 拦截器关闭时调用
*/
public void close();
}
/**
* 消息发送前调用
*/
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
/**
* 消息发送后,服务器返回结果(成功或错误)时调用
*/
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
/**
* 拦截器关闭时调用
*/
public void close();
}
onSend:
kafkaProducer在将消息序列化和计算分区之前,会调用onSend()方法来对消息进行相应的定制化操作;即在消息发送之前
kafkaProducer在将消息序列化和计算分区之前,会调用onSend()方法来对消息进行相应的定制化操作;即在消息发送之前
onAcknowledgement:
kafkaProducer会在消息被应答之前或消息发送失败时调用onAcknowledgement方法;
这个方法先于用户设定的Callback方法执行:
onAcknowledgement方法运行在Producer的IO线程中,如果逻辑复杂可能会影响消息发送速度
kafkaProducer会在消息被应答之前或消息发送失败时调用onAcknowledgement方法;
这个方法先于用户设定的Callback方法执行:
onAcknowledgement方法运行在Producer的IO线程中,如果逻辑复杂可能会影响消息发送速度
close:
close方法主要用于在关闭拦截器时执行一些资源的清理工作
close方法主要用于在关闭拦截器时执行一些资源的清理工作
自定义拦截器示例
public class ProducerInterceptorPrefix implements ProducerInterceptor<String,String> {
private volatile long sendSuccess = 0;
private volatile long sendFailure = 0;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord&<String, String> record) {
String modifiedValue = "prefix1-" + record.value();
return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(),
record.key(), modifiedValue,
record.headers());
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
sendSuccess++;
} else {
sendFailure ++;
}
@Override
public void close() {
double successRatio = (double)sendSuccess / (sendFailure + sendSuccess);
System.out.println("[INFO] 发送成功率=" + String.format("%f", successRatio * 100) + "%");
}
}
private volatile long sendSuccess = 0;
private volatile long sendFailure = 0;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord&<String, String> record) {
String modifiedValue = "prefix1-" + record.value();
return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(),
record.key(), modifiedValue,
record.headers());
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
sendSuccess++;
} else {
sendFailure ++;
}
@Override
public void close() {
double successRatio = (double)sendSuccess / (sendFailure + sendSuccess);
System.out.println("[INFO] 发送成功率=" + String.format("%f", successRatio * 100) + "%");
}
}
通过onSend()方法来为每条消息添加一个前缀 “prefix1-”
通过onAcknowledgement()方法来计算发送消息的成功率
properties.put(ProducerConfig.INTERCEPTOR_CLASS_CONFIG,ProducerInterceptorPrefix.class.getName())
分区器(分区机制)
概念
一个topic可以划分为多个partition
每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份
作用
为消息分配分区
实现消息的顺序读写
提供负载均衡的能力(实现系统的高伸缩性)
不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写处理请求
可以通过添加新的机器来增加整个系统的吞吐量
分区器实现的时机
消息在经过序列化之后,就需要确定它发往的分区,如果ProducerRecord中指定了partition字段(指定了推送的分区),就不需要分区器的作用,因为partition就是要发往的分区编号
分区策略
概念
分区策略就是决定生产者将消息发送到哪个分区的算法
算法
自定义分区策略
举例:按照地区实现分区策略
轮询策略(默认)
轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上
故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一
随机策略
本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以如果追求数据的均匀分布,还是使用轮询策略比较好
按消息键保序策略
定义
kafka默认分区策略实际上同时实现了两种策略,如果指定了key,那么默认实现按消息键保序策略;如果没有指定key,则使用轮询策略
kafka允许为每条消息定义消息键,简称为key。一旦消息被定义了key,那么就可以保证同一个key的所有消息都进入到相同的分区里面,在每个分区下的消息处理都是有序的
推送规则
在发送一条消息时,可以指定这条消息的key,producer根据这个key和partition机制来判断将这条消息发送到哪个parition
paritition机制可以通过指定producer的paritition. class这一参数来指定,该class必须实现kafka.producer.Partitioner接口(一般是对key做hash)
如果key可以被解析为整数则将对应的整数与partition总数取余,该消息会被发送到该数对应的partition。(每个parition都会有个序号)
分区的增加和减少
当一个主题被创建后,kafka只支持分区的增加,不支持减少
不支持减少分区的原因
按照Kafka现有的代码逻辑而言,此功能完全可以实现,不过也会使得代码的复杂度急剧增大。实现此功能需要考虑的因素很多,比如删除掉的分区中的消息该作何处理?如果随着分区一起消失则消息的可靠性得不到保障;如果需要保留则又需要考虑如何保留。直接存储到现有分区的尾部,消息的时间戳就不会递增,如此对于Spark、Flink这类需要消息时间戳(事件时间)的组件将会受到影响;如果分散插入到现有的分区中,那么在消息量很大的时候,内部的数据复制会占用很大的资源,而且在复制期间,此主题的可用性又如何得到保障?与此同时,顺序性问题、事务性问题、以及分区和副本的状态机切换问题都是不得不面对的。所以这个功能的收益点很低
替代方案
重新创建一个分区数较小的主题,然后将现有主题中的消息按照既定的逻辑复制过去
总结
分区是实现负载均衡以及高吞吐量的关键,故在生产者这一端就要仔细盘算合适的分区策略,避免造成消息数据的倾斜,使得某些分区称为性能瓶颈,这样极易引发下游数据消费的性能下降
TCP通信连接(2.1.0版本)
概述
kafka所有的通信都是基于TCP连接的
创建TCP连接
1. KafkaProducer 实例创建时启动 Sender 线程,从而创建与 bootstrap.servers 中所有 Broker 的 TCP 连接
2. KafkaProducer 实例首次更新元数据信息之后,还会再次创建与集群中所有 Broker 的 TCP 连接
当 Producer 尝试给一个不存在的主题发送消息时,Broker 会告诉 Producer 说这个主题不存在。此时 Producer 会发送 METADATA 请求给 Kafka 集群,去尝试获取最新的元数据信息
Producer 通过 metadata.max.age.ms 参数定期地去更新元数据信息。该参数的默认值是 300000,即 5 分钟,也就是说不管集群那边是否有变化,Producer 每 5 分钟都会强制刷新一次元数据以保证它是最及时的数据
3. 如果 Producer 端发送消息到某台 Broker 时发现没有与该 Broker 的 TCP 连接,那么也会立即创建连接
关闭TCP连接
用户主动关闭
producer.close()
kill -9
Kafka自动关闭
根据Producer端参数connections.max.idle.ms的值;
默认是9分钟,即如果在9分钟内没有任何请求“流过”某个TCP连接,那么kafka会主动帮你把该TCP连接关闭
默认是9分钟,即如果在9分钟内没有任何请求“流过”某个TCP连接,那么kafka会主动帮你把该TCP连接关闭
connections.max.idle.ms=-1则是禁掉这种机制,TCP连接将成为永久长连接;
(当然kafka创建的这些Socket连接都开启了keepalive,因为keepalive探活机制还是会遵守的)
(当然kafka创建的这些Socket连接都开启了keepalive,因为keepalive探活机制还是会遵守的)
Kafka Broker 端处理请求的全流程
Reactor 模式
定义:
Reactor模式是事件驱动架构的一种实现方式,特别适合应用于多个客户端并发向服务器端发送请求的场景
Reactor模式是事件驱动架构的一种实现方式,特别适合应用于多个客户端并发向服务器端发送请求的场景
架构图
多个客户端会发送请求到Reactor
Reactor有个请求分发线程Dispatcher,也就是图中的Acceptor,它会将不同的请求下发到多个工作线程去处理
Acceptor线程只是用于请求转发,不涉及具体的逻辑处理,非常得轻量级,因此有很高的吞吐量表现;
而工作线程可以根据实际业务处理需要任意增减,从而动态调节系统负载能力
而工作线程可以根据实际业务处理需要任意增减,从而动态调节系统负载能力
Kafka的 Reactor模式
架构图
Kafka 的 Broker 端有个 SocketServer 组件,类似于 Reactor 模式中的 Dispatcher
Acceptor 线程采用轮询的方式将入站请求公平地发到所有网络线程中
请求处理架构图
参数配置
配置参数:
Broker端参数,num.network.threads
Broker端参数,num.network.threads
默认值为3
表示每台Broker启动时会创建3个网络线程,专门处理客户端发送的请求
请求处理流程
当网络线程池拿到请求后,它不是自己处理,而是把请求放入到一个共享请求队列中
Broker端还有个IO线程池,负责从该队列中取出请求,执行真正的处理;
如果是 PRODUCE 生产请求,则将消息写入到底层的磁盘日志中;如果是 FETCH 请求,则从磁盘或页缓存中读取消息
如果是 PRODUCE 生产请求,则将消息写入到底层的磁盘日志中;如果是 FETCH 请求,则从磁盘或页缓存中读取消息
作用:
IO线程池中的线程才是执行请求逻辑的线程
IO线程池中的线程才是执行请求逻辑的线程
Broker端参数:
num.io.threads
num.io.threads
控制线程池中的线程数
默认值是8。表示每台Broker启动后自动创建8个IO线程处理请求
建议:如果CPU资源非常充裕,可以调大该参数,允许更多的并发请求同时处理
当 IO 线程处理完请求后,会将生成的响应发送到网络线程池的响应队列中,然后由对应的网络线程负责将 Response 返还给客户端
请求队列和响应队列的差别
请求队列是所有网络线程共享的,而响应队列则是每个网络线程专属的(响应队列在网络线程池中,网络线程池在Broker上)
为什么这么设计?
Dispatcher(kafka中的SocketServer) 只是用于请求分发而不负责响应回传,因此只能让每个网络线程自己发送 Response 给客户端,所以这些 Response 也就没必要放在一个公共的地方
Dispatcher(kafka中的SocketServer) 只是用于请求分发而不负责响应回传,因此只能让每个网络线程自己发送 Response 给客户端,所以这些 Response 也就没必要放在一个公共的地方
Purgatory组件
概述
这是 Kafka 中著名的“炼狱”组件。它是用来缓存延时请求(Delayed Request)的
所谓延时请求,就是那些一时未满足条件不能立刻处理的请求
举例
比如设置了 acks=all 的 PRODUCE 请求,一旦设置了 acks=all,那么该请求就必须等待 ISR 中所有副本都接收了消息后才能返回,此时处理该请求的 IO 线程就必须等待其他 Broker 的写入结果。当请求不能立刻处理时,它就会暂存在 Purgatory 中。稍后一旦满足了完成条件,IO 线程会继续处理该请求,并将 Response 放入对应网络线程的响应队列中
请求类型
数据类请求
客户端发送的 PRODUCE 请求和 FETCH 请求
控制类请求
控制类请求可以让数据类请求无效
控制类请求可以让数据类请求无效
负责更新 Leader 副本、Follower 副本以及 ISR 集合的 LeaderAndIsr 请求
负责勒令副本下线的 StopReplica 请求
消息消费
消费者
定义:
消费者是一个实际的应用实例,可以是一个线程,也可以是一个进程
消费者是一个实际的应用实例,可以是一个线程,也可以是一个进程
作用:
消费者(Consumer)负责订阅kafka中的主题(Topic),并且从订阅的主题上拉取消息
消费者(Consumer)负责订阅kafka中的主题(Topic),并且从订阅的主题上拉取消息
实现:
kafkaConsumer
kafkaConsumer
Kafka的消费者采用从broker 拉取(pull) 消息的方式获取消息
Kafka消费消息是一个不断轮询的过程,消费者只要不断重复调用poll方法即可, poll() 方法返回由生产者写入 Kafka 但还没有被消费者读取过的记录
KafkaConsumer是非线程安全的(但线程架构),poll方法中定义了一个acquire方法,用来检测当前是否只有一个线程在操作;
如果有其他线程操作会抛出ConcurrentModificationException异常
acquire方法中并没有调用锁(synchronized或juc的lock), 它通过线程操作技术标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作
如果有其他线程操作会抛出ConcurrentModificationException异常
acquire方法中并没有调用锁(synchronized或juc的lock), 它通过线程操作技术标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作
offset
Kafka集群保持所有的消息,直到它们过期(无论消息是否被消费)。实际上消费者所持有的仅有的元数据就是这个offset(偏移量),也就是说offset由消费者来控制:正常情况当消费者消费消息的时候,偏移量也线性的的增加。但是实际偏移量由消费者控制,消费者可以将偏移量重置为更早的位置,重新读取消息。可以看到这种设计对消费者来说操作自如,一个消费者的操作不会影响其它消费者对此log的处理
消息投递模式
点对点模型(消息队列模型)(Peer to Peer, P2P)
概念
同一条消息只能被下游的一个消费者消费,其他消费者不能染指
系统A发送的消息只能被系统B接收,其他任何系统都不能读取A发送的消息
原理
消费者组(Consumer Group):消费者组就是多个消费者实例共同组成一个组来消费一组主题
消费者组中的每个分区都只会被组内的一个消费者实例消费,其他消费者实例不能消费它
发布/订阅模型
这个模型可能存在多个发布者向相同的主题(Topic)发送消息,而订阅者也可以存在多个,他们都能接收到相同主题的消息
消费者拦截器
作用:
在消息到消息或者提交消费位移时进行一些定制化的操作(如按照某个规则过滤不符合条件的消息、修改消息内容等)
在消息到消息或者提交消费位移时进行一些定制化的操作(如按照某个规则过滤不符合条件的消息、修改消息内容等)
消费者拦截器接口:
org.apache.kafka.clients.consumer.ConsumerInterceptor
org.apache.kafka.clients.consumer.ConsumerInterceptor
public interface ConsumerInterceptor<K, V> extends Configurable {
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
public void close();
}
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
public void close();
}
onConsume()
KafkaConsumer会在poll()方法返回之前调用 onConsume()方法来对消息进行相应的定制化操作;(也就是在正式处理消息之前,拦截器会先拦一道,之后再返回给你)
如果onConsume()方法中抛出异常,会被捕获并记录到日志中,异常不会再向上传递
onCommit()
kafkaConsumer会在提交完消费位移只有调用 onCommit()方法,可以使用这个方法来记录所提交的位移信息
通常可以在该方法中做一些记账类的动作,比如打日志等
close()
close()方法主要用于在关闭拦截器时执行一些资源的清理工作
示例
public class ConsumerInterceptorTTL implements ConsumerInterceptor<String, String> {
private static final long EXPIRE_INTERVAL = 10000;
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
long now = System.currentTimeMillis();
Map<TopicPartition, List> ConsumerRecord<String, String >>> newRecords = new HashMap<>();
for (TopicPartition tp : records.partitions()) {
List<org.apache.kafka.clients.consumer.ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();
List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
for (ConsumerRecord<String, String> record : tpRecords) {
if (now - record.timestamp() < EXPIRE_INTERVAL) {
newTpRecords.add(record);
}
}
newRecords.put(tp, newTpRecords);
}
return new ConsumerRecords<>(newRecords);
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
offsets.forEach((tp, offset) -> System.out.println(tp + ":" + offset.offset()));
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
private static final long EXPIRE_INTERVAL = 10000;
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
long now = System.currentTimeMillis();
Map<TopicPartition, List> ConsumerRecord<String, String >>> newRecords = new HashMap<>();
for (TopicPartition tp : records.partitions()) {
List<org.apache.kafka.clients.consumer.ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();
List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
for (ConsumerRecord<String, String> record : tpRecords) {
if (now - record.timestamp() < EXPIRE_INTERVAL) {
newTpRecords.add(record);
}
}
newRecords.put(tp, newTpRecords);
}
return new ConsumerRecords<>(newRecords);
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
offsets.forEach((tp, offset) -> System.out.println(tp + ":" + offset.offset()));
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
使用消息的timestamp字段来判定是否过期,如果消息的时间戳与当前的时间戳相差超过10秒则判定过期
过期消息将被过滤而不投递给具体的消费者
消费者组(Consumer Group)
定义
消息组(Consumer Group)是一个逻辑上的概念,它将消费者归为一类;每一个消费者只属于一个消费组;
Consumer Group 是kafka提供的可扩展且具有容错性的消费者机制
Consumer Group 是kafka提供的可扩展且具有容错性的消费者机制
每个消费组都会有一个固定的名称,消费者在进行消费前需要指定所属消费者的名称;
参数 group.id,默认为空字符串
参数 group.id,默认为空字符串
每个消费者都有一个对应的消费组,当消费发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者
如果所有实例都属于同一个Group, 那么它实现的就是消息队列模型;如果所有实例分别属于不同的Group,那么它实现的就是发布/订阅模型
消费者数量和分区的数量关系
此处为该Group设置6个Consumer实例是比较理想的情形
当然可以设置小于等于6的实例个数,比如只有3个Consumer实例,则每个实例消费2个分区的消息
如果设置了8个消费实例,就会有2个消费实例不会被分配分区,它们会永远处于空闲状态
理想情况下,Consumer实例的数量应该等于该Group订阅主题的分区总数,因为它能最大限度地实现高伸缩性
在实际使用中,不推荐设置大于总分区数的Consumer实例,设置多余的实例只会浪费资源,而没有任何好处
示意图
Topic有4个分区,P0、P1、P2、P3
有两个消费组A和B,都订阅了这个主题
Consumer Group A 中有4个消费者(C0、C1、C2、C3),消费者B中有2个消费者(C4、C5)
按kafka默认规则,Consumer Group A 中的每个消费者分配到1个分区,Consumer Group B中的每个消费者分配到2个分区
每个消费者只能消费到所分配到的分区中的消息;即每一个分区只能被“一个消费组中的一个消费者”所消费
分区分配策略
机制
在 kafka 中,存在两种分区分配策略,一种是 Range(默认)、另 一 种 另 一 种 还 是 RoundRobin ( 轮 询 )
通过partition.assignment.strategy 这个参数来设置
Range strategy(范围分区)
RoundRobin strategy(轮询分区)
作用
消费者组的模型可以让整体的消费能力具备横向伸缩性
分区分配的演变
一个消费者C0
加入消费者C1,需要将原来C0的部分分区分配给C1
通过增加(减少)消费者的个数来提高(降低)整体的消费能力
对于分区数固定的情况,如果消费者的数量超出分区就没有意义了
Reblance(重平衡)
定义:
本质上是一种协议,规定了一个Group下的所有Consumer如何达成一致,来分配订阅Topic的每个分区
本质上是一种协议,规定了一个Group下的所有Consumer如何达成一致,来分配订阅Topic的每个分区
比如某个Group下有20个Consumer实例,它订阅了1个具有100个分区的Topic。
正常情况下,kafka平均会为每个Consumer分配5个分区。
这个分配的过程就叫Reblance
正常情况下,kafka平均会为每个Consumer分配5个分区。
这个分配的过程就叫Reblance
触发条件
1. (被动/故障导致)组成员数发生变更:有新的Consumer实例加入组或者离开组,或者有Consumer实例崩溃被"踢出"组
2. (主动运维操作)订阅主题数发生变更: 当Consumer Group使用正则表达式的方式订阅主题,在Consumer Group运行过程中,新创建了一个满足这样条件的主题,则该Group就会发生Rebalance
比如consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题
3. (主动运维操作) 订阅主题的分区数发生变更:当增加主题的分区数时,会触发订阅这个主题的Group开启Rebalance
kafka分区数只可以增加不可以减少
示例
Consumer Group 下有两个 Consumer,比如 A 和 B,当第三个成员 C 加入时,Kafka 会触发 Rebalance,并根据默认的分配策略重新为 A、B 和 C 分配分区
执行过程
概念:
在 Rebalance 过程中,所有 Consumer 实例共同参与,在协调者组件的帮助下,完成订阅主题分区的分配
在 Rebalance 过程中,所有 Consumer 实例共同参与,在协调者组件的帮助下,完成订阅主题分区的分配
影响点:
在整个Rebalance过程中,所有实例都不能消费任何消息,因此对Consumer的TPS影响很大
在整个Rebalance过程中,所有实例都不能消费任何消息,因此对Consumer的TPS影响很大
协调者(Coordinator)
作用:
专门为Consumer Group服务,它驻留在Broker端的内存找;
负责为Group执行Rebalance以及提供位移管理和组成员管理
专门为Consumer Group服务,它驻留在Broker端的内存找;
负责为Group执行Rebalance以及提供位移管理和组成员管理
Consumer端应用程序在提交位移时,其实是向Coordinator所在的Broker提交位移
Consumer应用启动时,也是向Coordinator所在的Broker发送各种请求,然后由Coordinator负责执行消费者的注册、成员管理记录等元数据管理操作
何时创建和开启:
所有Broker在启动时,都会创建和开启响应的Coordinator组件。
也就是说,所有Broker都有各自的Coordinator组件
所有Broker在启动时,都会创建和开启响应的Coordinator组件。
也就是说,所有Broker都有各自的Coordinator组件
Consumer Group如何确定为它服务的Coordinator在哪台Broker上?
1. 首先确定由位移主题的哪个分区来保存该Group的数据:
partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)
partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)
2. 找出该分区Leader副本所在的Broker,该Broker即为对应的Coordinator
在实际使用过程中,Consumer 应用程序,特别是 Java Consumer API,能够自动发现并连接正确的 Coordinator;
知晓该算法的意义:
当Consumer Group出现问题时,可以由此计算并定位到对应的Broker上,不必一台一台的盲查
当Consumer Group出现问题时,可以由此计算并定位到对应的Broker上,不必一台一台的盲查
弊端
影响Consumer端TPS:
在Rebalance中,所有Consumer实例都会停止消费,等待Rebalance
在Rebalance中,所有Consumer实例都会停止消费,等待Rebalance
为什么?
Rebalance之前会有一个通知,消费者接收到这个通知后,提交对应的消费位移;
如果不停止消费,Rebalance和消费同时进行,在Rebalance过程中的消费位移没法提交;
在Rebalance结束后,新的消费者不知道分配到的分区消费到了哪个位置,只能去查找上一次的提交;
但是Rebalance过程中,已经消费到了后面的位置,再从上一次提交开始消费,就会造成重复消费
在Rebalance结束后,新的消费者不知道分配到的分区消费到了哪个位置,只能去查找上一次的提交;
但是Rebalance过程中,已经消费到了后面的位置,再从上一次提交开始消费,就会造成重复消费
Rebalance过程很慢:
如果Group下Consumer实例过多,Rebalance过程很慢(可能长达几小时)
如果Group下Consumer实例过多,Rebalance过程很慢(可能长达几小时)
Rebalance效率不高:
目前Rebalance的设计是所有Consumer实例共同参与,全部重新分配所有分区;
(没法复用以前负责的分区,不会保留以前的分配方案,而是打乱全部重新分配)
目前Rebalance的设计是所有Consumer实例共同参与,全部重新分配所有分区;
(没法复用以前负责的分区,不会保留以前的分配方案,而是打乱全部重新分配)
重复消费:
消费者消费完某个分区的一部分消息还没有来得及提交,发生了Rebalance;
这个分区被分配给Group中的另一个消费者,原来被消费的消息会被重新消费一遍
消费者消费完某个分区的一部分消息还没有来得及提交,发生了Rebalance;
这个分区被分配给Group中的另一个消费者,原来被消费的消息会被重新消费一遍
Rebalance的场景
第一种情况:
(订阅主题数量发生变化,订阅主题的分区数发生变化),这2个都是主动运维操作引发的Rebalance,都是不可避免的,也是可以接受的
(订阅主题数量发生变化,订阅主题的分区数发生变化),这2个都是主动运维操作引发的Rebalance,都是不可避免的,也是可以接受的
第二种情况:
Consumer Group 下的 Consumer 实例数量发生变化(最常见的Rebalance)
Consumer Group 下的 Consumer 实例数量发生变化(最常见的Rebalance)
增加Consumer实例
再启动一个配置有相同group.id 值的Consumer程序时,就是想这个Group添加了一个新的Consumer实例;
此时,Coordinator会接纳这个新实例,将其加入到组中,并重新分配分区
此时,Coordinator会接纳这个新实例,将其加入到组中,并重新分配分区
增加Consumer实例一般都是计划内的,可能是出于增加TPS或者提高伸缩性的需要
增加Consumer不属于需要规避的操作
减少Consumer实例
1. 如果是自己需要关闭掉某些Consumer实例,也不属于需要规避的操作
2. 某些情况下,Consumer实例可能会被Coordinator错误地认为“已停止”而被踢出Group,导致Rebalance
心跳机制
当Group完成Rebalance之后,每个Consumer实例都会定期地向Coordinator发送心跳请求,表明自己活着
如果某个Consumer实例不能及时地发送这些心跳请求,Coordinator就会认为这个Consumer已经挂掉,从而将其移除,开启新一轮Rebalance
Consumer端参数配置
session.timeout.ms(默认10s)
如果Coordinator在10秒之内没有收到Group下某Consumer实例的心跳,就会认为这个Consumer实例已经挂了
Consumer存活的时间间隔
heartbeat.interval.ms(默认3s)
用于控制发送心跳请求频率,告诉Consumer要每3秒给Coordinator发一个心跳包
这个值设置的越小,Consumer实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更快地知晓当前是否需要开启Rebalance
这个参数的真正作用是控制重平衡通知的频率;
如果你想要消费者实例更迅速地得到通知,那么就可以给这个参数设置一个非常小的值,这样消费者就能更快地感知到重平衡已经开启了
如果你想要消费者实例更迅速地得到通知,那么就可以给这个参数设置一个非常小的值,这样消费者就能更快地感知到重平衡已经开启了
max.poll.interval.ms(默认值5min)
用于控制Consumer实际消费能力对Rebalance的影响,限定了Consumer端两次调用poll方法的最大时间间隔
如果Consumer程序在5min之内无法消费完poll返回的消息,那么Consumer就会发起“离开组”的请求,导致Rebalance
避免非必要的Rebalance
第一种情况:
未能及时发送心跳,导致Consumer被踢出Group,而引发Rebalance
未能及时发送心跳,导致Consumer被踢出Group,而引发Rebalance
可能的原因
heartbeat.interval.ms 被设置的 大于 session.timeout.ms,导致心跳包未发送到
网络延时等情况,影响了Consumer发送的心跳包的到达,可能下一个heartbeat就正常了
如何避免
设置 session.timeout.ms = 6s
设置 heartbeat.interval.ms = 2s
要保证Consumer 实例在被判定为 "dead" 之前,能够发送至少3轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms
第二种情况:
Consumer 消费时间过长
Consumer 消费时间过长
可能的原因
在消费数据的过程中,如果有很重的消费逻辑,导致消费时长的增加,大于了 max.poll.interval.ms 的值
如何避免
1. 为下游的消费业务处理逻辑留下足够的时间,比如处理业务消耗5分钟,则把max.poll.interval.ms 设置为 6分钟
2. 缩短单条消息处理的时间
3. 减少下游系统一次性消费的消息总数
这取决于Consumer端参数 max.poll.records的值
当前该值的默认值是500条,表明调用一次 kafkaConsumer.poll方法,最多返回500条消息
当前该值的默认值是500条,表明调用一次 kafkaConsumer.poll方法,最多返回500条消息
可以降低此参数值
4. 下游系统使用多线程来加速消费
第三种情况:
Consumer端的频繁GC
Consumer端的频繁GC
可能在Consumer端出现了频繁的 Full GC , 导致了长时间的停顿,从而引发Rebalance
Rebalance的通知机制(如何通知各个Consumer开启Rebalance)
原理:
靠的是消费者的心跳线程
靠的是消费者的心跳线程
机制:
当协调者决定开启新一轮重平衡后,它会将“REBALANCE_IN_PROGRESS”封装进心跳请求的响应中,发还给消费者实例。当消费者实例发现心跳响应中包含了“REBALANCE_IN_PROGRESS”,就能立马知道重平衡又开始了,这就是重平衡的通知机制
当协调者决定开启新一轮重平衡后,它会将“REBALANCE_IN_PROGRESS”封装进心跳请求的响应中,发还给消费者实例。当消费者实例发现心跳响应中包含了“REBALANCE_IN_PROGRESS”,就能立马知道重平衡又开始了,这就是重平衡的通知机制
消费者组状态机(State Machine)
定义:
kafka设计了一套消费组状态机,来帮助协调者完成整个重平衡流程
kafka设计了一套消费组状态机,来帮助协调者完成整个重平衡流程
Kafka 为消费者组定义了 5 种状态
状态机的各个状态流转
消费组启动时,最开始时Empty状态,当Rebalance过程开启时,它会被置于PreparingRebalance状态等待成员加入;
之后变更到CompletingRebalance状态等待分配方案,分配完成后流转到Stable状态完成重平衡
之后变更到CompletingRebalance状态等待分配方案,分配完成后流转到Stable状态完成重平衡
当有新成员加入或者已有成员退出时,消费组的状态从Stable直接跳到PreparingRebalance状态;
此时,所有现存成员就必须全部重新申请加入组
此时,所有现存成员就必须全部重新申请加入组
当所有成员都退出组后,消费者组状态变更为Empty
Kafka会自动删除过期位移,条件就是组要处于Empty状态
重平衡流程
消费者端Rebalance流程
JoinGroup(加入组)
请求流程
当组内成员加入组时,它会向协调者发送JoinGroup请求;
在这个请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息
在收集了全部成员的 JoinGroup请求后,协调者会从这些成员中选择一个(一般是第一个)担任这个消费者组的领导者
领导者消费者的任务:
收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案
收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案
选出领导者后,协调者会把消费者组的订阅信息封装进JoinGroup的响应体中发回给领导者,由领导者统一作出分配方案
目的
JoinGroup 请求的主要作用是将组成员订阅信息发送给领导者消费者,待领导者制定好分配方案后,重平衡流程进入到 SyncGroup 请求阶段
处理流程图
SyncGroup(等待领导者消费者(Leader Consumer)分配方案)
请求流程
在领导者作出分配方案后,会向协调者发送SyncGroup请求(把分配方案发给协调者)
其他成员也会向协调者发送 SyncGroup 请求,只不过请求体中并没有实际的内容
目的
让协调者接收分配方案,然后统一以 SyncGroup 响应的方式分发给所有成员,这样组内所有成员就都知道自己该消费哪些分区了;
当所有成员都成功接收到分配方案后,消费者组进入到 Stable 状态,即开始正常的消费工作
当所有成员都成功接收到分配方案后,消费者组进入到 Stable 状态,即开始正常的消费工作
处理流程图
Borker端(协调者端)Rebalance流程
新成员加入组
当协调者收到新的 JoinGroup 请求后,它会通过心跳请求响应的方式通知组内现有的所有成员,强制它们开启新一轮的重平衡
组成员主动离组
消费者实例所在线程或进程调用 close() 方法主动通知协调者它要退出。这个场景就涉及到了第三类请求:LeaveGroup 请求。协调者收到 LeaveGroup 请求后,依然会以心跳响应的方式通知其他成员
组成员崩溃离组
崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组
它和主动离组是有区别的,因为后者是主动发起的离组,协调者能马上感知并处理。但崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由消费者端参数 session.timeout.ms 控制的。也就是说,Kafka 一般不会超过 session.timeout.ms 就能感知到这个崩溃
重平衡时协调者对组内成员提交位移的处理
正常情况下,每个组内成员都会定期汇报位移给协调者。当重平衡开启时,协调者会给予成员一段缓冲时间,要求每个成员必须在这段时间内快速地上报自己的位移信息,然后再开启正常的 JoinGroup/SyncGroup 请求发送
消费位移管理
版本区别
以前版本
实现方式
老版本的Consumer的位移管理依赖于Zookeeper,它会将位移数据提交到Zookeeper中保存
当Consumer重启后,它能自动从Zookeeper中读取位移数据,从而在上次消费截止的地方继续消费
优点
减少了kafka Broker端的状态保存开销
服务器节点无状态,可以自由地扩容或缩容,实现超强的伸缩性
缺点
ZooKeeper这类元框架并不适合进行频繁的写更新,而Consumer Group的位移更新却是一个非常频繁的操作。
这种大吞吐量的写操作会极大地拖慢Zookeeper集群的性能
这种大吞吐量的写操作会极大地拖慢Zookeeper集群的性能
1.0.9版本以后
将Consumer的位移数据作为一条条普通的kafka消息,提交到_consumer_offset中
_consumer_offsets就是一个普通的主题,用来保存kafka消费者的位移消息(消费到了分区中某个消息所在的位置)
位移主题(__consumer_offsets)
消息格式(三种)
1. 正常消息格式
_consumer_offsets由KV对组成,Key表示消息的键值,value表示消息体
key
Group ID(唯一标识一个Consumer Group)
扩展:
也支持独立 Consumer,也称 Standalone Consumer。它的运行机制与 Consumer Group 完全不同,但是位移管理的机制却是相同的。因此,即使是 Standalone Consumer,也有自己的 Group ID 来标识它自己,所以也适用于这套消息格式
也支持独立 Consumer,也称 Standalone Consumer。它的运行机制与 Consumer Group 完全不同,但是位移管理的机制却是相同的。因此,即使是 Standalone Consumer,也有自己的 Group ID 来标识它自己,所以也适用于这套消息格式
主题名
分区号
Value
位移值
位移提交的元数据(时间戳、用户自定义数据等)
2. 用于保存Consumer group信息的消息
作用:
用于注册Consumer Group
用于注册Consumer Group
3. 用于删除Group过期位移甚至是删除Group的消息
tombstone 消息,即墓碑消息,也称 delete mark
消息体是null,即空消息体
当某个Consumer Group下的所有Consumer实例都停止了,而且它们的位移数据都已被删除时,
kafka会向位移主题的对应分区写入tombstone消息,表明要彻底删除这个Group消息
kafka会向位移主题的对应分区写入tombstone消息,表明要彻底删除这个Group消息
创建时机
自动创建位移主题(默认)
当Kafka集群中的第一个Consumer程序启动时,kafka会自动创建位移主题(_consumer_offsets)
分区数由Broker 端参数 offsets.topic.num.partitions 决定,默认值50
副本数由Broker参数 offsets.topic.num.partitions 决定,默认值3
手动创建位移主题(不建议)
何时创建:
在kafka集群尚未启动任何Consumer之前,使用kafka ApI创建它
在kafka集群尚未启动任何Consumer之前,使用kafka ApI创建它
好处:
可以自己控制分区个数和副本数
可以自己控制分区个数和副本数
位移提交
定义:
Consumer向kafka汇报自己的位移数据,这个汇报过程被称为提交位移;
把消费位移存储起来(持久化)的动作称为"提交";消费者在消费完消息之后需要执行位移提交;
因为Consumer能够同时消费多个分区的数据,所以位移提交是在分区粒度上进行的;
Consumer需要为分配给它的每个分区提交各自的位移数据;
Consumer向kafka汇报自己的位移数据,这个汇报过程被称为提交位移;
把消费位移存储起来(持久化)的动作称为"提交";消费者在消费完消息之后需要执行位移提交;
因为Consumer能够同时消费多个分区的数据,所以位移提交是在分区粒度上进行的;
Consumer需要为分配给它的每个分区提交各自的位移数据;
目的:
为了表征Consumer的消费进度,这样当Consumer发生故障重启之后,就能够从Kafka之后读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程再来一遍
为了表征Consumer的消费进度,这样当Consumer发生故障重启之后,就能够从Kafka之后读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程再来一遍
示意图
当前消费者需要提交的消费位移(commit offset)不是x,而是 x + 1;
下一条需要拉取消息的位置(position)也是 x + 1
自动提交(默认)
实现方式
开启方式:
enable.auto.commit(默认true);
Consumer自动位移提交,在后台定期提交
enable.auto.commit(默认true);
Consumer自动位移提交,在后台定期提交
kafka会在开始调用poll方法时,提交上次poll返回的所有消息
从顺序上来说,poll方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此可以保证消息不丢失
从顺序上来说,poll方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此可以保证消息不丢失
在每次拉取之前会检查是否可以进行位移提交,如果可以提交,会先提交上一次的位移
提交间隔时间:
auto.commit.interval.ms(默认值5秒)
auto.commit.interval.ms(默认值5秒)
kafka每5秒会自动提交一次位移
代码
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
问题
消息重复
场景:
假设刚提交完上一次消费位移,拉取一批(或几批)消息消费(已经消费完,还没提交位移),
在下一次自动提交之前(kafka默认每5秒提交一次位移),消费者崩溃了(或者Rebalance),消费者恢复后,又得从上一次位移提交的地方重新开始消费,造成重复消费
假设刚提交完上一次消费位移,拉取一批(或几批)消息消费(已经消费完,还没提交位移),
在下一次自动提交之前(kafka默认每5秒提交一次位移),消费者崩溃了(或者Rebalance),消费者恢复后,又得从上一次位移提交的地方重新开始消费,造成重复消费
解决方案(不能完全解决,只是减少出现的情况):
可减少 auto.commit.interval.ms 的值来提高提交频率;
但这么做只能缩小重复消费的时间窗口,不可能完全消除它
可减少 auto.commit.interval.ms 的值来提高提交频率;
但这么做只能缩小重复消费的时间窗口,不可能完全消除它
消息丢失
1. 消费者每隔3秒提交一次offset,假如偏移量成功提交了,但是数据处理失败了,这个时候就会丢数据
2. 假设消费线程消费消息放在本地缓存中,比如BlockingQueue中;
目前已经消费了10次,且已经提交(不能再消费);
处理线程当前值处理到了BlockingQueue中的第6次,此时处理线程发生异常;
待处理线程恢复的时候,本地缓存中没有了数据,消费线程从第11次开始消费消息再放入本地缓存;
那么第6-10次中间的消息就没有处理,造成消息丢失
目前已经消费了10次,且已经提交(不能再消费);
处理线程当前值处理到了BlockingQueue中的第6次,此时处理线程发生异常;
待处理线程恢复的时候,本地缓存中没有了数据,消费线程从第11次开始消费消息再放入本地缓存;
那么第6-10次中间的消息就没有处理,造成消息丢失
手动提交
1. 设置enable.auto.commit = false;
2. 调用API手动提交位移
同步提交
无参数
commitSync():
只能提交当前批次对应的position值(poll()方法返回的最新值)
commitSync():
只能提交当前批次对应的position值(poll()方法返回的最新值)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
try {
consumer.commitSync();
} catch (CommitFailedException e) {
handle(e); // 处理提交失败异常
}
process(records); // 处理消息
try {
consumer.commitSync();
} catch (CommitFailedException e) {
handle(e); // 处理提交失败异常
}
带参数
commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets);
可以提交一个中间值,比如业务每消费N条消息就提交一次位移
commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets);
可以提交一个中间值,比如业务每消费N条消息就提交一次位移
调用时机:
处理完了poll()方法返回的所有消息之后,再提交位移;如果过早的提交了位移,会出现消费数据丢失的情况
处理完了poll()方法返回的所有消息之后,再提交位移;如果过早的提交了位移,会出现消费数据丢失的情况
阻塞消费者线程(当调用commitSync()时,Consumer程序会处于阻塞状态,直到远端的Broker返回提交结果,这个状态才会结束)
同步提交支持自动重试
异步提交
commitAsync()
commitAsync(OffsetCommitCallback callback)
提供了回调函数,可以实现提交之后的逻辑(记录日志、处理异常等)
commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
consumer.commitAsync((offsets, exception) -> {
if (exception != null)
handle(exception); // 处理提交失败异常
});
process(records); // 处理消息
consumer.commitAsync((offsets, exception) -> {
if (exception != null)
handle(exception); // 处理提交失败异常
});
执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作
异步提交可以使消费者的性能得到一定的增强
问题:
异步提交出现问题时不会自动重试
异步提交出现问题时不会自动重试
因为它是异步操作,消费者线程可能往后消费了更多的消息。重试时提交的位移值不是最新的值了
假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费
此时如果发生了崩溃或者Rebalance,都会导致重复消费
异步提交下仍然重试的方案:
而如果想进行重试同时又保证提交顺序的话,一种简单的办法是使用单调递增的序号。每次发起异步提交时增加此序号,并且将此时的序号作为参数传给回调方法;当消息提交失败回调时,检查参数中的序号值与全局的序号值,如果相等那么可以进行重试提交,否则放弃(因为已经有更新的位移提交了)。
而如果想进行重试同时又保证提交顺序的话,一种简单的办法是使用单调递增的序号。每次发起异步提交时增加此序号,并且将此时的序号作为参数传给回调方法;当消息提交失败回调时,检查参数中的序号值与全局的序号值,如果相等那么可以进行重试提交,否则放弃(因为已经有更新的位移提交了)。
利用同步和异步API一起提交
思路
我们可以利用 commitSync 的自动重试来规避那些瞬时错误,比如网络的瞬时抖动,Broker 端 GC 等。因为这些问题都是短暂的,自动重试通常都会成功,因此,我们不想自己重试,而是希望 Kafka Consumer 帮我们做这件事
我们不希望程序总处于阻塞状态,影响 TPS
代码
对于常规性、阶段性的手动提交,调用commitAsync() 避免程序阻塞
而在Consumer要关闭前,我们调用commitSync() 方法执行同步阻塞式的位移提交,以确保Consumer关闭前能够保存正确的位移数据
总结
1. 自动提交会导致消息重复和消息丢失
2. 手动同步提交可以自动重试,但会阻塞消费者进程,影响TPS
3. 手动异步提交,消费者进程不会阻塞,但如果服务器返回提交失败,异步提交不会进行重试
位移过期消息的删除
背景:
如果不存在位移过期消息删除的策略,那么只要Consumer一直启动着,就会无限期往位移主题写入消息;
即使某个主题中没有消息可消费,还是会一直往位移主题中写入最后的位移信息;
所以必须要有针对位移主题消息特点的消息删除策略,否则这种消息会越来越多,最终撑爆整个磁盘
如果不存在位移过期消息删除的策略,那么只要Consumer一直启动着,就会无限期往位移主题写入消息;
即使某个主题中没有消息可消费,还是会一直往位移主题中写入最后的位移信息;
所以必须要有针对位移主题消息特点的消息删除策略,否则这种消息会越来越多,最终撑爆整个磁盘
举例:
假设 Consumer 当前消费到了某个主题的最新一条消息,位移是 100,之后该主题没有任何新消息产生,故 Consumer 无消息可消费了,所以位移永远保持在 100。由于是自动提交位移,位移主题中会不停地写入位移 =100 的消息
假设 Consumer 当前消费到了某个主题的最新一条消息,位移是 100,之后该主题没有任何新消息产生,故 Consumer 无消息可消费了,所以位移永远保持在 100。由于是自动提交位移,位移主题中会不停地写入位移 =100 的消息
解决方案:
Kafka 使用Compact 策略来删除位移主题中的过期消息,避免该主题无限期膨胀
Kafka 使用Compact 策略来删除位移主题中的过期消息,避免该主题无限期膨胀
Compact策略:
对于同一个 Key 的两条消息 M1 和 M2,如果 M1 的发送时间早于 M2,那么 M1 就是过期消息;
Compact的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起
对于同一个 Key 的两条消息 M1 和 M2,如果 M1 的发送时间早于 M2,那么 M1 就是过期消息;
Compact的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起
图中位移为 0、2 和 3 的消息的 Key 都是 K1。Compact 之后,分区只需要保存位移为 3 的消息,因为它是最新发送的
kafka提供了专门的后台线程(Log Cleaner)定期地巡检待Compact的主题,看看是否存在满足条件的可删除数据
找不到消费位移
情况分类
1. 新的消费组建立,没有可以查找的消费位移
2. 消费组内一个新的消费者订阅了一个新的主题,没有可以查找的消费位移
3. _consumer_offsets主题中有关这个消费组的位移信息过期被删除后,也没有可以查找的消费位移
处理方法
kafka的消费者找不到记录的消费位移时,会根据消费者客户端参数 auto.offset.reset 的配置来决定从何处进行消费
auto.offset.reset
重置消费位移
重置消费位移
latest(默认值):
表示将位移调整到最新末端位移(图中的9)
表示将位移调整到最新末端位移(图中的9)
earliest :
表示将位移调整到主题当前最早位移处,这个最早位移不一定是0。因为kafka会自动删除过期位移,所以earliest很可能大于0
表示将位移调整到主题当前最早位移处,这个最早位移不一定是0。因为kafka会自动删除过期位移,所以earliest很可能大于0
none :
查找不到消费位移时,直接抛出 NoOffsetForPartitionException 异常
查找不到消费位移时,直接抛出 NoOffsetForPartitionException 异常
消费者管理TCP连接
何时创建
和生产者不同,创建KafkaConsumer实例时是不会创建任何TCP连接的
TCP连接是在调用KafkaConsumer.poll 方法时被创建的;
在poll方法内部有3个时机可以创建TCP连接
在poll方法内部有3个时机可以创建TCP连接
1. 发起FindCoordinator(协调者)请求时
当消费者程序首次启动调用 poll 方法时,它需要向 Kafka 集群发送一个名为 FindCoordinator 的请求,希望 Kafka 集群告诉它哪个 Broker 是管理它的协调者
FindCoordinator 请求可以发给集群中的任意服务器(回向当前集群中负载最小的那台Broker发送请求)
如何评估负载最小?
待发送请求最少,就是负载最小
待发送请求最少,就是负载最小
2. 连接协调者时
Broker处理完上一步发送的FindCoordinator请求之后,会返还对应的相应结果,显示地告诉消费者哪个Broker是真正的协调者
消费者知晓了真正的协调者后,会创建连向该Broker的Socket连接
当成功连入协调者,协调者才能开启正常的组协调操作,比如加入组、等待组分配方案、心跳请求处理、位移获取、位移提交等
3. 消费数据时
消费者会为每个要消费的分区创建与该分区领导者副本所在Borker连接的TCP
举例:
假设消费者要消费 5 个分区的数据,这 5 个分区各自的领导者副本分布在 4 台 Broker 上,那么该消费者在消费时会创建与这 4 台 Broker 的 Socket 连接
假设消费者要消费 5 个分区的数据,这 5 个分区各自的领导者副本分布在 4 台 Broker 上,那么该消费者在消费时会创建与这 4 台 Broker 的 Socket 连接
何时关闭
主动关闭
显式地调用消费者API的方法去关闭消费者
方法:
手动调用 KafkaConsumer.close() 方法,或者是执行Kill命令 (kill -2 或者 kill -9)
手动调用 KafkaConsumer.close() 方法,或者是执行Kill命令 (kill -2 或者 kill -9)
自动关闭
由消费者端参数 connection.max.idle.ms控制(默认值是9分钟);
如果某个Socket连接上连续9分钟都没有任何请求的话,那么消费者会强行杀掉这个Socket连接
如果某个Socket连接上连续9分钟都没有任何请求的话,那么消费者会强行杀掉这个Socket连接
当第三类 TCP 连接成功创建后,消费者程序就会废弃第一类 TCP 连接,之后在定期请求元数据时,它会改为使用第三类 TCP 连接。
也就是说,最终你会发现,第一类 TCP 连接会在后台被默默地关闭掉。对一个运行了一段时间的消费者程序来说,只会有后面两类 TCP 连接存在
也就是说,最终你会发现,第一类 TCP 连接会在后台被默默地关闭掉。对一个运行了一段时间的消费者程序来说,只会有后面两类 TCP 连接存在
消费者组消费进度监控
使用 Kafka 自带的命令行工具 kafka-consumer-groups 脚本
使用 Kafka Java Consumer API 编程
使用 Kafka 自带的 JMX 监控指标
消息保障
消息的可靠性保障
消息交互可靠性保障
定义:
对Producer和Consumer要处理的消息提供什么样的承诺
对Producer和Consumer要处理的消息提供什么样的承诺
常见的承诺
最多一次(at most once):
消息可能丢失,但不会重复发送
消息可能丢失,但不会重复发送
至少一次(at least once):
消息不会丢失,但可能重复发送
消息不会丢失,但可能重复发送
精确一次(exactly once):
消息不会丢失,也不会重复发送
消息不会丢失,也不会重复发送
kafka的默认实现
kafka默认提供的交付可靠性保证是至少一次
当Prodicer无法确定消息是否成功提交时,会选择重试(可能导致重复发送)(关闭Producer重试即可实现最多一次)
精确一次(exactly once)(0.11.0 版本之后)
幂等性Producer
概述
Producer默认不是幂等性的,kafka向分区发送数据时,可能会出现同一条消息被发送了多次,导致重复消费的情况
0.11版本之后,可以创建幂等性Producer,保证消息只被发送一次
开启方式
设置参数:
props.put(“enable.idempotence”, ture)
或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)
props.put(“enable.idempotence”, ture)
或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)
实现原理
用空间换时间的优化思路,在Broker端多保存了一些字段,当Producer发送了具有相同字段值的消息后,Broker能够自动知晓这些消息已经重复了,会把他们丢弃掉
缺陷
1. 只能保证单分区上的幂等性
一个幂等性Producer能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性
2. 只能实现单会话上的幂等性
可以理解为Producer进程的一次运行,当重启了Producer进程之后,这种幂等性保证就丧失了
事务型Producer
概述
kafka自0.11版本开始提供对事务的支持,隔离级别是read committed
保证多条消息原子性地写入目标分区,同时也能保证Consumer只能看到事务成功提交的消息
原理
主要的机制是两阶段提交(2PC)。引入了事务协调器的组件帮助完成分布式事务
开启方式(Producer端)
设置参数1:
props.put(“enable.idempotence”, ture)
props.put(“enable.idempotence”, ture)
设置参数2:
设置 Producer 端参数 transctional. id。最好为其设置一个有意义的名字
设置 Producer 端参数 transctional. id。最好为其设置一个有意义的名字
开启方式(Consumer端)
设置参数:
isolation.level = read_committed
isolation.level = read_committed
read_committed表明Consumer只会读取事务型Producer成功提交事务写入的消息
默认值是read_uncommitted,表明Consumer能够读取到kafka写入的任何消息,不论事务型Producer提交事务还是终止事务,其写入的消息都可以读取
作用(如何选取)
幂等性Producer和事务型Producer都是kafka为实现精确一次处理语义所提供的工具
幂等性Producer只能保证单分区、单会话上的消息幂等性
事务型Producer能够保证跨分区、跨会话间的幂等性
不足
比起幂等性 Producer,事务型 Producer 的性能要更差
生产者推送消息
发送模式
发后既忘(fire and forget)
方式:
只管往kafka中发送消息而不关心消息是否正确到达;性能最高,可靠性最低
只管往kafka中发送消息而不关心消息是否正确到达;性能最高,可靠性最低
问题:
在发生不可重试异常时,会造成消息丢失
在发生不可重试异常时,会造成消息丢失
代码:
producer.send(msg)
producer.send(msg)
同步(sync)
方式:
send()方法并非是void类型,而是Future类型,所以可以通过get()方法来阻塞等待响应,实现同步
send()方法并非是void类型,而是Future类型,所以可以通过get()方法来阻塞等待响应,实现同步
问题:
性能很差,需要等待一条消息发送完之后才能发送下一条
性能很差,需要等待一条消息发送完之后才能发送下一条
代码:
producer.send(record).get()
producer.send(record).get()
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();
System.out.println(metadata.topic());
System.out.println(metadata.partition());
System.out.println(metadata.offset());
RecordMetadata metadata = future.get();
System.out.println(metadata.topic());
System.out.println(metadata.partition());
System.out.println(metadata.offset());
RecordMetadata对象中包含了一些消息的元数据信息;
比如:当前消息存储的主题、分区号、偏移量、时间戳等
比如:当前消息存储的主题、分区号、偏移量、时间戳等
异步(async)
producer.send(record,new Callback(){
@Override
public void onCompletion(RecordMetadata metadata,Exception exception){
if(exception != null){
exception.printStackTrace();
} else {
System.out.println(metadata.topic());
System.out.println(metadata.partition());
System.out.println(metadata.offset());
}
}
})
@Override
public void onCompletion(RecordMetadata metadata,Exception exception){
if(exception != null){
exception.printStackTrace();
} else {
System.out.println(metadata.topic());
System.out.println(metadata.partition());
System.out.println(metadata.offset());
}
}
})
onCompletion()方法中两个参数是互斥的;
消息发送成功时,metadata不为null,exception不为null;
消息发送异常时,metadata为null,exception不为null;
消息发送成功时,metadata不为null,exception不为null;
消息发送异常时,metadata为null,exception不为null;
实际应用中,发送异常时,应该做某种重试或者日志记录
异步发送的条件
当满足以下其中一个条件的时候就触发 发送
batch.num.messages 异步发送 每次批量发送的条目
queue.buffering.max.ms 异步发送的时候 发送时间间隔 单位是毫秒
batch.num.messages 异步发送 每次批量发送的条目
queue.buffering.max.ms 异步发送的时候 发送时间间隔 单位是毫秒
生产环境应该都应该采用异步方式发送消息
配置方式(老版本)
推送异常类型
可重试的异常
NetworkException、LeaderNotAvailableException、UnknownTopicOrPartitionException、
NotEnoughReplicasException、NotCoordinatorException等
NotEnoughReplicasException、NotCoordinatorException等
NetworkException表示网络异常,有可能是由于网络瞬时故障而导致的异常,可以通过重试解决
LeaderNotAvailableException表示分区的leader副本不可用,这个异常通常发生在leader副本下线而新的leader副本选举完成之前,重试之后可以重新恢复
对于可重试的异常,如果配置了retries参数,只要在规定的重试次数内自行恢复了,就不会抛出异常
不可重试的异常
比如RecordTooLargeException等
对于不可重试的异常,kafka不会对此进行任何重试,直接抛出异常
零丢失配置方案
生产者(如何保证kafka写入数据不丢失)(Producer端)
配置
设置acks=all(或者acks=-1)(Producer端参数)
扩展(具体含义)
acks=0
代表生产者只要把消息发送出去以后就认为消息发送成功了,这种方式有可能会导致数据丢失,因为有可能消息发送到服务端以后服务端存储失败了
acks=1
代表生产者把消息发送到服务端,服务端的leader replica 副本写成功以后,就返回生产者消息发送成功了,这种方式也有可能会导致丢数据,因为有可能刚好数据写入到leader replica,然后返回处理成功的响应给生产者,假如这个时候leader replica在的服务器出问题了,follower replica还没来得及同步数据,这个时候是会丢数据的。
acks=all(acks=-1)
代表生产者把消息发送到服务端,服务端的ISR列表里所有replica 都写入成功以后,才会返回成功响应给生产者。假设ISR列表里面有该分区的三个replica(一个leader replica,两个follower replica),那么acks=-1就意味着消息要写入到leader replica,并且两个follower replica从leader replica上同步数据成功,服务端才会给生产者发送消息发送成功的响应。
所以ISR列表里面的replica就非常关键。如果我们想要保证数据不丢,那么acks的值设置为-1,并且还需要保证ISR列表里面是1个副本以上,具体由哪个参数控制,看下面的服务端的配置
所以ISR列表里面的replica就非常关键。如果我们想要保证数据不丢,那么acks的值设置为-1,并且还需要保证ISR列表里面是1个副本以上,具体由哪个参数控制,看下面的服务端的配置
设置retries=MAX(Producer端的参数)
一旦写入失败就无限重试(或者此处设置较大的重试次数)
如果重试失败,需要对异常进行处理,可以把消息另外保存到其他安全的地方
推送消息使用带有回调的API
在生产中Kafka生产者的开发我们都会用异步调用的方式,异步调用方式有如下两个API:
producer.send(msg) 不带回调方法
producer.send(msg,callback) 带回调方法
记得要使用带有回调方法的API,我们可以根据回调函数得知消息是否发送成功,如果发送失败了我们要进行异常处理,比如存储到其他介质来保证消息不丢。
producer.send(msg) 不带回调方法
producer.send(msg,callback) 带回调方法
记得要使用带有回调方法的API,我们可以根据回调函数得知消息是否发送成功,如果发送失败了我们要进行异常处理,比如存储到其他介质来保证消息不丢。
原理
生产者等待broker的ack,当partition的leader接收到消息,并且ISR中所有的follower都同步到消息之后,才会发送ack给生产者,表示发送成功。如果不满足这个条件,生产者会自动不断的重试,重试无限次
服务器(kafka本身丢失)(Broker端)
配置
1. unclean.leader.election.enable=false (Broker端的参数)
关闭unclean领导者选举,即不允许非ISR中的副本被选举为leader,以避免数据丢失
2. replication.factor >1 (Broker端参数)
或者 replication.factor >= 3
或者 replication.factor >= 3
参数的含义是partition的副本数量大于等于3
3. min.insync.replicas >1 (Broker端参数)
消息至少被写入到多少个副本才算是“已提交”
设置成大于1可以提升消息持久性
在实际环境中千万不要使用默认值1
设置成大于1可以提升消息持久性
在实际环境中千万不要使用默认值1
这个参数要跟生产者里的acks参数配合使用,当生产者acks=-1时,服务端的ISR列表里的所有副本都写入成功,才会给生产者返回成功的响应。而min.insync.replicas这个参数就是控制ISR列表的,假设min.insync.replicas=1,这就意味着ISR列表里可以只有一个副本,这个副本就是leader replica,这个时候即使acks设置的是-1,但其实消息只发送到leader replica,以后就返回成功的响应了。
因为ISR只有一个副本,我们知道这种情况是有可能会丢数据的,所以min.insync.replicas这个值需要大于1的(如果ISR列表里面副本的个数小于min.insync.replicas,生产者发送消息是失败的),并且是min.insync.replicas <= replication.factor
因为ISR只有一个副本,我们知道这种情况是有可能会丢数据的,所以min.insync.replicas这个值需要大于1的(如果ISR列表里面副本的个数小于min.insync.replicas,生产者发送消息是失败的),并且是min.insync.replicas <= replication.factor
4. 确保replication.factor > min.insync.replicas
推荐设置成replication.factor = min.insync.replicas + 1
推荐设置成replication.factor = min.insync.replicas + 1
如果两者相等,那么只要有一个副本挂了,整个分区就无法正常工作了
消费者(Consumer端)
配置
enable.auto.commit=false
关闭手动提交offset(消费端保证手动ACK+消费幂等)
消费者是可以自动提交offset的,但是如果是自动提交offset,可能会丢数据,比如消费者每隔3秒提交一次offset,假如偏移量成功提交了,但是数据处理失败了,这个时候就会丢数据。所以把enable.auto.commit设置成false就行
总结
Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证
生产者端:使用带回调的API / acks=all / retries=MAX
kafka服务器端:unclean.leader.election.enable=false/ replication.factor >1 / min.insync.replicas >1
消费者端:enable.auto.commit=false(手动提交offset)
生产者端:使用带回调的API / acks=all / retries=MAX
kafka服务器端:unclean.leader.election.enable=false/ replication.factor >1 / min.insync.replicas >1
消费者端:enable.auto.commit=false(手动提交offset)
如何保证消息不丢失不被重复消费
消息的发送机制
机制
kafka的消息发送机制分为同步和异步机制(配置同步和异步,可以通过producer.type属性进行配置)
使用同步模式时,有三种状态来保证消息的安全生产。可以通过配置 request.required.acks属性
0: 表示不进行消息接收是否成功的确认(但同时,acks值为0会得到最大的系统吞吐量)
1:表示当Leader接收成功时确认
-1:表示Leader和Follower都接收成功时确认
消息丢失的场景
当acks = 0的时候,不和Kafka集群进行消息接收确认,则当网络异常、缓冲区满了等情况时,消息可能丢失
acks=1的时候,只保证leader写入成功。当leader partition挂了的时候,数据就有可能发生丢失
使用异步模式的时候,如果配置acks=0,消息还没有接受到确认,当缓冲区满了,就会自动清空缓冲池里的消息
如何保证消息不丢失
同步模式下只需要将确认机制设置为-1,让消息写入leader和所有的副本,就可以保证消息安全生产
异步模式下,如果消息发出去了,但还没有收到确认的时候,缓冲池满了,在配置文件中将阻塞超时的时间设置为不限制,也就说让生产端一直阻塞,这样也能保证数据不会丢失
设置block.on.buffer.full = true。 这样producer将一直等待缓冲区直至其变为可用。缓冲区满了就阻塞
设置block.on.buffer.full = true。 这样producer将一直等待缓冲区直至其变为可用。缓冲区满了就阻塞
消息的接受机制
设置enable.auto.commit为false
关闭自动提交位移
消息的重复消费如何解决
一方面需要消息中间件来进行保证。另一方面需要自己的处理逻辑来保证消息的幂等性
有可能代码消费了消息,但服务器突然宕机,未来得及提交offset。所以我们可以在代码保证消息消费的幂等性
可以通过redis的原子性来保证,也可以通过数据库的唯一id来保证
集群运维
线上部署
磁盘系统
考量点
操作系统I/O模型
建议
将kafka部署在Linux系统上
kafka客户端底层使用了java的selector,selector在linux上的实现机制是epoll,而在windows平台上的实现机制是select。因此kafka部署在linux上是优势的,因为能够获得更高效的I/O性能
在 Linux 部署 Kafka 能够享受到零拷贝技术所带来的快速数据传输特性。
磁盘
考量点
磁盘I/O性能
建议
普通环境使用机械磁盘,不需要搭建RAID
磁盘容量
考量点
根据消息数、留存时间预估磁盘容量
建议
实际使用中建议预留20%到30%的磁盘空间
带宽
考量点
根据实际带宽资源和业务SLA预估服务器数量
建议
对于千兆网络,建议每台服务器按照700Mbps来计算,避免大流量下的丢包
控制器
基本概念
控制器(Controller)是Kafka的核心组件,主要作用是在zookeeper的帮助下管理和协调整个Kafka集群
集群中任意一台Broker都能作为控制器;
在正常运行过程中,只有一个Broker能成为控制器,行使管理和协调的职责
在正常运行过程中,只有一个Broker能成为控制器,行使管理和协调的职责
选举策略
当Broker在启动时,都会尝试去zookeeper上创建 /controller 节点;
Kafka选举控制器的规则为:第一个成功创建 /controller 节点的Borker就是当前集群的控制器
作用(协调作用)
主题管理(创建、删除、增加分区)
控制器帮助完成对Kafka主题的创建、删除以及增加分区的操作;
执行Kafka-topics.sh脚本时大部分工作都是由控制器完成
分区重分配
主要指使用 kafka-reassign-partitions 脚本 对已有分区进行细粒度的分配功能
Preferred 领导者选举
kafka为了避免部分Broker负载过重而提高的一种换Leader的方案
.集群成员管理(新增 Broker、Broker 主动关闭、Broker 宕机)
自动检测新增Broker、Broker主动关闭及Broker宕机等(依赖Zookeeper的Watch功能和临时节点组合实现)
示例
新增Broker:
当有新Broker启动时,会在zookeeper的 /brokers/ids 下创建专属的znode节点,zookeeper会通过Watch机制将消息推送给控制器,控制器就能自动感知到创建节点的变化,进而后续的新增Broker操作
当有新Broker启动时,会在zookeeper的 /brokers/ids 下创建专属的znode节点,zookeeper会通过Watch机制将消息推送给控制器,控制器就能自动感知到创建节点的变化,进而后续的新增Broker操作
检测Broker存活性:
Broker启动时在 /brokers/ids 下创建的是 临时znode,当Broker宕机或者主动关闭后,与zookeeper的回话结束,这个znode会被自动删除;
同理 zookeeper的 Watch 机制会将这个变更推送给控制器,控制器进行后续处理
Broker启动时在 /brokers/ids 下创建的是 临时znode,当Broker宕机或者主动关闭后,与zookeeper的回话结束,这个znode会被自动删除;
同理 zookeeper的 Watch 机制会将这个变更推送给控制器,控制器进行后续处理
数据服务
向其他Broker提高数据服务
控制器上保存了最全的集群元数据,控制器会定期地往其他Broker发送请求,从而更新自己内存中的元数据
控制器保存的数据
这些数据在zookeeper上也保存了一份,控制器初始化时,从zookeeper读取数据保存到自己的内存中
所有主题信息:
包括具体的分区信息,比如领导者副本是谁,ISR集合中有哪些副本等
包括具体的分区信息,比如领导者副本是谁,ISR集合中有哪些副本等
所有Broker信息:
包括当前都有哪些运行中的Broker,哪些正在关闭中的 Broker等
包括当前都有哪些运行中的Broker,哪些正在关闭中的 Broker等
所有涉及运维任务的分区:
包括当前正在进行 Preferred 领导者选举以及分区重分配的分区列表
包括当前正在进行 Preferred 领导者选举以及分区重分配的分区列表
故障转移(Failover)
概念:
故障转移指的是,当运行中的控制器突然宕机或意外终止时,Kafka 能够快速地感知到,并立即启用备用控制器来代替之前失败的控制器
故障转移指的是,当运行中的控制器突然宕机或意外终止时,Kafka 能够快速地感知到,并立即启用备用控制器来代替之前失败的控制器
过程
zookeeper通过Watch机制快速感知到控制器所在的Broker宕机,并马上删除 /controller 节点
存活的Broker开始竞争新的控制器,第一个在zookeeper上创建 /controller 节点的Broker当选控制器
新的控制器从zookeeper中读取集群元数据信息,并初始化到自己的内存中;Failover 完成
其他
Broker
持久化数据
1. Kafka使用消息数据(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Append-only)消息的物理文件
2. 因为只能追加写入,故避免了缓慢的随机I/O操作,改为性能较好的顺序I/O写操作,这也是kafka实现高吞吐量的一个手段
定期删除数据
原理就是通过日志段(Log Segment)机制
在kafka底层,一个日志又近一步细分成多个日志段,消息被追加写到当前最新的日志段中,当写满一个日志段后,kafka会自动切分出一个新的日志段,并将老的日志段封存起来。kafka在后台还有定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的
zookeeper
作用
它是一个分布式协调框架,负责协调管理并保存 Kafka 集群的所有元数据信息,比如集群都有哪些 Broker 在运行、创建了哪些 Topic,每个 Topic 都有多少分区以及这些分区的 Leader 副本都在哪些机器上等信息
高水位(High Watermark)
作用
定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的
帮助 Kafka 完成副本同步
HW、LEO 基本概念
HW
HW是 High Watemark的缩写,也叫高水位,标识了一个特定的消息偏移量(offset)
已提交消息
消息位移保存起来(持久化)的消息
分区高水位以下的消息被认为是已提交消息
消费者只能消费已提交消息,即下图中位移小于8的所有消息
在分区高水位以下的消息被认为是已提交消息,反之就是未提交消息
LEO
LEO是Log End Offset的缩写,标识日志末端位移,也标识副本写入下一条消息的位移
数字15的方框是虚线,说明当前副本只有15条消息,位移值是0到14,下一条新消息的位移是15
如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]
特点
每个Kafka副本对象都有两个重要的属性:LEO和HW。注意是所有的副本,而不只是leader副本
同一个副本对象,HW值不会大于LEO值
一个分区中,ISR中最小的LEO为分区的HW
Kafka 使用 Leader 副本的高水位来定义所在分区的高水位。换句话说,分区的高水位就是其 Leader 副本的高水位
高水位更新机制
概念
每个副本对象都保存了一组高水位值和LEO值;
在Leader副本所在的Broker上,还保存了其他Follower副本的LEO值
在Leader副本所在的Broker上,还保存了其他Follower副本的LEO值
Broker 0 上保存了某分区的 Leader 副本和所有 Follower 副本的 LEO 值;
而 Broker 1 上仅仅保存了该分区的某个 Follower 副本
而 Broker 1 上仅仅保存了该分区的某个 Follower 副本
Kafka 把 Broker 0 上保存的这些 Follower 副本又称为远程副本(Remote Replica),帮助Leader副本确定其高水位
更新机制:
Kafka 副本机制在运行过程中,会更新 Broker 1 上 Follower 副本的高水位和 LEO 值,同时也会更新 Broker 0 上 Leader 副本的高水位和 LEO 以及所有远程副本的 LEO,但它不会更新远程副本的高水位值,也就是我在图中标记为灰色的部分
Kafka 副本机制在运行过程中,会更新 Broker 1 上 Follower 副本的高水位和 LEO 值,同时也会更新 Broker 0 上 Leader 副本的高水位和 LEO 以及所有远程副本的 LEO,但它不会更新远程副本的高水位值,也就是我在图中标记为灰色的部分
HW和LEO的更新时机
follower副本何时更新LEO
kafka有2套follower副本LEO
一套LEO保存在follower副本所在的Broker的副本管理机制中;(第一套)
帮助follower副本更新其HW值
另一套LEO保存在leader副本所在broker的副本管理机制中 (第二套)
帮助leader副本更新其HW使用
follower副本端的follower副本LEO何时更新?
follower副本端的LEO值就是底层日志的LEO值,也就是说每当写入一条消息,其LEO值就会被更新(类似于LEO += 1);
当follower发送FETCH请求后,leader将数据返回给follower,此时follower开始向底层log写数据,从而自动地更新LEO值
当follower发送FETCH请求后,leader将数据返回给follower,此时follower开始向底层log写数据,从而自动地更新LEO值
leader副本端的follower副本LEO何时更新?
发生在leader副本处理follower FETCH请求的时候;
一旦leader接收到follower发送的FETCH请求,它首先会从自己的log中读取相应的数据,但是在给follower返回数据之前它先去更新follower的LEO(该LEO是上面说的第二套LEO)
一旦leader接收到follower发送的FETCH请求,它首先会从自己的log中读取相应的数据,但是在给follower返回数据之前它先去更新follower的LEO(该LEO是上面说的第二套LEO)
follower副本何时更新HW
follower更新HW发生在其更新LEO之后,一旦follower向log写完数据,它会尝试更新它自己的HW值
具体算法是 比较当前LEO值与FETCH响应中leader的HW值,取两者的小者作为新的HW值
leader副本何时更新LEO
和follower更新LEO道理相同,leader写log时就会自动地更新它自己的LEO值
leader副本何时更新HW值
机制:
leader的HW值就是分区HW值,它直接影响了分区数据对于Consumer的可见性
leader的HW值就是分区HW值,它直接影响了分区数据对于Consumer的可见性
以下4种情况leader会尝试去更新分区HW(尝试更新,有可能因为不满足条件而不做任何更新)
副本成为leader副本时
当某个副本成为了分区的leader副本,kafka会尝试更新分区HW
broker出现崩溃导致副本被踢出ISR时
若有broker奔溃则必须查看下是否会涉及此分区
producer向leader副本写入消息时(leader处理producer请求)
因为写入消息会更新leader的LEO,故有必要再查看下HW值是否也需要修改
leader处理follower FETCH请求时
当leader处理follower的FETCH请求时首先从底层的log读取数据,之后尝试更新分区HW值
总结:当kafka Broker都正常工作时,分区HW值的更新时机有两个:leader处理produce请求时和leader处理FETCH请求时
如何确定分区HW
当尝试确定分区HW时,它会选出所有满足条件的副本,比较它们的LEO(当然也包括leader自己的LEO),并选择最小的LEO值作为HW值
满足条件
处于ISR中
副本LEO落后于leader LEO的时长不大于replica.lag.time.max.ms参数值(默认是10s)
具体步骤分析
初始状态
初始时leader和follower的HW和LEO都是0
leader中的remote LEO指的就是leader端保存的follower LEO,也被初始化成0
场景一:follower发送FETCH请求,在leader处理完PRODUCE请求之后
producer给该topic分区发送了一条消息(此时follower尚未发送FETCH请求)
leader接收到PRODCER请求主要做两件事
将消息写入底层log(同时也就自动更新了leader的LEO)(leader LEO 更新成 1)
尝试更新leader HW值,min(leader LEO,remote LEO) = 0,取最小值0,与当前HW值相同,所以不会更新分区HW值;(leader LEO = 1,但是 remote LEO = 0,所以leader HW = 0)
总结: leader HW = 0,leader LEO = 1,remote LEO = 0
此时follower发送FETCH请求(leader处理follower的FETCH请求)
(或者说follower早已发送了FETCH请求,只不过在broker的请求队列中排队)
(或者说follower早已发送了FETCH请求,只不过在broker的请求队列中排队)
当follower发送FETCH请求时,leader端的处理依次如下
读取底层log数据
更新remote LEO = 0;
(为什么是0?因为此时follower还没有写入这条消息)
(leader如何确认follower还未写入呢?这是通过follower发送的FETCH请求中的fetch offset来确定的)
(为什么是0?因为此时follower还没有写入这条消息)
(leader如何确认follower还未写入呢?这是通过follower发送的FETCH请求中的fetch offset来确定的)
尝试更新分区HW值 = 0;
此时leader LEO = 1,remote LEO = 0,所以分区HW值 = min(leader LEO,follower remote LEO) = 0
此时leader LEO = 1,remote LEO = 0,所以分区HW值 = min(leader LEO,follower remote LEO) = 0
把数据和当前分区HW值(依然是0)发送给follower副本
总结:leader HW = 0,Leader LEO = 1,remote LEO = 0
follower副本接收到FETCH response后
follower副本端依次如下操作
写入本地log(同时更新follower LEO = 1)
更新follower HW
比较本地LEO 和 当前leader HW 取最小值,min(follower LEO,leader HW) = 0,所以 follower HW = 0
比较本地LEO 和 当前leader HW 取最小值,min(follower LEO,leader HW) = 0,所以 follower HW = 0
总结:follower HW = 0,follower LEO = 1
此时,第一轮FETCH RPC结束
虽然leader和follower都已经在log中保存了这条消息,但分区HW值尚未被更新
leader HW = 0 , leader LEO = 1 , remote LEO = 0 , follower HW = 0 , follower LEO = 1
然后 follower发起第二轮FETCH 请求(此时无数据需要同步)
leader端收到后依次如下操作
读取底层log数据
更新 remote LEO = 1
这次为什么是1?
因为这轮FETCH RPC请求携带的fetch offset是1,
为什么这次携带的是1呢?
因为上一轮结束后follower LEO被更新为1了
因为这轮FETCH RPC请求携带的fetch offset是1,
为什么这次携带的是1呢?
因为上一轮结束后follower LEO被更新为1了
尝试更新分区HW = 1
此时leader LEO = 1,remote LEO = 1,所以分区HW = min(leader LEO , follower remote LEO) = 1
把数据(实际上没有数据)和当前分区HW值(已更新为1)发送给follower副本
总结:remote LEO = 1, leader HW = 1
follower副本接收到FETCH repsonse后
follower副本依次操作如下
写入本地log(当然无数据可写,所以follower LEO也不会变化,依然是1)
更新follower HW
比较本地LEO和当前leader HW取小者。由于此时两者都是1,故更新follower HW = 1
总结:follower LEO = 1, follower HW = 1
producer端发送消息后broker端完整的处理流程就讲完了。此时消息已经成功地被复制到leader和follower的log中且分区HW是1,表明consumer能够消费offset = 0的这条消息
场景二:FETCH请求保存在purgatory中PRODUCE请求到来
机制
follower发送过来的FETCH请求因为无数据而暂时会被寄存到leader端的purgatory中,待500ms(replica.fetch.wait.max.ms参数)超时后会强制完成。倘若在寄存期间producer端发送过来数据,那么会Kafka会自动唤醒该FETCH请求,让leader继续处理之
leader端处理
leader写入本地log
尝试唤醒在purgatory中寄存的FETCH请求
尝试更新分区HW
问题:
kafka使用HW值来决定副本备份的进度,而HW值的更新通常需要额外一轮FETCH RPC才能完成,故而这种设计可能会带来问题
(Follower 副本的高水位更新需要一轮额外的拉取请求才能实现。如果把上面那个例子扩展到多个 Follower 副本,情况可能更糟,也许需要多轮拉取请求)
kafka使用HW值来决定副本备份的进度,而HW值的更新通常需要额外一轮FETCH RPC才能完成,故而这种设计可能会带来问题
(Follower 副本的高水位更新需要一轮额外的拉取请求才能实现。如果把上面那个例子扩展到多个 Follower 副本,情况可能更糟,也许需要多轮拉取请求)
备份数据丢失
前提条件
min.insync.replicas=1
(消息至少被写入到多少个副本才算是“已提交”)
(消息至少被写入到多少个副本才算是“已提交”)
举例
上图中有两个副本:A和B。开始状态是A是leader
我们假设producer端min.insync.replicas设置为1,那么当producer发送两条消息给A后,A写入到底层log,此时Kafka会通知producer说这两条消息写入成功
但是在broker端,leader和follower底层的log虽都写入了2条消息且分区HW已经被更新到2,但follower HW尚未被更新;
倘若此时副本B所在的broker宕机,那么重启回来后B会自动把LEO调整到之前的HW值,故副本B会做日志截断(log truncation),将offset = 1的那条消息从log中删除,并调整LEO = 1,此时follower副本底层log中就只有一条消息,即offset = 0的消息
倘若此时副本B所在的broker宕机,那么重启回来后B会自动把LEO调整到之前的HW值,故副本B会做日志截断(log truncation),将offset = 1的那条消息从log中删除,并调整LEO = 1,此时follower副本底层log中就只有一条消息,即offset = 0的消息
B重启之后需要给A发FETCH请求,但若A所在broker机器在此时宕机,那么Kafka会令B成为新的leader,而当A重启回来后也会执行日志截断,将HW调整回1。这样,位移=1的消息就从两个副本的log中被删除,即永远地丢失了
这个场景丢失数据的前提是在min.insync.replicas=1时,一旦消息被写入leader端log即被认为是“已提交”,而延迟一轮FETCH RPC更新HW值的设计使得follower HW值是异步延迟更新的,倘若在这个过程中leader发生变更,那么成为新leader的follower的HW值就有可能是过期的,使得clients端认为是成功提交的消息被删除
引入Leader Epoch之后
Follower 副本 B 重启回来后,需要向 A 发送一个特殊的请求去获取 Leader 的 LEO 值。在这个例子中,该值为 2
当获知到 Leader LEO=2 后,B 发现该 LEO 值不比它自己的 LEO 值小,而且缓存中也没有保存任何起始位移值 > 2 的 Epoch 条目,因此 B 无需执行任何日志截断操作
此时副本A宕机,B成为 Leader。同样地,当 A 重启回来后,执行与 B 相同的逻辑判断,发现也不用执行日志截断,至此位移值为 1 的那条消息在两个副本中均得到保留
后面当生产者程序向 B 写入新消息时,副本 B 所在的 Broker 缓存中,会生成新的 Leader Epoch 条目:[Epoch=1, Offset=2]
之后,副本 B 会使用这个条目帮助判断后续是否执行日志截断操作
这样,通过 Leader Epoch 机制,Kafka 完美地规避了这种数据丢失场景
备份数据不一致
现象
leader端log和follower端log的数据可能不一致
举例
A依然是leader,A的log写入了2条消息,但B的log只写入了1条消息。分区HW更新到2,但B的HW还是1,同时producer端的min.insync.replicas = 1
这次我们让A和B所在机器同时挂掉,然后假设B先重启回来,因此成为leader,分区HW = 1。假设此时producer发送了第3条消息(绿色框表示)给B,于是B的log中offset = 1的消息变成了绿色框表示的消息,同时分区HW更新到2(A还没有回来,就B一个副本,故可以直接更新HW而不用理会A)之后A重启回来,需要执行日志截断,但发现此时分区HW=2而A之前的HW值也是2,故不做任何调整。此后A和B将以这种状态继续正常工作
显然,这种场景下,A和B底层log中保存在offset = 1的消息是不同的记录,从而引发不一致的情形出现
引入leader Epoch之后
解决方案(Leader Epoch)
方案
Kafka 0.11引入了leader epoch来取代HW值;
Leader端多开辟一段内存区域专门保存leader的epoch信息
Leader端多开辟一段内存区域专门保存leader的epoch信息
数据结构
Leader Epoch由两部分数据组成
Leader Epoch由两部分数据组成
Epoch。一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。
小版本号的Leader被认为是过期的Leader,不能再行使Leader权利
小版本号的Leader被认为是过期的Leader,不能再行使Leader权利
起始位移(Start Offset)。 Leader副本在该Epoch值上写入的首条消息的位移
leader epoch实际上是一对值:(epoch,offset)。epoch表示leader的版本号,从0开始,当leader变更过1次时epoch就会+1,而offset则对应于该epoch版本的leader写入第一条消息的位移
举例
Leader Epoch<0, 0> 和 <1, 120>
第一个 Leader Epoch 表示版本号是 0,这个版本的 Leader 从位移 0 开始保存消息,一共保存了 120 条消息。之后,Leader 发生了变更,版本号增加到 1,新版本的起始位移是 120
原理
Kafka Broker 会在内存中为每个分区都缓存 Leader Epoch 数据,同时它还会定期地将这些信息持久化到一个 checkpoint 文件中
当 Leader 副本写入消息到磁盘时,Broker 会尝试更新这部分缓存。如果该 Leader 是首次写入消息,那么 Broker 会向缓存中增加一个 Leader Epoch 条目,否则就不做更新
每次有 Leader 变更时,新的 Leader 副本会查询这部分缓存,取出对应的 Leader Epoch 的起始位移,以避免数据丢失和不一致的情况
举例说明(看上图)
ZooKeeper在Kafka中的作用是什么
Zookeeper在Kafka集群中主要用于协调管理,主要作用:
zookeeper上保存了kafka的元数据信息(控制器上也保存了一份)
broker
当前存活、正在关闭的Broker列表
某个Broker的所有分区、所有副本
分区
每个分区的副本列表
正在进行leader选举的分区
当前存活的所有副本
每个分区的leader和ISR信息
正在进行重分配的分区列表
某组分区下的所有副本
topic
topic列表
某个topic的所有副本
某个topic的所有分区
移除某个topic的所有信息
Broker注册
每个Broker在启动时,都会到Zookeeper上进行注册,即到/brokers/ids下创建属于自己的节点,如/brokers/ids/[0...N]
zookeeper会通过Watch机制将消息推送给控制器
每个Broker就会将自己的IP地址和端口信息记录到该节点中去
其中,Broker创建的节点类型是临时节点,一旦Broker宕机,则对应的临时节点也会被自动删除
controller(控制器)注册
当Broker在启动时,都会尝试去zookeeper上创建 /controller 节点;
第一个成功创建 /controller 节点的Borker就是当前集群的控制器
故障转移:
zookeeper通过Watch机制快速感知到控制器所在的Broker宕机,并马上删除 /controller 节点
重新选举后,新的控制器从zookeeper中读取集群元数据信息,并初始化到自己的内存中
topic注册
在Kafka中,同一个Topic的消息会被分成多个分区并将其分布在多个Broker上,这些分区信息及与Broker的对应关系也都是由Zookeeper在维护,由专门的节点来记录,如:/borkers/topics
生产者负载均衡
生产者需要将消息合理地发送到这些分布式的Broker上;
Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡
Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡
kafka的四层负载均衡
根据生产者的IP地址和端口来为其确定一个相关联的Broker
此方法不是真正的负载均衡,如果有些生产者产生的消息远多于其他生产者的话,那么会导致不同的Broker接收到的消息总数差异巨大;
同时,生产者也无法实时感知到Broker的新增和删除;
同时,生产者也无法实时感知到Broker的新增和删除;
zookeeper
由于每个Broker启动时,都会完成Broker注册过程,生产者会通过该节点的变化来动态地感知到Broker服务器列表的变更,这样就可以实现动态的负载均衡机制
消费者负载均衡
每条消息都只会发送给分组中的一个消费者
不同的Consumer Group消费自己特定的Topic下面的消息,互不干扰
分区和消费者的关系
在Kafka中,规定了每个消息分区 只能被同组的一个消费者进行消费
因此,需要在 Zookeeper 上记录 消息分区 与 Consumer 之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其Consumer ID 写入到 Zookeeper 对应消息分区的临时节点上,例如:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
消费进度 offset记录(新版本脱离zk)
在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度Offset记录到Zookeeper上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
1.0.9版本以后,修改为保存到kafka内部(将Consumer的位移数据作为一条条普通的kafka消息,提交到_consumer_offset中)
消费者注册以及重平衡
每个消费者服务器启动时,都会到Zookeeper的指定节点下创建一个属于自己的消费者节点。例如/consumers/[group_id]/ids/[consumer_id]
完成节点创建后,消费者就会将自己订阅的Topic信息写入该临时节点
0 条评论
下一页