Kafka
2024-10-21 16:31:02 0 举报
AI智能生成
kafka基础知识
作者其他创作
大纲/内容
基础架构
生产者
消息分层
消息集合(message set)
消息(message)
消息丢失
producer.send(msg)
“fire and forget” 发送过后Broker是否收到消息我们并不清楚。会导致消息丢失,发送过后续情况不清楚
producer.send(msg, callback)
callback回调函数能够清楚的告诉我们发送消息后的情况
retries
设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失
acks
设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
拦截器
生产者拦截器
消费者拦截器
消费者(Standalone Consumer)
消费者组(Consumer Group)
Consumer Group 下可以有一个或多个 Consumer 实例
Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group
Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。这个分区当然也可以被其他的 Group 消费。
Rebalance
组成员数发生变更
订阅主题数发生变更
订阅主题的分区数发生变更
Broker
一台服务器就是一个broker
controller
1.主题管理
2.分区重分配
3.Preferred 领导者选举
4.集群成员管理(新增 Broker、Broker 主动关闭、Broker 宕机)
5.数据服务
Broker Configs
read-only
per-broker
cluster-wide
控制器故障转移(Failover)
Topic
位移主题(__consumer_offsets)
老版本
新版本
Paration
每个分区是一个独立的、有序的消息日志。每条消息都带有一个偏移量(offset),表示它在分区中的位置
作用
提高吞吐量
提高可扩展性
消息有序性
分配策略
轮询法 (Round-Robin)
基于键的分区 (Keyed Partitioning)
Partition 和 Consumer的关系
一个分区只能被同一个消费组中的一个消费者消费,即每个分区与一个消费者一一对应
多个消费者可以同时消费多个分区,当消费者数量小于分区数时,一个消费者可以同时消费多个分区
动态扩展:如果消费者数量增加,Kafka 会自动重新分配分区给消费者,达到负载均衡的效果
分区数量
吞吐量
并发消费者数量
磁盘使用
延迟
Replica
副本的优势
提供数据冗余
提供高伸缩性
改善数据局部性
领导者(Leader-based)的副本机制(保障副本数据一致性)
kafka生产者
首先创建生产者,通过序列化器一般使用自带的序列化器,然后加上分区器,发送到缓存队列里面,这个队列大小是32m,每批次大小是16k。这个缓存队列叫双端队列,(这个队列里面其实还有一个内存池,发送批次数据的时候会创建批次大小并且从内存池中取出内存,后续数据发送到kafka集群时候就把内存再释放到内存池当中。)数据到队列中,Sender线程会拉取数据。拉取数据有两种情况:1. 数据积累到16k会发送数据 2. 数据如果迟迟没有达到16k,但是达到linger时间会发送数据。发送数据的时候会把每一个分区的数据每一个broker节点一个队列往外发送,如果kafka中没有及时应答,最多缓存五个请求。会有一个Selector将输入和输出打通,开始发送数据,集群收到会进行一个副本同步,会进行应答,应答有三种方式,ack=0,1,-1,应答成功会清理对应的请求,清理掉分区的数据,如果失败会重试,重试会一直重试,一直到发送成功为止
带回调函数的异步发送
// 2 发送数据
for (int i = 0; i < 500; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null){
System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
}
}
});
Thread.sleep(2);
}
kafka分区策略
分支主题
三种分区方式
1. 指定了分区就直接写入到这个分区下
2. 没有指定分区但是有key,就key的hash值和topic的分区数取余得到分区值
3. 如果既没有分区也没有key就使用粘性分区,随机选择一个分区,然后一直使用这个分区,等待这个分区满了之后再随机另一个分区使用
自定义分区
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取数据 atguigu hello
String msgValues = value.toString();
int partition;
if (msgValues.contains("atguigu")){
partition = 0;
}else {
partition = 1;
}
return partition;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
kafka生产者如何提高吞吐量
1. 设置缓冲区大小
2. 设置批次大小
3. 减少linger时间
4. 设置压缩
// 配置
Properties properties = new Properties();
// 连接kafka集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
// 序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
// 批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
// linger.ms
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 压缩
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
// 1 创建生产者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
Properties properties = new Properties();
// 连接kafka集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
// 序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
// 批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
// linger.ms
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 压缩
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
// 1 创建生产者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
ack应答原理
开启幂等性
幂等性就是保证不管发送多少次重复数据,broker端都只会持久化一条数据,不会重复
enable.idempotence 开启true
事务
开启事务必须开启幂等性
必须自定义一个唯一的transactional.id
数据乱序处理
如果未启动幂等性:max.in.fight.requests.per.connection设置为1
如果启动幂等性:max.in.fight.requests.per.connection需要小于等于5
broker
zk中的kafka信息
broker工作流程
1. 首先启动的时候将broker启动信息在zk里面注册
2. 然后抢占,看谁抢的快,谁就是监控的controller
3. 监控的controller监控brokers的节点变化,然后选举leader
选举规则是:按照isr中存活前提,然后按照在ar中的顺序
(isr是能互相连通的所有节点,ar是kafka中所有的副本,or是follower和leader副本同步延迟过多的副本
ar=isr+or)
排在ar前面的优先。
leader负责处理读写操作,而follower只负责副本数据的同步
4. 选举完,controller会将选举的信息上传到zk中
5. 其他controller会从zk同步信息
6. 如果有一个节点挂了,监听的controller监听到了变化会去获取isr,然后重新选举新的leader
再更新leader和isr
leader和follower故障处理
follower故障处理细节
1. 当出现follower故障的时候首先会临时踢出isr
2. 在这期间的leader和follower会继续接收数据
3. follower恢复后,follower会读取本地磁盘中在这个follower故障时候的HW,然后将自己高于上一次HW的数据都切掉,因为这些数据是没有验证的数据,然后去从这个上一次的HW开始,同步leader,当follower追上当前的hw就可以加入isr了
leo是每个副本的最后一个offset也就是每个副本的最新的那个数据
hw是所有副本中最小的那个数据,是全局的
leader故障处理细节
生产经验
手动调整分区副本,创建副本存储计划
手动调整分区副本存储的步骤如下:
(1)创建一个新的topic,名称为three。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 4 --replication-factor 2 --topic three
(2)查看分区副本存储情况。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three
(3)创建副本存储计划(所有副本都指定存储在broker0、broker1中)。
vim increase-replication-factor.json
输入如下内容:
{
"version":1,
"partitions":[{"topic":"three","partition":0,"replicas":[0,1]},
{"topic":"three","partition":1,"replicas":[0,1]},
{"topic":"three","partition":2,"replicas":[1,0]},
{"topic":"three","partition":3,"replicas":[1,0]}]
}
(4)执行副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
(5)验证副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify
(6)查看分区副本存储情况。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three
(1)创建一个新的topic,名称为three。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 4 --replication-factor 2 --topic three
(2)查看分区副本存储情况。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three
(3)创建副本存储计划(所有副本都指定存储在broker0、broker1中)。
vim increase-replication-factor.json
输入如下内容:
{
"version":1,
"partitions":[{"topic":"three","partition":0,"replicas":[0,1]},
{"topic":"three","partition":1,"replicas":[0,1]},
{"topic":"three","partition":2,"replicas":[1,0]},
{"topic":"three","partition":3,"replicas":[1,0]}]
}
(4)执行副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
(5)验证副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify
(6)查看分区副本存储情况。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three
leader Partition自动平衡
auto.leader.rebalance.enable 建议设置false
增加副本因子
由于某个主题很重要,所以我们要对这个重要的主题数据增加副本
1)创建topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 3 --replication-factor 1 --topic four
2)手动增加副本存储
(1)创建副本存储计划(所有副本都指定存储在broker0、broker1、broker2中)。
[atguigu@hadoop102 kafka]$ vim increase-replication-factor.json
输入如下内容:
{"version":1,"partitions":[{"topic":"four","partition":0,"replicas":[0,1,2]},{"topic":"four","partition":1,"replicas":[0,1,2]},{"topic":"four","partition":2,"replicas":[0,1,2]}]}
(2)执行副本存储计划。
[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
文件存储机制
首先kafka中topic是逻辑上的概念,partition是物理上的概念,每个partition对应一个log文件,数据是不断追加到log文件的末尾的。为了防止文件过大定位太慢,kafka使用分片和稀疏索引机制,每个log分为多个segment,segment中包括index文件,log文件和timeindex文件,文件命名规则是:topic名称+分区序号
log文件和index文件详解
如何在log文件中找到offset为600的数据
1. 首先根据segment文件的名字找打segment文件。这个文件里面存储的是相对offset,通过相对offset+segment文件名获取绝对offset
2. 找到小于等于目标pffset的最大的offset对应的索引
3. 然后定位到log文件
4. 向下遍历找到想要的record
注意:
1. kafka的index是稀疏索引,每向log文件中写入4kb数据,就会向index文件写入一条索引。
2. index文件中保存的offset是相对offset
文件清理策略
日志保存时间默认为7天,一旦超过了设置的时间有两种清理策略
1. delete
2. compact
delete是将过期数据删除
compact是将相同key的不同value值只保留最后一个版本
kafka如何高效读写数据
1. kafka本身是分布式的集群,使用分区技术,并行度高
2. 读数据使用稀疏索引,快速定位
3. 顺序写磁盘,kafka写文件一直是追加写,而不是更新
4. 页缓存和零拷贝技术
kafka生产者的数据会直接写入到linux系统内核的页缓存上,页缓存上数据什么时候落盘到file文件由linux系统确定,然后kafka消费者消费数据的时候先去页缓存上拿取数据,再去file的segment中读取数据,然后直接走网卡到消费者,不做任何处理。因为kafka拿取数据的时候不进行处理,就不走应用层,而是直接将数据网卡给到消费者
kafka消费者
总体工作流程
1. 生产者向每一个分区的leader发送数据,每一个分区的follower去拉取自己分区的leader的数据作为副本
2. 消费者消费数据,每个分区的数据只能由一个消费者组中的一个消费者消费
每个消费者的offset存放在kafka的_consumer_offsets主题下
消费者组工作原理
每个消费者负责消费不同分区的数据,一个分组只能由一个组内的消费者消费
消费者组初始化流程
1. 首先选择coordinator节点,用这个消费者组的groupid%50获取哪号分区作为这个消费者组的老大
2. coordinator把要消费的topic情况发送给消费者组中的leader消费者,这个leader消费者是随机选出来的
3. 消费者leader会指定消费方案
4. leader将消费方案发送给coordinaor,然后coordinator将消费方案下发给哦各个consumer
5. 注意注意注意!!!!
这里的每个消费者都会跟coordinator保持心跳,一旦超过45s这个消费者就会被移除,会触发再平衡
这里如果消费者处理数据的时间过长大于5分组,也会触发再平衡
注意!这个再平衡特别影响kafka性能,很重要,一定不要触发再平衡
消费者组详细消费流程
首先消费者发送消费请求给NetWork客户端,发送请求导对应分区,请求返回一个回调函数OnSuccess,放到一个completeFetches队列中,队列中会有很多返回的数据,然后消费者向这个队列中拉取数据,默认500条拉取一次。
这里注意:每批次抓取的大小默认1字节,也就是当分区内有一个字节的数据就直接拉取,数据最小值超时时间默认是500ms,也就是500ms没有拉取导小于设置的抓取大小的数据就直接都拉取到对列。每批次拉取的最大大小是50m
分区的分配和再平衡
有四种分区,Range,RoundRobin,Sticky,CooperativeSticky
kafka默认的是Range+CooperativeStricky
Range
分区按照序号排序,分区数/消费者数获取每个消费者可以消费多少个分区然后分配
再分配:
(1)停止掉0号消费者,快速重新发送消息观看结果(45s以内,越快越好)。
1号消费者:消费到3、4号分区数据。
2号消费者:消费到5、6号分区数据。
0号消费者的任务会整体被分配到1号消费者或者2号消费者。
说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。
(2)再次重新发送消息观看结果(45s以后)。
1号消费者:消费到0、1、2、3号分区数据。
2号消费者:消费到4、5、6号分区数据。
说明:消费者0已经被踢出消费者组,所以重新按照range方式分配。
RoundRobin
轮询分配
再分配:
(1)停止掉0号消费者,快速重新发送消息观看结果(45s以内,越快越好)。
1号消费者:消费到2、5号分区数据
2号消费者:消费到4、1号分区数据
0号消费者的任务会按照RoundRobin的方式,把数据轮询分成0 、6和3号分区数据,分别由1号消费者或者2号消费者消费。
说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。
(2)再次重新发送消息观看结果(45s以后)。
1号消费者:消费到0、2、4、6号分区数据
2号消费者:消费到1、3、5号分区数据
说明:消费者0已经被踢出消费者组,所以重新按照RoundRobin方式分配。
Sticky
粘性分区,尽量均衡的防止分区
再分配:
(1)停止掉0号消费者,快速重新发送消息观看结果(45s以内,越快越好)。
1号消费者:消费到2、5、3号分区数据。
2号消费者:消费到4、6号分区数据。
0号消费者的任务会按照粘性规则,尽可能均衡的随机分成0和1号分区数据,分别由1号消费者或者2号消费者消费。
说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。
(2)再次重新发送消息观看结果(45s以后)。
1号消费者:消费到2、3、5号分区数据。
2号消费者:消费到0、1、4、6号分区数据。
说明:消费者0已经被踢出消费者组,所以重新按照粘性方式分配。
offset位移
默认维护位置
维护在_consumer_offsets主题里面,kv方式存储,k是groupid+topic+分区号,vaule就是当前的offset值。每隔一段时间就会对这个数进行compact,保存最新数据
自动提交offset
enable.auto.commit 默认值为true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s。
手动提交offset
同步提交offset
consumer.commitSync();
异步提交offset
consumer.commitAsync();
同步提交offset和异步提交offset的相同点是都会选择一批数据中最高的偏移量提交,不同是同步提交会阻塞线程,一直到提交成功,自动失败重试。异步提交没有失败重试可能会提交失败
同步提交必须等待offset提交完再去消费下一批数据
异步提交是发送完提交offset请求就去消费下一批数据
指定offset
public static void main(String[] args) {
// 0 配置信息
Properties properties = new Properties();
// 连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");
// 1 创建消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅主题
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
// 指定位置进行消费
Set<TopicPartition> assignment = kafkaConsumer.assignment();
// 保证分区分配方案已经制定完毕
while (assignment.size() == 0){//如果大小等于0,就说明没有获取任何的分区分配方案
kafkaConsumer.poll(Duration.ofSeconds(1));
assignment = kafkaConsumer.assignment();
}
// 指定消费的offset
for (TopicPartition topicPartition : assignment) {
kafkaConsumer.seek(topicPartition,600);
}
// 3 消费数据
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
指定时间消费
public static void main(String[] args) {
// 0 配置信息
Properties properties = new Properties();
// 连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// key value反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
// 1 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅一个主题
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}
HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
// 封装集合存储,每个分区对应一天前的数据
for (TopicPartition topicPartition : assignment) {
timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}
// 获取从1天前开始消费的每个分区的offset
Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);
// 遍历每个分区,对每个分区设置消费时间。
for (TopicPartition topicPartition : assignment) {
OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
// 根据时间指定开始消费的位置
if (offsetAndTimestamp != null){
kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
}
}
// 3 消费该主题数据
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
注意:这个指定时间消费是重点,可以用作flink补数
漏消费和重复消费
重复消费
自动提交offset引起
已经消费数据但是offset没有提交
漏消费
手动提交offset,数据未落盘,但是消费者挂了
先提交offset后消费
解决漏消费和重复消费
消费者事务
可以开启自动提交offset然后中间再用redis进行数据去重
0 条评论
下一页