Kafka知识点梳理
2023-12-26 18:58:14 27 举报
AI智能生成
Kafka是一个分布式流处理平台,由LinkedIn公司开发并开源。它主要用于构建实时数据管道和流应用。Kafka的核心概念包括Producer、Broker、Consumer和Topic。Producer负责生产消息,发送到Broker;Broker接收并存储消息,根据配置的分区规则将消息分发到不同的Topic;Consumer订阅特定的Topic,从Broker拉取消息进行消费。Kafka具有高吞吐量、可扩展性、持久性和容错性等特点,适用于大数据场景下的实时数据处理。
作者其他创作
大纲/内容
Controller(控制器)
概述
在 ZooKeeper 的帮助下管理和协调整个 Kafka 集群
选举
第一个成功创建 /controller 节点的 Broker 会被指定为控制器
作用
主题管理(创建、删除、增加分区)
分区重分配
Preferred 领导者选举
集群成员管理(新增 Broker、Broker 主动关闭、Broker 宕机)
故障转移(Failover)
各broker watch /controller临时节点,故障后重新选举
内部设计原理
0.11 版本后,把把多线程的方案改成了单线程加事件队列的方案
将之前同步操作 ZooKeeper 全部改为异步操作
问题
为什么kafka不支持读写分离
读写分离不能提高读性能
方便实现“Read-your-writes”
方便实现单调读(Monotonic Reads)
位移offset
__consumer_offsets
实现方式
使用 __consumer_offsets内部主题的方式来保存位移
消息格式
普通消息格式
Value格式:主要是offset的值,还包括时间戳等
Key的格式:<GroupID,主题名,分区号 >
保存Consumer Group 信息的消息
用于删除 Group 过期位移甚至是删除 Group 的消息
默认设置
如果位移主题是 Kafka 自动创建的,那么该主题的分区数是 50,副本数是 3
消息Compaction
提交位移
手动提交
直接提交最新一条消息的位移
commitSync()
Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果
commitAsync()
立即返回,不会阻塞,不会影响 TPS,但是有错误不会重试
精细化提交位移
commitSync(Map<TopicPartition, OffsetAndMetadata>
commitAsync(Map<TopicPartition, OffsetAndMetadata>
自动提交
参数设置
enable.auto.commit=true
auto.commit.interval.ms(默认5s)
auto.commit.interval.ms(默认5s)
提交位移时机
在开始调用 poll 方法时,提交上次 poll 返回的所有消息。
导致问题:它可能会出现重复消费
导致问题:它可能会出现重复消费
最佳实践
手动提交,组合异步提交和同步提交
CommitFailedException
发生时机
CommitFailedException 异常通常发生在手动提交位移时,
即用户显式调用 KafkaConsumer.commitSync() 方法时
即用户显式调用 KafkaConsumer.commitSync() 方法时
原因
同时出现了设置相同 group.id 值的消费者组程序和独立消费者程序
消息处理的总时间超过预设的 max.poll.interval.ms 参数值
解决方案
缩短单条消息处理的时间
增加 Consumer 端允许下游系统消费一批消息的最大时长,调大max.poll.interval.ms
减少下游系统一次性消费的消息总数,调小max.poll.records
下游系统使用多线程来加速消费
副本
作用
只有一个作用,就是提供冗余以实现高可用
Kafka的读写只能发生在leader副本上
ISR(In Sync Replicas)
判断标准 replica.lag.time.max.ms(默认10s)
Unclean 领导者选举
参数
Broker 端参数 unclean.leader.election.enable 控制是否允许 Unclean 领导者选举
选择
在这个问题上,Kafka 赋予你选择 C 或 A 的权利
建议
unclean.leader.election.enable=false,牺牲可用性换取一致性
监控
消费者组监控
lead
计算方式
=Consumer Offset -LogStartOffset
作用
lead越接近于0,那么就表示有可能要丢消息
lag
计算方式
=HW-Consumer Offset
作用
消费者当前落后于生产者的程度
Rebalance
触发时机
组成员数发生变更
新组成员加入或老组成员离开
组成员崩溃,或者网络异常被踢出组
订阅主题数发生变更
使用正则表达式来订阅topic
订阅主题的分区数发生变更
增加分数区
问题
在 Rebalance 过程中,TPS不高。所有 Consumer 实例都会停止消费,等待 Rebalance 完成
在Rebalance 时,效率不高。所有 Consumer 实例共同参与,全部重新分配所有分区
Rebalance过程太慢
Coordinator
订阅主题分区的分配由Coordinator负责
所有Broker 都有各自的 Coordinator 组件
Consumer Group 如何确定为它服务的
Coordinator 在哪台 Broker 上?
Coordinator 在哪台 Broker 上?
第 1 步:确定由位移主题的哪个分区来保存该 Group 数据:
partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。
partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。
第 2 步:找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator。
如何避免
可避免情况
Consumer 实例会被 Coordinator 错误地认为“已停止”从而被“踢出”Group
相关参数
session.timeout.ms
每个 Consumer 实例都会定期地向Coordinator 发送心跳请求,表明它还存活着
heartbeat.interval.ms
控制发送心跳请求频率
max.poll.interval.ms
它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。
它的默认值是 5 分钟
它的默认值是 5 分钟
方案
设置 session.timeout.ms = 6s
设置 heartbeat.interval.ms = 2s
要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求
设置 heartbeat.interval.ms = 2s
要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求
max.poll.interval.ms的值要大于消费者消费的最大时长
过程
重平衡过程是如何通知到其他消费者实例的?
靠消费者端的心跳线程(Heartbeat Thread)
消息可靠性
生产者
设置ack=all
使用producer.send(msg, callback) 或者使用同步方式发送消息
设置retries 次数
消费者
enable.auto.commiot=false,确保消息消费完成再提交
Broker
设置unclean.leader.election.enable = false 分区落后太多不允许竞选为leader
replication.factor >= 3
min.insync.replicas > 1
确保 replication.factor > min.insync.replicas
性能调优
调优目标
高吞吐
Broker端
调大 num.replica.fetchers( Follower 副本用多少个线程来拉取消息)
调优GC参数避免FGC
Producer端
适当调大batch.size的值
适当增大linger.ms的值
设置compression.type为lz4或者zstd
设置ack为0或者1
设置retries为0
多线程共享Producer实例,则适当调大buffer.memory
Consumer端
采用多线程消费
增大fetch.min.size值
低延迟
调优层次
操作系统层
禁止atime mount -o noatime
选择合适的文件系统 ext4 或 XFS
swap 空间设置得比较小 sudo sysctl vm.swappiness=N
加大文件句柄数 ulimit -n
调大 vm.max_map_count
预留较大的页缓存
JVM
将JVM 堆大小设置成 6~8GB
使用 G1 收集器
大对象 增加 JVM 启动参数 -XX:+G1HeapRegionSize=N
Broker
即尽力保持客户端版本和 Broker 端版本一致
应用层
不要频繁地创建 Producer 和 Consumer 对象实例
用完及时关闭
合理利用多线程来改善性能
0 条评论
下一页