kafka学习笔记图
2022-03-22 17:30:52 0 举报
AI智能生成
kafka作为比较的消息队列,在实际工作运用都比较广泛,了解其底层原来尤为的重要
作者其他创作
大纲/内容
消息编码格式:纯二进制的字节序列
缓冲上下游瞬时突发流量,使其更平滑
1、削峰填谷
2、在不同系统之间传递语义准确的消息,实现松耦合的异步式数据传递
作用
点对点模型
主题(Topic)
发布者(Publisher)
订阅者(Subscriber)
和点对点模型不同的是,这个模型可能存在多个发布者向相同的主题发送消息,而订阅者也可能存在多个,它们都能接收到相同主题的消息
发布 / 订阅模型
消息引擎模型
定义:开源的消息引擎系统和分布式流处理平台
生产者(Producer):向主题发布新消息的应用程序
消费者(Consumer):从主题订阅新消息的应用程序
生产者和消费者
kafka的客户端(Clients)
负责接收和处理客户端发送过来的请求
只能追加写(Append-only)消息的物理文件
顺序 I/O 写操作
消息日志(Log)
消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka 会自动切分出一个新的日志段,并将老的日志段封存起来
定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的
日志段(Log Segment)机制
磁盘满了?
对消息进行持久化
主题(Topic):主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务
一个Broker包含多个主题topic
kafka的服务端(Broker)
主题层:每个主题可以配置 M 个分区,而每个分区又可以配置 N 个副本
分区层:一个领导者角色,对外提供服务; N-1 个副本是追随者副本,只是提供数据冗余之用
消息位移(Offset):示分区中每条消息的位置信息,是一个单调递增且不变的值
消费者位移(Consumer Offset):消费者消费进度,每个消费者都有自己的消费者位移
消息(Record):指 Kafka 处理的主要对象
消息层:分区中包含若干条消息,每条消息的位移从 0 开始,依次递增(先进先出)
kafka的三层消息架构
描述:多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐,每个分区都只会被组内的一个消费者实例消费
目的:提升消费者端的吞吐量
重平衡(Rebalance):当消费组中的正在消费的线程挂掉后,会rebalance将消息分配给消费组中的其他线程消费
kafka的消费者组(Consumer Group)
1、已提交的消息:kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件
2、有限度的持久化保证:Broker 中至少有 1 个存活
持久化条件
1、网络抖动,导致消息压根就没有发送到 Broker 端
2、消息本身不合格导致 Broker 拒绝接收
原因
0:生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送
1:leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应
all:leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失
2、设置acks = all:表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”spring.kafka.producer.acks=all;springboot配置文件
3、设置 retries 为一个较大的值,spring.kafka.producer.retries=0;springboot配置文件
生产者端解决
1、设置 min.insync.replicas > 1:消息至少要被写入到多少个副本才算是“已提交”
确保 replication.factor > min.insync.replicas,建议设备replication.factor = min.insync.replicas + 1
2、设置 replication.factor >= 3:将消息多保存几份,不得大于集群中broker的个数
3、设置 unclean.leader.election.enable = false:关闭竞选leader
brocker端解决
生产者程序丢失数据
消息的重复处理
解决
带来问题
维持先消费消息(阅读),再更新位移(书签)的顺序
1、未按照正确的消费步骤消费
Consumer 程序不要开启自动提交位移(enable.auto.commit设置为false),而是要应用程序手动提交位移
2、多线程负责的消息没有被成功处理,但位移已经被更新
消费者程序丢失数据
丢失场景
kafka如何保证消息不丢失?
实现机制是 epoll
零拷贝(Zero Copy):零拷贝避免了将数据从磁盘复制到缓冲区,再将缓冲区的内容发送到socket的性能损耗。这中间有四次数据拷贝,磁盘->页缓存->用户空间->socket缓存->网络
Linux
实现机制是 select
Windows
kafka的I/O模型
可以多路复用等
HTTP 库在很多编程语言中都略显简陋
选择TCP原因
1、KafkaProducer 实例创建时启动 Sender 线程,从而创建与 bootstrap.servers 中所有 Broker 的 TCP 连接
2、KafkaProducer 实例首次更新元数据信息之后,还会再次创建与集群中所有 Broker 的 TCP 连接
3、Producer 端发送消息到某台 Broker 时发现没有与该 Broker 的 TCP 连接,那么也会立即创建连接
何时建立连接?
如果设置 Producer 端 connections.max.idle.ms 参数大于 0,则步骤 1 中创建的 TCP 连接会被自动关闭;如果设置该参数 =-1,那么步骤 1 中创建的 TCP 连接将无法被关闭,从而成为“僵尸”连接
用户主动关闭:producer.close()
何时关闭连接?
kafka的TCP连接
log.dirs:Broker 需要使用的若干个文件目录路径,默认配置为:log.dirs=/tmp/kafka-logs
log.dir:结尾没有 s,说明它只能表示单个路径,它是补充上一个参数用的
Broker 端参数
参数配置
Broker集群化,部署在不同的机器
为客户端提供服务:生产者总是向领导者副本写消息;而消费者总是从领导者副本读消息
领导者副本(Leader Replica)
向领导者副本发送请求,请求领导者把最新生产的消息发给它,这样它能保持与领导者的同步
追随者副本(Follower Replica)
副本(Replica)
保证数据的持久化或消息不丢失
kafka的备份机制(Replication):把相同的数据拷贝到多台brocker机器上
kafka如何实现高可用?
继承 org.apache.kafka.clients.producer.ProducerInterceptor
生产者拦截器
实现 org.apache.kafka.clients.consumer.ConsumerInterceptor
消费者拦截器
指定拦截器类时要指定它们的全限定名
Kafka 拦截器
1、Broker 端指定了和 Producer 端不同的压缩算法
丧失了引以为豪的 Zero Copy 特性
2、Broker 端发生了消息格式转换
brocker端压缩
根据compression.type来配置
生产者端压缩
压缩
压缩过的消息集合在 Broker 端写入时都要发生解压缩操作,目的就是为了对消息执行各种验证
brocker解压
consumer解压缩 使用的算法信息放在压缩集合里面
解压
1、吞吐量方面:LZ4 > Snappy > zstd 和 GZIP
2、压缩比方面:zstd > LZ4 > GZIP > Snappy
算法
kafka消息的解压/压缩
描述:将每个主题划分成多个分区(Partition),每个分区是一组有序的消息日志;
出现原因:实现系统的高伸缩性(Scalability),带来的高吞吐量和负载均衡的优势
消息存储:生产者生产的每条消息只会被发送到一个分区中
1、轮询策略(Round-robin)
2、随机策略(Randomness)
3、按消息键保序策略(Key-ordering)
保存消息到分区的策略
设置:创建topic的时候设置topic的分区数量
坏处:如果分区过多,那么日志分段也会很多,写的时候由于是批量写,其实就会变成随机写了,随机 I/O 这个时候对性能影响很大。所以一般来说 Kafka 不能有太多的 Partition
kafka的分区(Partitioning):一个有序不变的消息序列
2、props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)
幂等性(指定 Producer 幂等性的方法)
出现原因:幂等性只能解决单分区和单会话的数据不出现重复消息
描述:保证将消息原子性地写入到多个分区中,这批消息要么全部写入成功,要么全部失败
1、生产中端开启 enable.idempotence = true
2、生产者端设置 Producer 端参数 transactional. id;配置文件spring.kafka.producer.transactionIdPrefix=
read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取
read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息
3、消费者端设置isolation.level =read_committed
设置生效
事务
kafka的幂等性和事务
定义:Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制
1、Consumer Group 下可以有一个或多个 Consumer 实例
2、Group ID 是一个字符串
3、Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费
特性
Group内是队列模型
Group间是发布/订阅模型
同时实现点对点模型和发布 / 订阅模型
好处
定义:质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区
1、组成员数发生变更
2、订阅主题数发生变更
3、订阅主题的分区数发生变更
何时触发?
在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成
STW
所有 Consumer 实例共同参与,全部重新分配所有分区
缺点
设置 session.timeout.ms = 6s
设置heartbeat.interval.ms = 2s
1、因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的
设置max.poll.interval.ms消费时长
2、Consumer 消费时间过长导致的
非必要Rebalance预防
Rebalance
kafka的消费者组
ZooKeeper不适用于高频写的场景
出现的原因
将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets 中
描述
1、key value的格式,key由三部分组成 groupid+topic+pattition
2、consumer group信息(在新建group会创建该消息)
Consumer Group 下的所有 Consumer 实例都停止了,而且它们的位移数据都已被删除时,Kafka 会向位移主题的对应分区写入 tombstone 消息,表明要彻底删除这个 Group 的信息
3、tombstone(墓碑消息),也称 delete mark
消息格式
Compact 的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起
线程叫 Log Cleaner
Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据
Compact 策略
过期消息删除策略
当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题
该主题的分区数是 offsets.topic.num.partitions=50,副本数是 offsets.topic.replication.factor=3
自动创建
手动创建
创建时机
enable.auto.commit=true
按照auto.commit.interval.ms定期提交位移
自动提交
enable.auto.commit = false
consumer.commitSync()
consumer.commitAsync()
调用Consumer API 提供的位移提交方法
手动提交
1、缩短单条消息处理的时间;代码优化
2、增加 Consumer 端允许下游系统消费一批消息的最大时长;max.poll.interval.ms 两次poll消息的间隔时间
3、减少下游系统一次性消费的消息总数;max.poll.records:一次poll的消息数量
1、消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程
2、消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑
4、下游系统使用多线程来加速消费
1、费者两次从kafka中获取消息的时间间隔超过设置的值时,会抛出该异常
解决:设置不同的组id
2、Consumer 程序配置的 group.id相同
异常处理(CommitFailedException 异常)
提交位移
kafka的位移主题
kafka学习笔记
0 条评论
下一页