Kafka
2020-09-12 22:38:39 0 举报
AI智能生成
Kafka基础知识体系,希望觉得有用的coder点个赞
作者其他创作
大纲/内容
基础知识
三大角色
消息系统
系统解耦、异步通信、削峰填谷;kafka还提供了其他消息中间件难以实现的 消息顺序性保障 及回溯消费;
存储系统
kafka可以把消息持久化到磁盘;kafka的消息持久化功能和多副本机制,可以使它作为长期的数据存储系统来使用;
流式处理平台
kafka为各个流式处理框架提供了 可靠的数据来源 和 流式处理类库(窗口、连接、聚合等操作);
基本术语
Producer
生产者,负责创建消息,将消息投递到kafka;
Consumer
消费者,连接到kafka上并接收消息,进而进行相应的业务逻辑处理;
Broker
服务节点(服务进程),负责接收和处理客户端发送过来的请求,以及对消息进行持久化;
持久化数据
使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Append-Only)消息的物理文件;
因为只能追加写,所以是写操作是顺序IO(性能好),避免了随机IO(性能差);
(kafka实现高吞吐特性的一个手段)
(kafka实现高吞吐特性的一个手段)
消息日志(Log)被分成多个日志段(Log segment),消息被写到最新的日志段中;
通过定时任务检测旧的日志段是否能删除;实现回收磁盘空间;
通过定时任务检测旧的日志段是否能删除;实现回收磁盘空间;
Topic
发布订阅的对象,kafka中的消息以Topic为单位进行归类;Topic是一个逻辑概念;
Partition
分区,将每个主题划分成多个分区,每个分区是一组有序的消息日志(可看作可追加的日志文件);
生产者生成的消息,只会发给主题的一个分区;(例如,双分区的主题,消息要么在分区0中,要么在分区1中)
offset
消息位移(偏移量),offset是消息在Partition中的唯一标识,kafka通过它来保证消息在Partition内的顺序性;
offset并不跨越Partition;即kafka保证的是分区有序,不是主题有序;
分区消息写入
offset从0开始,依次递增;
Replica
副本基础概念
副本,kafka为 Partition引入了多副本的机制,同一Partition的不同副本保存的是相同的消息;(副本与日志文件一一对应)
主题、分区、副本和LOG(日志)的关系图
副本之间是一主多从(leader-followers)是关系;leader副本负责处理读写请求,follower副本只负责与leader副本的消息同步;
(副本的同步是异步消息拉取)
(副本的同步是异步消息拉取)
为什么要设计follower副本不处理请求
方便实现 “read-your-writes”
用生产者API向kafka写入消息后,马上使用消费者API去读取刚才产生的消息;
如果允许follower副本对外提供服务,由于消息同步的异步的;就可能出现follower副本并未拉取到消息;
因此客户端可能看不到最新写入的消息;
方便实现“单调读”
如果允许follower副本对外提供服务,假设有两个follower F1和F2;
F1拉取到了最新消息,F2没有拉取到最新消息,此时如果有一个消费者先从F1消费,又从F2消费;
就会出现,第一次消费看到的消息,在第二次消费时不见了;这就不是单调一致性读;
这些副本分散在不同的Broker中,当leader副本出现故障时,从follower副本中重新选举(基于zookeeper提高的监控功能)
新的leader副本;(实现故障自动转移)
新的leader副本;(实现故障自动转移)
副本工作机制
当前kafka集群中有4个Broker;
某个Topic有3个Partition(P1、P2、P3);
每个Partition有3个副本(1个leader和2个follower);
kafka的分区可以分布在不同的 Broker上,即一个Topic可以横跨多个Broker;
每条消息会根据分区规则选择存储到哪个具体的分区,如果分区规划合理,消息会均匀的分布到各个分区;
分布式中副本机制的好处
提供数据冗余:(kafka中只能提供这种)
即使部分Broker故障,系统仍然能继续运转,增加了整体可用性及数据持久性;
即使部分Broker故障,系统仍然能继续运转,增加了整体可用性及数据持久性;
提供高伸缩性:
支持横向扩展,能够通过增加机器的方式来提升读性能,进而提高吞吐量;
支持横向扩展,能够通过增加机器的方式来提升读性能,进而提高吞吐量;
改善数据局部性:
可实现将数据放入与用户地理位置相近的地方,降低系统延时;
可实现将数据放入与用户地理位置相近的地方,降低系统延时;
AR、ISR、OSR
AR(Assigned Replicas)
分区中所有副本统称为 AR;
ISR(In-Sync Replicas)
所有与leader副本保持一定程度的同步的副本(包括leader)组成ISR,ISR是AR的一个子集;
一定程度的同步
replica.lag.time.max.ms(默认10秒):follower副本能落后leader 副本的最长时间间隔;
只要follower副本落后leader副本的时间不连续超过10秒,就认为是同步的(即使消息数相差很多);
因为消息会先发送到leader副本,然后follower副本拉取消息进行同步,这期间follower副本会有一定程度滞后;
OSR(Out-of-Sync Replicas)
与leader副本同步滞后太多的副本(不包括leader)组成OSR;(如果后面追上leader进度的话,会被重新加回ISR)
正常情况下,所有的follower副本都应该与leader副本保持一定程度的同步,即AR=ISR,OSR为空;
AR = ISR + OSR;
HW、LEO
HW
HW是 High Watermark的缩写,也叫高水位,标识了一个特定的消息偏移量(offset);
已提交消息
消息位移保存起来(持久化)的消息;
分区高水位以下的消息被认为是已提交消息;
消费者只能消费已提交消息,即图中位移小于8的所有消息;
LEO
LEO是 Log End Offset 的缩写,标识日志末端位移,也标识副本写入下一条消息的位移;
数字15的方框是虚线,说明当前副本只有15条消息,位移值是0到14,下一条新消息的位移是15;
同一个副本对象,HW值不会大于LEO值;
一个分区中,ISR中最小的LEO为分区的HW;(也是leader副本的HW,由leader副本去比较各个副本的LEO之后更新自己的HW)
三层消息架构
主题层:
每个主题可以配置M个分区,每个分区可以配置N个副本;
每个主题可以配置M个分区,每个分区可以配置N个副本;
分区层:
每个分区的N个副本,可以分布在不同的Broker上;只能有一个充当leader,对外提供服务;
其他的 N-1 个都是follower,只是提供数据冗余;
每个分区的N个副本,可以分布在不同的Broker上;只能有一个充当leader,对外提供服务;
其他的 N-1 个都是follower,只是提供数据冗余;
消息层:
分区中包含若干条消息,每条消息的位移从0开始,依次递增;
分区中包含若干条消息,每条消息的位移从0开始,依次递增;
示意图
基本操作
环境搭建及主题管理
消息
消息格式
消息集合(message set):
等价于消息批次(RecordBatch),在外层,里面包含若干条消息;Producer以recordbatch为单位发送消息;
等价于消息批次(RecordBatch),在外层,里面包含若干条消息;Producer以recordbatch为单位发送消息;
消息(record):
等价于日志项,在里层;
等价于日志项,在里层;
消息压缩与解压缩
消息压缩
生产者端
代码:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 开启 GZIP 压缩
props.put("compression.type", "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 开启 GZIP 压缩
props.put("compression.type", "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);
这个Producer的压缩算法使用的是GZIP;
压缩算法存储:
kafka会将启用了哪种压缩算法封装进消息集合中;
kafka会将启用了哪种压缩算法封装进消息集合中;
生产的每个消息集合都是经过GZIP压缩过的,
能够很好的节省网络传输带宽及Broker端的磁盘占用;
能够很好的节省网络传输带宽及Broker端的磁盘占用;
Broker端
正常情况下,Broker从Producer端接收到消息后不会再做压缩处理;
Broker重新压缩的两种情况
Broker端指定了和Producer端不同的压缩算法;
Broker 端也有一个参数叫 compression.type,这个参数的默认值是producer,
默认使用Producer端的压缩算法;
默认使用Producer端的压缩算法;
但是如果在Broker端设置了不同的compression.type 值,就会发生Broker端重新压缩;
也会导致Broker端CPU使用率飙升;
也会导致Broker端CPU使用率飙升;
Broker端发生了消息格式转换;
为了兼容老版本的消息格式,Broker端会对新版本消息执行向老版本格式的转换;
这个过程会涉及到消息的解压缩和重新压缩;这种转换对性能的影响很大;(尽量避免)
消息解压缩
消费者端
当消息到达Consumer端后,由Consumer自行解压缩还原成之前的消息;
正常情况为:
Producer端压缩,Broker端保持,Consumer端解压缩;
Producer端压缩,Broker端保持,Consumer端解压缩;
Broker端
跟上面消息格式转换时发生的解压缩不是同一场景;
每个压缩过的消息集合在Broker端写入时,都要发生解压缩,目的是为了对消息进行逐条校验(CRC);
这种解压缩对Broker端性能有一定影响;(kafka可能后续会规避这一操作)
这种解压缩对Broker端性能有一定影响;(kafka可能后续会规避这一操作)
压缩算法
分类
GZIP
Snappy
LZ4
Zstandard(2.1.0版本后)
评价指标
压缩比:
原先占100份空间的,压缩后占20份,压缩比就是5;
原先占100份空间的,压缩后占20份,压缩比就是5;
压缩/解压缩吞吐量:
每秒能够压缩或解压缩多少MB的数据;
每秒能够压缩或解压缩多少MB的数据;
kafka性能结果
压缩比:
zstd > LZ4 > GZIP > Snappy
zstd > LZ4 > GZIP > Snappy
吞吐量:
LZ4 > Snappy > zstd 和 GZIP
LZ4 > Snappy > zstd 和 GZIP
最佳实践
CPU资源充足:
Producer端完成的压缩,需要Producer运行机器上的CPU资源很充足(因为压缩需要消耗CPU);
Producer端完成的压缩,需要Producer运行机器上的CPU资源很充足(因为压缩需要消耗CPU);
带宽资源有限:
消息压缩后,能够很好的节省网络传输带宽(很多生产环境都可能遭遇带宽被打满的情况);
消息压缩后,能够很好的节省网络传输带宽(很多生产环境都可能遭遇带宽被打满的情况);
消息发送
生产者
作用:
生产者(Producer)负责向Kafka发送消息;
生产者(Producer)负责向Kafka发送消息;
实现:
KafkaProducer
KafkaProducer
生产者拦截器
作用:
既可以用来在消息发送前做一些准备工作(如按照某个规则过滤不符合条件的消息、修改消息内容等),
又可以用来在发送回调逻辑前做一些定制化的需求(如统计类工作);
既可以用来在消息发送前做一些准备工作(如按照某个规则过滤不符合条件的消息、修改消息内容等),
又可以用来在发送回调逻辑前做一些定制化的需求(如统计类工作);
生产者拦截器接口:
org.apache.kafka.clients.producer. ProducerInterceptor;
org.apache.kafka.clients.producer. ProducerInterceptor;
public interface ProducerInterceptor<K, V> extends Configurable {
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
public void close();
}
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
public void close();
}
KafkaProducer在将消息序列化和计算分区之前,会调用onSend()方法来对消息进行相应的定制化操作;
即在发送之前;
即在发送之前;
KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用onAcknowledgement() 方法;
这个方法先于用户设定的 Callback()方法执行;
onAcknowledgement() 方法运行在Producer的IO线程中,如果逻辑复杂可能会影响消息发送速度;
这个方法先于用户设定的 Callback()方法执行;
onAcknowledgement() 方法运行在Producer的IO线程中,如果逻辑复杂可能会影响消息发送速度;
close() 方法主要用于在关闭拦截器时执行一些资源的清理工作;
自定义拦截器
public class ProducerInterceptorPrefix implements ProducerInterceptor<String,String>{
private volatile long sendSuccess = 0;
private volatile long sendFailure = 0;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
String modifiedValue = "prefix1-" + record.value();
return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), modifiedValue,
record.headers());
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
sendSuccess++;
} else {
sendFailure ++;
}
}
@Override
public void close() {
double successRatio = (double)sendSuccess / (sendFailure + sendSuccess);
System.out.println("[INFO] 发送成功率=" + String.format("%f", successRatio * 100) + "%");
}
}
private volatile long sendSuccess = 0;
private volatile long sendFailure = 0;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
String modifiedValue = "prefix1-" + record.value();
return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), modifiedValue,
record.headers());
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
sendSuccess++;
} else {
sendFailure ++;
}
}
@Override
public void close() {
double successRatio = (double)sendSuccess / (sendFailure + sendSuccess);
System.out.println("[INFO] 发送成功率=" + String.format("%f", successRatio * 100) + "%");
}
}
通过 onSend() 方法来为每条消息添加一个前缀“prefix1-”;
通过 onAcknowledgement() 方法来计算发送消息的成功率;
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName());
序列化器
作用:
生产者需要用序列化器把对象转成字节数组才能通过网络发送给kafka;
消费者也需要用反序列化器把从kafka中收到的字节数组转成相应的对象;
生产者需要用序列化器把对象转成字节数组才能通过网络发送给kafka;
消费者也需要用反序列化器把从kafka中收到的字节数组转成相应的对象;
序列化接口:
org.apache.kafka.common.serialization.Serializer;
org.apache.kafka.common.serialization.Serializer;
public interface Serializer<T> extends Closeable {
void configure(Map<String, ?> configs, boolean isKey);
byte[] serialize(String topic, T data);
default byte[] serialize(String topic, Headers headers, T data) {
return serialize(topic, data);
}
@Override
void close();
}
void configure(Map<String, ?> configs, boolean isKey);
byte[] serialize(String topic, T data);
default byte[] serialize(String topic, Headers headers, T data) {
return serialize(topic, data);
}
@Override
void close();
}
configure() 方法,这个方法是在创建 KafkaProducer 实例的时候调用的,主要用来确定编码类型;
一般情况下,客户端都不会对key.serializer.encoding、value.serializer. encoding 和 serializer.encoding
这几个参数进行配置,所以encoding的值为默认的“UTF-8”;
一般情况下,客户端都不会对key.serializer.encoding、value.serializer. encoding 和 serializer.encoding
这几个参数进行配置,所以encoding的值为默认的“UTF-8”;
serialize() 方法是将 String 类型转为 byte[] 类型;
kafka提供了String、ByteArray、ByteBuffer、Bytes、Double、Integer、Long等多种类型的序列化器供使用;
如果kafka客户端提供的序列化器无法满足要求,推荐使用Avro、JSON、Thrift、ProtoBuf 和 Protostuff 等通用的
序列化工具来实现序列化;
也可以自定义序列化器来实现;
序列化工具来实现序列化;
也可以自定义序列化器来实现;
分区器
作用
为消息分配分区;
提供负载均衡能力,可以实现系统的高伸缩性(Scalability),可以通过添加新的节点来增加系统吞吐量;
不同的分区可以被放到不同节点的机器上,数据的读写操作的粒度也是分区,这样每个节点的机器都能
独立执行各自分区的读写请求;
独立执行各自分区的读写请求;
消息在经过序列化之后,就需要确定它发往的分区,如果 ProducerRecord 中指定了 partition字段,
就不需要分区器的作用,因为partition就是要发往的分区编号;
就不需要分区器的作用,因为partition就是要发往的分区编号;
分区器接口:
org.apache.kafka.clients.producer.Partitioner;
org.apache.kafka.clients.producer.Partitioner;
public interface Partitioner extends Configurable, Closeable {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
void close();
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
void close();
}
partition() 方法用来计算分区号,返回值为 int 类型;
partition()方法中的参数分别表示:主题、键、序列化后的键、值、序列化后的值、集群的元数据信息;
分区策略
轮询策略
Round-robin 策略,即顺序分配;
示意图
假设一个主题下有3个分区;
第一条消息被发送到分区0,第二条被发送到分区1,第三条被发送到分区2;
当生产第四条消息的时候,又会被发送到分区0,以此类推;
随机策略
Randomness 策略,即将消息随机地任意放置到一个分区;
示意图
本质上看随机策略也是力求将数据均匀分散到各个分区;
但从实际表现来看,要逊于轮询策略;
实现:
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
按消息键保序策略
kafka允许为每条消息定义消息键,简称Key;
Key的作用很大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号、业务ID等;
也可以用来表征消息元数据,比如时间戳;
也可以用来表征消息元数据,比如时间戳;
示意图
如果消息被定义了Key,就能保证同一个Key的所有消息都进入到相同的分区里面,
由于每个分区下的消息处理都是有顺序的,所以这个策略被称为按消息键保序策略;
由于每个分区下的消息处理都是有顺序的,所以这个策略被称为按消息键保序策略;
实现:
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
其他分区策略
比如跨地理位置的分区策略(针对大规模的kafka集群,跨城市、跨国家等的集群)
实现:
// 根据broker所在的IP地址实现定制化分区策略
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();
// 根据broker所在的IP地址实现定制化分区策略
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();
kafka默认分区器
// org.apache.kafka.clients.producer.internals.DefaultPartitioner
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = this.nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return ((PartitionInfo)availablePartitions.get(part)).partition();
} else {
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = this.nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return ((PartitionInfo)availablePartitions.get(part)).partition();
} else {
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
如果指定了Key,默认使用按消息键保序策略;计算得到的分区号会是所有分区中任意一个;
如果没有指定Key,默认使用轮询策略;计算得到的分区号仅为可用分区中的任意一个;
自定义分区器
public class DemoPartitioner implements Partitioner {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (null == keyBytes) {
return counter.getAndIncrement() % numPartitions;
} else
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
@Override public void close() {}
@Override public void configure(Map<String, ?> configs) {}
}
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (null == keyBytes) {
return counter.getAndIncrement() % numPartitions;
} else
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
@Override public void close() {}
@Override public void configure(Map<String, ?> configs) {}
}
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DemoPartitioner.class.getName());
消息消费
消费者
定义:
消费者是一个实际的应用实例,可以是一个线程,也可以是一个进程;
消费者是一个实际的应用实例,可以是一个线程,也可以是一个进程;
作用:
消费者(Consumer)负责订阅kafka中的主题(Topic),并且从订阅的主题上拉取消息;
消费者(Consumer)负责订阅kafka中的主题(Topic),并且从订阅的主题上拉取消息;
实现:
KafkaConsumer
KafkaConsumer
KafkaConsumer是非线程安全的(单线程架构),其中定义了一个acquire()方法,用来检测当前是否只有一个线程在操作;
如果有其他线程操作会抛出ConcurrentModifcationException 异常;
如果有其他线程操作会抛出ConcurrentModifcationException 异常;
private void acquire() {
long threadId = Thread.currentThread().getId();
if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
refcount.incrementAndGet();
}
long threadId = Thread.currentThread().getId();
if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
refcount.incrementAndGet();
}
KafkaConsumer中的每个公共方法在执行所要执行的动作之前都会调用这个acquire()方法(wakeup()方法除外);
消息投递模式
点对点(P2P)
如果所有的消费者都属于同一个消费组,那么所有的消息都会被均衡地投递给每一个消费者;
即每条消息只会被一个消费者处理;
发布/订阅(Pub/Sub)
如果所有的消费者不属于不同的消费组,那么所有的消息都会被广播给所有的消费者;
即每条消息会被所有的消费者处理;
消费组
定义
消费组(Consumer Group)是一个逻辑上的概念,它将消费者归为一类,每一个消费者只属于一个消费组;
Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制;
Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制;
每个消费组都会有一个固定的名称,消费者在进行消费前需要指定所属消费组的名称;参数group.id,默认为空字符串;
每个消费者都有一个对应的消费组,当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者;
示意图
主题有4个分区,P0、P1、P2、P3;
有两个消费组A和B,都订阅了这个主题;
消费组A中有4个消费者(C0、C1、C2、C3),消费组B中两个消费者(C4,C5);
按kafka默认规则,消费组A中的每个消费者分配到1个分区,消费组B中的每个消费组分配到2个分区;
每个消费者只能消费到所分配到的分区中的消息;即每一个分区只能被“一个消费组中的一个消费者”所消费;
作用
消费组的模型可以让整体的消费能力具备横向伸缩性
分区分配的演变
一个消费者C0
加入消费者C1,需要将原来C0的部分分区分配给C1
所以可以通过增加(减少)消费者的个数来提高(降低)整体的消费能力;
对于分区数固定的情况,如果消费者数量超出分区数就没有意义了;
消费组下实例个数配置
理想情况下,一个Group中Consumer实例的数量应该等于该Group订阅主题的分区总数;
Consumer订阅了3个主题A、B、C;
分区数分别为1、2、3;
则该为这个Group设置6个Consumer实例较为合适;(可以小于,不能大于)
Rebalance(重平衡)
定义:
本质上是一种协议,规定了一个Group下的所有Consumer如何达成一致,来分配订阅Topic的每个分区;
本质上是一种协议,规定了一个Group下的所有Consumer如何达成一致,来分配订阅Topic的每个分区;
触发条件
(被动/故障导致)组成员数发生变更;有新的Consumer实例加入组或离开组、有Consumer实例崩溃被提出组;
(主动运维操作)订阅主题数发生变更;当使用正则方式订阅主题时,在Consumer Group运行过程中,新创建了满足条件的主题;
(主动运维操作)订阅主题的分区数发生变更;当增加主题的分区数时,会触发订阅这个主题的Group开启Rebalance;
执行过程
让一个Group下的所有Consumer实例,在协调组组件的帮助下,完成订阅主题分区的分配;
协调者(Coordinator)
负责为Group执行Rebalance以及提供位移管理和组成员管理;
Consumer端应用程序在提交位移时,是向Coordinator所在的Broker提交位移;
当Consumer应用启动时,是向Coordinator所在的Broker发送各种请求,然后由Coordinator负责执行各自操作;
所有Broker都有各自的Coordinator组件;Broker在启动时都会创建和开启相应的Coordinator;
如何为某个Group 确定为它服务的Coordinator在哪个Broker上?
首先确定由位移主题的哪个分区来保存该Group的数据:
partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount);
partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount);
然后找出该分区的Leader副本所在的Broker,就是对应的Coordinator;
当Consumer Group出现问题时,可以由此计算并定位到对应的Broker上,不必一台一台的盲查;
分配策略
弊端
影响Consumer端TPS;
在Rebalance过程中,所有Consumer实例都会停止消费,等待Rebalance完成;
在Rebalance过程中,所有Consumer实例都会停止消费,等待Rebalance完成;
为什么?
Rebalance之前会有一个通知,消费者接收到这个通知后,提交对应的消费位移;
如果不停止消费,Rebalance和消费同时进行,在Rebalance过程中的消费位移没法提交;
在Rebalance结束后,新的消费者不知道分配到的分区消费到了哪个位置,只能去查找上一次的提交;
但是Rebalance过程中,已经消费到了后面的位置,再从上一次提交开始消费,就会造成重复消费;
Rebalance过程很慢;
如果Group下Consumer实例过多,Rebalance过程很慢(可能长达几小时);
如果Group下Consumer实例过多,Rebalance过程很慢(可能长达几小时);
Rebalance效率不高;
目前Rebalance的设计是所有Consumer实例共同参与,全部重新分配所有分区;
(没法复用以前负责的分区,不会保留以前的分配方案;而是打乱全部重新分配)
目前Rebalance的设计是所有Consumer实例共同参与,全部重新分配所有分区;
(没法复用以前负责的分区,不会保留以前的分配方案;而是打乱全部重新分配)
重复消费;
消费者消费完某个分区中的一部分消息还没有来得及提交,发生了Rebalance;
这个分区被分配给Group中另一个消费者,原来被消费的消息会被重新消费一遍;
消费者消费完某个分区中的一部分消息还没有来得及提交,发生了Rebalance;
这个分区被分配给Group中另一个消费者,原来被消费的消息会被重新消费一遍;
如何避免
主动运维操作引发的Rebalance大都是不可避免的,也是可以接受的;
增加Consumer实例(或主动停止某些Consumer实例)导致的Rebalance,是计划内的,也是可以接受的;
Consumer实例可能会被Coordinator错误地认为“已停止”而被踢出Group,导致Rebalance;
当Group完成Rebalance之后,每个Consumer实例都会定期地向Coordinator发送心跳请求,表明自己活着;
如果某个Consumer实例不能及时地发送这些心跳请求,Coordinator就会认为这个Consumer已经挂掉,
从而被移除,开启新一轮Rebalance;
从而被移除,开启新一轮Rebalance;
session.timeout.ms(默认10s)
逻辑指标,指定了一个阈值;
表示Coordinator在这个阈值(10秒)内没有收到某个Consumer的心跳,就认为这个Consumer已经挂了;
heartbeat.interval.ms(默认3s)
物理指标,告诉Consumer要每3秒给Coordinator发一个心跳包;(其实真正作用为控制Rebalance通知的频率)
用于控制发送心跳请求频率,值越小,Consumer实例发送心跳请求的频率就越高;(消费者就越快得到通知)
Coordinator将通知各实例开启Rebalance的消息,封装在了心跳请求的响应体中(Rebalance通知时机);
max.poll.interval.ms(默认值5min)
用于控制Consumer实际消费能力对Rebalance的影响,限定了Consumer端两次调用poll方法的最大时间间隔;
如果Consumer程序在5min之内无法消费完poll返回的消息,那么Consumer就会发起“离开组”的请求,导致Rebalance;
非必要的Rebalance及避免
未能及时发送心跳,导致Consumer被“踢出”Group;
heartbeat.interval.ms 被设置来大于了 session.timeout.ms,导致心跳包未发送到;
网络延时等情况,影响了 Consumer发送的心跳包的到达,可能下一个heartbeat就正常了;
避免
设置 session.timeout.ms = 6s;
设置 heartbeat.interval.ms = 2s;
保证Consumer实例在被判断挂掉之前,能够至少发送3轮心跳请求;即session.timeout.ms >= 3 * heartbeat.interval.ms;
Consumer消费时间过长
在消费数据的过程中,如果有很重的消费逻辑,导致消费时长的增加,大于了 max.poll.interval.ms 的值;
避免:
为下游的消费业务处理逻辑留下充足的时间,比如处理业务消耗5分钟,则把max.poll.interval.ms设置到6分钟;
为下游的消费业务处理逻辑留下充足的时间,比如处理业务消耗5分钟,则把max.poll.interval.ms设置到6分钟;
Consumer端的频繁GC
可能在Consumer端出现了频繁的 Full GC,导致长时间的停顿,从而引发Rebalance;
(被动)Rebalance全流程
消费组状态机
(State Machine)
(State Machine)
定义:
kafka设计了一套消费组状态机,来帮助协调者完成整个重平衡流程;
kafka设计了一套消费组状态机,来帮助协调者完成整个重平衡流程;
kafka为消费组定义了5种状态
状态机各个状态的流转过程
消费组启动时,最开始是 Empty状态,当Rebalance过程开启后,它会被置于PreparingRebalance状态等待成员加入;
之后变更到CompletingRebalance状态等待分配方案,分配完成后流转到Stable状态完成重平衡;
之后变更到CompletingRebalance状态等待分配方案,分配完成后流转到Stable状态完成重平衡;
当有新成员加入或者已有成员退出时,消费组的状态从Stable直接跳到PreparingRebalance状态;
此时,所有现存成员就必须全部重新申请加入组;
此时,所有现存成员就必须全部重新申请加入组;
当所有成员都退出组后,消费组状态变更为Empty;
kafka定期自动删除过期位移的条件就是:组要处于Empty状态;
消费者端Rebalance流程
消费者端Rebalance的两个步骤
JoinGroup(加入组)
当组内成员加入组时,每个消费者都会向协调者发送JoinGroup请求;
在这个请求中,每个成员都要将自己的订阅的主题上报,这样协调者能够收集到所有成员的订阅信息;
收集到全部成员的JoinGroup请求后,协调者会从这些成员中选择一个(一般为第一个)担任这个组的领导者;
选出领导者后,协调者会把消费者组的订阅信息封装进JoinGroup的响应体中发回给领导者,由领导者统一作出分配方案;
SyncGroup(等待领导者消费者(Leader Consumer)分配方案)
在领导者作出分配方案后,会向协调者发送SyncGroup请求(把分配方案发给协调者);
除了领导者的其他成员也会发送SyncGroup请求(请求体中没有内容),因为
分配方案是以SyncGroup的响应体返回给所有成员的;
分配方案是以SyncGroup的响应体返回给所有成员的;
消费者接收到SyncGroup的响应之后,就知道自己该消费哪些分区了;
两个步骤的示例说明
JoinGroup作用:将订阅信息发送给协调者,并由协调者选举领导者消费者,并由领导者制定分配方案;
SyncGroup作用:让协调者把领导者制定的分配方案下发给各个消费者成员;
Broker端(协调者端)
Rebalance流程
Rebalance流程
新成员加入组
当协调者收到新的JoinGroup请求后,会通过心跳请求响应的方式,通知现在组内所有成员;
强制他们开启新一轮的Rebalance;
强制他们开启新一轮的Rebalance;
组成员主动离组
当消费者实例主动通知协调者它要退出时,会发送 LeaveGroup 请求;
协调者收到 LeaveGroup请求后,同样以心跳请求响应的方式通知其他成员,开启Rebalance;
组成员崩溃离组
崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组;
主动离组是主动发起的离组,协调者能够马上感知;崩溃离组是被动的,协调者需要等待一段时间才能感知;
在 session.timeout.ms 时间内,如果还没有收到某个消费者的心跳请求,协调者则认为它崩溃离组;
消息消费模式
推模式
服务端主动将消息推送给消费者;
拉模式(kafka默认)
消费者主动向服务端发起请求来拉取消息;
poll()方法
poll()方法返回的是所订阅的主题(分区)上的一组消息(还没有被消费过的消息集);
poll()方法无法精确掌握消费的起始位置;
提供的 auto.offset.reset 参数也只能在找不到消费位移或位移越界的情况下,粗粒度地从开头或末尾开始消费;
其他时候auto.offset.reset 参数并不会起效,如果这个时候想要从头或者末尾开始消费,就需要seek()方法;
代码实现
public ConsumerRecords<K, V> poll(final Duration timeout);
timeout控制poll()方法的阻塞时间,消费者缓冲区没有可用数据时会发生阻塞;
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
records.count():
返回此次接收到的消息总条数;
返回此次接收到的消息总条数;
records.partitions():
返回records中所有分区名称;
返回records中所有分区名称;
records..records(partition):
按照分区维度进行消息消费;
按照分区维度进行消息消费;
records.records(topic):
按照主题维度进行消息消费;
按照主题维度进行消息消费;
kafka中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用poll()方法,
seek()方法
提供了一种更细粒度的掌控,可以从特定的位移处开始拉取消息;也可实现追前消费或回溯消费;
代码实现
public void seek(TopicPartition partition, long offset);
partition表示分区,offset表示从分区哪个位置消费;
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
consumer.poll(Duration.ofMillis(100));
assignment = consumer.assignment(); // 获取消费者所分配到的分区信息
}
for (TopicPartition tp : assignment) {
consumer.seek(tp, 10); // 设置每个分区的消费位置为10
}
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
while (assignment.size() == 0) {
consumer.poll(Duration.ofMillis(100));
assignment = consumer.assignment(); // 获取消费者所分配到的分区信息
}
for (TopicPartition tp : assignment) {
consumer.seek(tp, 10); // 设置每个分区的消费位置为10
}
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
从开头或末尾开始消费
Map<TopicPartition, Long> offsets = consumer.beginningOffsets(assignment); // 开头
// Map<TopicPartition, Long> offsets = consumer.endOffsets(assignment); // 末尾
for (TopicPartition tp : assignment) {
consumer.seek(tp, offsets.get(tp));
}
// Map<TopicPartition, Long> offsets = consumer.endOffsets(assignment); // 末尾
for (TopicPartition tp : assignment) {
consumer.seek(tp, offsets.get(tp));
}
// 或者
public void seekToBeginning(Collection<TopicPartition> partitions);
public void seekToEnd(Collection<TopicPartition> partitions);
public void seekToBeginning(Collection<TopicPartition> partitions);
public void seekToEnd(Collection<TopicPartition> partitions);
从某个具体时间开始消费
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) ;
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch,
Duration timeout);
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch,
Duration timeout);
Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
for (TopicPartition tp : assignment) {
timestampToSearch.put(tp, System.currentTimeMillis()-1*24*3600*1000); // 获取一天之前的消息位置
}
Map<TopicPartition, OffsetAndTimestamp> offsets =consumer.offsetsForTimes(timestampToSearch);
for (TopicPartition tp : assignment) {
OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
if (offsetAndTimestamp != null) {
consumer.seek(tp, offsetAndTimestamp.offset());
}
}
for (TopicPartition tp : assignment) {
timestampToSearch.put(tp, System.currentTimeMillis()-1*24*3600*1000); // 获取一天之前的消息位置
}
Map<TopicPartition, OffsetAndTimestamp> offsets =consumer.offsetsForTimes(timestampToSearch);
for (TopicPartition tp : assignment) {
OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
if (offsetAndTimestamp != null) {
consumer.seek(tp, offsetAndTimestamp.offset());
}
}
可突破__consumer_offsets的限制,把消费位移保存在任意存储系统(数据库、文件系统等)中;
下次消费的时候,读取外部存储系统中的消费位移,并通过seek()方法指向这个具体位置进行消费;
下次消费的时候,读取外部存储系统中的消费位移,并通过seek()方法指向这个具体位置进行消费;
消费位移管理
版本区别
以前版本
老版本的Consumer的位移管理依赖于Zookeeper,它会将位移数据提交到Zookeeper中保存;
当Consumer重启后,它能自动从Zookeeper中读取位移数据,从而在上次消费截止的地方继续消费;
但是Zookeeper并不适合高频的写操作,所以kafka推出了新的位移管理机制;
1.0.9版本以后
将Consumer的位移数据作为一条条普通的kafka消息,提交到__consumer_offset中;
__consumer_offsets就是一个普通的主题,用来保存kafka消费者的位移信息(消费到了分区中某个消息所在的位置);
位移主题
__consumer_offsets
__consumer_offsets
消息格式
1. 正常消息格式
__consumer_offsets由KV对组成,Key表示消息的键值,value表示消息体;
Key
Group ID(唯一标识一个Consumer);
主题名;
分区号;
Value
位移值;
位移提交的元数据(时间戳、用户自定义数据等)
2. 用于保存Consumer group信息的消息;
用于注册 Consumer Group;
3. 用于删除Group过期位移甚至是删除Group的消息;
tombstone 消息,即墓碑消息,也称 delete mark;
当某个Consumer Group下的所有Consumer实例都停止了,而且他们的位移数据都已被删除时,
kafka会向位移主题的对应分区写入 tombstone消息,表明要彻底删除这个Group消息;
kafka会向位移主题的对应分区写入 tombstone消息,表明要彻底删除这个Group消息;
创建时机
自动创建位移主题(默认)
当kafka集群中的第一个Consumer 启动时,kafka会自动创建__consumer_offsets;
分区数由Broker端参数 offsets.topic.num.partitions 控制,默认值50;
副本数由Broker端参数 offsets.topic.replication.factor 控制,默认值3;
手动创建位移主题
在kafka集群尚未启动任何Consumer之前,使用kafka API创建它;
可以自己控制分区个数和副本数;
位移提交
定义:
把消费位移存储起来(持久化)的动作称为“提交”;消费者在消费完消息之后需要执行位移提交;
因为Consumer能够同时消费多个分区的数据,所以位移提交是在分区粒度上进行的;
把消费位移存储起来(持久化)的动作称为“提交”;消费者在消费完消息之后需要执行位移提交;
因为Consumer能够同时消费多个分区的数据,所以位移提交是在分区粒度上进行的;
示意图
x表示某一次拉取操作中此分区的最大偏移量,假设当前消费者已经消费到了x位置,那么消费位移为x(lastConsumedOffset);
但是,当前消费者需要提交的消费位移(commit offset)不是x,而是 x+1;下一条需要拉取消息的位置(position)也是 x+1;
当前 position = commit offset = lastConsumedOffset + 1;
position 和 commit offset不会一直相同;
自动提交
enable.auto.commit(默认true),Consumer自动位移提交,在后台定期提交;
自动位移提交的动作是在poll()方法里的逻辑里完成的,在每次拉取之前会检查是否可以进行位移提交;
如果可以提交,会先提交上一次的位移;
auto.commit.interval.ms(默认值5秒),提交间隔;
只要Consumer一直启动着,就会无限期往位移主题写入消息;即使主题中没有消息可消费,还是会一直往
位移主题中写入最后的位移信息;(所以必须要有针对位移主题消息特点的消息删除策略,否则会撑爆自盘)
位移主题中写入最后的位移信息;(所以必须要有针对位移主题消息特点的消息删除策略,否则会撑爆自盘)
问题
消息重复
假设刚提交完一次消费位移,拉取一批(或几批)消息消费(已经消费),
在下一次自动提交之前,消费者崩溃了(或者Rebalance);
在下一次自动提交之前,消费者崩溃了(或者Rebalance);
就又得从上一次位移提交的地方重新开始消费,造成重复消费;(对于再均衡同样适用)
消息丢失
假设消费线程消费消息放在本地缓存中,比如 BlockingQueue中;
目前已经消费了10次,且已经提交(不能再消费);
处理线程当前只处理到了BlockingQueue中的第6次,此时处理线程发生异常;
待处理线程恢复的时候,本地缓存中没有了数据,消费线程从第11次开始消费消息再放入本地缓存;
那么第6-10次中间的消息就没有处理,造成消息丢失;
那么第6-10次中间的消息就没有处理,造成消息丢失;
手动提交
设置 enable.auto.commit = false;
在Consumer端需要自己手动提交位移,Consumer API提供了位移提交的方法;
同步提交
commitSync():
只能提交当前批次对应的position值(poll()方法返回的最新值);
只能提交当前批次对应的position值(poll()方法返回的最新值);
commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets):
可以提交一个中间值,比如业务每消费N条消息就提交一次位移;
可以提交一个中间值,比如业务每消费N条消息就提交一次位移;
会阻塞消费者线程,直到处理完poll()方法返回的所有消息之后,才能提交;
同步提交支持自动重试;
异步提交
commitAsync();
commitAsync(OffsetCommitCallback callback);
提供了回调函数,可以实现提交之后的逻辑(记录日志、处理异常等);
commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);
执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作;
异步提交可以使消费者的性能得到一定的增强;
异步提交出现问题时不会自动重试;
因为在当前一次提交的时候,消费者线程可能往后消费了更多的消息;
如果开启了重试,此次提交失败,就会重新提交这个offset;
此时如果发生了崩溃或者Rebalance,都会导致重复消费;
删除过期消息
kafka使用Compact策略来删除位移主题中的过期消息,避免该主题无限期膨胀;
定义过期消息:
对于同一个Key的两条消息M1和M2,如果M1的发送时间早于M2,则M1就是过期消息;
对于同一个Key的两条消息M1和M2,如果M1的发送时间早于M2,则M1就是过期消息;
示意图
位移0、2、3的消息的Key都是K1,在Compact之后,
分区只需要保存位移为3的消息,因为它是最新的;
分区只需要保存位移为3的消息,因为它是最新的;
kafka提供了专门的后台线程(Log Cleaner)定期巡检待Compact的主题,查看是否存在满足条件的可删除数据;
找不到消费位移
情况分类
新的消费组建立,没有可以查找的消费位移;
消费组内一个新的消费者订阅了一个新的主题,没有可以查找的消费位移;
__consumer_offsets主题中有关这个消费组的位移信息过期被删除后,也没有可以查找的消费位移;
处理方法
kafka的消费者找不到记录的消费位移时,会根据消费者客户端参数 auto.offset.reset 的配置来决定从何处进行消费;
auto.offset.reset
重置消费位移
重置消费位移
latest(默认值):
表示将位移调整到最新末端位移(图中的9);
表示将位移调整到最新末端位移(图中的9);
earliest:
表示将位移调整到主题当前最早位移处,这个最早位移不一定是0,因为kafka会自动删除过期位移,所以earliest很可能大于0;
表示将位移调整到主题当前最早位移处,这个最早位移不一定是0,因为kafka会自动删除过期位移,所以earliest很可能大于0;
none:
查找不到消费位移时,直接抛出 NoOffsetForPartitionException 异常;
查找不到消费位移时,直接抛出 NoOffsetForPartitionException 异常;
控制消费
定义
在某些应用场景下可能需要暂停某些分区的消费而先消费其他分区,当达到一定条件时再恢复消费这些分区;
KafkaConsumer提供了对消费速度进行控制的方法;
实现
public void pause(Collection<TopicPartition> partitions);(暂停分区消费)
public void resume(Collection<TopicPartition> partitions);(恢复分区消费)
public Set<TopicPartition> paused();(返回暂停的分区集合)
关闭消费
在跳出while循环的消费逻辑之后,一定要显式地执行关闭动作来释放运行过程中占用的各种系统资源(内存资源、socket连接等);
实现
public void close();(内部设定了最长等待时间 30s)
public void close(Duration timeout);(设定关闭方法的最长执行时间)
因为有些关闭逻辑会耗费一定的时间,比如自动提交位移,关闭时还会执行一次位移提交操作;
因为有些关闭逻辑会耗费一定的时间,比如自动提交位移,关闭时还会执行一次位移提交操作;
消费者拦截器
作用:
在消费到消息或者提交消费位移时进行一些定制化的操作;(如按照某个规则过滤不符合条件的消息、修改消息内容等),
在消费到消息或者提交消费位移时进行一些定制化的操作;(如按照某个规则过滤不符合条件的消息、修改消息内容等),
消费者拦截器接口:
org.apache.kafka.clients.consumer. ConsumerInterceptor;
org.apache.kafka.clients.consumer. ConsumerInterceptor;
public interface ConsumerInterceptor<K, V> extends Configurable {
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
public void close();
}
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
public void close();
}
KafkaConsumer 会在poll()方法返回之前调用 onConsume() 方法来对消息进行相应的定制化操作;
如果onConsume()方法中抛出异常,会被捕获并记录到日志中,异常不会再向上传递;
如果onConsume()方法中抛出异常,会被捕获并记录到日志中,异常不会再向上传递;
KafkaConsumer 会在提交完消费位移之后调用 onCommit()方法,可以使用这个方法来记录根据所提交的位移信息;
close() 方法主要用于在关闭拦截器时执行一些资源的清理工作;
自定义拦截器
public class ConsumerInterceptorTTL implements ConsumerInterceptor<String, String> {
private static final long EXPIRE_INTERVAL = 10000;
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
long now = System.currentTimeMillis();
Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();
List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
for (ConsumerRecord<String, String> record : tpRecords) {
if (now - record.timestamp() < EXPIRE_INTERVAL) {
newTpRecords.add(record);
}
}
newRecords.put(tp, newTpRecords);
}
return new ConsumerRecords<>(newRecords);
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
offsets.forEach((tp, offset) -> System.out.println(tp + ":" + offset.offset()));
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
private static final long EXPIRE_INTERVAL = 10000;
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
long now = System.currentTimeMillis();
Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();
List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
for (ConsumerRecord<String, String> record : tpRecords) {
if (now - record.timestamp() < EXPIRE_INTERVAL) {
newTpRecords.add(record);
}
}
newRecords.put(tp, newTpRecords);
}
return new ConsumerRecords<>(newRecords);
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
offsets.forEach((tp, offset) -> System.out.println(tp + ":" + offset.offset()));
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
使用消息的 timestamp 字段来判定是否过期,如果消息的时间戳与当前的时间戳相差超过10秒则判定为过期;
过期消息将被过滤而不投递给具体的消费者;
消息保障
消息可靠性保障
消息交互可靠性保障
定义:
对 Producer和Consumer 要处理的消息提供什么样的承诺;
对 Producer和Consumer 要处理的消息提供什么样的承诺;
常见的承诺
最多一次(at most once):
消息可能丢失,但不会重复发送;
消息可能丢失,但不会重复发送;
至少一次(at least once):
消息不会丢失,但可能重复发送;
消息不会丢失,但可能重复发送;
精确一次(exactly once):
消息不会丢失,也不会重复发送;
消息不会丢失,也不会重复发送;
kafka的默认实现
kafka默认提供的交付可靠性保障是至少一次;
当Producer无法确定消息是否成功提交时,会选择重试(关闭Producer重试即可实现最多一次),
也就是再次发送相同的消息,即至少一次;(可能导致重复发送)
也就是再次发送相同的消息,即至少一次;(可能导致重复发送)
无丢失消息配置
kafka只对已提交的消息做有限度的持久化保证;
已提交的定义
若干个Broker都成功接收到消息并保存到日志文件,并且Producer接到Broker的应答,才会认为该消息已成功发送;
可以选择只要一个Broker成功保存就算消息已提交,也可以让所有Broker成功保存才算已提交;参数设定;
如果Producer已提交消息,但Broker的应答没有发送回来(比如网络抖动),那么Producer就无法确定消息是否提交成功;
有限度的持久化:
kafka不丢消息是有前提的,假设消息保存在N个Broker上,这个前提条件就是这N个Broker中至少有一个存活;
kafka不丢消息是有前提的,假设消息保存在N个Broker上,这个前提条件就是这N个Broker中至少有一个存活;
生产者(KafkaProducer )
消息发送模式
消息发送模式
发后即忘(fire-and-forget)
方式:
只管往kafka中发送消息而不关心消息是否正确到达;性能最高,可靠性最低;
只管往kafka中发送消息而不关心消息是否正确到达;性能最高,可靠性最低;
问题:
在发生不可重试异常时,会造成消息丢失;
在发生不可重试异常时,会造成消息丢失;
代码:
producer.send(record);
producer.send(record);
同步(sync)
方式:
send()方法并非是 void 类型,而是 Future类型,所以可以通过get()方法来阻塞等待响应,实现同步;
send()方法并非是 void 类型,而是 Future类型,所以可以通过get()方法来阻塞等待响应,实现同步;
问题:
性能很差,需要等待一条消息发送完之后才能发送下一条;
性能很差,需要等待一条消息发送完之后才能发送下一条;
代码:
producer.send(record).get();
producer.send(record).get();
也可以在执行完 send() 方法之后不直接调用 get() 方法:
try {
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();
System.out.println(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();
System.out.println(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
这样可以获取一个RecordMetadata对象,在RecordMetadata对象中包含了一些消息的元数据信息;
比如:当前消息存储的主题、分区号、偏移量、时间戳等,如果在应用代码中需要这些信息,可以使用这种方式;
比如:当前消息存储的主题、分区号、偏移量、时间戳等,如果在应用代码中需要这些信息,可以使用这种方式;
异步(async)
代码:
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());
}
}
});
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());
}
}
});
onCompletion()方法中两个参数是互斥的;
消息发送成功时,metadata不为null,exception为null;
消息发送异常时,metadata为null,exception不为null;
消息发送成功时,metadata不为null,exception为null;
消息发送异常时,metadata为null,exception不为null;
实际应用中,(exception != null)时,应该做某种重试或者日志记录;
生产环境都应该采用异步方式发送消息;
生产者(KafkaProducer )
中的两种异常类型
中的两种异常类型
可重试的异常
NetworkException、LeaderNotAvailableException、UnknownTopicOrPartitionException、
NotEnoughReplicasException、NotCoordinatorException 等
NotEnoughReplicasException、NotCoordinatorException 等
NetworkException 表示网络异常,有可能是由于网络瞬时故障而导致的异常,可以通过重试解决;
LeaderNotAvailableException 表示分区的 leader 副本不可用,这个异常通常发生在 leader 副本
下线而新的 leader 副本选举完成之前,重试之后可以重新恢复;
下线而新的 leader 副本选举完成之前,重试之后可以重新恢复;
对于可重试的异常,如果配置了retries参数,只要在规定的重试次数内自行恢复了,就不会抛出异常;
props.put(ProducerConfig.RETRIES_CONFIG, 10);
props.put(ProducerConfig.RETRIES_CONFIG, 10);
不可重试的异常
RecordTooLargeException等;
对于不可重试的异常,kafka不会对此进行任何重试,直接抛出异常;
最佳实践
Producer端
不要使用 producer.send(msg),而要使用 producer.send(msg, callback);
设置 acks = all
acks代表对“已提交”消息的定义;
kafka默认 acks=1,表示 leader副本接收到消息并写入磁盘,就表示发送成功,而不管其他follower副本;
如果设置成all,表示所有副本Broker(ISR中)都要接受到消息,该消息才算是“已提交”;这是最高等级的“已提交”定义;
设置 retries 为一个较大的值
控制可重试的异常的重试次数;
retries在kafka 2.4版本中,默认设置为Integer.MAX_VALUE;以前版本默认为0;
Broker端
设置 unclean.leader.election.enable = false;
Unclean领导者选举
Unclean领导者选举
控制哪些Broker有资格竞选分区的leader;(非ISR中的副本不能够参与选举)
如果一个Broker落后原先的Leader太多,如果它成为新的Leader,则会造成消息的丢失;
将这个参数设置为false,则不允许这种Broker参与竞选;提高可靠性;
但是可用性会降低,如果ISR中的副本都挂掉,分区会处于不可用状态(停止对外提供服务);(CAP理论)
设置 replication.factor >= 3
最好将消息多保存几份,目前防止消息丢失的主要机制就是冗余;
设置 min.insync.replicas > 1
控制消息至少要被写入到多少个副本才算是“已提交”;
设置成大于1可以提升消息持久性,生产环境不要使用默认值1;
做一个下限限制,对应于 acks = all;不能只满足ISR中全部写入,还要保证ISR中写入个数不少于 min.insync.replicas;
确保 replication.factor > min.insync.replicas
如果两者相等,只要有一个副本挂了,整个分区就无法正常工作了;
推荐设置为 确保 replication.factor = min.insync.replicas + 1;
Consumer端
设置 enable.auto.commit=false;
确保消息消费完成再提交;最好把它设置为false,并采用手动提交位置的方式;
对于单Consumer多线程处理消息的场景至关重要;
控制器
基本概念
控制器(Controller)是kafka的核心组件,主要作用是在zookeeper的帮助下管理和协调整个kafka集群;
集群中任意一台Broker都能作为控制器,正常运行的集群中运行过程中只能有一个控制器;
选举策略
当Broker在启动时,都会尝试去zookeeper上创建/ controller 节点;
kafka选举控制器的规则为:第一个成功创建 /controller 节点的Broker就是当前集群的控制器;
作用(协调作用)
主题管理:
控制器帮助完成对kafka主题的创建、删除以及增加分区的操作;(执行kafka-topics.sh脚本时大部分工作都是由控制器完成)
控制器帮助完成对kafka主题的创建、删除以及增加分区的操作;(执行kafka-topics.sh脚本时大部分工作都是由控制器完成)
分区重分配:
主要指使用 kafka-reassign-partitions.sh 脚本提供对已有分区进行细粒度的分配功能;
主要指使用 kafka-reassign-partitions.sh 脚本提供对已有分区进行细粒度的分配功能;
Preferrd领导者选举:
kafka为了避免部分Broker负载过重而提供的一种换Leader的方案;
kafka为了避免部分Broker负载过重而提供的一种换Leader的方案;
集群成员管理:
自动检测新增Broker、Broker主动关闭及Broker宕机等(依赖zookeeper的Watch功能和临时节点组合实现);
自动检测新增Broker、Broker主动关闭及Broker宕机等(依赖zookeeper的Watch功能和临时节点组合实现);
示例
新增Broker:
当有新Broker启动时,会在zookeeper的/brokers/ids 下创建专属的znode节点,zookeeper会通过Watch机制
将消息推送给控制器,控制器就能自动感知到创建节点的变化,进行后续的新增Broker操作;
当有新Broker启动时,会在zookeeper的/brokers/ids 下创建专属的znode节点,zookeeper会通过Watch机制
将消息推送给控制器,控制器就能自动感知到创建节点的变化,进行后续的新增Broker操作;
检测Broker存活性:
Broker启动时在 /brokers/ids 下创建的是 临时znode,当Broker宕机或者主动关闭后,与zookeeper的回话结束,
这个znode会被自动删除,同理zookeeper的Watch机制会将这个变更推送给控制器,控制器进行后续处理;
Broker启动时在 /brokers/ids 下创建的是 临时znode,当Broker宕机或者主动关闭后,与zookeeper的回话结束,
这个znode会被自动删除,同理zookeeper的Watch机制会将这个变更推送给控制器,控制器进行后续处理;
数据服务:
向其他Broker提供数据服务;控制器上保存了最全的集群元数据,控制器会定期往其他Broker发送请求,从而更新自己内存中的元数据;
向其他Broker提供数据服务;控制器上保存了最全的集群元数据,控制器会定期往其他Broker发送请求,从而更新自己内存中的元数据;
保存什么数据
这些数据在zookeeper上也保存了一份,控制器初始化时,从zookeeper读取数据保存到自己的内存中
所有主题信息:
topic列表,具体的分区信息,有哪些副本,领导者副本是谁,ISR中有哪些副本等;
topic列表,具体的分区信息,有哪些副本,领导者副本是谁,ISR中有哪些副本等;
所有Broker信息:
当前有哪些运行的Broker,正在关闭中的Broker等;
当前有哪些运行的Broker,正在关闭中的Broker等;
所有涉及运维任务的分区:
当前正在进行Preferred的领导者选举以及分区重分配的分区列表;
当前正在进行Preferred的领导者选举以及分区重分配的分区列表;
故障转移(Failover)
当运行中的控制器突然宕机或者意外终止时,kafka能够快速感知到,并立即启用备用控制器(选举)来代替失效的控制器;
zookeeper通过Watch机制快速感知到控制器所在的Broker宕机,并马上删除 /controller 节点;
存活的Broker开始竞选新的控制器,第一个在zookeeper上创建 /controller节点的Broker当选控制器;
新的控制器从zookeeper中读取集群元数据信息,并初始化到自己的内存中;Failover完成;
集群运维
优先副本的选举
前提:
在创建主题的时候,该主题的分区及副本会尽可能均匀的分布到集群中的各个Broker节点上,对应的leader副本的分配也比较均匀;
在创建主题的时候,该主题的分区及副本会尽可能均匀的分布到集群中的各个Broker节点上,对应的leader副本的分配也比较均匀;
示例
例如使用一个kafka-topics.sh脚本创建一个分区数为3,副本数为3的主题 topic-partitions:
可以看到 leader副本均匀分布在brokerId为0、1、2的节点中;
一个Broker中只可能有同一个分区的一个副本;leader副本所在的节点称为分区的leader节点;
kafka集群中的Broker节点不可避免的会发生宕机或者崩溃,当分区的leader节点发生故障时,
ISR中的一个follower节点就会被选举成为新的leader节点;
ISR中的一个follower节点就会被选举成为新的leader节点;
例如将brokerId为2的节点重启,主题 topic-partitions 新的分布信息:
可以看到,原本分区1的leader节点为2,现在变成了0,这样的话原本均衡的负载就失衡了;
这个分区所在的节点中,节点0的负载最高,节点2的负载最低;
概念
为了能有效地治理负载失衡的情况,kafka引入了优先副本机制(preferred replica);
优先副本:
指 在AR集合列表中的第一个副本;比如上面分区0的AR列表为[1,2,0],那么分区0的优先副本即为1;
指 在AR集合列表中的第一个副本;比如上面分区0的AR列表为[1,2,0],那么分区0的优先副本即为1;
理想情况下,优先副本就是该分区的leader副本,所以也可以称为 preferred leader;
优先副本选举:
指 通过一定的方式促使优先副本选举为leader副本,以此来促进集群的负载均衡(也称为“分区平衡”);
指 通过一定的方式促使优先副本选举为leader副本,以此来促进集群的负载均衡(也称为“分区平衡”);
操作
自动
kafka提供了分区自动平衡的功能,对应的Broker端参数为 auto.leader. rebalance.enable(默认为true);
当开启时,kafka的控制器会启动一个定时任务,去轮询所有的Broker节点,计算每个节点的分区不平衡率;
不平衡率 = 非优先副本的leader个数 / 分区总数
判断是否超过 leader.imbalance.per.broker.percentage(默认10%),如果超过了则会自动执行优先副本的选举;
定时执行的周期由 leader.imbalance.check.interval.seconds(默认300秒) 控制;
不过在生产环境不建议默认开启,因为执行时间无法自主掌控,可能引起客户端阻塞;
(比如发生在大促时候,将造成业务阻塞,频繁超时等);
(比如发生在大促时候,将造成业务阻塞,频繁超时等);
手动
将 auto.leader.rebalance.enable 设置为 false,由管理者在合适的时机执行合适的操作(手动执行分区平衡);
Kafka 中 kafka-perferred-replica-election.sh 脚本提供了对分区 leader 副本进行重新平衡的功能;
示例
kafka-perferred-replica-election.sh 脚本的具体用法:
可以看到,在执行脚本之后,主题 topic-partitions 中的所有leader副本都已经是优先副本了;
但是这种方式会将集群上的所有分区都执行一遍优先副本的选举操作,leader副本的转移
也是一项高成本的工作;如果要执行的分区数很多,必然会对客户端造成一定的影响;
也是一项高成本的工作;如果要执行的分区数很多,必然会对客户端造成一定的影响;
path-to-json-file 参数
kafka-perferred-replica-election.sh 脚本中还提供了 path-to-json-file 参数来
小批量地对部分分区执行优先副本的选举操作;(集群中其他分区不会执行选举操作)
小批量地对部分分区执行优先副本的选举操作;(集群中其他分区不会执行选举操作)
通过 path-to-json-file 参数来指定一个JSON文件,这个JSON文件里保存需要执行优先副本选举的分区;
具体用法
election.json:
执行脚本:
分区重分配
背景
在kafka中,当集群中的一个节点突然宕机时;如果节点上的分区是单副本的,则这些分区就不可用了;
如果节点上的分区是多副本的,并且这个节点还是leader节点,则这个节点的leader副本角色会转交到其他follower副本中;
如果节点上的分区是多副本的,并且这个节点还是leader节点,则这个节点的leader副本角色会转交到其他follower副本中;
不管怎样,这个节点上的分区副本都已经处于不可用的情况了,kafka并不会将这些失效的分区副本自动转移到其他节点;
这样的话,不仅会影响集群的负载均衡,还会影响整体服务的可靠性和可用性;
这样的话,不仅会影响集群的负载均衡,还会影响整体服务的可靠性和可用性;
诉求
我们希望如果有节点突然故障,或者需要对节点进行有计划的下线操作时,能够将该节点的分区副本迁移到其他的可用节点;
当集群中新增节点时,只有新创建的主题分区才可能被分配到这个节点上,而之前的主题分区并不会自动分配到新增节点;
这样的话,新节点与原节点的负载会出现严重不均衡,所以也希望能够进行分区的重新分配;
这样的话,新节点与原节点的负载会出现严重不均衡,所以也希望能够进行分区的重新分配;
实现
kafka提供了 kafka-reassign-partitions.sh 脚本来执行分区重分配的工作,可以在集群扩容、节点失效的情况下对分区进行迁移;
示例
在一个由3个节点(0、1、2)组成的集群中创建一个主题 topic-reassign,主题中包含4个分区和2个副本:
可以看到,主题 topic-reassign 在3个节点中都有相应的分区副本分布;现在需要下线 brokerId为1的节点;(需要迁移分区)
操作
第一步:
创建一个JSON文件(reassign.json),文件内容为要进行分区重分配的主题:
创建一个JSON文件(reassign.json),文件内容为要进行分区重分配的主题:
第二步:
根据这个JSON文件和指定所要分配的Broker节点,来生成一份候选的重分配方案:
根据这个JSON文件和指定所要分配的Broker节点,来生成一份候选的重分配方案:
Current partition replica assignment:当前分区的副本分配情况,最好保存起来,以备后续的回滚操作;
Proposed partition reassignment configuration:为分区重分配生成的候选方案
(也可以直接自定义这个方案,就不需要执行第一、第二个步骤了);
(也可以直接自定义这个方案,就不需要执行第一、第二个步骤了);
第三步:
执行重分配的动作;将生成的方案保存在一个JSON文件中(project.json),再执行分配:
执行重分配的动作;将生成的方案保存在一个JSON文件中(project.json),再执行分配:
再次查看主题 topic-reassign 的具体信息,可以看出所有分区副本都在0和2节点上了:
第四步(可选):
查看分区重分配的进度,只需要将第三步中的 execute 换成 verify 即可:
查看分区重分配的进度,只需要将第三步中的 execute 换成 verify 即可:
Tips
分区重分配对集群的性能影响很大,并且很占用额外的资源(网络、磁盘等);
所以在实际操作中,需要降低重分配的粒度,分成多个小批次来执行;
如果要执行Broker下线,在执行重分配之前,最好将这个Broker 关闭或重启,这样它就不会是任何分区的leader节点,
减少broker间的流量复制;以此提升重分配的性能,减少对整个集群的影响;
减少broker间的流量复制;以此提升重分配的性能,减少对整个集群的影响;
如何选择分区数
性能测试工具
生产者性能测试
消费者性能测试
性能调优
调优目标
对于kafka而言,调优的目标主要是性能,主要指 高吞吐量和低延时;
吞吐量:
即TPS,是指Broker端进程或Client端应用程序每秒能处理的字节数或消息数,越大越好;
即TPS,是指Broker端进程或Client端应用程序每秒能处理的字节数或消息数,越大越好;
延时:
表示从Producer端发送消息到Broker端持久化完成的时间间隔;也可以表示端到端
(从Producer发送到Consumer成功消费)的延时;时间越短越好;
表示从Producer端发送消息到Broker端持久化完成的时间间隔;也可以表示端到端
(从Producer发送到Consumer成功消费)的延时;时间越短越好;
0 条评论
下一页