Kafka核心技术与实战
2023-10-28 09:34:02 1 举报
AI智能生成
Kafka核心技术实战
作者其他创作
大纲/内容
关键配置参数
Broker 端参数
log.retention.{hour|minutes|ms}
控制一条消息数据被保存多长时间。
从优先级上来说 ms 设置最高、minutes 次之、hour 最低。
log.retention.bytes
指定 Broker 为消息保存的总磁盘容量大小。
值默认是 -1,表明你想在这台 Broker 上保存多少数据都可以(至少Broker不会进行限制)
message.max.bytes
控制 Broker 能够接收的最大消息大小。
默认的 1000012 太少了,还不到 1MB,实际可以开大一些
Topic 级别参数
retention.ms
该 Topic 消息被保存的时长。默认是 7 天,即该 Topic 只保存最近 7 天的消息。
一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。
retention.bytes
规定了要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。
当前默认值是 -1,表示可以无限使用磁盘空间。
消息分区机制
用途
提供负载均衡的能力,为了实现系统的高伸缩性(Scalability)
不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。
并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。
分发策略
轮询策略
Round-robin 策略,即顺序分配
Kafka Java 生产者 API 默认提供的分区策略
如果你未指定partitioner.class参数,那么你的生产者程序会按照轮询的方式在主题的所有分区间均匀地“码放”消息。
随机策略
Randomness 策略,随意地将消息放置到任意一个分区上
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
return ThreadLocalRandom.current().nextInt(partitions.size());
实际表现来看,它要逊于轮询策略,所以如果追求数据的均匀分布,还是使用轮询策略比较好。
Key策略
官网没提到,直接按照消息的key进行分发,相同的key能够被发往同一个partition
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
return Math.abs(key.hashCode()) % partitions.size();
在需要保证顺序,同时又需要多分区带来的伸缩性
自定义分区策略
显式地配置生产者端的参数partitioner.class
在编写生产者程序时,编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口
只定义了两个方法:partition()和close(),通常你只需要实现最重要的 partition 方法
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
这里的topic、key、keyBytes、value和valueBytes都属于消息数据,cluster则是集群信息(比如当前 Kafka 集群共有多少主题、多少 Broker 等)
同时设置partitioner.class参数为你自己实现类的 Full Qualified Name,那么生产者程序就会按照你的代码逻辑对消息进行分区
生产者压缩算法
时间去换空间的经典 trade-off 思想
希望以较小的 CPU 开销带来更少的磁盘占用或更少的网络 I/O 传输
Kafka 的消息层次都分为两层:消息集合(message set)以及消息(message)。
一个消息集合中包含若干条日志项(record item),而日志项才是真正封装消息的地方。
Kafka 底层的消息日志由一系列消息集合日志项组成。
Kafka 通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作。
Kafka的两个消息格式版本
V1
每条消息都需要执行 CRC 校验,但有些情况下消息的 CRC 值是会发生变化的
保存压缩消息的方法是把多条消息进行压缩然后保存到外层消息的消息体字段中
V2
CRC到消息集合校验
对整个消息集合进行压缩。显然后者应该比前者有更好的压缩效果
压缩
compression.type
示例:// 开启 GZIP 压缩
props.put("compression.type", "gzip");
props.put("compression.type", "gzip");
broker端的压缩
其实大部分情况下 Broker 从 Producer 端接收到消息后仅仅是原封不动地保存而不会对其进行任何修改,但这里的“大部分情况”也是要满足一定条件的。有两种例外情况就可能让 Broker 重新压缩消息。
Broker 端指定了和 Producer 端不同的压缩算法。
Broker 端发生了消息格式转换。
为了兼容老版本的消费者程序
一般情况下这种消息格式转换对性能是有很大影响的,除了这里的压缩之外,它还让 Kafka 丧失了引以为豪的 Zero Copy 特性。
不过,broker端做的校验实际上意义不大,京东提出一个bugfix去除了为做消息校验引入的解压缩
副本机制
一般副本机制的优势
提供数据冗余
即使系统部分组件失效,系统依然能够继续运转,因而增加了整体可用性以及数据持久性。
Kafka能提供
提供高伸缩性
支持横向扩展,能够通过增加机器的方式来提升读性能,进而提高读操作吞吐量。
Kafka不能提供
改善数据局部性
允许将数据放入与用户地理位置相近的地方,从而降低系统延时。
Kafka不能提供
Kafka副本机制的优势
不能够提供伸缩性和数据局部性
因为Kafka的Follower副本不对外提供服务
对于客户端用户而言,Kafka 的追随者副本没有任何作用,它既不能像 MySQL 那样帮助领导者副本“抗读”
也不能实现将某些副本放到离客户端近的地方来改善数据局部性。
副本定义
所在概念层级
主题 - 分区 - 副本
所谓副本(Replica),本质就是一个只能追加写消息的提交日志。
同一个分区下的所有副本保存有相同的消息序列,这些副本分散保存在不同的 Broker 上,从而能够对抗部分 Broker 宕机带来的数据不可用。
在实际生产环境中,每台 Broker 都可能保存有各个主题下不同分区的不同副本
副本角色
如何确保副本中所有的数据都是一致的呢?
对 Kafka 而言,当生产者发送消息到某个主题后,消息是如何同步到对应的所有副本中的呢?
采用基于领导者(Leader-based)的副本机制
一言以蔽之
工作原理
副本分类
领导者副本(Leader Replica)
追随者副本(Follower Replica)
每个分区在创建时都要选举一个副本,称为领导者副本,其余的副本自动称为追随者副本。
Kafka 的副本机制比其他分布式系统要更严格一些。在 Kafka 中,追随者副本是不对外提供服务的。这就是说,任何一个追随者副本都不能响应消费者和生产者的读写请求。所有的请求都必须由领导者副本来处理,或者说,所有的读写请求都必须发往领导者副本所在的 Broker,由该 Broker 负责处理。追随者副本不处理客户端请求,它唯一的任务就是从领导者副本异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。
当领导者副本挂掉了,或者说领导者副本所在的 Broker 宕机时,Kafka 依托于 ZooKeeper 提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。老 Leader 副本重启回来后,只能作为追随者副本加入到集群中。
设计思想
方便实现“Read-your-writes”
顾名思义就是,当你使用生产者 API 向 Kafka 成功写入消息后,马上使用消费者 API 去读取刚才生产的消息。
方便实现单调读(Monotonic Reads)
什么是单调读呢?就是对于一个消费者用户而言,在多次消费消息时,它不会看到某条消息一会儿存在一会儿不存在。
In-sync Replicas(ISR)
背景
追随者副本不提供服务,只是定期地异步拉取领导者副本中的数据而已。
既然是异步的,就存在着不可能与 Leader 实时同步的风险
在探讨如何正确应对这种风险之前,我们必须要精确地知道同步的含义是什么。或者说,Kafka 要明确地告诉我们,追随者副本到底在什么条件下才算与 Leader 同步。
leader副本与ISR副本集合
Leader 副本天然就在 ISR 中。也就是说,ISR 不只是追随者副本集合,它必然包括 Leader 副本。甚至在某些情况下,ISR 只有 Leader 这一个副本。
什么情况下算同步的副本
这张图中,哪个follower副本算是同步的呢?
取决于Broker 端参数 replica.lag.time.max.ms 参数值
Follower 副本能够落后 Leader 副本的最长时间间隔,当前默认值是 10 秒
只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。
如果超过则被踢出ISR集合,同步速度跟上来了,延时变小后,也会重新加入ISR
Unclean 领导者选举(Unclean Leader Election)
ISR 为空
说明leader副本也挂掉了
Kafka 把所有不在 ISR 中的存活副本都称为非同步副本。
如果选择这些副本作为新 Leader,就可能出现数据的丢失
Broker 端参数 unclean.leader.election.enable 控制是否允许 Unclean 领导者选举。
权衡
开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。
可用性(Availability)
反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。
一致性(Consistency)
建议
不要开启它,毕竟我们还可以通过其他的方式来提升高可用性。如果为了这点儿高可用性的改善,牺牲了数据一致性,那就非常不值当了。
请求处理过程
交互都是通过“请求 / 响应”的方式完成
Apache Kafka 自己定义了一组请求协议,用于实现各种各样的交互操作
常见的 PRODUCE 请求是用于生产消息的
FETCH 请求是用于消费消息的
METADATA 请求是用于请求 Kafka 集群元数据信息的
消费者组重平衡
重平衡触发条件
组成员数量发生变化。
最常见
每次消费者启动都会伴随重平衡
订阅主题数量发生变化。
订阅主题的分区数发生变化
心跳线程
重平衡过程是如何通知到其他消费者实例的?答案就是,靠消费者端的心跳线程(Heartbeat Thread)
Kafka Java 消费者需要定期地发送心跳请求(Heartbeat Request)到 Broker 端的协调者,以表明它还存活着
在 Kafka 0.10.1.0 版本之前,发送心跳请求是在消费者主线程完成的,也就是你写代码调用 KafkaConsumer.poll 方法的那个线程。
最大的问题在于,消息处理逻辑也是在这个线程中完成的
一旦消息处理消耗了过长的时间,心跳请求将无法及时发到协调者那里,导致协调者“错误地”认为该消费者已“死”
自 0.10.1.0 版本开始,社区引入了一个单独的心跳线程来专门执行心跳请求发送,避免了这个问题。
其实,重平衡的通知机制正是通过心跳线程来完成的。
当协调者决定开启新一轮重平衡后,它会将“REBALANCE_IN_PROGRESS”封装进心跳请求的响应中,发还给消费者实例。
当消费者实例发现心跳响应中包含了“REBALANCE_IN_PROGRESS”,就能立马知道重平衡又开始了,这就是重平衡的通知机制。
heartbeat.interval.ms
从字面上看,它就是设置了心跳的间隔时间,但这个参数的真正作用是控制重平衡通知的频率。
如果你想要消费者实例更迅速地得到通知,那么就可以给这个参数设置一个非常小的值,这样消费者就能更快地感知到重平衡已经开启了。
消费者组状态机
Kafka 设计了一套消费者组状态机(State Machine),来帮助协调者完成整个重平衡流程。
状态定义
Kafka 为消费者组定义了 5 种状态,它们分别是:Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable
状态流转
Kafka 定期自动删除过期位移的条件就是,组要处于 Empty 状态。因此,如果你的消费者组停掉了很长时间(超过 7 天),那么 Kafka 很可能就把该组的位移数据删除了。我相信,你在 Kafka 的日志中一定经常看到下面这个输出:
Removed ✘✘✘ expired offsets in ✘✘✘ milliseconds.
这就是 Kafka 在尝试定期删除过期位移。现在你知道了,只有 Empty 状态下的组,才会执行过期位移删除的操作。
Removed ✘✘✘ expired offsets in ✘✘✘ milliseconds.
这就是 Kafka 在尝试定期删除过期位移。现在你知道了,只有 Empty 状态下的组,才会执行过期位移删除的操作。
哈啰pro环境应该更快
0 条评论
下一页