Kafka设计与实践
2021-09-13 15:54:57 7 举报
AI智能生成
《深入理解Kafka:核心设计与实践原理》 学习笔记
作者其他创作
大纲/内容
深入理解Kafka:核心设计与实践原理
一、简介
历史
LinkedIn 捐给 Apache,一开始用的Scala写的,后面用 Java 重写了
基本概念
架构图
术语
Producer:生产者,即发消息投递到 Kafka 的一方
Consumer:消费者,即从 Kafka 拿到消息消费的一方
Broker:Kafka 的服务实例,可以理解为存储并处理消息的
ZooKeeper:Kafka 引入用于负责集群元数据的管理、控制器选举
Topic:主题,消息是挂在(归类)一个具体主题(业务)下的
Partition:分区,可以理解为多个分区组成了一个逻辑上的主题,概念上更偏向于数据存储方面,可以视作消息存储的逻辑抽象,本质上是一个个磁盘上的log文件。
多副本机制
目的:一个分区只有一份数据,利用一主多从的思想,保障稳定性
原理:一个分区会冗余多个副本,Leader 副本负责读写数据,Follower 副本只用于同步Leader 副本数据,当 Leader 副本故障时,会重新选举新的Leader,以保障稳定性
ISR 机制
目的:保障多副本机制下,副本复制时的性能
原理:多副本机制需要使用一个副本复制策略,同步和异步的复制方式均不佳,引入适当副本(ISR集合)同步后即ACK的复制机制
HW 高水位机制
目的:保障多副本机制下故障发生后的数据一致性
原理:高水位 规定消费者最大可消费的offset 保障消费进度一致,Leader Epoch 用于记录 Leader 切换时记录 LEO 信息 防止错误的日志切断发生造成消息丢失。
安装配置:略
生产和消费:展示了一堆Kafka 自带的脚本工具,略
服务端参数配置:略
二、浅析生产者
客户端开发
(ProducerRecord)消息对象结构
topic
partition
headers:留给使用者自定义的一个扩展信息,一般会塞一些应用信息
key:消息的key,作为附加字段存在。
作用1:可利用 key 来计算分区号,发往指定分区;
作用2: topic 做第一次归类,key 用作第二次归类,同 key 可以放到同一个分区
作用3: 有key 还能支持日志压缩
value:消息体
timestamp:可分为两类
CreateTime:消息创建的时间
LogAppendTime:消息追加到日志文件的时间
参数配置(允许配置自定义的序列化器)
消息的发送方式
同步
异步
发后即忘:即不关心发送结果如何
序列化:略,说的具体怎么配
分区器
作用:对未指定 partition 的消息指派分区
默认策略
若 key 为 null,则轮询
若 key 不为 null,则对 key 进行哈希分区(采用 MurmurHash2 算法)
⚠️:书上说增加分区数量时,分区和Key的映射会变,可不可以采用 Hash 环的方式来优化呢?
也允许自定义分区器
拦截器:消息发送时做一些特殊处理,比如打点之类,留的扩展点。拦截器可以是链式的
原理分析
生产者客户端由两个线程协调运行
主线程:由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)
Sender线程(发送线程):负责从RecordAccumulator中获取消息并将其发送到Kafka中。
RecordAccumulator 消息累加器
作用: 缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能
原理:维护分区为单位的双端队列(Deque<ProducerBatch>),主线程追加消息到队尾,sender 线程从队首取消息;将 单条消息(ProducerRecord) 合并成 一个批次的消息(ProducerBatch)
空间紧凑
IO小,吞吐高
过程
Sender 线程处理消息
1. 从 RecordAccumulator 中获取缓存的消息
2. <分区,Deque<ProducerBatch>>的保存形式转变成<Node(Broker),List< ProducerBatch>
3. <Node,List<ProducerBatch>>的形式之后,Sender 还会进一步封装成<Node,Request>
4. 请求在从Sender线程发往Kafka之前还会保存到InFlightRequests
5. InFlightRequests保存对象的具体形式为 Map<NodeId,Deque<Request>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求
5.1. 关于 InFlightRequests ,可以限制 每个链接 最多缓存多少请求,一旦堆积过多,说明网络或者负载有问题
5.2. 关于 leastLoadedNode,即所有 Node 中负载最小的节点,怎么判断负载?通过 InFlightRequests 中堆积的请求数量
元数据
是什么:指Kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR、ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。
客户端怎么更新元数据:Sender 线程挑选出 leastLoadedNode,然后向这个 Node 发送 MetadataRequest 请求来获取具体的元数据信息。
重要参数
acks
含义:指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的
涉及消息的可靠性和吞吐量之间的权衡。
值类型
1(默认值):只要 Leader 副本写入成功,就响应 ack
0:不做等待,直接 ack
-1/all:ISR 内的所有副本都写入成功,才响应 ack
max.request.size:消息最大值(默认1MB)
retries和retry.backoff.ms:异常情况下重试次数和重试时间
compression.type:消息压缩方式,默认“ none ”,可配置为“gzip”“snappy”...
linger.ms
含义:指定生产者发送 ProducerBatch 之前等待更多消息(ProducerRecord)加入ProducerBatch 的时间
作用:通过增加一定的消息延迟,来换取吞吐量
类似:TCP 协议的 Nagle 算法
总结:略
三、浅析消费者
消费者与消费组
Consumer Group 消费组
含义:每个 消费者 所归属的消费组(逻辑上的概念)
作用:消息发布到主题后,只会被投递给消费组中的一个消费者
为什么要定义消费组?
1. 性能:默认策略下,可以通过增加 消费者实例的个数来提高整体的消费能力,具备了横向扩展的能力。注:默认规则下,一个分区对应一个消费者,当消费者数 大于 分区数时,会出现部分消费者无消息可消费的情况
2.功能:能同时支持 发布订阅 和 点对点两种投递模式
Pub/Sub (发布/订阅):消息一对多广播发送;有消费组情况下,消费者隶属不同消费组即可
P2P(点对点):消息点对点发送,类似队列;消费组情况下,所有消费者属于同一消费组即可
正常消费步骤:配置参数-> 创建消费者实例 -> 订阅主题 -> 拉取消费并消费 -> 提交消费位移 -> 关闭消费者实例
参数配置:略
订阅主题与分区
一个消费者可以订阅一个或多个主题
subscribe(Collection):订阅一组 topic,比较常用
subscribe(Pattern):正则表达式 形式的订阅
assign(Collection<TopicPartition>):允许订阅指定topic下的指定分区
反序列化:略
消息消费
常见消息消费一般有两种模式
推模式:服务端主动推送给消费者
拉模式:消费者主动拉取
Kafka 采用的是拉模式,轮询执行 poll(Duration) 方法,获取所订阅的一组消息
关于 Duration : 在可用数据时会发生阻塞,此处可控制阻塞时间
ConsumerRecord - Consumer 取到的消息结构
offset
timestamp
timestampType
serializedKeySize
serializedValuesSize
headers
key
value
checkSum
位移提交
消费位移(offset):对于分区中的每条消息,都有唯一的offset,用于标记其在分区中的位置。消费者会记录当前消费消息的offset,用于记录消费进度
offset 的持久化(提交)
为什么要做持久化
poll 方法返回的是未消费的消息集,所以要对消息位移做持久化保存,存在内存易丢失
分区/消费者再均衡场景,当新起消费者时,新的消费者需知道之前消费组的消费进度到哪了
持久化数据存储方式
老版本:存储于 ZK 中
新版本:存储于 Kafka 内部主题 _consumer_offsets
一些容易弄错的位点名词
position:下一次拉取消息的位置
lastConsumedOffset:当前消费到的位置
committed offset:已提交过的消费位移(一般和position一样)
位移提交的时机
消息提交时间点的不慎重可能造成 重复消费 和 消息丢失
消息丢失:Ex. 若一拉到消息就【提交】,当消息未消费完时,发生了故障。因为 offset 已经提交了,所以就再也消费不到未消费完成的消息了
重复消费:所有拉取消息(一批)都消费完成才提交,若发生故障,因为 offset 未提交,所以又会从头开始消费
默认的消费位移提交机制:自动提交
机制:消费者每隔X秒,允许将拉取到的每个分区最大消息位移进行提交
触发:自动提交在 poll 方法内完成,每次发起拉取请求前都会检查是否可以进行提交
问题
重复消费:Ex. 在两次提交的间隔期间消费者故障了,就不得不从上一次提交的 offset 继续消费
消息丢失:Ex. 消费线程 和 拉取线程独立,当消费线程故障时重新拉取消息会以最新 offset 为准。这个 case 有点牵强,都自己独立做消费线程了却不保障好本地消息处理状态的记录,还怪自动提交
允许配置手动提交
同步提交
粒度一:以 poll 方法拉取的最新位移提交
粒度二:提交指定位移
异步提交
控制或关闭消费:略,说的怎么起停 Consumer
指定位移开始消费
消费者在找不到消费位移时(位移越界也会),会根据配置从分区末尾/起始 开始消费(也可以抛异常)
若需要从某一时间点开始消费,那么可以通过 offsetsForTimes 找到该时刻分区下的 offset
因为有了 seek 方法,所以可以通过外部介质来存储 offset 的方式来进行消费(有点累就是了)
拦截器(ConsumerInterceptor)
onConsume:poll方法返回之前
onCommit:提交完 offset 之后
close
多线程实现
Consumer 是非线程安全的,通过 CAS 来保证多线程操作下的 Fail-Fast (快速失败)
acquire() :获取当前线程号,与执行线程号比较是否一致。不一致则抛异常
release():将执行线程标记为空
在非线程安全情况下,怎么合理利用 CPU 资源实现多线程消费,提高消费能力
一、线程封闭(1)
实现1:每个(消费)线程实例化一个 KafkaConsumer 对象
说明:一个消费线程可以消费一个或多个分区的消息;当消费线程的个数大于分区数时,会有部分消费线程空闲
个人理解:本质上和开多个进程进行单线程消费并没有什么区别
二、线程封闭(2)
实现2: 多个消费线程同时消费一个分区,通过 assign、seek 指定消费的方式曲线救国,突破实例和分区数的限制
说明:逻辑复杂了。。
三、拉取消息和处理消息分离
背景:一般 poll 操作是很快的,瓶颈多来源于处理消息。所以抓重点解决处理消息的效率
实现:基于背景,考虑 poll 消息单线程,处理消费多线程,抓重点问题解决(Amdahl 定律)。还可以结合线程封闭的思路,开多个实例来提升整体消费;
缺点:消息无法顺序处理
消息提交
简述:需要有一个共享的 offset 来保证消息提交,线程封闭模式下用自动提交是OK的
但是简单的用共享 offset 会有消息丢失的情况发生:比如消费线程1消费了0~99,线程二消费了99~100并提交,此时线程1挂掉,就丢了0~99的消息
可采用滑动窗口的形式进行批量消费:批量消费,消费完成 起始窗口才迁移。
滑动窗口异常处理:若一个窗口内消息一段时间内无法消费完成,可尝试重试,重试失败则进重试队列,重试队列失败进死信队列
重要的消费者参数
fetch.min.bytes:每次 poll 最小的拉取数据量
会等待拉取数据量到达该阈值
能提高一定吞吐,但是会有一定延迟
fetch.max.wait.ms:poll 最长等待时间
max.poll.records:一次 poll 最大消息数
isolation.level:消费者事物隔离级别
read_uncommitted
read_committed
四、主题和分区
主题的管理
简述:主要说了管理台支持哪些修改操作
创建主题
一般可以通过 kafka-topics.sh / KafkaAdminClient 来实现主题的管理
分区副本的分配策略
即在哪个 broker 创建哪些分区的副本
可以自己指定,也可以按照默认规则分配。
具体默认策略:略
修改主题
允许增加主题的分区数量(只支持增加,不支持减少)
增加分区数的副作用:基于 key 计算的消息投递策略会有影响
为什么不支持减少分区
Q1:删除分区的消息如何处理
Q2:顺序、事务、状态切换 etc...
建议如果真的要,那么建立新 topic 然后复制
查看主题:略
配置管理:略
topic 参数:略
删除主题:略
初识 KafkaAdminClient
主要在说 KafkaAdminClient API 具体怎么用,略
分区的管理
优先副本的选举
背景:分区会有多个副本节点,leader 副本负责读写,当 leader 副本故障时需从 follower 节点中选举出新的 leader 副本。那么需要如何进行选举,才能使选举后 leader 节点分布均匀,避免出现某一 broker的副本都是 leader 副本出现高负载?
怎么做:Kafka 对分区内的副本做了区分,将 AR 集合中的第一个副本定义为优先副本,Kafka 会保证优先副本的分布均匀
理想情况:优先副本就是 leader 副本,只要优先副本分布均匀就能保证 leader 分布均匀
注意:优先分布均匀并不能保障负载均衡,比如存在某些 leader 副本负载高,某些副本负载低的情况
分区自动平衡功能
作用:定时自动执行优先副本选举操作以求分区平衡
怎么做:控制器启动定时任务(默认5分钟),轮询所有 broker 节点,计算分区不平衡率(非优先副本的 leader 数 / 分区总数),比较是否超过阈值(默认 10%),若超过则执行优先副本选举
注:不建议使用,万一业务高峰搞了一波自动选举,建议还是埋点手动执行脚本
kafka-perferred-replica-election.sh
分区重分配
背景:下线 broker 节点,失效的分区副本不会自动迁移;新启的 broker 节点也不会有旧 topic 的分区自动迁移,负载不均匀
怎么做:在集群扩容、broker 下线失效场景下进行分区重分配,进行分区的迁移
依赖脚本:kafka-reassign-partitions.sh
原理:通过控制器为每个分区添加新副本,然后复制 leader 副本数据到新副本中,删除旧副本
复制限流:分区重分配本质上是数据的复制,所以会占用一定资源,通过改配置有办法能限流
kafka-config.sh
kafka-reassign-partitions.sh
同样的也可以修改副本数,也是通过 reassign 脚本实现的
分区数的选择
背景:性能和分区数有必然关系,所以怎么设置分区数才能达到最佳性能
怎么做:通过性能测试工具
生产者: kafka-producer-perf-test.sh
指标
records sent:发送消息总数
records/sec :每秒发送数量(吞吐量)
avg latency:处理平均耗时
消费者:kafka-consumer-perf-test.sh
是否分区数越多吞吐量越高(控制其他变量)
趋势:上升到某一阈值后开始下降慢慢趋于稳定
分区数的物理上限受机器文件描述符限制
结论:因地制宜(没说一样)
五、日志存储机制
文件目录布局
简述:topic、partition 都是逻辑上的存在,映射到具体物理上的存在则是具体一个分区副本(Log),即一个 Log 文件夹;防止日志文件过大,引入 LogSegment 概念,对 Log 进行切分
物理映射
topic-partition 对应一个 Log 文件夹
Log Segment 对应磁盘上的一个日志文件和两个索引文件,以及其他文件
🌰举个例子
逻辑上
topic0
partition0
Replica0
Replica1
partition1
partition2
物理上
Log 文件夹:topic-partition0-Replica0
日志文件:0-0-0.log
offset 索引文件:0-0-0.index
时间戳索引文件:0-0-0.timeindex
其他文件
具体细节
消息写入因为是顺序的,所以一个Log文件中,只有最后一个 LogSegment 才能执行写入
高效检索的需求所以引入了两个索引文件
offset 索引文件
时间戳索引文件
每个 LogSegment 都有一个基准偏移量 baseOffset,文件都是以基准偏移量命名(20位)的,如第一个 00000000000000000000.log
日志格式演变
介绍了Kafka 0.8 之后的三个消息格式版本,个人只针对最新版本做说明,想了解其他版本可看原文
0.11.0 后的消息格式图解
结构描述
Record
length:消息总长度
timestamp delta:时间戳增量,保存了与 RecordBatch 的起始时间戳的差值,节省字节数
思考:这个技巧比较常见,例如在生成核销码的时候引入了时间维度作为变量,但是过去的时间其实是毫无意义的,所以会用某一时间点为起始点开始使用
offset delta:offset 增量,保存与 RecordBatch 的起始位移差值,节省字节数
key length
value length
headers count
headers:业务扩展点,一般可以把应用层的通用属性塞进去,比如 from 哪个业务
RecordBatch
first offset
当前这个批次的起始位移
length
从 partition leader epoch 到结束的长度
partition leader epoch
分区 leader 纪元,可视作 partition leader 的版本号
magic
消息格式的版本号,目前应该是 2
crc32
校验位,防止被篡改
attributes
消息属性
低3位表示压缩格式
第4位表示时间戳类型
第5位表示是否处于事务中
第6位表示是否是控制消息
last offset delta
first offset 减去 最后一个 record 的 offset 得到的差值
first timestamp
第一条 record 的时间戳
max timestamp
最大时间戳,一般指最后一条消息
producer id
一般用于支持幂等和事务
producer epoch
first sequence
records count
record 的个数
records
做的优化
消息压缩:将多条消息一起压缩,也就是上边说的 Batch的形式。端到端的压缩:生产者发送压缩数据,Broker 持久化压缩数据,Consumer 只有消费的时候才会解压
引入变长字段减少空间
日志索引
总述
每个日志 segment 都有两个索引文件
offset index
用 offset 索引具体物理地址
timestamp index
用 timestamp 索引具体 offset
索引文件的构造和查询
以稀疏索引方式构造,并不保证每个消息都有索引项。(密度可以配置)
以二分查找的形式进行搜索,因为我们保证了消息在分区是顺序的
索引文件超出限度也会做相应的切分
个人理解:查找时会先读索引文件到内存中,在内存中进行二分查找。一方面因为是稀疏索引内存开销不会太大;另一方面因为内存二分所以速度会相对比较快。
offset index format
relativeOffset:相对于索引文件起始的偏移量(也就是xxxx.index 索引文件名)
postion:实际物理地址
Kafka 里还是用了比较多的相对值替代绝对值以达到节省空间目的
timestamp index format
时间戳
relativeOffset
相对位移
日志清理
磁盘空间有限,所以对历史存储的日志需要做管理(清理)
清理策略
日志删除(保留)
按照一定保留策略来淘汰不符合条件的日志分段
日志压缩
针对消息的 key 进行整合,同 key 不同 value 消息取其最后一个版本
具体清理策略的粒度可以控制到具体某个 topic 维度
日志删除
Kafka 日志管理器按照保留策略周期执行日志删除任务(默认 5min)
策略一: 基于时间,即保留超时删除,默认为七天
策略二:基于日志大小,计算存储阈值,若超过则从第一个 Segment 开始删
策略三:基于日志起始偏移量,按照消息条数的阈值进行按序删除
如何删除
1、从维护 Log Segment 的跳跃表中移除待删除的 Segment,以保证不会再有读取操作
2、将对应 Log Segment 打上 .deleted 标签
3、延迟任务 delete-file 执行删除操作
业务上如果只关心 同 Key 消息的最新 value,那么可以做日志压缩,将同 Key 消息进行合并。合并后会生产新的 Log Segment,合并后消息的偏移量保持不变。
本质上有点类似:redoLog 和 RDB ?
具体压缩步骤比较难理解,略
磁盘存储
Kafka 是基于磁盘文件做存储,众所周知磁盘效率读写效率低,但为什么还用
写入速度:顺序写内存 > 顺序写盘 > 随机写内存 > 随机写磁盘
所以综合来看顺序写盘也不会比随机写内存慢
Kafka 采用了 File Append 的形式来追加消息,且已写入的不允许修改,比较匹配顺序写盘模式
页缓存
介绍页缓存:操作系统会对磁盘 I/O 做优化,即采用来页缓存的机制
读取时,先读缓存,缓存未命中则读取磁盘并写到缓存
写入时,先读取页缓存,缺页则从磁盘读取,读取成功后才写数据到页缓存中。此时我们称为该页为脏页,操作系统才合适时间将脏页数据落盘。(脏页的数据丢失问题忘了怎么解决了)
这个机制在 MySQL 中用的也比较多,可以参考 InnoDB 的 checkPoint 机制
为什么 Kafka 用页缓存
进程内部往往会缓存处理的数据,但这份数据可能已在操作系统的页缓存中,即一份数据多份缓存
Java 内存开销大,垃圾回收会随堆内数据增多变得缓慢(比如触发 Full GC)
Java 对象 相比 紧凑的文件字节码而言,占用的空间更大
可以依赖操作系统来维护一致性,而不用交给应用程序维护,安全且有效
结论:所以干脆依赖 操作系统的页缓存 而不是维护进程内的缓存
磁盘 I/O 流程
操作系统知识建议掌握,略
零拷贝
实质:直接将数据从磁盘文件复制到网卡,不经过应用程序
目的:提高性能,减少内核态/用户态之前的上下文切换
实现:操作系统提供 sendFile() 方法,上层语言一般会做封装调用
一般复制输出过程
1. (本质还是内核态上操作)Disk->内核态:File A 从磁盘复制到 Read Buffer
2. 内核态->用户态:CPU 将数据从 Read Buffer 复制到应用程序下
3. 用户态->内核态:CPU 将数据从应用程序复制到 Socket Buffer
4. (本质还是内核态上操作)内核态-> Network:从 Socket Buffer 复制到网卡设备中
总结:如果对数据没有额外处理的诉求,其实直接走1-4就完成来整个流程,没有必要再绕一次2-3
零拷贝复制输出过程
Disk -> Read Buffer -> Network
没有经过应用程序做中转复制,大大提高效率
方式:DMA 技术
六、深入服务端
协议设计
协议层面没必要深究了,略
时间轮
场景:Kafka 有大量延时操作,如延时生产、延时拉取、延时删除操作,为提高延时操作的效率所以引入时间轮算法。注:时间轮算法在其他中间件也有广泛应用。
时间轮算法 vs JDK Timer/Delay Queue
JDK:插入、删除时间复杂度为 O(nlogn)
时间轮:均为 O(1)
时间轮结构
数据结构:存储定时任务的环形队列,底层为数组,数组的每个元素可以存放定时任务的双向链表,链表中的每一项都是一个定时任务
TimingWheel(时间轮-环形队列)
TimerTaskList(定时任务双向链表-环形队列元素)
TimerTaskEntry(定时任务项-任务链表元素)
TimerTask(具体的定时任务)
时间轮原理(和表盘一样)
时间轮结构(抽象)
时间轮类似表盘,由多个单元时间格组成,所以一个时间轮跨度即为单元时间*格子数
有一个表盘指针,表示时间轮当前所处时间,表针所指为恰好到期部分,需要执行对应的定时任务
如何操作
插入定时任务:当前时间轮+定时时长/单元时间 = 时间格坐标,找到坐标后插入任务即可
执行定时任务:指针轮转到指定单元格,触发单元格内为执行任务
一个时间轮可以处理的定时任务跨度:currentTime < time < currentTime + 一个时间轮跨度
当遇到了超过时间轮能处理的定时任务怎么办(比如100W毫秒,总不能扩单元格数吧)
引入了分级时间轮的概念,类似于表盘上的时/分/秒
举个🌰
一层时间轮:时间单元 1ms * 单元数 20=时长 20ms
二层时间轮:时间单元 20ms * 单元数 20 = 时长 400 ms
三层:... 也就是 400 * 20 = 8000 ms
时间轮升级:当前时间轮不能满足任务所需的时间时,就会进行时间轮升级,插入高层所对应的任务列表中
时间轮降级:由于高层时间轮粒度过大,会有到了单元时间但未到定时任务时间的情况,这个时候就需要操作时间轮降级,将任务重新提交到低层时间轮中。如三层时间轮400ms 单元格内有400ms、450ms两个任务,时间轮到400ms时,并不能触发450ms的定时任务,所以需要下降到二层、一层时间轮中
时间轮指针是如何推进的?
还是借助了 JDK 的 DelayQueue
操作:
将每个用到的 TimerTaskList 插入到 DelayQueue
根据 TimerTaskList 的定时时间进行排序,短任务在前
Kafka 专门有 ExpiredOperationReaper 线程来获取队列中到期的任务列表
当获取到到期任务列表时,既可以推进时间轮,又可以执行对定时任务列表中的任务执行相关操作
前面说了 DelayQueue 插入性能差,为什么还用呢?
专人做专事
因为时间轮已经将具体某个定时任务编排到了定时任务列表了,利用其时间复杂度O(1)
DelayQueue 只要负责插入具体的任务列表(已经编排好了)即可,不用维护具体某个定时任务的插入/删除
时间轮用来执行插入/删除具体的定时任务
DelayQueue 负责时间推送
如果采用时间轮自己的每秒推进,那么会存在比较多的空推进,比如一个1ms 任务和 1000 ms 任务,空耗资源
反而 DelayQueue 获取队列头只要 O(1)
延时操作
Kafka 有多种延时操作,如延时生产、延时拉取、延时删除
延时生产:如消息 ack 需要等所有 ISR 副本写入完成才能ack,同时设置了超时时间30s。
延时拉取:当生产者无新消息写入的时候,消费者可以避免无意义的空拉取
延时操作:延时返回响应结果,有延时时间,但并非定时响应,也可提前响应
怎么实现
延时操作创建之后会被加入延时操作管理器(DelayedOperationPurgatory
管理器的定时器底层依赖时间轮
延时操作的延时由定时器来做超时管理
同时管理器配有监听池,来监听分区的外部事件,以触发延时操作
控制器
控制器的选举及异常恢复
选举、控制器节点数据变更依赖 zookeeper 来实现
当一个 broker 成为 controller 之后,会更新 controller_epoch (相当于版本号)和 brokerId 等
控制器 Broker 除正常消息读写职责外,需要担负作为控制器的责任
监听分区相关变化
ISR 集合变更
处理优先副本选举
监听主题变化
处理主题删减变化
处理删除主题
监听 broker 变化
broker 的增减
维护 topic、partition、broker 信息
更新元数据
管理分区状态机、副本状态机
...
如何处理这么多事件:使用单线程基于事件队列的模型
优雅关闭
进程终止会触发 kafka-shutdown-hock
正常关闭资源
ControlledShutdown 控制关闭
ControlledShutdown 流程 :略
分区 leader 的选取
场景:创建分区、分区上线时、leader 下线需要执行 分区 leader 的重新选举
策略:顺序寻找 AR 集合中第一个存活的副本,且副本在 ISR 内
为了 leader 节点均匀分布的需要,AR 集合内部顺序一般是不会变的(除了分区重分配)
发送优先副本选举时,直接将优先副本设置为 leader 即可
参数解密
说的是各式各样的 broker 参数,实际用的时候再仔细看吧,略
七、深入客户端
分区分配策略
简述:主要介绍 Consumer 与 Topic-Partition 订阅策略,如 ConsumerA 是订阅 Partition A or Partition B
RangeAssignor分配策略(默认)
策略:平均分配,将 group 内的所有 Consumer 按字典序排序,每个 Consumer 平均分配固定的分区范围
Ex:3个 Partition(0-1-2),2个Consumer(A、B)
那么 Consumer A - 分到 Partition 0-1
Consumer B - Partition 2
会出现分配不均匀的情况,因为是以 topic 为单位的,以上面的场景为例,类似的topic*3之后;consumerA - 6个分区;consumer B - 3个分区
RoundRobinAssignor分配策略
策略:consumer 和 consumer 订阅所有主题的分区 排序,然后轮询将分区分配给每个 consumer。即,排成两列,轮询连线;
问题点:Kafka 官方给了一个同 Group 下不同 Consumer 可以订阅不同 topic 的例子,还是会导致分配不匀;补充一下:我目前是没看到是怎么能让 同一 group 下的 consumer 实例订阅不同 topic 的
StickyAssignor分配策略
目的:分配均匀,当发生重分配时尽量与上次保持相同
自定义分区分配策略
给了扩展点,有自定义诉求的可以看下,略
消费者协调器和组协调器
旧版消费者客户端的问题
依赖 zookeeper 来实现分区分配的,每个消费者监听 zookeeper 的变更
存在两个问题
羊群效应(Herd Effect)
一个节点的变化,触发了大量监听通知,导致通知期其他操作被延迟
脑裂问题(Split Brain)
消费者再均衡时,每个消费者都单独和 ZK 通信以判断变化,可能会导致同一时刻各消费者获取的状态不一致
新版本再均衡(rebalance)的原理
引入 GroupCoordinator 和 ConsumerCoordinator,分别管理消费组和消费者
什么时候触发再均衡
启动新消费者
消费者下线
消费者主动退出消费组
订阅的topci 分区发生变化
具体步骤
一、FIND_COORDINATOR:Consumer 和所属的消费组协调器( GroupCoordinator )建连
历史已经保存过 GroupCoordinator(一般是一个 leader 节点) 信息,建连即可
本地无信息则需要向集群中负载最小的节点发出 寻找 GroupCoordinator 的请求
有点像找爸爸,知道在哪直接去就行,不知道在哪就找个空的人帮忙定个位,拿到定位去就行了
二、JOIN_GROUP:请求加入消费组
1. Consumer 发送 Join 请求到 GroupCoordinator
2. 获取消费组的 leader
若没有则会进行 Consumer Leader 的选举,选举是随机选举的,没什么大的讲究
3. 获取分区分配策略
因为每个 Consumer 都可以定义自己的分区分配策略,对于消费组而言则需要选举一个相对合理的策略来进行执行
选举本质:拿到 Consumer 支持做多的策略
收集策略,组成候选人
每个 Consumer 从候选人里找出第一个支持的策略并投票
计算并得出票数最多策略,即消费组的策略
4. 通知 JoinGroupResponse(内含分区分配策略) 给组内每一个 Consumer
三、SYNC_GROUP :同步分配方案
1. Leader Consumer 给出具体分配方案给到 GroupCoordinator
2. 各个普通 Consumer 向 GroupCoordinator 询问获取具体分区分配方案
3. GroupCoordinator 提取 Leader Consumer 的方案,并存入 __consumer_offsets主题中,最后发送响应提供分配方案
4. 各 Consumer 收到 Response 后执行分区分配,并启动心跳任务(和 GroupCoordinator)
四、HEARTBEAT:保持心跳
1. 正式消费之前 Consumer 会先获取当前分区消息拉取的起始位置 (__consumer_offsets主题 有存储)
2. 维持心跳来保证消费组关系的维护,心跳一旦停了就会被认为是下线了,会触发再均衡
consumer_offsets 剖析
用作位移提交,保存了 Consumer - Topic - Partition 的消费位移提交情况
从位移提交 Request 和 Response 的角度来解析
OffsetCommitRequest
group_id
generation_id
member_id
retention_time:消费位移能保留的时长,默认7天
topics
partitions
metadata
OffsetCommitResponse
throttle_time_ms
responses
partition_responses
error_code
发起请求后,消费位移同样会以发消息的形式发送到 topic(consumer_offsets)内
消息格式
Key
version
group
Value
commit_timestamp
expire_timestamp
事务
消息传输保障
MQ 消息传输一般有三个层级
at most once:最多一次,可能会丢,但不会重
at least once:最少一次,绝不会丢,但可能会重
exactly once:恰好一次,保证一次不丢不重
Kafka 的传输层级
Kafka Producer 如果写入成功,那么消息一定不会丢;如果写入失败,会多次重试;所以 Producer -> Broker 是 at least once
Broker -> Consumer 依赖具体使用方式不同。若先提交位移后处理消息,发生故障,从最后提交开始消费,则会出现消息丢失,即 at most once;若先处理消息后提交位移,发生故障,从上一次提交开始消费,则会出现重复消费,即 at least once;
变动:0.11.0.0 版本起,引入了幂等和事务以支持 exactly once 语义
幂等
背景:Producer 写入消息在重试时可能会重复写入消息,Kafka 引入幂等配置避免这种情况
配置:enable.idempotence (默认关闭幂等)
TODO
八、如何保障可靠性
副本剖析
失效副本
失效副本:同步延迟太久或者宕机等异常的副本,所对应的分区也叫同步失效分区 (under-replicated 分区)
怎么判断是否失效副本
功能失效:直接下线了、卡住了
同步失效:没跟上 leader 副本太久、滞后消息数太多
ISR 的伸缩
LEO 和 HW
Leader Epoch的介入
日志同步机制
可靠性分析
多副本机制保证
发送消息时不同的ack机制
极端情况:ISR 只剩 leader,可以通过 min.insync.replicas 来指定 ISR 最小副本数
操作系统的日志落盘
consumer 手动提交 offset
九、应用
介绍了 Kafka 的一些工具,略
十、监控
怎么做监控,略
十一、高级应用
过期时间
消息增加过期时间属性,对于超时消息通过消费者拦截器做多样化处理
延时队列
解决方案一:将延时消息按照延时时间,投递到延时分级的delay topic中
死信队列和重试队列:啥也没说。。。
消息路由
消息轨迹
跟踪消息的生产消费,搞一个 trace_topic 即可
消息审计:监控统计
消息中间件选型
有哪些消息队列
选型时需要注意的点
十二、Spark集成
收藏
收藏
0 条评论
回复 删除
下一页