《深入理解Kafka》读书笔记
2021-04-11 19:50:55 2 举报
AI智能生成
kafka
作者其他创作
大纲/内容
初识Kafka
术语
producer
consumer
broker
topic
partition
一个topic可以指定多个分区
replica
leader副本
写入和读取数据
follower副本
仅仅同步leader的数据
AR
assigned replicas
分区中所有的副本
ISR
in-sync replicas
leader副本和跟leader数据保持同步的follower副本集合
leader宕机,只有ISR中的副本可以参与选举
OSR
out-of-sync replicas
和leader副本滞后太多的副本集合
offset
LEO
log end offset
日志文件中下一条待写入消息的offset
HW
high watermark(高水位)
ISR中LEO最小的就是HW
消费者只能拉取HW之前的消息
服务端参数
server.properties
zookeeper.connect
listeners
客户端连接broker的地址列表
broker.id
指定kafka集群中broker的唯一id
log.dir
日志路径,也就是消息存放的路径,默认/tmp/kafka-logs
message.max.bytes
指定消息的最大值,默认1000012,约977KB
生产者
客户端开发
步骤
配置生产者客户端参数,创建生产者实例
构建消息
消息包含主题(必须),分区,key、value(必须),消息头和时间戳
发送消息
关闭生产者实例
KafkaProducer
线程安全的,可以在多个线程中共享单个KafkaProducer实例
生产者拦截器
实现ProducerInterceptor接口
onSend方法
消息序列化和分区之前对消息进行拦截操作
onAcknowledge方法
消息应答之前或者失败,优先于自定义callback方法之前执行
序列化器
把消息转成字节数组才能发送到kafka上
分区器
消息没有指定分区,那么通过分区器计算其目标分区
默认DefaultPartioner
如果key为null,则轮询分配到所有分区。如果key不是null,则哈希计算得出分区,相同key指向同一个分区
自定义分区器
实现Partitioner接口
参数
buffer.memory
客户端中用于缓存消息的缓冲区大小,默认33554432(32MB)
max.reuest.size
客户端能发送消息的最大值,默认1048576B(1MB)
小于broker端的配置message.max.bytes
batch.size
批量发送消息大小,默认16384(16KB)
linger.ms
指定批量发送消息等待时间,默认0
acks
指定分区中有多少副本收到消息,才认为消息发送成功
取值
"0"
消息发送后无需等待服务端响应。会造成消息丢失,吞吐量最高
"1"
默认,只要leader副本写入成功就能收到服务端响应
"-1"或"all"
ISR中所有副本都写入成功才能够够收到服务端的成功响应
min.insync.replicas
最小副本写入成功数量
max.block.ms
发送消息的最大阻塞时间,默认60000
一般缓冲区已满或者没有找到元数据会阻塞
request.timeout.ms
指定等待服务端响应超时时间,默认3000
retries
重试次数,默认为0
retry.backoff.ms
重试间隔,默认100
compression.type
消息压缩方式,默认为“none”
connections.max.idle.ms
默认540000,即9分钟
receive.buffer.bytes
send.buffer.bytes
bootstrap.servers
配置broker的地址,多个用逗号隔开。
无需配置所有的broker,因为会从给定的broker中查找其他borker信息
无需配置所有的broker,因为会从给定的broker中查找其他borker信息
key.serializer
value.serializer
enble.idempotence
是否开启幂等性,默认false
interceptor.classes
配置自定义的拦截器
partitioner.classes
配置自定义的分区器
metadata.max.age.ms
指定时间没有更新元数据则强制更新,默认300000(5分钟)
transactional.id
事务id,必须唯一,默认null
client.id
默认为""
原理
整体架构
两个线程
主线程
创建消息,通过可能的拦截器,序列化器和分区器后,缓存到客户端的消息收集器中
负责读取元数据信息
sender线程
负责从消息收集器中获取消息发送到broker
负责更新元数据
消息收集器
可以缓存消息,批量发送,减少网络传输的消耗,提升性能
元数据更新
元数据就是包含topic的所有分区,leader副本所在broker的地址,端口等信息
消费者
消费者和消费者组
消费者
订阅主题
主题的分区会被分配给消费者,如果分区数小于消费者数,
有的消费者就无法分配到分区而不会消费消息
有的消费者就无法分配到分区而不会消费消息
消费者组
每个消费者都有一个对应的消费者组
投递模式
一对一
广播
多个消费者组订阅主题
客户端开发
步骤
配置消费者客户端参数以及创建消费者实例
订阅主题
拉取(poll())消息并消费
提交消费位移
旧版本消费位移保存在zookeeper,新的则保存在kafka内部主题__consumer__offsets
提交消费位移是下一次消费位移,即当前位移+1
关闭消费者实例
参数
bootstrap.servers
group.id
消费者所在的消费者组名称
client.id
key.deserializer
value.deserializer
fetch.min.bytes
消费者一次拉取最小数据量,如果数据小于该值,则需要等待,直到满足条件。
湿度调大可以提高一定吞吐量,但造成一定延迟,默认1(B)
湿度调大可以提高一定吞吐量,但造成一定延迟,默认1(B)
fetch.max.wait.ms
指定时间没有达到fetch.min.bytes的要求,则拉取到消息,默认500
fetch.max.bytes
消费者一次拉取最大的数据量,默认5242880(B),即50MB
max.partition.fetch.bytes
每个分区里返回给消费者的最大数据量,默认1048576(1MB)
max.poll.records
消费者一次拉取请求最大条数,默认500
connections.max.idle.ms
指定多久之后关闭限制的连接,默认540000
request.timeout.ms
消费者等待请求响应的超时时间,默认30000
metadata.max.age.ms
配置元数据的过期时间,默认300000.如果再限定时间没有更新,则强制更新
reconnect.backoff.ms
尝试重连指定主机之前的等待时间,默认50
retry.backoff.ms
配置尝试重新发送失败的请求到指定的主题分区之前的等待时间,默认100
isolation.level
消费者事务隔离级别
取值
read_uncommitted
默认值,可以消费到HW的位置
read_committed
只能消费到LSO位置
heartbeat.interval.ms
心跳到消费者协调器之间的预计时间,默认3000
session.timeout.ms
组管理协议中用来检测消费者是否失效的超时时间,默认10000
max.poll.interval.ms
当通过消费者组管理消费者时,配置指定拉取消息线程最长空闲时间,默认300000.
超过这个时间没有发起poll,则消费者组认为消费者离开消费者组,会触发重平衡
超过这个时间没有发起poll,则消费者组认为消费者离开消费者组,会触发重平衡
auto.offset.reset
取值
earliest
latest
默认
none
enable.auto.commit
是否自动提交消费位移,默认true
auto.commit.interval.ms
enable.auto.commit为ture时候生效,表示自动提交消费位移的时间间隔,默认5000
interceptor.class
消费者拦截器
消费者再平衡
定义
分区的所属权从一个消费者转移到另一个消费者的行为
好处
为消费组的高可用和伸缩性提供保障
缺点
- 期间消费者无法读取消息
- 当一个分区发生消费者转移,之前消费者状态丢失。导致重复消费
解决
再均衡监听器ConsumerRebalanceListener
实现接口,可以在再均衡之前和之后调用,可以提交位移,防止重复消费
消费者拦截器
实现Consumerlnterceptor接口
可以在消息消费前,消息提交后做一些操作
多线程实现
KafkaConsumer是非线程安全的
如何多线程?
方式一
每个线程实例化一个KafkaConsumer
受限于分区个数
方式二
单线程拉取消息,多线程处理消息
对消息的顺序处理比较困难
主题与分区
主题管理
创建主题
查看主题
修改主题
修改分区个数
目前只支持增加不支持减少,减少会抛出异常
指定key的消息,修改分区数,会影响到消息的顺序
删除主题
设置delete.topic.enable为true(默认)
内部主题无法删除
配置管理
kafka-configs.sh
在运行状态修改配置
KafkaAdminClient
分区管理
优先副本的选举
优先副本:AR列表中的第一个副本就是优先副本
目的
优先副本选举为leader副本,促进集群负载均衡
分区重分配
集群扩容,broker节点失效场景下,让分区副本再次进行合理分配
kafka-reassign-partitions.sh
复制限流
修改副本因子
可以增加和减少副本因子数
kafka-reassign-partitions.sh
如何选择合适分区数
性能测试工具
测试生产者
kafka-producer-perf-test.sh
测试消费者
kafka-consumer-perf-test.sh
分区数越多吞吐量就越高吗
随着分区数增加而增加,达到一定阈值则开始下降
分区上限
超过一定值会造成kafka崩溃
文件描述符的限制
查看当前应用占用文件描述符
ls /proc/进程号/fd I we -l
查看文件描述符限制
ulimit -Sn
ulimit -Hn
调大文件描述符限制
ulimit -n 数量
经验
日志存储
文件目录布局
日志命名
日志中第一条消息的offset作为baseOffset的20位数字命名,不够左边补0
默认每个文件达到1GB分段,可以通过log.segment.bytes配置
索引命名
同日志命名,后缀为.timeindex和.index
日志索引
偏移量索引
offset到物理地址的映射关系
相对偏移量
消息相对于baseOffset的偏移量,占用4个字节
物理地址
消息在日志分段文件中的物理地址,占用4个字节
时间戳索引
时间戳到offset的映射关系
日志清理
清理方式
日志删除
默认,通过log.cleanup.policy设置为delete
日志删除任务周期性检测和删除不符合保留条件的日志分段文件
周期设置
log.retention.check.interval.ms,默认300000
保留策略
基于时间
参数
log.retention.hours
默认配置,默认值168,即保留7天
log.retention.minutes
log.retention.ms
基于日志大小
log.retention.byte
默认值-1,表示无穷大
基于日志起始偏移量
日志压缩
通过log.cleanup.policy设置为compact
有相同key的不同value值, 只保留最后一个版本
磁盘存储
页缓存
顺序I/O
消息日志采用追加写入,属于顺序写,所以很快
零拷贝
传统模式下是先把文件读取到内核缓冲区,然后拷贝到用户缓冲区,然后拷贝到内核socket缓冲区
零拷贝无需从内核拷贝到用户缓冲区,少了内核态和用户态的切换
深入服务端
协议设计
时间轮
延时操作
控制器
什么是控制器
kafka集群中有一个或多个broker,其中一个会被选举为控制器
控制器选举
依赖zookeeper,第一个创建/controller的临时节点(内容为brokerId,版本和时间戳)成功的broker即为控制器。
其它broker会将控制器的brokerId保存到内存中
其它broker会将控制器的brokerId保存到内存中
zookeeper还有一个/controller_epoch的持久节点,用来记录当前的epoch,没变更一次控制器,则+1
职责
监听分区相关变化
负责leader的选举
监听主题相关变化
监听broker相关变化
从zk中读取获取当前所有主题,分区,以及broker相关的信息并进行相应管理
启动并管理分区状态机和副本状态机
更新集群元数据信息
....
深入客户端
分区分配策略
RangeAssignor
RoundRobinAssignor
StickyAssignor
自定义分区分配策略
消费者协调器和组协调器
GroupCoordinator
是kafka服务端中用于管理消费者组的组件
ConsumerCoordinator
是消费者客户端中的组件,和组协调器进行交互
主要作用
负责执行消费者再平衡的操作,包括期间的分区分配工作
消费者再平衡
触发条件
新的消费者加入消费者组
消费者宕机下线,不一定是真下线,可能是长时间GC,网络延迟等导致消费者长时间没有向组协调器发送心跳
消费者主动退出消费者组
消费者组对应的组协调器节点发送变更
消费者组内订阅的主题或者分区数发生变更
阶段
第一阶段
消费者找到所属消费组对应的组协调器所在broker,并与其建立网络通信
第二阶段
找到组协调器后就进入加入消费组的阶段
组协调器从消费组中选举出leader
消费组中消费者投票选举出分区分配策略
第三阶段
leader消费者从第二阶段选举出的分区分配策略进行分区分配,将方案通过组协调器同步给各个消费者
第四阶段
消费者向组协调器发送心跳,异常则会被组协调器认为死亡,触发再平衡
_consumer_offsets
内部主题,保存消费者提交的位移
默认副本因子为3,分区数为50
消费位移对应的内容格式
消息传输保障
传输保障层级
at most once
至多一次,消息可能会丢失,但不会重复传输
at least once
至少一次,消息不会丢失,但可能会重复传输
exactly once
恰好一次,每条消息肯定且只会被传输一次
kafka生产者重试即at least once
kafka消费者提交位移但是还没消费完成宕机,重新上线后则会从下一条消息消费,则丢失了消息,即at most once
kafka消费者已经完成消费,还没提交位移则宕机,重新上线则会继续消费,导致重复消费,即at least once
引入幂等和事务特性后,实现ESO(exactly once semantics,精准一次处理语义)
幂等
默认关闭,设置enable.idempotence(生产者配置)为true开启幂等
相关参数
retries必须大于0
acks必须为“-1”或者“all”
max.in.flignt.request.per.connection不能大于5,默认为5
实现原理
每个生产者都会被分配一个PID,维护一个<PID,分区>和序列号(从0开始,每发送一次消息+1)映射
broker也会维护<PID,分区>和序列号的映射
broker收到消息会拿生产者的SN_new和自己的SN_old对比
SN_new = SN_old + 1,则接收消息
SN_new < SN_old + 1,说明重复消息,直接丢弃
SN_new > SN_old + 1,说明有消息未写入,出现乱序,可能有消息丢失,抛出OutOfOrderSequenceException
引入序列号实现幂等只针每一对<PID,分区>而言,也就是说如果重发到其它分区就不会保证幂等了
事务
可以跨分区,保证消息发送,消息消费,提交消费位移当原子操作,要么都成功要么都失败
可靠性探究
副本剖析
失效副本
功能失效(比如下线宕机)
同步失效
判定
broker参数relica.lag.time.max.ms,默认10000。
当ISR中的副本滞后leader副本的时间超多指定的时间则被判定同步失效,会剔除ISR,进入OSR
ISR的伸缩
ISR两个相关定时任务
isr-expiration
周期检测每个分区是否需要缩减ISR集合,检测到有失效副本,则收缩ISR,会将变更后的数据写入到zookeeper。
还会将变更后的记录缓存到isrChangeSet
还会将变更后的记录缓存到isrChangeSet
周期为relica.lag.time.max.ms一半,默认5s
isr-change-propagation
周期性检测(固定2500ms)isrChangeSet,如果有ISR变更记录,在zookeeper创建持久节点,并把isrChangeSet信息保存到节点上。kafka控制器会在节点上添加watcher,当节点变化则watcher通知控制器更新相关元数据,并向他管理的broker发送更新元数据请求,最后删除处理过的节点
当OSR中副本LEO不小于HW则会重新进入到ISR中,扩充后的ISR也会更新zookeeper上节点和isrChangeSet,之后通ISR收缩
为什么不支持读写分离
数据一致性问题
延时问题
kafka的优秀的架构设计,读写分离带来收益不大
日志同步机制
可靠性分析
建立合适个数的副本
生产者acks配置为“all”
生产者重试,retries>0
broker参数
log.flush.interval.messages
log.flush.interval.ms
消费端手动提交消费位移
回溯消费,可以对漏掉的消息进行回补
0 条评论
下一页