kafka学习笔记图
2022-03-22 17:30:52 0 举报
AI智能生成
kafka作为比较的消息队列,在实际工作运用都比较广泛,了解其底层原来尤为的重要
作者其他创作
大纲/内容
定义:开源的消息引擎系统和分布式流处理平台
消息编码格式:纯二进制的字节序列
作用
1、削峰填谷
缓冲上下游瞬时突发流量,使其更平滑
2、在不同系统之间传递语义准确的消息,实现松耦合的异步式数据传递
消息引擎模型
点对点模型
发布 / 订阅模型
主题(Topic)
发布者(Publisher)
订阅者(Subscriber)
和点对点模型不同的是,这个模型可能存在多个发布者向相同的主题发送消息,
而订阅者也可能存在多个,它们都能接收到相同主题的消息
而订阅者也可能存在多个,它们都能接收到相同主题的消息
kafka的客户端(Clients)
生产者和消费者
生产者(Producer):向主题发布新消息的应用程序
消费者(Consumer):从主题订阅新消息的应用程序
kafka的服务端(Broker)
负责接收和处理客户端发送过来的请求
对消息进行持久化
消息日志(Log)
只能追加写(Append-only)消息的物理文件
顺序 I/O 写操作
磁盘满了?
日志段(Log Segment)机制
消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka 会自动切分出一个新的日志段,并将老的日志段封存起来
定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的
一个Broker包含多个主题topic
主题(Topic):主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务
kafka的三层消息架构
主题层:每个主题可以配置 M 个分区,而每个分区又可以配置 N 个副本
分区层:一个领导者角色,对外提供服务; N-1 个副本是追随者副本,只是提供数据冗余之用
消息层:分区中包含若干条消息,每条消息的位移从 0 开始,依次递增(先进先出)
消息位移(Offset):示分区中每条消息的位置信息,是一个单调递增且不变的值
消费者位移(Consumer Offset):消费者消费进度,每个消费者都有自己的消费者位移
消息(Record):指 Kafka 处理的主要对象
kafka的消费者组(Consumer Group)
描述:多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐,每个分区都只会被组内的一个消费者实例消费
目的:提升消费者端的吞吐量
重平衡(Rebalance):当消费组中的正在消费的线程挂掉后,会rebalance将消息分配给消费组中的其他线程消费
kafka如何保证消息不丢失?
持久化条件
1、已提交的消息:kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件
2、有限度的持久化保证:Broker 中至少有 1 个存活
丢失场景
生产者程序丢失数据
原因
1、网络抖动,导致消息压根就没有发送到 Broker 端
2、消息本身不合格导致 Broker 拒绝接收
生产者端解决
1、使用 producer.send(msg, callback)发送消息,在回调函数对失败的消息进行处理
2、设置acks = all:表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”
spring.kafka.producer.acks=all;springboot配置文件
spring.kafka.producer.acks=all;springboot配置文件
0:生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送
1:leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应
all:leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失
3、设置 retries 为一个较大的值,spring.kafka.producer.retries=0;springboot配置文件
brocker端解决
1、设置 min.insync.replicas > 1:消息至少要被写入到多少个副本才算是“已提交”
2、设置 replication.factor >= 3:将消息多保存几份,不得大于集群中broker的个数
确保 replication.factor > min.insync.replicas,建议设备replication.factor = min.insync.replicas + 1
3、设置 unclean.leader.election.enable = false:关闭竞选leader
消费者程序丢失数据
1、未按照正确的消费步骤消费
解决
维持先消费消息(阅读),再更新位移(书签)的顺序
带来问题
消息的重复处理
解决
2、多线程负责的消息没有被成功处理,但位移已经被更新
解决
Consumer 程序不要开启自动提交位移(enable.auto.commit设置为false),而是要应用程序手动提交位移
kafka的I/O模型
Linux
实现机制是 epoll
零拷贝(Zero Copy):零拷贝避免了将数据从磁盘复制到缓冲区,再将缓冲区的内容发送到socket的性能损耗。这中间有四次数据拷贝,磁盘->页缓存->用户空间->socket缓存->网络
Windows
实现机制是 select
kafka的TCP连接
选择TCP原因
可以多路复用等
HTTP 库在很多编程语言中都略显简陋
何时建立连接?
1、KafkaProducer 实例创建时启动 Sender 线程,从而创建与 bootstrap.servers 中所有 Broker 的 TCP 连接
2、KafkaProducer 实例首次更新元数据信息之后,还会再次创建与集群中所有 Broker 的 TCP 连接
3、Producer 端发送消息到某台 Broker 时发现没有与该 Broker 的 TCP 连接,那么也会立即创建连接
何时关闭连接?
如果设置 Producer 端 connections.max.idle.ms 参数大于 0,则步骤 1 中创建的 TCP 连接会被自动关闭;如果设置该参数 =-1,那么步骤 1 中创建的 TCP 连接将无法被关闭,从而成为“僵尸”连接
用户主动关闭:producer.close()
kafka如何实现高可用?
Broker集群化,部署在不同的机器
参数配置
Broker 端参数
log.dirs:Broker 需要使用的若干个文件目录路径,默认配置为:log.dirs=/tmp/kafka-logs
log.dir:结尾没有 s,说明它只能表示单个路径,它是补充上一个参数用的
kafka的备份机制(Replication):把相同的数据拷贝到多台brocker机器上
副本(Replica)
领导者副本(Leader Replica)
为客户端提供服务:生产者总是向领导者副本写消息;而消费者总是从领导者副本读消息
追随者副本(Follower Replica)
向领导者副本发送请求,请求领导者把最新生产的消息发给它,这样它能保持与领导者的同步
保证数据的持久化或消息不丢失
Kafka 拦截器
生产者拦截器
继承 org.apache.kafka.clients.producer.ProducerInterceptor
消费者拦截器
实现 org.apache.kafka.clients.consumer.ConsumerInterceptor
指定拦截器类时要指定它们的全限定名
kafka消息的解压/压缩
压缩
brocker端压缩
1、Broker 端指定了和 Producer 端不同的压缩算法
2、Broker 端发生了消息格式转换
丧失了引以为豪的 Zero Copy 特性
生产者端压缩
根据compression.type来配置
解压
brocker解压
压缩过的消息集合在 Broker 端写入时都要发生解压缩操作,目的就是为了对消息执行各种验证
consumer解压缩 使用的算法信息放在压缩集合里面
算法
1、吞吐量方面:LZ4 > Snappy > zstd 和 GZIP
2、压缩比方面:zstd > LZ4 > GZIP > Snappy
kafka的分区(Partitioning):一个有序不变的消息序列
描述:将每个主题划分成多个分区(Partition),每个分区是一组有序的消息日志;
出现原因:实现系统的高伸缩性(Scalability),带来的高吞吐量和负载均衡的优势
消息存储:生产者生产的每条消息只会被发送到一个分区中
保存消息到分区的策略
1、轮询策略(Round-robin)
2、随机策略(Randomness)
3、按消息键保序策略(Key-ordering)
设置:创建topic的时候设置topic的分区数量
坏处:如果分区过多,那么日志分段也会很多,写的时候由于是批量写,其实就会变成随机写了,随机 I/O 这个时候对性能影响很大。所以一般来说 Kafka 不能有太多的 Partition
kafka的幂等性和事务
幂等性(指定 Producer 幂等性的方法)
1、props.put(“enable.idempotence”, ture)
2、props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)
事务
出现原因:幂等性只能解决单分区和单会话的数据不出现重复消息
描述:保证将消息原子性地写入到多个分区中,这批消息要么全部写入成功,要么全部失败
设置生效
1、生产中端开启 enable.idempotence = true
2、生产者端设置 Producer 端参数 transactional. id;配置文件spring.kafka.producer.transactionIdPrefix=
3、消费者端设置isolation.level =read_committed
read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取
read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息
kafka的消费者组
定义:Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制
特性
1、Consumer Group 下可以有一个或多个 Consumer 实例
2、Group ID 是一个字符串
3、Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费
好处
同时实现点对点模型和发布 / 订阅模型
Group内是队列模型
Group间是发布/订阅模型
Rebalance
定义:质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区
何时触发?
1、组成员数发生变更
2、订阅主题数发生变更
3、订阅主题的分区数发生变更
缺点
STW
在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成
所有 Consumer 实例共同参与,全部重新分配所有分区
非必要Rebalance预防
1、因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的
设置 session.timeout.ms = 6s
设置heartbeat.interval.ms = 2s
2、Consumer 消费时间过长导致的
设置max.poll.interval.ms消费时长
kafka的位移主题
出现的原因
ZooKeeper不适用于高频写的场景
描述
将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets 中
消息格式
1、key value的格式,key由三部分组成 groupid+topic+pattition
2、consumer group信息(在新建group会创建该消息)
3、tombstone(墓碑消息),也称 delete mark
Consumer Group 下的所有 Consumer 实例都停止了,而且它们的位移数据都已被删除时,Kafka 会向位移主题的对应分区写入 tombstone 消息,表明要彻底删除这个 Group 的信息
过期消息删除策略
Compact 策略
Compact 的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起
Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据
线程叫 Log Cleaner
创建时机
自动创建
当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题
该主题的分区数是 offsets.topic.num.partitions=50,副本数是 offsets.topic.replication.factor=3
手动创建
提交位移
自动提交
enable.auto.commit=true
按照auto.commit.interval.ms定期提交位移
手动提交
enable.auto.commit = false
调用Consumer API 提供的位移提交方法
consumer.commitAsync(Map<TopicPartition, OffsetAndMetadata>)
consumer.commitSync()
consumer.commitSync(Map<TopicPartition, OffsetAndMetadata>)
consumer.commitAsync()
异常处理(CommitFailedException 异常)
1、费者两次从kafka中获取消息的时间间隔超过设置的值时,会抛出该异常
解决
1、缩短单条消息处理的时间;代码优化
2、增加 Consumer 端允许下游系统消费一批消息的最大时长;max.poll.interval.ms 两次poll消息的间隔时间
3、减少下游系统一次性消费的消息总数;max.poll.records:一次poll的消息数量
4、下游系统使用多线程来加速消费
1、消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程
2、消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑
2、Consumer 程序配置的 group.id相同
解决:设置不同的组id
0 条评论
下一页
为你推荐
查看更多