Kafka知识体系
2024-06-07 20:15:02 0 举报
AI智能生成
Kafka知识体系
作者其他创作
大纲/内容
事务
http://matt33.com/2018/11/04/kafka-transaction
Kafka 事务性最开始的出发点是为了在 Kafka Streams 中实现 Exactly-Once 语义的数据处理,这个问题提出之后,在真正的方案讨论阶段,社区又挖掘了更多的应用场景,也为了尽可能覆盖更多的应用场景,在真正的实现中,在很多地方做了相应的 tradeoffs
txn.id 可以跟内部的 PID 1:1 分配,它们不同的是 txn.id 是用户提供的,而 PID 是 Producer 内部自动生成的(并且故障恢复后这个 PID 会变化),有了 txn.id 这个机制,就可以实现多 partition、跨会话的 EOS 语义
事务隔离级别(isolation.level)
read_uncommitted(默认)
消费者可以消费到 HW (High Watermark) 位置
read_committed
消费者忽略事务未提交的消息,即只能消费到 LSO (LastStableOffset) 的位置
LastStableOffset
该值会影响消息堆积量 Kafka Lag的计算,消息堆积可以通过自带工具查看 kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --group test2_consumer_group
1. 在隔离级别为 read_uncommited 的情况下,消息堆积量为 HW - ConsumerOffset
2. 在隔离级别为 read_commited 的情况下,消息堆积量为 LSO - ConsumerOffset
LSO存在一个问题,即当前若有一个 long transaction,比如其 first offset 是 1000,另外有几个已经完成的小事务操作,比如:txn1(offset:1100~1200)、txn2(offset:1400~1500),假设此时的 LSO 是 1000,也就是说这个 long transaction 还没有完成,那么已经完成的 txn1、txn2 也会对 consumer 不可见(假设都是 commit 操作),此时受 long transaction 的影响可能会导致数据有延迟。在实际的生产场景中,尽量避免 long transaction 这种操作
关于事务的部分思考
Consumer 如何过滤 abort 的事务数据
Broker 会追踪每个 Partition 涉及到的 abort transactions,Partition 的每个 log segment 都会有一个单独后缀为 .txnindex 的文件(append-only file)来存储 abort transaction 信息,因为 abort transaction 并不是很多,所以这个开销是可以可以接受的,之所以要持久化到磁盘,主要是为了故障后快速恢复,要不然 Broker 需要把这个 Partition 的所有数据都读一遍,才能知道哪些事务是 abort 的,这样的话开销太大(如果这个 Partition 没有事务操作,就不会生成这个文件)
有了这个设计,Consumer 在拉取数据时,Broker 会把这批数据涉及到的所有 abort transaction 信息都返回给 Consumer,Server 端会根据拉取的 offset 范围与 abort transaction 的 offset 做对比,返回涉及到的 abort transaction 集合
PID Snapshot 是做什么的?用来解决什么问题?
对于每个 Topic-Partition,Broker 都会在内存中维护其 PID 与 sequence number(最后成功写入的 msg 的 sequence number)的对应关系。
Broker 重启时,如果想恢复上面的状态信息,那么它读取所有的 log 文件。相比于之下,定期对这个 state 信息做 checkpoint(Snapshot),明显收益是非常大的,此时如果 Broker 重启,只需要读取最近一个 Snapshot 文件,之后的数据再从 log 文件中恢复即可
若 txn.id 长期不使用,server 端怎么处理
producer 端默认事务超时时间 为60s transaction.timeout.ms 。Producer 设置超时时间不能超过 Server,否则会抛出异常
broker端默认事务超时时间为 15min, transaction.max.timeout.ms
对于 txn.id,我们知道 TransactionCoordinator 会缓存 txn.id 的相关信息,如果没有超时机制,这个 meta 大小是无法预估的,Server 端提供了一个 transaction.id.expiration.ms 参数来配置这个超时时间(默认是 7 天),如果超过这个时间没有任何事务相关的请求发送过来,那么 TransactionCoordinator 将会使这个 txn.id 过期
Producer 幂等性
幂等性实现是事务性实现的基础,幂等性提供了单会话单 Partition Exactly-Once 语义的实现,正是因为 Idempotent Producer 不提供跨多个 Partition 和跨会话场景下的保证
只能保证 Producer 在单个会话内不丟不重,如果 Producer 出现意外挂掉再重启是无法保证的(幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重)
幂等性不能跨多个 Topic-Partition,只能保证单个 partition 内的幂等性,当涉及多个 Topic-Partition 时,这中间的状态并没有同步。
幂等性用来解决什么问题
在 0.11.0 之前,Kafka 通过 Producer 端和 Server 端的相关配置可以做到数据不丢,也就是 at least once,但是在一些网络异常情况下,可能会导致数据重复
PID + Sequence Number
PID 用于标识每一个 producer
sequence numbers,client 发送的每条消息都会带相应的 sequence number,Server 端就是根据这个值来判断数据是否重复。sequence number 将会从 0 开始自增,每个 Topic-Partition 都会有一个独立的 sequence number
kafka的事务保证
跨会话的幂等性写入:即使中间故障,恢复后依然可以保持幂等性;
跨会话的事务恢复:如果一个应用实例挂了,启动的下一个实例依然可以保证上一个事务完成(commit 或者 abort);
跨多个 Topic-Partition 的幂等性写入,Kafka 可以保证跨多个 Topic-Partition 的数据要么全部写入成功,要么全部失败,不会出现中间状态。
TransactionCoordinator 脑裂问题(brain split)
发生场景
TransactionCoordinator 在遇到上 long FGC 时,可能会导致 脑裂 问题,FGC 时会 stop-the-world,这时候可能会与 zk 连接超时导致临时节点消失进而触发 leader 选举,如果 __transaction_state 发生了 leader 选举,TransactionCoordinator 就会切换,如果此时旧的 TransactionCoordinator FGC 完成,在还没来得及同步到最细 meta 之前,会有一个短暂的时刻,对于一个 txn.id 而言就是这个时刻可能出现了两个 TransactionCoordinator
解决方式(Fencing)
通过 CoordinatorEpoch 来判断,每个 TransactionCoordinator 都有其 CoordinatorEpoch 值,这个值就是对应 __transaction_state Partition 的 Epoch 值(每当 leader 切换一次,该值就会自增 1)
zk, raft 的解决方式
与 HDFS 不同(Fencing)
HDFS 选举得到NN后会在ZK创建一个持久节点,用于记录当前NN的地址信息。当NN会话状态发生正常切换时,该持久节点也会被删除。而异常状态下该节点不会被删除,故NN选举成功后可以检查是否存在该持久节点,存在则说明出现了脑裂的情况, 直接 Kill 旧的 NN 进程或者强制切换状态
疑问
为什么kafka不支持读写分离
1. 数据一致性,leader同步到ISR中的副本存在一定时间差,容易造成数据不一致
2. 负载不均衡
延迟消息
Timingwheel
异常
CommitFailedException
情况1:消费处理业务逻辑时间超出session.timeout.ms或者max.poll.interval.ms(看版本)超时后会导致当前consumer已崩溃或宕机,导致被group踢出组后重新发生rebalance,此时consumer再提交消息就会发现该consumer已经不在group组内
情况2:standalone consumer与consumer group冲突时:这里所说的standalone consumer指的是使用KafkaConsumer.assign()而非subscribe()的消费者。当用户系统中同时出现了standalone consumer和consumer group,并且它们的group id相同时,此时standalone consumer手动提交位移时就会立刻抛出此异常。这是因为group coordinator无法识别这个具有相同group id的consumer,从而向它返回“你不是一个合法的组成员”错误
版本历史
0.7
基本消息队列,无副本机制
0.8
引入副本机制,真正意义上完备的分布式高可靠消息队列
0.9
增加基础安全认证及权限功能
使用Java重写新版本consumer API
引入Kafka Connect组件用于实现高性能的数据抽取
新的Comsumer API
Kafka可以自行维护Offset、消费者的Position。
也可以开发者自己来维护Offset,实现相关的业务需求。
新的ComsumerAPI不再有high-level、low-level之分了,而是自己维护offset。这样做的好处是避免应用出现异常时,数据未消费成功,但Position已经提交,导致消息未消费的情况发生。通过查看API
自行控制Consumer消费消息的位置
可以使用外部存储记录Offset,如数据库之类的
可以使用多线程进行消费
0.10
引入Kafka Streams,正式成为流式处理框架
内置机架感知以便隔离副本,提高可用性
消息格式增加时间戳
引入max.poll.records参数,允许开发者控制返回消息的条数
0.11
实现幂等性producer
支持事务
重构消息格式
新的分配算法 Sticky
重构controller
老版本controller在执行多步操作时,若其中一步出错,则无法回滚之前的操作
多线程同时访问controller上下文,重构后采用单线程+基于事件队列的方式
1.x (2017.11)
改进Streams Api
改进Connect的度量指标
支持Java9,实现更快的TLS和CRC32, 加快加密速度降低计算开销
调整了 SASL 认证模块的错误处理逻辑,原先的
认证错误信息现在被清晰地记录到日志当中
更好地支持磁盘容错,更优雅地处理磁盘错误,
单个 JBOD 上的磁盘错误不会导致整个集群崩溃。
0.11.0 版本中引入的幂等性生产者需要将max.in.flight.requests.per.connection 参数设置为 1,这对吞吐量造成了一定的限制。而在 1.0.0 版本里,这个参数最大可以被设置为 5,极大提升了吞吐量范围。
消费顺序
Kafka仅提供单个分区内的消费顺序,而不会维护全局的消费顺序
若要实现全局的消费顺序就只能通过让每个consumer group下只包含一个consumer实例来间接实现
Broker
zookeeper路径
/brokers
/ids
/<broker.id>保存成员节点信息
version
host
port
jmx_port
timestamp
endpoints
指明传输协议类型、主机名及端口号,如PLAINTEXT://host1:9092
rack
机架信息,若设置了该信息则Kafka在分配副本时会考虑把
某个分区的多个副本分配到多个机架上,以保证高可用
listener_security_protocol_map
broker与外界通信的安全协议类型
/topics
/seqid
/controller
/admin
/delete_topics
/reassign_partitions
/preferred_replica_election
/isr_change_notification
/config
/topics
/changes
/clients
/users
/brokers
/cluster
保存j集群的简要信息
/controller_epoch
保存controller组件版本号,Kafka使用该版本号来隔离无效的controller请求
副本同步
术语及概念
AR(Assigned Replicas)
分区中的所有副本统称为 AR
ISR(In-Sync Replicas)
如何判定ISR
0.9.0版本之前
replica.lag.max.messages 用于控制follower落后leader副本的消息数
瞬时高峰流量可能会大于该消息数,故会导致
follower被不断的踢出加入,再踢出再加入
replica.lag.time.max.ms 表示若follower副本无法在该时间内向leader请求数据,则会被踢出ISR
0.9.0版本之后
统一使用参数 replica.lag.time.max.ms,默认10s
follower与leader不同步的原因
1. 请求速度追不上
2. 进程卡住,如频繁GC或程序bug
3. 创建新副本
OSR (Out-of-Sync Replicas )
与 leader 副本同步滞后过多的副本
起始位移 (base offset / LogStartOffset)
该副本第一条消息的offset
logStartOffset 受日志清除策略影响,注意 LogStartOffset 不可以缩写为 LSO,因为在 Kafka 中,LSO 特指 LogStableOffset(与事务实现息息相关)
高水位值 (High Watermark, HW)
表示该副本最新一条已提交消息的下一条位移,消费端只能拉取到此offset之前的消息
更新机制
leader会尝试更新HW的情况
1. follower成为leader时
2. broker出现崩溃导致副本被踢出ISR时
3. producer向leader写入消息时
4. leader处理follower的FETCH请求时:leader处理follower的FETCH请求时,首先会从底层的log读取数据,之后再尝试更新分区的HW
leader更新HW
leader broker尝试确定分区HW时,会选出所有满足条件的副本比较他们的LEO以及自己的LEO,并选择最小的LEO最为HW
follower更新HW
follower更新HW发生在其更新LEO之后,一旦follower向log写完数据,它就会尝试更新HW。具体算法是比较当前LEO与FETCH响应中leader的HW值,取两者的最小值作为HW
完整流程
情况1:leader副本写入消息后,
follower副本发送FETCH请求
1. leader接收到生产者消息后将其写入到底层日志,同时更新自身LEO
2. 尝试更新leader的HW:follower尚未发起FETCH请求,故leader保存的remote LEO=0,将remote LEO与leader LEO对比取最小值即 HW = 0,与原HW相同,故不更新分区HW
3. leader 接收到follower的FETCH请求,开始读取底层log数据,并根据FETCH请求中的fetch offset=0 更新remote LEO
4. leader再次尝试更新HW,leader LEO=1,remote LEO =0,故分区HW=0,此时consumer无可消费的消息
5. leader将数据及当前HW返回给follower
6. follower接收到FETCH响应后将消息写入log日志并更新自身 LEO = 1
7. follower根据响应返回的HW与本地LEO做对比,取其中最小的值来更新HW,故HW=0。此时consumer仍无可消费的消息
8. follower发起第二轮FETCH请求,同时带上参数fetch offset=1(自身的LEO=1)发送给leader
9. leader 接收到请求后读取底层log数据,并更新自身的remote LEO=1
10. leader 尝试更新分区HW,对比自身LEO=1 与 remote LEO=1 ,取最小值. 即HW=1, 原HW=0,此时HW被更新
11. leader将数据(空数据)及当前的HW返回给follower
12. follower接收到响应后将消息写入log,因为
是空数据所有无消息可写,LEO仍然为1
13. follower更新自身HW,对比本地LEO=1与fetch请求返回的HW=1,获取最小值即1,故follower 的HW=1。此时消息已经被成功拷贝到leader及follower的log中且分区HW=1,所以consumer可消费offset=0的消息
情况2:FETCH请求保存在
purgatory中时生产者发来消息
原理与情况1基本类似
缺陷
备份数据丢失
前提是min.insync.repicas=1
场景
1. 有两个副本A,B,A为leader。生产者发送两条消息给A,A写入成功并成功返回
2. 此时leader及follower都已写入两条消息,且分区HW已更新为2,
但follower的HW未被更新,需等到下一轮FETCH请求方可更新。此时 A(hw=leo=2),B(hw=1,leo=2)
3. 此时B宕机,则重启B后自动将LEO调整为之前的HW,故B会做日志截取,
将offset=1的消息从log中删除,并调整LEO=1. 即 B (HW=LEO=1)
4. B重启后需要向A发起FETCH请求,但此时A宕机,B成为新的leader。
而当A重启后又向B发起FETCH请求,对比自身LEO与leader的HW,发现
应该取leader的HW=1,故A做日志截取,将offset=1的消息删除。此时
offset=1的消息在集群所有副本中被永久删除
总结
延迟一轮FETCH请求更新HW值的设计使得follower HW值是异步延迟更新的,若这个过程中leader发生变更,那么成为新leader的follower的HW值就有可能是过期的
备份数据不一致
前提是min.insync.repicas=1
场景
1. 有两个副本A,B,A为leader。生产者发送两条消息给A,A写入成功并成功返回
2. 此时leader及follower都已写入两条消息,且分区HW已更新为2,但follower的HW未被更新,需等到下一轮FETCH请求方可更新。此时 A(hw=leo=2),B(hw=1,leo=2)
3. A,B同时宕机,若B先重启,成为leader,分区HW=1。此时producer发送第三条消息给B,B写入成功也提交成功,因为只有B自己一个副本, 即 B(HW=LEO=2)
4. 此时A重启,执行FETCH请求,发现自身LEO=2,而leader的HW=2,两者相同无需更新。这就造成leader中offset=1的消息与follower中offset=1的消息不一致问题
LW ( Low Watermark)
AR 集合中最小的 logStartOffset 值
日志末端位移 (Log End Offset, LEO)
副本日志中下一条待写入消息的offset。每当leader接收到producer推送的消息,它会更新自己的LEO,通常是加1
每个副本都由LEO,不论leader与follower。leader除保存自己的LEO以外还保存这所有follower的LEO
同步流程
1. broker上的leader副本接收到消息后将自己的LEO值加一
2. follower副本各自发送请求给leader
3. leader分别将该消息推送给follower副本
4. follower副本接收到消息后各自将更新自己的LEO,加一
5. leader副本接收到其他follower副本的响应后,更新HW。也即代表该条消息可以被consumer消息
日志存储
每个topic都会有对应的子目录,目录名称为 <topic>_<分区号>
文件结构
日志段 .log
以该文件的第一条消息对应的offset来命名
log.segment.bytes 指定文件大小,默认1G.日志段被填满后,Kafka会自行切分,创建出新的日志段及索引文件
索引文件
分类
.index 位移索引文件
Kafka强制要求必须是8的整数倍,因为每个索引项占用8字节
.timeindex 时间戳索引文件(0.10.0引入)
Kafka强制要求必须是12的整数倍. 因为每个索引项占用12字节
log.index.size.max.bytes 索引文件大小,默认10M.
方便通过二分查找快速定位(O(LogN))
日志清除策略
1. 基于时间清除
默认清除7天前的日志数据(包含日志段文件及两个索引文件)
log.retention.{hours|minutes|ms} 可配置时间间隔,ms优先级最高,minutes次之,hours最后
0.10版本之前使用当前时间与文件最近修改时间做比较,而该时间会时常变动。因此0.10开始取日志段首条消息的时间做对比
2. 基于文件大小清除
log.retention.bytes, 默认-1,对log大小不做限制
3. 定时清除
在 Kafka 的日志管理器中会有一个专门的日志删除任务来周期性地检测和删除不符合保留条件的日志分段文件,这个周期可以通过 broker 端参数 log.retention.check.interval.ms 来配置,默认值为 300000,即 5 分钟
LogSegment保留策略
基于时间的保留策略
基于日志大小的保留策略
基于日志起始偏移量的保留策略
基于 logStartOffset 来实现
日志压缩(compaction)
对于相同key不同value的消息,仅保留最新的一条,多余的会被清除
必须设置可以,否则无法执行该操作
使用场景
高可用日志化
数据库变更订阅
时间溯源
相关参数
log.cleanup.policy
log compaction策略
0.10.1之前有两种取值:delete(默认), compact
0.10.1开始同时支持两种策略
log.cleaner.enable
是否启用Cleaner
0.9.0及之前默认为false,之后默认为true
log.cleaner.min.compaction.lag.ms
表示比某个时间段新的数据无需清理,默认为0
controller
某个broker被选举出来管理集群所有分区状态并执行相应的管理操作
维护的状态分两类
分区状态
状态
New
分区创建完成,且分区已确定副本列表,但尚未选出leader和ISR
Online
分区leader被选出
Offline
崩溃状态
NonExisten
分区不存在或已删除
副本状态
状态
New
controller创建副本时的状态,只能成为follower
Online
在线状态,既可成为follower也可成为leader
Offline
崩溃状态
DeletionStarted
若开启了topic删除操作,则topic下所有分区的所有副本都会被删除
DeletionIneligible
副本删除失败
NonExisten
副本删除成功
职责
更新集群元数据
当有分区信息发生变更时,controller将变更后的信息封装进UpdateMetadataRequests请求发给集群中的broker
创建、删除topic
分区重分配
对topic的所有分区重新分配副本所在broker的位置,以期望实现更均匀的分配效果
broker崩溃或加入
preferred leader副本选举
在众多 leader 的转移过程中,就会产生 leader 不均衡现象,可能一小部分 broker 上有大量的 leader,影响了整个集群的性能,所以就需要把 leader 调整会最初的 broker 上,这就需要 preferred leader 选举
topic分区扩展
若某些topic的现有分区数不足以支撑clients的业务量,因此需要增加分区
controller leader选举
在zk创建一个临时节点/controller,节点保存了当前controller所在的broker id。集群首次启动时,所有broker都会抢着创建该节点,但zk保证最终只会有一个broker创建成功
受控关闭
受控关闭可最大限度的降低broker的不一致性
仅依赖RPC实现,无需zk
组件
数据类
ControllerContext
0.11版本之前controller的设计是多线程的,故使用了大量同步机制
基础功能类
ZkClient
ControllerChannelManager
负责向其他broker发送请求
ControllerBrokerRequestBatch
RequestSendThread
ZookeeperLeaderElector
老版本设计缺陷
多线程共享状态
代码组织混乱
管理类请求与数据类请求未分开
controller同步些zk且是一个分区一个分区的写
controllerg给broker的请求无版本号信息
broker请求处理
处理请求的模式是Reactor模式
一个acceptor线程及若干个processor线程组成,num.network.threads可配置processor的线程数
流程
1. processor线程接收acceptor线程分配的新socket连接通道,然后开始监听该通道上的数据传输
2. processor将接收到的请求放入broker启动时创建的全局唯一请求队列(queued.max.requests,默认500),一旦队列已满则clients端发送给broker的请求将会被阻塞
3. 队列中的请求由KafkaRequestHandler线程池分配具体线程来处理,线程池大小可以通过 num.io.threas 配置,默认为8个线程
4. broker还会创建与processor线程数等量的响应队列,故processor线程的另一个重要的任务是实时处理各自响应队列中的响应结果
Consumer
主要参数
session.timeout.ms
consumer group 组内成员发生崩溃时,coordinator检测失败的时间,默认10s
max.poll.interval.ms
设置消息处理逻辑的最大时间
auto.offset.reset
无位移信息或位移越界时Kafka的应对策略
earliest
从最早的位移开始消息,不一定时0
latest
none
enable.auto.commit
comsumer是否自动提交位移
fetch.max.bytes
指定了consumer端单次获取数据的最大字节数
max.poll.records
heartbeat.interval.ms
通知其他成员开启新一轮rebalance的心跳间隔时间
当coordinator决定开启新一轮rebalance时,它会将这个决定以REBALANCE_IN_PROCESS异常的形式放入consumer心跳
请求的reponse中,这样其他成员拿到response后才能知道它需要重新加入group
connections.max.idle.ms
空闲socket的存活时间,默认9min,推荐设置为-1,即无需关闭空闲连接
订阅topic
订阅列表
基于正则表达式订阅
offset
每个consumer实例都会为它消费的分区维护属于
自己的位置信息来记录当前消费了多少条信息
实现消息交付语义保证的基石(message delivery semantic)
at most once
at least once
exactly once (EOS)
保存方式对比
保存在服务器端
实现简单,但有几方面影响
broker从此变成了有状态的,增加同步成本,影响伸缩性
需要引入应答机制来确认消费成功与否
由于要保存offset,故需引入复杂的数据结构,从而造成不必要的资源浪费
保存在consumer group
只需简单保存一个长整型数据即可
引入checkpointing机制,定期对offset进行持久化,简化应答机制实现
位移提交
consumer客户端需要定期向集群汇报自己消费数据的进度
旧版
consumer定期将位移信息提交到zookeeper下的固定节点中
新版
consumer把位移提交到Kafka的一个内部topic中(__consumer_offsets)
故新版consumer不再依赖zookeeper
位移管理
新版
consumer会在Kafka集群的所有broker中选择一个broker作为consumer group的coordinator,用于实现组成员管理、消费分配方案制定以及位移提交
当consumer运行一段时间后,必须提交自己的位移信息。如果consumer崩溃或关闭,它负责的分区就会被分配给其他consumer。因此一定要在其他consumer读取这些分区前就做好位移提交工作,否则会出现消息的重复消费。
提交位移的机制
consumer向所属的coordinator发送位移信息,每个位移提交请求都会往__consumer_offsets对应分区上追加写入一条信息。
消息的key由group.id,topic及分区组成,value是位移值
若consumer为同一个group的同一个topic提交了多次位移,那么__consumer_offsets对应的分区上就会有若干条key相同但value不同的消息,默认仅会关心最新一条消息,其他旧消息会在compact过程中被清理
参数
enable.auto.commit
auto.commit.interval.ms, 默认5s
旧版
consumer定期将位移信息提交到zookeeper下的固定节点中
参数
auto.commit.enable
auto.commit.interval.ms, 默认60s
多线程消费
1. 每个线程维护一个KafkaConsumer实例
优势
实现简单
速度快,无线程间交互
易于维护分区间的消息消费顺序
方便位移管理
缺陷
socket连接开销大
consumer数受限于topic分区数,扩展性差
broker端处理负载高
rebalance可能性增大
2. 单个KafkaConsumer实例 + 多 worker 线程
优势
消息获取与处理解耦
可独立扩展consumer数及worker数,伸缩性好
缺陷
难以维护分区内的消息顺序
处理l链路变长,导致位移管理困难
worker线程异常可能导致消费数据丢失
新旧版本对比
新版本
consumer group
standalone consumer
旧版本
high level consumer
zookeeper负责管理group节点
/consumers
<groupid>
/ids
记录consumer的订阅信息
/owners
保存consumer各个消费线程的Id
消费线程是新旧版本consumer在设计上的重大区别
/offsets
保存该group消费指定分区的位移信息
low level consumer
使用场景
1. 消息重演
2. 只想消费部分分区数据
3. 实现精确一次处理语义
即把数据处理与位移提交放入一个事务中处理
劣势
用户必须自行处理位移提交
storm-kafka插件就是使用low level,它把位移保存在zookeeper中特定的位置下
用户必须寻找分区的leader broker
用户必须自行处理leader变更
consumer group
定义
用于实现高伸缩、高容错的消费机制
组内多个consumer实例可以同时读取Kafka消息,一旦由某个consumer挂了,consumer group会立即将已崩溃consumer负责的分区转交给其他consumer来负责,从而保证整个group可以继续工作,不会丢失数据
rebalance
定义
规定了一个consumer group下所有consumer如何
达成一致来分配订阅topic的所有分区
触发条件
1. 组成员发生变更
新consumer加入
已有consumer主动离开
已有consumer崩溃
最常见的场景是:consumer消费逻辑过重,无法在指定时间内完成消息的处理,那么coordinator认为该consumer已经崩溃,从而引发rebalance
2. 组订阅topic数发生变更
如使用基于正则表达式的订阅
3. 组订阅topic的分区数发生变更
如使用命令行脚本增加订阅topic的分区数
分区分配策略(partition.assignment.strategy)
range策略,新版本默认
round-robin策略
sticky策略 (0.11版本引入)
避免前两种策略无视历史分配方案的缺陷
可规避极端情况下的数据倾斜并且
在两次rebalance间最大限度的维持之前的分配方案
generation(分代概念)
每个goup进行rebalance后,generation
号都会加1,表示group进入了一个新的版本号
rebalance后,若consumer提交的是rebalance前延迟的offset信息则会被group拒绝
协议
1. JoinGroup
consumer请求加入组
2. SyncGroup
group leader把分配方案同步更新到组内所有成员
3. Heartbeat
consumer定期向coordinator汇报心跳表明自己依然存活
consumer也是根据heartbeat请求的响应中是否包含
REBALANCE_IN_PROCESS来判断当前group是否开启新一轮rebalance
4. LeaveGroup
consumer主动通知coordinator该consumer即将离组
5. DescribeGroup
查看组的所有信息,包括成员信息、协议信息、分配方案以及订阅信息等。主要供管理员使用,coordinator不使用该请求执行rebalance
执行流程
确定coordinator所在的broker
1. 计算Math.abs(groupID.hashCode) % offsets.topic.num.partitions(默认50),获得分区
2. 寻找该分区在__consumer_offsets中的leader副本所在的broker,该broker即这个group的coordinator
加入组
组内所有的consumer向coordinator发送JoinGroup请求
当收集全JoinGroup后(若个别consumer指定了自定义分配策略而其他consumer都不支持,则该consumer被拒绝加入),coordinator从中选择一个consumer作为
group的leader并把所有成员信息以及他们的订阅信息发送给leader
同步更新分配方案
leader根据系统指定的分区分配策略决定每个consumer都负责哪些topic的哪些分区
分配完成后,leader会把这个分配方案装进SyncGroup请求并发送给coordinator。
组内成员都会发送SyncGroup请求,只不
过只有leader发送的请求才会包含分配方案
coordinator接收到分配方案后把属于每个consumer的方案单独抽取出来作为SyncGroup请求的response返回给各自的consumer
监听器
ConsumerRebalanceListener
反序列化
ByteArrayDeserializer
ByteBufferDeserializer
BytesDeserializer
DoubleDeserializer
IntegerDeserializer
LongDeserializer
StringDeserializer
状态
Empty
group下没有任何active consumer,但可能包含位移信息
preparingRebalance
该状态下的group依然可能保存有位移信息,
因此clients可以发起OffsetFetch及OffsetCommit请求
AwaitingSync
group所有成员都已加入并等待leader consumer发送分区分配方案。
此时依然可以获取位移,但若提交位移coordinator将会抛出REBALANCE_IN_PROCESS异常
Stable
Dead
多线程程序(新版本)
用户主线程
consumer group执行rebalance
消息获取
新版KafkaConsumer属非线程安全,若未显示增加同步锁机制,则Kafka会抛出KafkaConsumer is not safe for multi-threaded access
consumer消费完成后需要调用close方法关闭consumer
清除consumer创建的各种socket资源
通知consumer group主动离组从而更快的开启新一轮rebalance
coordinator管理
异步任务结果的处理
位移的提交
后台心跳线程
特性
高吞吐、低延时
大量使用操作系统页缓存,速度快,命中率高
以追加的方式写文件,将随机写改为顺序写
使用sendfile系统调用,实现数据零拷贝
消息持久化
解耦消息的发送与消费
实现灵活的消息处理
负载均衡与故障转移
保证高可用
伸缩性
服务器状态的保存和管理交由专门的协调服务来处理,无需集群间共享,方便集群节点扩容
消息
消息格式
V0 (0.10.0.0之前)
CRC(4byte)
版本号(1b)
属性(1b)
高五位保留
低三位用于保存压缩类型
0 无压缩
1 GZIP
2 Snappy
3 LZ4
key长度(4b)
key
value长度(4b)
value
V1(0.10.0.0)
时间戳(8byte)
属性(1byte)
第四位用于指定时间戳类型
CREATE_TIME
表示消息创建时由producer指定时间戳
LONG_APPEND_TIME
后者表示消息被发送到broker时由broker指定时间戳
低三位保存压缩类型不变
V2(0.11.0)
消息总长度(可变长度)
属性(1byte)
时间戳增量(可变长度)
位移增量(可变长度)
key lenght(可变长度)
key
value size(可变长度)
value
header个数(可变长度)
headers(可变长度)
消息集合格式
每个消息集合包含若干日志项,V2版本之前日志项称为log entry,V2版本则称为消息批次(record batch)
各版本日志项区别
V0及V1
浅层消息(shallow message)
1. 若未启用压缩,则浅层消息就是消息本身
2. 若启用压缩
Kafka会将多条消息压缩到一起封装到该条浅层消息的value字段
此时该浅层消息被称为wrapper消息,而value字段包含的消息被称为inner消息
V0, V1版本中的日志项只能包含一条浅层消息
日志项头部(log entry header)
offset (8byte)
1. 未启用压缩
该消息在Kafka分区日志中的offset
2. 启用压缩
表示wrapper消息中最后一条inner消息的offset
size (4byte)
日志项格式
1. offset (8byte)
2. size (4byte)
3. message (由size决定)
V2
日志项格式
1. offset (8byte)
2. size (4byte)
3. 分区leader版本号 (4byte)
4. 版本(1byte)
5. CRC(4byte)
6. attribute(2byte)
低三位用于保存压缩类型
0 无压缩
1 GZIP
2 Snappy
3 LZ4
第四位依然保存时间戳类型
第五位表示事务类型
第六位表示控制类型
7. 最大位移增量(4byte)
8. 起始时间戳 (8byte)
9. 最大时间戳(8byte)
10. PID (8byte)
一个幂等性producer的ID
11. Producer epoch (2byte)
PID携带的当前版本号
12. 起始序列号 (4byte)
13. 消息个数 (4byte)
14. 消息 (可变)
topic
partition
offset
replica
leader replica
follower replica
ISR (in-sync replica)
ISR至少存在一个活着的replica
只有ISR集合中的所有replica都接收到了同一条信息,Kafka才会将消息置于已提交状态
只有该集合中的replica才能被选举为leader
设计初衷:解决超大数据集的实时传输
消息引擎
流式处理框架
运维
消费者组管理 : kafka-consumer-groups.sh。旧版使用的是 ZkConsumerGroupService ,新版使用的 KafkaConsumerGroupService
查看消费者列表 --list
kafka-consumer-groups.sh --bootstrap-server xxxx:9090 --list
查看消费者组详情 --describe
kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --all-groups
该命令可以查看 Lag 消息积压数量
查询消费者成员信息 --members
kafka-consumer-groups.sh --describe --members --group test2_consumer_group --bootstrap-server xxxx:9090
查询消费者状态信息 --state
kafka-consumer-groups.sh --describe --all-groups --state --bootstrap-server xxxx:9090
删除消费者组 --delete
重置消费组的偏移量 --reset-offsets
缺陷与不足
1. 横向扩展问题
kafka 分区与 broker 强耦合, 数据迁移困
kafka broker 宕机后, kafka borkerid 不会自动转移, 需要手动维护
kafka 新加入的 Borker 无法承接旧的分区流量, 需要手动均衡
2. 读写热点问题
当 kafka 出现分区写热点问题时, 无法直接增加broker, 因为只能写到一个 leader borker 中
通过增加分区数量实现横向扩容, 但是会引发 Rebalance 风险
通过增加 broker 机器性能实现纵向扩容
kafka 读热点问题, 由于 kafka 的读写都是在 leader broker 中实现, 故无法做到读写分离, 新加入的 broker 也无法读取到旧分区数据
错误类型
可重试异常(都继承于RetrizbleException)
LeaderNotAvailableException
NotControllerException
NetworkException
不可重试异常
RecordTooLargeException
SerializationException
KafkaException
Producer
主要参数
acks : 确保写入消息的副本数
0:无需理睬leader broker是否已经写入成功,吞吐量最高
all / -1:即待ISR中的所有副本都写入成功后才返回,吞吐量最低
1:leader broker仅将消息写入本地日志,无需等待ISR中其他副本写入,默认参数
buffer.memory
用于缓存待发送消息的缓冲区大小,单位为字节,默认32M
Java版本producer启动时会首先创建一块内存缓冲区用于保存待发送的消息,然后由另一个专属线程负责从缓冲区读取消息执行真正的发送。
若producer向缓冲区写消息的速度超过了专属IO线程发送的速度,必然会造成缓冲区的不断增大。此时producer会停止手头的工作等待IO线程追上来,若一段时间之后IO线程还是无法追上producer速度,那么producer就会抛出异常并期望用户介入
compression.type
gzip
snappy
lz4
retries
重试次数,默认为0
设置该参数需注意
1. 重试可能造成消息的重复发送
0.11版本开始支持精准一次处理语义,从设计上避免了类似问题
2. 重试可能造成消息的乱序
max.in.flight.requests.per.connection
指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
该参数设置为1是为了防止topic同分区下的消息乱序问题
如顺序发送1,2两条消息,结果1发送失败,2发送成功。若此时该配置大于1,则producer会重试发送1,待发送成功时1与2的顺序已经混乱
retry.backoff.ms
两次重试间会停顿一段时间
batch.size
调优producer吞吐量及延时性能的重要指标
批次大小,默认大小 16384 即16k
linger.ms
表示batch中的消息是否需要延时发送,即吞吐量与延时之间的权衡
默认为0 ,即消息需要立即发送,无需关心batch是否已被填满
max.request.size
控制producer能够单个请求可发送最大消息的大小
request.timeout.ms
即producer发送请求到broker后,broker需要在该段时间内返回响应,默认30s
消息分区机制
自定义分区
实现org.apache.kafka.clients.producer.Partitioner接口,重写partition方法
消息序列化
系统序列化器
ByteArraySerializer
ByteBufferSerializer
BytesSerializer
DoubleSerializer
IntegerSerializer
LongSerializer
StringSerializer
自定义
实现org.apache.kafka.common.serialization.Serializer接口
拦截器
实现org.apache.kafka.clients.producer.ProducerInterceptor接口
onSend
运行在用户主线程,在消息被序列化及计算分区前被调用
onAcknowledgement
在消息被应答之前或消息发送失败时调用,运行在IO线程因此不要在该方法中放入比较中的逻辑,否则会拖慢消息发送速率
close
工作流程
1. 将待发送消息进行序列化并计算目标分区
2. 追加写入消息到缓冲区(accumulator)
缓冲区由HashMap结构的集合构成,该结构保存了每个topic分区下的batch队列,如 (test-0, [ batch1, batch2 ])
每个batch包含最重要的三个组件
compressor :负责执行追加写入操作
batch缓冲区
thunks:保存消息回调逻辑的集合
3. Sender线程预处理及消息发送
1. 不断轮询缓冲区寻找已做好发送准备的分区
2. 将查找到的各个batch按照目标分区所在的leader broker进行分组
3. 将分组后的batch通过底层创建的socket连接
发送给各个broker,并等待response返回
4. Sender线程处理response
无消息丢失配置
producer端
block.on.buffer.full=true
待缓冲区被填满时producer处于阻塞状态并停止接收新消息而不是抛出异常
该参数在0.9版本已经废弃,改用 max.block.ms
acks=all
retries=integer.MAX_VALUE
max.in.flight.requests.per.connection=1
使用带回调机制的send
callback逻辑中显式立即关闭producer
若不使用close(0),默认情况下producer会被允许将
未完成的消息发送出去,这样有可能造成消息乱序
broker端
unclean.leader.election.enable=false
不允许非ISR中的副本被选举为leader,从而
避免broker端日志水位截断而造成的消息丢失
replication.factor>=3
min.insync.replicas=1
确保replication.factor>min.insync.replicas
0 条评论
下一页
为你推荐
查看更多