Kafka设计与实践
2021-09-13 15:54:57 7 举报
AI智能生成
《深入理解Kafka:核心设计与实践原理》 学习笔记
作者其他创作
大纲/内容
一、简介
历史
LinkedIn 捐给 Apache,一开始用的Scala写的,后面用 Java 重写了
基本概念
架构图
术语
Producer:生产者,即发消息投递到 Kafka 的一方
Consumer:消费者,即从 Kafka 拿到消息消费的一方
Broker:Kafka 的服务实例,可以理解为存储并处理消息的
ZooKeeper:Kafka 引入用于负责集群元数据的管理、控制器选举
Topic:主题,消息是挂在(归类)一个具体主题(业务)下的
Partition:分区,可以理解为多个分区组成了一个逻辑上的主题,概念上更偏向于数据存储方面,可以视作消息存储的逻辑抽象,本质上是一个个磁盘上的log文件。
多副本机制
目的:一个分区只有一份数据,利用一主多从的思想,保障稳定性
原理:一个分区会冗余多个副本,Leader 副本负责读写数据,Follower 副本只用于同步Leader 副本数据,当 Leader 副本故障时,会重新选举新的Leader,以保障稳定性
ISR 机制
目的:保障多副本机制下,副本复制时的性能
原理:多副本机制需要使用一个副本复制策略,同步和异步的复制方式均不佳,引入适当副本(ISR集合)同步后即ACK的复制机制
HW 高水位机制
目的:保障多副本机制下故障发生后的数据一致性
原理:高水位 规定消费者最大可消费的offset 保障消费进度一致,Leader Epoch 用于记录 Leader 切换时记录 LEO 信息 防止错误的日志切断发生造成消息丢失。
安装配置:略
生产和消费:展示了一堆Kafka 自带的脚本工具,略
服务端参数配置:略
二、浅析生产者
客户端开发
(ProducerRecord)消息对象结构
topic
partition
headers:留给使用者自定义的一个扩展信息,一般会塞一些应用信息
key:消息的key,作为附加字段存在。
作用1:可利用 key 来计算分区号,发往指定分区;
作用2: topic 做第一次归类,key 用作第二次归类,同 key 可以放到同一个分区
作用3: 有key 还能支持日志压缩
value:消息体
timestamp:可分为两类
CreateTime:消息创建的时间
LogAppendTime:消息追加到日志文件的时间
参数配置(允许配置自定义的序列化器)
消息的发送方式
同步
异步
发后即忘:即不关心发送结果如何
序列化:略,说的具体怎么配
分区器
作用:对未指定 partition 的消息指派分区
默认策略
若 key 为 null,则轮询
若 key 不为 null,则对 key 进行哈希分区(采用 MurmurHash2 算法)
⚠️:书上说增加分区数量时,分区和Key的映射会变,可不可以采用 Hash 环的方式来优化呢?
也允许自定义分区器
拦截器:消息发送时做一些特殊处理,比如打点之类,留的扩展点。拦截器可以是链式的
原理分析
架构图
生产者客户端由两个线程协调运行
主线程:由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)
Sender线程(发送线程):负责从RecordAccumulator中获取消息并将其发送到Kafka中。
RecordAccumulator 消息累加器
作用: 缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能
原理:维护分区为单位的双端队列(Deque<ProducerBatch>),主线程追加消息到队尾,sender 线程从队首取消息;
将 单条消息(ProducerRecord) 合并成 一个批次的消息(ProducerBatch)
将 单条消息(ProducerRecord) 合并成 一个批次的消息(ProducerBatch)
空间紧凑
IO小,吞吐高
过程
Sender 线程处理消息
1. 从 RecordAccumulator 中获取缓存的消息
2. <分区,Deque<ProducerBatch>>的保存形式转变成<Node(Broker),List< ProducerBatch>
3. <Node,List<ProducerBatch>>的形式之后,Sender 还会进一步封装成<Node,Request>
4. 请求在从Sender线程发往Kafka之前还会保存到InFlightRequests
5. InFlightRequests保存对象的具体形式为 Map<NodeId,Deque<Request>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求
5.1. 关于 InFlightRequests ,可以限制 每个链接 最多缓存多少请求,一旦堆积过多,说明网络或者负载有问题
5.2. 关于 leastLoadedNode,即所有 Node 中负载最小的节点,怎么判断负载?通过 InFlightRequests 中堆积的请求数量
元数据
是什么:指Kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR、ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。
客户端怎么更新元数据:Sender 线程挑选出 leastLoadedNode,然后向这个 Node 发送 MetadataRequest 请求来获取具体的元数据信息。
重要参数
acks
含义:指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的
涉及消息的可靠性和吞吐量之间的权衡。
值类型
1(默认值):只要 Leader 副本写入成功,就响应 ack
0:不做等待,直接 ack
-1/all:ISR 内的所有副本都写入成功,才响应 ack
max.request.size:消息最大值(默认1MB)
retries和retry.backoff.ms:异常情况下重试次数和重试时间
compression.type:消息压缩方式,默认“ none ”,可配置为“gzip”“snappy”...
linger.ms
含义:指定生产者发送 ProducerBatch 之前等待更多消息(ProducerRecord)加入ProducerBatch 的时间
作用:通过增加一定的消息延迟,来换取吞吐量
类似:TCP 协议的 Nagle 算法
总结:略
三、浅析消费者
消费者与消费组
Consumer Group 消费组
含义:每个 消费者 所归属的消费组(逻辑上的概念)
作用:消息发布到主题后,只会被投递给消费组中的一个消费者
为什么要定义消费组?
1. 性能:默认策略下,可以通过增加 消费者实例的个数来提高整体的消费能力,具备了横向扩展的能力。
注:默认规则下,一个分区对应一个消费者,当消费者数 大于 分区数时,会出现部分消费者无消息可消费的情况
注:默认规则下,一个分区对应一个消费者,当消费者数 大于 分区数时,会出现部分消费者无消息可消费的情况
2.功能:能同时支持 发布订阅 和 点对点两种投递模式
Pub/Sub (发布/订阅):消息一对多广播发送;有消费组情况下,消费者隶属不同消费组即可
P2P(点对点):消息点对点发送,类似队列;消费组情况下,所有消费者属于同一消费组即可
客户端开发
正常消费步骤:配置参数-> 创建消费者实例 -> 订阅主题 -> 拉取消费并消费 -> 提交消费位移 -> 关闭消费者实例
参数配置:略
订阅主题与分区
一个消费者可以订阅一个或多个主题
subscribe(Collection):订阅一组 topic,比较常用
subscribe(Pattern):正则表达式 形式的订阅
assign(Collection<TopicPartition>):允许订阅指定topic下的指定分区
反序列化:略
消息消费
常见消息消费一般有两种模式
推模式:服务端主动推送给消费者
拉模式:消费者主动拉取
Kafka 采用的是拉模式,轮询执行 poll(Duration) 方法,获取所订阅的一组消息
关于 Duration : 在可用数据时会发生阻塞,此处可控制阻塞时间
ConsumerRecord - Consumer 取到的消息结构
topic
partition
offset
timestamp
timestampType
serializedKeySize
serializedValuesSize
headers
key
value
checkSum
位移提交
消费位移(offset):对于分区中的每条消息,都有唯一的offset,用于标记其在分区中的位置。消费者会记录当前消费消息的offset,用于记录消费进度
offset 的持久化(提交)
为什么要做持久化
poll 方法返回的是未消费的消息集,所以要对消息位移做持久化保存,存在内存易丢失
分区/消费者再均衡场景,当新起消费者时,新的消费者需知道之前消费组的消费进度到哪了
持久化数据存储方式
老版本:存储于 ZK 中
新版本:存储于 Kafka 内部主题 _consumer_offsets
一些容易弄错的位点名词
position:下一次拉取消息的位置
lastConsumedOffset:当前消费到的位置
committed offset:已提交过的消费位移(一般和position一样)
位移提交的时机
消息提交时间点的不慎重可能造成 重复消费 和 消息丢失
消息丢失:Ex. 若一拉到消息就【提交】,当消息未消费完时,发生了故障。因为 offset 已经提交了,所以就再也消费不到未消费完成的消息了
重复消费:所有拉取消息(一批)都消费完成才提交,若发生故障,因为 offset 未提交,所以又会从头开始消费
默认的消费位移提交机制:自动提交
机制:消费者每隔X秒,允许将拉取到的每个分区最大消息位移进行提交
触发:自动提交在 poll 方法内完成,每次发起拉取请求前都会检查是否可以进行提交
问题
重复消费:Ex. 在两次提交的间隔期间消费者故障了,就不得不从上一次提交的 offset 继续消费
消息丢失:Ex. 消费线程 和 拉取线程独立,当消费线程故障时重新拉取消息会以最新 offset 为准。
这个 case 有点牵强,都自己独立做消费线程了却不保障好本地消息处理状态的记录,还怪自动提交
这个 case 有点牵强,都自己独立做消费线程了却不保障好本地消息处理状态的记录,还怪自动提交
允许配置手动提交
同步提交
粒度一:以 poll 方法拉取的最新位移提交
粒度二:提交指定位移
异步提交
控制或关闭消费:略,说的怎么起停 Consumer
指定位移开始消费
消费者在找不到消费位移时(位移越界也会),会根据配置从分区末尾/起始 开始消费(也可以抛异常)
seek(partition,offset) 允许从指定位移开始消费,也就相当于重置消费进度到某点
若需要从某一时间点开始消费,那么可以通过 offsetsForTimes 找到该时刻分区下的 offset
因为有了 seek 方法,所以可以通过外部介质来存储 offset 的方式来进行消费(有点累就是了)
拦截器(ConsumerInterceptor)
onConsume:poll方法返回之前
onCommit:提交完 offset 之后
close
多线程实现
Consumer 是非线程安全的,通过 CAS 来保证多线程操作下的 Fail-Fast (快速失败)
acquire() :获取当前线程号,与执行线程号比较是否一致。不一致则抛异常
release():将执行线程标记为空
在非线程安全情况下,怎么合理利用 CPU 资源实现多线程消费,提高消费能力
一、线程封闭(1)
实现1:每个(消费)线程实例化一个 KafkaConsumer 对象
说明:一个消费线程可以消费一个或多个分区的消息;当消费线程的个数大于分区数时,会有部分消费线程空闲
个人理解:本质上和开多个进程进行单线程消费并没有什么区别
二、线程封闭(2)
实现2: 多个消费线程同时消费一个分区,通过 assign、seek 指定消费的方式曲线救国,突破实例和分区数的限制
说明:逻辑复杂了。。
三、拉取消息和处理消息分离
背景:一般 poll 操作是很快的,瓶颈多来源于处理消息。所以抓重点解决处理消息的效率
实现:基于背景,考虑 poll 消息单线程,处理消费多线程,抓重点问题解决(Amdahl 定律)。还可以结合线程封闭的思路,开多个实例来提升整体消费;
缺点:消息无法顺序处理
消息提交
简述:需要有一个共享的 offset 来保证消息提交,线程封闭模式下用自动提交是OK的
但是简单的用共享 offset 会有消息丢失的情况发生:比如消费线程1消费了0~99,线程二消费了99~100并提交,此时线程1挂掉,就丢了0~99的消息
可采用滑动窗口的形式进行批量消费:批量消费,消费完成 起始窗口才迁移。
滑动窗口异常处理:若一个窗口内消息一段时间内无法消费完成,可尝试重试,重试失败则进重试队列,重试队列失败进死信队列
重要的消费者参数
fetch.min.bytes:每次 poll 最小的拉取数据量
会等待拉取数据量到达该阈值
能提高一定吞吐,但是会有一定延迟
fetch.max.wait.ms:poll 最长等待时间
max.poll.records:一次 poll 最大消息数
isolation.level:消费者事物隔离级别
read_uncommitted
read_committed
四、主题和分区
主题的管理
简述:主要说了管理台支持哪些修改操作
创建主题
一般可以通过 kafka-topics.sh / KafkaAdminClient 来实现主题的管理
分区副本的分配策略
即在哪个 broker 创建哪些分区的副本
可以自己指定,也可以按照默认规则分配。
具体默认策略:略
修改主题
允许增加主题的分区数量(只支持增加,不支持减少)
增加分区数的副作用:基于 key 计算的消息投递策略会有影响
为什么不支持减少分区
Q1:删除分区的消息如何处理
Q2:顺序、事务、状态切换 etc...
建议如果真的要,那么建立新 topic 然后复制
查看主题:略
配置管理:略
topic 参数:略
删除主题:略
初识 KafkaAdminClient
主要在说 KafkaAdminClient API 具体怎么用,略
分区的管理
优先副本的选举
背景:分区会有多个副本节点,leader 副本负责读写,当 leader 副本故障时需从 follower 节点中选举出新的 leader 副本。
那么需要如何进行选举,才能使选举后 leader 节点分布均匀,避免出现某一 broker的副本都是 leader 副本出现高负载?
那么需要如何进行选举,才能使选举后 leader 节点分布均匀,避免出现某一 broker的副本都是 leader 副本出现高负载?
怎么做:Kafka 对分区内的副本做了区分,将 AR 集合中的第一个副本定义为优先副本,Kafka 会保证优先副本的分布均匀
理想情况:优先副本就是 leader 副本,只要优先副本分布均匀就能保证 leader 分布均匀
注意:优先分布均匀并不能保障负载均衡,比如存在某些 leader 副本负载高,某些副本负载低的情况
分区自动平衡功能
作用:定时自动执行优先副本选举操作以求分区平衡
怎么做:控制器启动定时任务(默认5分钟),轮询所有 broker 节点,计算分区不平衡率(非优先副本的 leader 数 / 分区总数),比较是否超过阈值(默认 10%),若超过则执行优先副本选举
注:不建议使用,万一业务高峰搞了一波自动选举,建议还是埋点手动执行脚本
kafka-perferred-replica-election.sh
分区重分配
背景:下线 broker 节点,失效的分区副本不会自动迁移;新启的 broker 节点也不会有旧 topic 的分区自动迁移,负载不均匀
怎么做:在集群扩容、broker 下线失效场景下进行分区重分配,进行分区的迁移
依赖脚本:kafka-reassign-partitions.sh
原理:通过控制器为每个分区添加新副本,然后复制 leader 副本数据到新副本中,删除旧副本
复制限流:分区重分配本质上是数据的复制,所以会占用一定资源,通过改配置有办法能限流
kafka-config.sh
kafka-reassign-partitions.sh
同样的也可以修改副本数,也是通过 reassign 脚本实现的
分区数的选择
背景:性能和分区数有必然关系,所以怎么设置分区数才能达到最佳性能
怎么做:通过性能测试工具
生产者: kafka-producer-perf-test.sh
指标
records sent:发送消息总数
records/sec :每秒发送数量(吞吐量)
avg latency:处理平均耗时
消费者:kafka-consumer-perf-test.sh
是否分区数越多吞吐量越高(控制其他变量)
趋势:上升到某一阈值后开始下降慢慢趋于稳定
分区数的物理上限受机器文件描述符限制
结论:因地制宜(没说一样)
五、日志存储机制
文件目录布局
简述:topic、partition 都是逻辑上的存在,映射到具体物理上的存在则是具体一个分区副本(Log),即一个 Log 文件夹;
防止日志文件过大,引入 LogSegment 概念,对 Log 进行切分
防止日志文件过大,引入 LogSegment 概念,对 Log 进行切分
物理映射
topic-partition 对应一个 Log 文件夹
Log Segment 对应磁盘上的一个日志文件和两个索引文件,以及其他文件
🌰举个例子
逻辑上
topic0
partition0
Replica0
Replica1
partition1
partition2
物理上
Log 文件夹:topic-partition0-Replica0
日志文件:0-0-0.log
offset 索引文件:0-0-0.index
时间戳索引文件:0-0-0.timeindex
其他文件
具体细节
消息写入因为是顺序的,所以一个Log文件中,只有最后一个 LogSegment 才能执行写入
高效检索的需求所以引入了两个索引文件
offset 索引文件
时间戳索引文件
每个 LogSegment 都有一个基准偏移量 baseOffset,文件都是以基准偏移量命名(20位)的,如第一个 00000000000000000000.log
日志格式演变
介绍了Kafka 0.8 之后的三个消息格式版本,个人只针对最新版本做说明,想了解其他版本可看原文
0.11.0 后的消息格式图解
结构描述
Record
length:消息总长度
timestamp delta:时间戳增量,保存了与 RecordBatch 的起始时间戳的差值,节省字节数
思考:这个技巧比较常见,例如在生成核销码的时候引入了时间维度作为变量,但是过去的时间其实是毫无意义的,所以会用某一时间点为起始点开始使用
offset delta:offset 增量,保存与 RecordBatch 的起始位移差值,节省字节数
key length
key
value length
value
headers count
headers:业务扩展点,一般可以把应用层的通用属性塞进去,比如 from 哪个业务
RecordBatch
first offset
当前这个批次的起始位移
length
从 partition leader epoch 到结束的长度
partition leader epoch
分区 leader 纪元,可视作 partition leader 的版本号
magic
消息格式的版本号,目前应该是 2
crc32
校验位,防止被篡改
attributes
消息属性
低3位表示压缩格式
第4位表示时间戳类型
第5位表示是否处于事务中
第6位表示是否是控制消息
last offset delta
first offset 减去 最后一个 record 的 offset 得到的差值
first timestamp
第一条 record 的时间戳
max timestamp
最大时间戳,一般指最后一条消息
producer id
一般用于支持幂等和事务
producer epoch
一般用于支持幂等和事务
first sequence
一般用于支持幂等和事务
records count
record 的个数
records
做的优化
消息压缩:将多条消息一起压缩,也就是上边说的 Batch的形式。
端到端的压缩:生产者发送压缩数据,Broker 持久化压缩数据,Consumer 只有消费的时候才会解压
端到端的压缩:生产者发送压缩数据,Broker 持久化压缩数据,Consumer 只有消费的时候才会解压
引入变长字段减少空间
日志索引
总述
每个日志 segment 都有两个索引文件
offset index
用 offset 索引具体物理地址
timestamp index
用 timestamp 索引具体 offset
索引文件的构造和查询
以稀疏索引方式构造,并不保证每个消息都有索引项。(密度可以配置)
以二分查找的形式进行搜索,因为我们保证了消息在分区是顺序的
索引文件超出限度也会做相应的切分
个人理解:查找时会先读索引文件到内存中,在内存中进行二分查找。
一方面因为是稀疏索引内存开销不会太大;
另一方面因为内存二分所以速度会相对比较快。
一方面因为是稀疏索引内存开销不会太大;
另一方面因为内存二分所以速度会相对比较快。
offset index format
relativeOffset:相对于索引文件起始的偏移量(也就是xxxx.index 索引文件名)
postion:实际物理地址
Kafka 里还是用了比较多的相对值替代绝对值以达到节省空间目的
timestamp index format
timestamp
时间戳
relativeOffset
相对位移
日志清理
总述
磁盘空间有限,所以对历史存储的日志需要做管理(清理)
清理策略
日志删除(保留)
按照一定保留策略来淘汰不符合条件的日志分段
日志压缩
针对消息的 key 进行整合,同 key 不同 value 消息取其最后一个版本
具体清理策略的粒度可以控制到具体某个 topic 维度
日志删除
Kafka 日志管理器按照保留策略周期执行日志删除任务(默认 5min)
策略一: 基于时间,即保留超时删除,默认为七天
策略二:基于日志大小,计算存储阈值,若超过则从第一个 Segment 开始删
策略三:基于日志起始偏移量,按照消息条数的阈值进行按序删除
如何删除
1、从维护 Log Segment 的跳跃表中移除待删除的 Segment,以保证不会再有读取操作
2、将对应 Log Segment 打上 .deleted 标签
3、延迟任务 delete-file 执行删除操作
日志压缩
业务上如果只关心 同 Key 消息的最新 value,那么可以做日志压缩,将同 Key 消息进行合并。
合并后会生产新的 Log Segment,合并后消息的偏移量保持不变。
合并后会生产新的 Log Segment,合并后消息的偏移量保持不变。
本质上有点类似:redoLog 和 RDB ?
具体压缩步骤比较难理解,略
磁盘存储
Kafka 是基于磁盘文件做存储,众所周知磁盘效率读写效率低,但为什么还用
写入速度:顺序写内存 > 顺序写盘 > 随机写内存 > 随机写磁盘
所以综合来看顺序写盘也不会比随机写内存慢
Kafka 采用了 File Append 的形式来追加消息,且已写入的不允许修改,比较匹配顺序写盘模式
页缓存
介绍页缓存:操作系统会对磁盘 I/O 做优化,即采用来页缓存的机制
读取时,先读缓存,缓存未命中则读取磁盘并写到缓存
写入时,先读取页缓存,缺页则从磁盘读取,读取成功后才写数据到页缓存中。
此时我们称为该页为脏页,操作系统才合适时间将脏页数据落盘。(脏页的数据丢失问题忘了怎么解决了)
此时我们称为该页为脏页,操作系统才合适时间将脏页数据落盘。(脏页的数据丢失问题忘了怎么解决了)
这个机制在 MySQL 中用的也比较多,可以参考 InnoDB 的 checkPoint 机制
为什么 Kafka 用页缓存
进程内部往往会缓存处理的数据,但这份数据可能已在操作系统的页缓存中,即一份数据多份缓存
Java 内存开销大,垃圾回收会随堆内数据增多变得缓慢(比如触发 Full GC)
Java 对象 相比 紧凑的文件字节码而言,占用的空间更大
可以依赖操作系统来维护一致性,而不用交给应用程序维护,安全且有效
结论:所以干脆依赖 操作系统的页缓存 而不是维护进程内的缓存
磁盘 I/O 流程
操作系统知识建议掌握,略
零拷贝
实质:直接将数据从磁盘文件复制到网卡,不经过应用程序
目的:提高性能,减少内核态/用户态之前的上下文切换
实现:操作系统提供 sendFile() 方法,上层语言一般会做封装调用
一般复制输出过程
1. (本质还是内核态上操作)Disk->内核态:File A 从磁盘复制到 Read Buffer
2. 内核态->用户态:CPU 将数据从 Read Buffer 复制到应用程序下
3. 用户态->内核态:CPU 将数据从应用程序复制到 Socket Buffer
4. (本质还是内核态上操作)内核态-> Network:从 Socket Buffer 复制到网卡设备中
总结:如果对数据没有额外处理的诉求,其实直接走1-4就完成来整个流程,没有必要再绕一次2-3
零拷贝复制输出过程
Disk -> Read Buffer -> Network
没有经过应用程序做中转复制,大大提高效率
方式:DMA 技术
六、深入服务端
协议设计
协议层面没必要深究了,略
时间轮
场景:Kafka 有大量延时操作,如延时生产、延时拉取、延时删除操作,为提高延时操作的效率所以引入时间轮算法。
注:时间轮算法在其他中间件也有广泛应用。
注:时间轮算法在其他中间件也有广泛应用。
时间轮算法 vs JDK Timer/Delay Queue
JDK:插入、删除时间复杂度为 O(nlogn)
时间轮:均为 O(1)
时间轮结构
数据结构:存储定时任务的环形队列,底层为数组,数组的每个元素可以存放定时任务的双向链表,链表中的每一项都是一个定时任务
TimingWheel(时间轮-环形队列)
TimerTaskList(定时任务双向链表-环形队列元素)
TimerTaskEntry(定时任务项-任务链表元素)
TimerTask(具体的定时任务)
时间轮原理(和表盘一样)
时间轮结构(抽象)
时间轮类似表盘,由多个单元时间格组成,所以一个时间轮跨度即为单元时间*格子数
有一个表盘指针,表示时间轮当前所处时间,表针所指为恰好到期部分,需要执行对应的定时任务
如何操作
插入定时任务:当前时间轮+定时时长/单元时间 = 时间格坐标,找到坐标后插入任务即可
执行定时任务:指针轮转到指定单元格,触发单元格内为执行任务
一个时间轮可以处理的定时任务跨度:currentTime < time < currentTime + 一个时间轮跨度
当遇到了超过时间轮能处理的定时任务怎么办
(比如100W毫秒,总不能扩单元格数吧)
(比如100W毫秒,总不能扩单元格数吧)
引入了分级时间轮的概念,类似于表盘上的时/分/秒
举个🌰
一层时间轮:时间单元 1ms * 单元数 20=时长 20ms
二层时间轮:时间单元 20ms * 单元数 20 = 时长 400 ms
三层:... 也就是 400 * 20 = 8000 ms
时间轮升级:当前时间轮不能满足任务所需的时间时,就会进行时间轮升级,插入高层所对应的任务列表中
时间轮降级:由于高层时间轮粒度过大,会有到了单元时间但未到定时任务时间的情况,这个时候就需要操作时间轮降级,将任务重新提交到低层时间轮中。
如三层时间轮400ms 单元格内有400ms、450ms两个任务,时间轮到400ms时,并不能触发450ms的定时任务,所以需要下降到二层、一层时间轮中
如三层时间轮400ms 单元格内有400ms、450ms两个任务,时间轮到400ms时,并不能触发450ms的定时任务,所以需要下降到二层、一层时间轮中
时间轮指针是如何推进的?
还是借助了 JDK 的 DelayQueue
操作:
将每个用到的 TimerTaskList 插入到 DelayQueue
根据 TimerTaskList 的定时时间进行排序,短任务在前
Kafka 专门有 ExpiredOperationReaper 线程来获取队列中到期的任务列表
当获取到到期任务列表时,既可以推进时间轮,又可以执行对定时任务列表中的任务执行相关操作
前面说了 DelayQueue 插入性能差,为什么还用呢?
专人做专事
因为时间轮已经将具体某个定时任务编排到了定时任务列表了,利用其时间复杂度O(1)
DelayQueue 只要负责插入具体的任务列表(已经编排好了)即可,不用维护具体某个定时任务的插入/删除
时间轮用来执行插入/删除具体的定时任务
DelayQueue 负责时间推送
如果采用时间轮自己的每秒推进,那么会存在比较多的空推进,比如一个1ms 任务和 1000 ms 任务,空耗资源
反而 DelayQueue 获取队列头只要 O(1)
延时操作
Kafka 有多种延时操作,如延时生产、延时拉取、延时删除
延时生产:如消息 ack 需要等所有 ISR 副本写入完成才能ack,同时设置了超时时间30s。
延时拉取:当生产者无新消息写入的时候,消费者可以避免无意义的空拉取
延时操作:延时返回响应结果,有延时时间,但并非定时响应,也可提前响应
怎么实现
延时操作创建之后会被加入延时操作管理器(DelayedOperationPurgatory
管理器的定时器底层依赖时间轮
延时操作的延时由定时器来做超时管理
同时管理器配有监听池,来监听分区的外部事件,以触发延时操作
控制器
控制器的选举及异常恢复
选举、控制器节点数据变更依赖 zookeeper 来实现
当一个 broker 成为 controller 之后,会更新 controller_epoch (相当于版本号)和 brokerId 等
控制器 Broker 除正常消息读写职责外,需要担负作为控制器的责任
监听分区相关变化
分区重分配
ISR 集合变更
处理优先副本选举
监听主题变化
处理主题删减变化
处理删除主题
监听 broker 变化
broker 的增减
维护 topic、partition、broker 信息
更新元数据
管理分区状态机、副本状态机
...
如何处理这么多事件:使用单线程基于事件队列的模型
优雅关闭
进程终止会触发 kafka-shutdown-hock
正常关闭资源
ControlledShutdown 控制关闭
ControlledShutdown 流程 :略
分区 leader 的选取
场景:创建分区、分区上线时、leader 下线需要执行 分区 leader 的重新选举
策略:顺序寻找 AR 集合中第一个存活的副本,且副本在 ISR 内
为了 leader 节点均匀分布的需要,AR 集合内部顺序一般是不会变的(除了分区重分配)
发送优先副本选举时,直接将优先副本设置为 leader 即可
参数解密
说的是各式各样的 broker 参数,实际用的时候再仔细看吧,略
七、深入客户端
分区分配策略
简述:主要介绍 Consumer 与 Topic-Partition 订阅策略,如 ConsumerA 是订阅 Partition A or Partition B
RangeAssignor分配策略(默认)
策略:平均分配,将 group 内的所有 Consumer 按字典序排序,每个 Consumer 平均分配固定的分区范围
Ex:3个 Partition(0-1-2),2个Consumer(A、B)
那么 Consumer A - 分到 Partition 0-1
Consumer B - Partition 2
会出现分配不均匀的情况,因为是以 topic 为单位的,以上面的场景为例,类似的topic*3之后;
consumerA - 6个分区;consumer B - 3个分区
consumerA - 6个分区;consumer B - 3个分区
RoundRobinAssignor分配策略
策略:consumer 和 consumer 订阅所有主题的分区 排序,然后轮询将分区分配给每个 consumer。
即,排成两列,轮询连线;
即,排成两列,轮询连线;
问题点:Kafka 官方给了一个同 Group 下不同 Consumer 可以订阅不同 topic 的例子,还是会导致分配不匀;
补充一下:我目前是没看到是怎么能让 同一 group 下的 consumer 实例订阅不同 topic 的
补充一下:我目前是没看到是怎么能让 同一 group 下的 consumer 实例订阅不同 topic 的
StickyAssignor分配策略
目的:分配均匀,当发生重分配时尽量与上次保持相同
自定义分区分配策略
给了扩展点,有自定义诉求的可以看下,略
消费者协调器和组协调器
旧版消费者客户端的问题
依赖 zookeeper 来实现分区分配的,每个消费者监听 zookeeper 的变更
存在两个问题
羊群效应(Herd Effect)
一个节点的变化,触发了大量监听通知,导致通知期其他操作被延迟
脑裂问题(Split Brain)
消费者再均衡时,每个消费者都单独和 ZK 通信以判断变化,可能会导致同一时刻各消费者获取的状态不一致
新版本再均衡(rebalance)的原理
引入 GroupCoordinator 和 ConsumerCoordinator,分别管理消费组和消费者
什么时候触发再均衡
启动新消费者
消费者下线
消费者主动退出消费组
订阅的topci 分区发生变化
具体步骤
一、FIND_COORDINATOR:Consumer 和所属的消费组协调器( GroupCoordinator )建连
历史已经保存过 GroupCoordinator(一般是一个 leader 节点) 信息,建连即可
本地无信息则需要向集群中负载最小的节点发出 寻找 GroupCoordinator 的请求
有点像找爸爸,知道在哪直接去就行,不知道在哪就找个空的人帮忙定个位,拿到定位去就行了
二、JOIN_GROUP:请求加入消费组
1. Consumer 发送 Join 请求到 GroupCoordinator
2. 获取消费组的 leader
若没有则会进行 Consumer Leader 的选举,选举是随机选举的,没什么大的讲究
3. 获取分区分配策略
因为每个 Consumer 都可以定义自己的分区分配策略,对于消费组而言则需要选举一个相对合理的策略来进行执行
选举本质:拿到 Consumer 支持做多的策略
过程
收集策略,组成候选人
每个 Consumer 从候选人里找出第一个支持的策略并投票
计算并得出票数最多策略,即消费组的策略
4. 通知 JoinGroupResponse(内含分区分配策略) 给组内每一个 Consumer
三、SYNC_GROUP :同步分配方案
1. Leader Consumer 给出具体分配方案给到 GroupCoordinator
2. 各个普通 Consumer 向 GroupCoordinator 询问获取具体分区分配方案
3. GroupCoordinator 提取 Leader Consumer 的方案,并存入 __consumer_offsets主题中,最后发送响应提供分配方案
4. 各 Consumer 收到 Response 后执行分区分配,并启动心跳任务(和 GroupCoordinator)
四、HEARTBEAT:保持心跳
1. 正式消费之前 Consumer 会先获取当前分区消息拉取的起始位置 (__consumer_offsets主题 有存储)
2. 维持心跳来保证消费组关系的维护,心跳一旦停了就会被认为是下线了,会触发再均衡
consumer_offsets 剖析
用作位移提交,保存了 Consumer - Topic - Partition 的消费位移提交情况
从位移提交 Request 和 Response 的角度来解析
OffsetCommitRequest
group_id
generation_id
member_id
retention_time:消费位移能保留的时长,默认7天
topics
topic
partitions
partition
offset
metadata
OffsetCommitResponse
throttle_time_ms
responses
topic
partition_responses
partition
error_code
发起请求后,消费位移同样会以发消息的形式发送到 topic(consumer_offsets)内
消息格式
Key
version
group
topic
partition
Value
version
metadata
commit_timestamp
expire_timestamp
事务
消息传输保障
MQ 消息传输一般有三个层级
at most once:最多一次,可能会丢,但不会重
at least once:最少一次,绝不会丢,但可能会重
exactly once:恰好一次,保证一次不丢不重
Kafka 的传输层级
Kafka Producer 如果写入成功,那么消息一定不会丢;如果写入失败,会多次重试;
所以 Producer -> Broker 是 at least once
所以 Producer -> Broker 是 at least once
Broker -> Consumer 依赖具体使用方式不同。
若先提交位移后处理消息,发生故障,从最后提交开始消费,则会出现消息丢失,即 at most once;
若先处理消息后提交位移,发生故障,从上一次提交开始消费,则会出现重复消费,即 at least once;
若先提交位移后处理消息,发生故障,从最后提交开始消费,则会出现消息丢失,即 at most once;
若先处理消息后提交位移,发生故障,从上一次提交开始消费,则会出现重复消费,即 at least once;
变动:0.11.0.0 版本起,引入了幂等和事务以支持 exactly once 语义
幂等
背景:Producer 写入消息在重试时可能会重复写入消息,Kafka 引入幂等配置避免这种情况
配置:enable.idempotence (默认关闭幂等)
TODO
事务
TODO
八、如何保障可靠性
副本剖析
失效副本
失效副本:同步延迟太久或者宕机等异常的副本,所对应的分区也叫同步失效分区 (under-replicated 分区)
怎么判断是否失效副本
功能失效:直接下线了、卡住了
同步失效:没跟上 leader 副本太久、滞后消息数太多
ISR 的伸缩
LEO 和 HW
Leader Epoch的介入
日志同步机制
可靠性分析
多副本机制保证
发送消息时不同的ack机制
极端情况:ISR 只剩 leader,可以通过 min.insync.replicas 来指定 ISR 最小副本数
操作系统的日志落盘
consumer 手动提交 offset
九、应用
介绍了 Kafka 的一些工具,略
十、监控
怎么做监控,略
十一、高级应用
过期时间
消息增加过期时间属性,对于超时消息通过消费者拦截器做多样化处理
延时队列
解决方案一:将延时消息按照延时时间,投递到延时分级的delay topic中
死信队列和重试队列:啥也没说。。。
消息路由
消息轨迹
跟踪消息的生产消费,搞一个 trace_topic 即可
消息审计:监控统计
消息中间件选型
有哪些消息队列
选型时需要注意的点
十二、Spark集成

收藏

收藏
0 条评论
下一页