Kafka核心技术与实战
2021-01-26 13:54:44 22 举报
AI智能生成
Kafka相关知识点总结,学习必备
作者其他创作
大纲/内容
高级Kafka应用
Kafka Streams
Kafka DSL开发者
应用实例
客户端
生产者
分区机制
消息结构
Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息
分区的作用
为了实现系统的高伸缩性(Scalability),是实现负载均衡以及高吞吐量的关键
不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,
这样每个节点的机器都能独立地执行各自分区的读写请求处理
这样每个节点的机器都能独立地执行各自分区的读写请求处理
还可以通过添加新的节点机器来增加整体系统的吞吐量
扩展
分区思想
应用
Kafka 中叫分区
MongoDB 和 Elasticsearch 中就叫分片 Shard
HBase 中则叫 Region,
Cassandra 中又被称作 vnode
MongoDB 和 Elasticsearch 中就叫分片 Shard
HBase 中则叫 Region,
Cassandra 中又被称作 vnode
作用
提供负载均衡这种最核心的功能
实现业务级别的消息顺序的问题
分区策略
含义
决定生产者将消息发送到那个分区的算法 (Kafka提供默认的分区策略,同时支持自定义分区策略)
常见的分区策略
轮询策略,也称Round-robin策略,及顺序分配
轮询策略是Kafka Java生产者API默认提供的分区策略
(如果主题有3个分区,第一条消息会被发送到分区0,第二条会被发送到分区1 等等)
(如果主题有3个分区,第一条消息会被发送到分区0,第二条会被发送到分区1 等等)
特点:轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一
随机策略,也称Randomness策略
所谓随机就是我们随意地将消息放置到任意一个分区上
特点:力求将数据均匀分散,但逊于轮询策略
按消息键排序策略,也称Key-ordering 策略
消息键排序策略,kafka允许每条消息定义消息键,简称为Key,
该Key可以是一个明确业务含义的字符串,如客户代码,部分编号ID等;
也可以用来表示消息元数据
一旦消息被定义了Key,那么可以保证同一个Key消息进入到相同的分区
该Key可以是一个明确业务含义的字符串,如客户代码,部分编号ID等;
也可以用来表示消息元数据
一旦消息被定义了Key,那么可以保证同一个Key消息进入到相同的分区
特点:可以自定义主键策略
压缩算法(compression)
它秉承了用时间去换空间的经典 trade-off 思想,具体来说就是用 CPU 时间去换磁盘空间或网络 I/O 传输量,
希望以较小的 CPU 开销带来更少的磁盘占用或更少的网络 I/O 传输。
希望以较小的 CPU 开销带来更少的磁盘占用或更少的网络 I/O 传输。
Kafka是如何压缩消息?
Kafka消息格式
Kafka 共有两大类消息格式,社区分别称之为 V1 版本和 V2 版本。V2 版本是 Kafka 0.11.0.0 中正式引入的。
Kafka消息层次
Kafka消息层次分为两层:消息集合(message set)和消息(message)
Kafka底层的消息日志由一些列消息集合日志项组成,写入操作是在这个层面操作
一个消息集合中包含若干条日志项(record item),而日志项才是真正封装消息的地方
V1(kafka 0.11.0之前)集合:message set 消息为message
V2 (kafka 0.11.0之后) 集合: record batch 消息为 record
V2 (kafka 0.11.0之后) 集合: record batch 消息为 record
改进1
V1版本中,每条消息都需要执行CRC校验,
V2版本中,消息的CRC检验工作在消息集合一层
V2版本中,消息的CRC检验工作在消息集合一层
改进2
V1版本中,多条消息进行压缩然后保存到外层消息的消息体字段中,
V2版本中,对整个消息集合进行压缩
V2版本中,对整个消息集合进行压缩
Kafka是何时压缩
Kafka可能发生压缩的地方:生产者和Broker端
Broker端产生压缩的原因
Broker 端指定了和 Producer 端不同的压缩算法
Broker 端发生了消息格式转换
Kafka是何时解压缩
Kafka可能发生压缩的地方:生产者和Broker端
Producer 端压缩、Broker 端保持、Consumer 端解压缩
Kafka各种压缩算法对比
Kafka 2.1.0 版本之前,Kafka 支持 3 种压缩算法:GZIP、Snappy 和 LZ4
从 2.1.0 开始,Kafka 正式支持 Zstandard 算法(简写为 zstd)。
它是 Facebook 开源的一个压缩算法,能够提供超高的压缩比(compression ratio)
它是 Facebook 开源的一个压缩算法,能够提供超高的压缩比(compression ratio)
压缩算法的优劣
压缩比
原先占 100 份空间的东西经压缩之后变成了占 20 份空间,那么压缩比就是 5,显然压缩比越高越好
压缩 / 解压缩吞吐量
比如每秒能压缩或解压缩多少 MB 的数据,吞吐量也是越高越好
对于 Kafka 吞吐量:LZ4 > Snappy > zstd 和 GZIP;压缩比:zstd > LZ4 > GZIP > Snappy。
无消息丢失配置
Kafka在什么情况下保证消息不丢失?
Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证
已提交的消息:Kafka的若干个broker成功地接收到一条消息,并写入的日记文件
有限度的持久化保证:Kafka不可能保证任何情况下都做到不丢失消息
“消息丢失”案例
生产者程序丢失数据
Producer 永远要使用带有回调通知的发送 API,也就是说不要使用 producer.send(msg)
而要使用 producer.send(msg, callback)
而要使用 producer.send(msg, callback)
消费者程序丢失数据
消费者消费消息时,先更新位移,如果终止操作,
下次消费时,会从最新位移位置开始,导致终止操作的数据获取不到
下次消费时,会从最新位移位置开始,导致终止操作的数据获取不到
Consumer 端的位移数据
解决方案
维持先消费消息,再更新位移的顺序
引入新问题:如果消费信息成功,更新位移顺序失败,导致重复消费
如果是多线程异步处理消费消息,Consumer 程序不要开启自动提交位移,而是要应用程序手动提交位移
最佳实践
Producer配置
不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法
设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。
如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义
如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义
设置 retries 为一个较大的值。对应前面提到的 Producer 自动重试。
当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失
当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失
Broker配置
设置 unclean.leader.election.enable = false。它控制的是哪些 Broker 有资格竞选分区的 Leader。
如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。
故一般都要将该参数设置成 false,即不允许这种情况的发生。
如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。
故一般都要将该参数设置成 false,即不允许这种情况的发生。
设置 replication.factor >= 3 副本保存份数
设置 min.insync.replicas > 1 最小副本提交数,控制的是消息至少要被写入到多少个副本才算是“已提交”。
设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
确保 replication.factor > min.insync.replicas,推荐设置成 replication.factor = min.insync.replicas + 1
如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了
如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了
Consumer配置
确保消息消费完成再提交。
Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。
Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。
高级功能
拦截器
等同Spring Interceptor 或是 Apache Flume实现原理
Kafka 拦截器分为生产者拦截器和消费者拦截器
生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑
消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑
拦截器都支持链的方式,即你可以将一组拦截器串连成一个大的拦截器, 会按照添加顺序依次执行拦截器逻辑
拦截器实现
Producer端
配置Kafka 拦截器是通过参数配置完成的,指定拦截器类时要指定它们的全限定名
Producer自定义拦截器需要实现 org.apache.kafka.clients.producer.ProducerInterceptor 接口
onSend:该方法会在消息发送之前被调用。如果你想在发送之前对消息“美美容”,这个方法是你唯一的机会
onAcknowledgement:该方法会在消息成功提交或发送失败之后被调用。
onAcknowledgement 的调用要早于发送回调通知callback的调用。
该方法和 onSend不是在同一个线程中被调用的,如果你在这两个方法中调用了某个共享可变对象,一定要保证线程安全。
该方法处在 Producer 发送的主路径中,所以最好别放一些太重的逻辑进去,否则你会发现你的 Producer TPS 直线下降。
onAcknowledgement 的调用要早于发送回调通知callback的调用。
该方法和 onSend不是在同一个线程中被调用的,如果你在这两个方法中调用了某个共享可变对象,一定要保证线程安全。
该方法处在 Producer 发送的主路径中,所以最好别放一些太重的逻辑进去,否则你会发现你的 Producer TPS 直线下降。
Consumer端
配置Kafka 拦截器是通过参数配置完成的,指定拦截器类时要指定它们的全限定名
Consumer自定义拦截器需要实现 oorg.apache.kafka.clients.consumer.ConsumerInterceptor 接口
onConsume:该方法在消息返回给 Consumer 程序之前调用。也就是说在开始正式处理消息之前
onCommit:Consumer 在提交位移之后调用该方法。
通常你可以在该方法中做一些记账类的动作,比如打日志等
通常你可以在该方法中做一些记账类的动作,比如打日志等
Kafka拦截器应用的场景
Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景
端到端性能检测:通过编写拦截器类来统计消息端到端处理的延时
TCP连接管理
为何采用TCP通信
TCP拥有一些高级功能,如多路复用请求和同时轮询多个连接的能力
多路复用请求:multiplexing request,是将两个或多个数据合并到底层—物理连接中的过程。
TCP的多路复用请求会在一条物理连接上创建若干个虚拟连接,每个虚拟连接负责流转各自对应的数据流。
严格讲:TCP并不能多路复用,只是提供可靠的消息交付语义保证,如自动重传丢失的报文
TCP的多路复用请求会在一条物理连接上创建若干个虚拟连接,每个虚拟连接负责流转各自对应的数据流。
严格讲:TCP并不能多路复用,只是提供可靠的消息交付语义保证,如自动重传丢失的报文
很多编程语言的HTTP库功能相对的比较简陋
Kafka生产者过程(以Java生产者API KafkaProducer)
1.构造生产者对象所需的参数对象
2.创建KafkaProducer对象实例
3.使用KafkaProducer的send方法发送消息
4.KafkaProducer的close方法关闭生产者并释放各种资源
何时创建TCP连接
主:创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程。
该 Sender 线程开始运行时首先会创建与 Broker 的连接
该 Sender 线程开始运行时首先会创建与 Broker 的连接
bootstrap.servers,Producer核心参数,表示Producer启动时要连接Broker地址
集群中所有的 Broker信息都配置到 bootstrap.servers 中,通常你指定 3~4 台
因为 Producer 一旦连接到集群中的任一台 Broker,就能拿到整个集群的 Broker 信息
Producer 向某一台 Broker 发送了 METADATA 请求,尝试获取集群的元数据信息
因为 Producer 一旦连接到集群中的任一台 Broker,就能拿到整个集群的 Broker 信息
Producer 向某一台 Broker 发送了 METADATA 请求,尝试获取集群的元数据信息
可能被创建连接的地方:1.更新元数据后,2.在消息发送时(并不是总是创建TCP连接)
Producer更新集群的元数据,发现某些Broker当前没有连接,就会创建TCP连接
当要发送消息时,Producer 发现尚不存在与目标 Broker 的连接,也会创建TCP连接
何时关闭TCP连接
用户主动关闭:包含kill -9 杀死进程和调用producer.close()方法来关闭等
Kafka自动关闭:Pconnections.max.idle.ms 默认情况下该参数值是 9 分钟,
9分钟内没有请求通过,那么 Kafka 会自动关闭TCP 连接
一旦被设置成 -1,TCP 连接将成为永久长连接
9分钟内没有请求通过,那么 Kafka 会自动关闭TCP 连接
一旦被设置成 -1,TCP 连接将成为永久长连接
该TCP连接是在Broker端关闭的,是被动关闭场景,造成大量CLOSE_WAIT连接,
Producer端或Client端没有机会显式的观测连接被中断
Producer端或Client端没有机会显式的观测连接被中断
幂等生产者与事务生产者
Kafka 消息交付可靠性保障
Kafka 对 Producer 和 Consumer 要处理的消息提供什么样的承诺
1.最多一次(at most once):消息可能会丢失,但绝不会被重复发送
2.至少一次(at least once):消息不会丢失,但有可能被重复发送。
3.精确一次(exactly once):消息不会丢失,也不会被重复发送
1.最多一次(at most once):消息可能会丢失,但绝不会被重复发送
2.至少一次(at least once):消息不会丢失,但有可能被重复发送。
3.精确一次(exactly once):消息不会丢失,也不会被重复发送
Kafka 默认提供的交付可靠性保障是第二种,即至少一次
幂等性(Idempotence)
Producer 默认不是幂等性,在 0.11 之后,可以指定 Producer 幂等性。
通过props.put(“enable.idempotence”, ture)
或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)配置
通过props.put(“enable.idempotence”, ture)
或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)配置
幂等性 Producer 的作用范围
保证单分区上的幂等性,
单个幂等性Producer能够保证某个主题的某个分区不出现重复消息
无法现实多个分区幂等性
单个幂等性Producer能够保证某个主题的某个分区不出现重复消息
无法现实多个分区幂等性
只能出现单会话上的幂等性,不会实现跨会话的幂等性(重启幂等姓就无法保证)
事务(Transaction)
Kafka自0.11版本之后提供了对事务的支持,主要在read committed隔离级别。
保证多条消息原子性的写入目标分区,保证Consumer只能看到事务成功提交的消息
保证多条消息原子性的写入目标分区,保证Consumer只能看到事务成功提交的消息
事务型 Producer
设置事务型Producer
1.开启 enable.idempotence = true
2.设置 Producer 端参数 transactional. id,设置名字
2.设置 Producer 端参数 transactional. id,设置名字
事务型Producer调用
事务型 Producer 的显著特点是调用了一些事务 API,
如 initTransaction、beginTransaction、commitTransaction 和 abortTransaction,
它们分别对应事务的初始化、事务开始、事务提交以及事务终止。
如 initTransaction、beginTransaction、commitTransaction 和 abortTransaction,
它们分别对应事务的初始化、事务开始、事务提交以及事务终止。
消费者
消费者组(Consumer Group)
Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制
组内存在多个消费者或者消费者实例(Consumer Instance),共享一个公共ID(Group ID )
组内的所有消费者协调在一起在消费订阅主题(Subscribed Topics)的所有分区
每个分区只能由同一个消费者组内的Consumer实例来消费
组内存在多个消费者或者消费者实例(Consumer Instance),共享一个公共ID(Group ID )
组内的所有消费者协调在一起在消费订阅主题(Subscribed Topics)的所有分区
每个分区只能由同一个消费者组内的Consumer实例来消费
特性
1.Consumer Group 下可以有一个或多个 Consumer 实例,这里实例是一个单独的进程或者统一进程下的线程
2. Group ID 是一个字符串,在Kafka集群中,标识唯一的Consumer Group
3.Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费
_consumer_offsets(消息位移Topic)
位移主题,这是一个内部主题,即Offsets Topic
背景:老版本Consumer Group位移保存在Zookeeper中
优点:减少Kafka Broker端的状态保存开销。
将服务器节点做成无状态,可以自由地扩容,超强的伸缩性
将服务器节点做成无状态,可以自由地扩容,超强的伸缩性
缺点:Zookeeper并不适合进行频繁的写更新,
Consumer Group的位移是个非常频繁的操作,大吞吐量的写操作会拖慢ZooKeeper集群性能
Consumer Group的位移是个非常频繁的操作,大吞吐量的写操作会拖慢ZooKeeper集群性能
概念
使用_consumer_offsets topic用来记录Kafka 消费者的位移信息:
将Consumer的位移数据作为普通的Kafka消息,提交到_consumer_offsets中
将Consumer的位移数据作为普通的Kafka消息,提交到_consumer_offsets中
消息格式
_consumer_offsets的消息格式是Kafka自定义的,用户不能修改
<Key,Value>格式
Key: <Group,主题名,分区号>
Value
位移值
保存Consumer Group信息的消息
用于删除Group过期位移甚至是删除Group的消息(tombstone 消息,即墓碑消息,也称 delete mark)
初始化
当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题
自动创建分区
Broker端 offsets.topic.num.partitions 默认50,创建_consumer_offsets-xxx日志等目录
自动创建副本
Broker端 offsets.topic.replication.factor 默认3
Kafka Consumer提交位移方式
自动提交位移
enable.auto.commit 设置为true
auto.commit.interval.ms 来控制间隔
auto.commit.interval.ms 来控制间隔
问题
当Consumer无消息消费时,位移会保持在之前那次位移的位置由于自动提交,会不停的写入位移数据,
因此需要保留最新一条,需要删除之前的多余信息
因此需要保留最新一条,需要删除之前的多余信息
Kafka是怎样删除位移主题中过期信息?
Compact策略过程
如何定义Compact 策略中的过期?
对于同一个 Key 的两条消息 M1 和 M2,如果 M1 的发送时间早于 M2,那么 M1 就是过期消息
Compact 策略如何删除过期数据?
Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。
这个后台线程叫 Log Cleaner
这个后台线程叫 Log Cleaner
手动提交位移
enable.auto.commit = false
Consumer API提供了consumer.commitSync方法调用
Consumer API提供了consumer.commitSync方法调用
Rebalance
概念:Rebalance 就是让一个 Consumer Group 下所有的 Consumer 实例就如何消费订阅主题的所有分区达成共识的过程
触发Rebalance方式
Coordinator 通知各个 Consumer 实例开启 Rebalance 的方法,就是将 REBALANCE_NEEDED 标志封装进心跳请求的响应体中。
协调者(Coordinator)
它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等
Broker 在启动时,都会创建和开启相应的 Coordinator 组件
Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移
当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,
然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。
然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。
Consumer Group 如何确定为它服务的 Coordinator 在哪台 Broker 上呢?
有助于帮助我们定位Broker
有助于帮助我们定位Broker
第 1 步:确定由位移主题的哪个分区来保存该 Group 数据:
partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。
partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。
第 2 步:找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator。
Rebalance触发条件
1.组成员数发生变更。
比如有新的 Consumer 实例加入组或者离开组,异或是有 Consumer 实例崩溃被“踢出”组。
比如有新的 Consumer 实例加入组或者离开组,异或是有 Consumer 实例崩溃被“踢出”组。
2.订阅主题数发生变更。
Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
3.订阅主题的分区数发生变更。
Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。
Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。
Rebalance的弊端
1.Rebalance 影响 Consumer 端 TPS。 Rebalance期间,Consumer会停止工作
2.Rebalance很慢。
3.Rebalance效率不高。Kafka的设计机制决定每次Rebalance时,Group所有成员都要参与
(改进原则:使用局部型原理提高性能)
(改进原则:使用局部型原理提高性能)
避免Rebalance
订阅主题数量发生变化&订阅主题的分区数发生变化,无法避免
组成员数量变化而引发的 Rebalance 该如何避免
增加 Consumer 实例的操作都是计划内的,可能是出于增加 TPS 或提高伸缩性的需要
避免Group 下实例数减少
Conumser实例存活(通过Consumer实例定期向Coordinator发送心跳请求)
心跳参数:
session.timeout.ms 默认10s 发送心跳超时时间
heartbeat.interval.ms 控制发送心跳请求频率参数
max.poll.interval.ms 默认5分钟 表示5分钟内Consumer
无法消费完poll方法返回的消息,Consumer会主动发起“离开组”请求
session.timeout.ms 默认10s 发送心跳超时时间
heartbeat.interval.ms 控制发送心跳请求频率参数
max.poll.interval.ms 默认5分钟 表示5分钟内Consumer
无法消费完poll方法返回的消息,Consumer会主动发起“离开组”请求
避免Rebalance场景
第一类非必要 Rebalance : 因为未能及时发送心跳,
导致 Consumer 被“踢出”Group 而引发的
导致 Consumer 被“踢出”Group 而引发的
设置 session.timeout.ms = 6s
设置 heartbeat.interval.ms = 2s
要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求,
即 session.timeout.ms >= 3 * heartbeat.interval.ms
即 session.timeout.ms >= 3 * heartbeat.interval.ms
第二类非必要 Rebalance : Consumer 消费时间过长导致的
第三类非必要 Rebalance : 排查Consumer端GC表现,避免Full GC (GC参数配置不合理)
消费者组重平衡全流程解析
位移提交
概念
Consumer的消费位移
记录了 Consumer 要消费的下一条消息的位移
Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程称为提交位移(Committing Offsets)
位移提交的语义保障是由你来负责的,Kafka 只会“无脑”地接受你提交的位移。
你对位移提交的管理直接影响了你的 Consumer 所能提供的消息语义保障。
你对位移提交的管理直接影响了你的 Consumer 所能提供的消息语义保障。
粒度
分区粒度(各自分区提交各自位移数据)
位移提交方式
用户角度
自动提交
手动提交
Consumer端角度
同步提交
异步提交
异常处理
消息堆积原因
生产速度大于消费速度,这样可以适当增加分区,增加Consumer数量,提升消费TPS
Consumer消费性能低,查一下是否有很重的消费逻辑(比如拿到消息后写HDFS或HBASE这种逻辑就挺重的),看看是否可以优化Consumer TPS
确保Consumer端没有因为异常而导致消费hang住
如果你使用的是消费者组,确保没有频繁地发生rebalance
CommitFailedException
多线程开发实例
TCP连接管理
消费者组监控
Kafka原理
副本机制(Replication,备份机制)
副本机制好处
提供数据冗余
即使系统部分组件失效,系统依然能够继续运转,因而增加了整体可用性以及数据持久性。
提供高伸缩性
支持横向扩展,能够通过增加机器的方式来提升读性能,进而提高读操作吞吐量。
改善数据局部性
允许将数据放入与用户地理位置相近的地方,从而降低系统延时。
Apache Kafka提供了副本机制带来的第一个好处
即使系统部分组件失效,系统依然能够继续运转,因而增加了整体可用性以及数据持久性。
副本机制依然是 Kafka 设计架构的核心所在,它也是 Kafka 确保系统高可用和消息高持久性的重要基石。
副本定义(Replica)
一个只能追加写消息的提交日志
同一个分区下的所有副本保存有相同的消息序列,这些副本分散保存在不同的 Broker 上,
从而能够对抗部分 Broker 宕机带来的数据不可用。
从而能够对抗部分 Broker 宕机带来的数据不可用。
副本角色
如何确保副本中所有的数据都是一致?
(对Kafka而言 生产者发送消息到某个主题后,
消息是如何同步到所有副本中呢?)
(对Kafka而言 生产者发送消息到某个主题后,
消息是如何同步到所有副本中呢?)
基于领导者(Leader-based)的副本机制
1. Kafka中,副本分成两类:领导这副本(Leader Replica)和追随者副本(Follwer Replica)
每个分区在创建时都要选举一个副本,称为领导者副本,其余的副本自动称为追随者副本。
每个分区在创建时都要选举一个副本,称为领导者副本,其余的副本自动称为追随者副本。
2. 在 Kafka 中,追随者副本是不对外提供服务的。
追随者副本不处理客户端请求,它唯一的任务就是从领导者副本异步拉取消息,
并写入到自己的提交日志中,从而实现与领导者副本的同步
追随者副本不处理客户端请求,它唯一的任务就是从领导者副本异步拉取消息,
并写入到自己的提交日志中,从而实现与领导者副本的同步
3.当领导者副本挂掉了,或者说领导者副本所在的 Broker 宕机时,
Kafka 依托于 ZooKeeper 提供的监控功能能够实时感知到,
并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。
老 Leader 副本重启回来后,只能作为追随者副本加入到集群中。
Kafka 依托于 ZooKeeper 提供的监控功能能够实时感知到,
并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。
老 Leader 副本重启回来后,只能作为追随者副本加入到集群中。
基于领导者(Leader-based)的副本机制的好处
1.方便实现“Read-your-writes”:使用领导者副本提供写入和读取操作,可以保证消息的实时性
2.方便实现单调读(Monotonic Reads)
追随者副本同步条件
In-sync Replicas(ISR)
定义
可伸缩的副本集合,包含Leader和Follower副本的集合
控制可伸缩的参数
Broker端参数:replica.lag.time.max.ms
Follower副本落后Leader副本的最长时间间隔,
如果Follower副本超过这个时间未与Leader同步,
那么会被排除ISR集合之中;
如果Follwer后面追上Leader进度,
会被加入到ISR集合中
Follower副本落后Leader副本的最长时间间隔,
如果Follower副本超过这个时间未与Leader同步,
那么会被排除ISR集合之中;
如果Follwer后面追上Leader进度,
会被加入到ISR集合中
领导者选举(Leader Election)
Leader Election
Unclean Leader Election
定义
ISR集合为空的极端情况
Kafka请求处理
所有的请求都是通过 TCP 网络以 Socket 的方式进行通讯的。
Apache Kafka 自己定义了一组请求协议,用于实现各种各样的交互操作,截止至2.3版本,多达45种格式。
比如常见的 PRODUCE 请求是用于生产消息的,FETCH 请求是用于消费消息的,METADATA 请求是用于请求 Kafka 集群元数据信息的。
比如常见的 PRODUCE 请求是用于生产消息的,FETCH 请求是用于消费消息的,METADATA 请求是用于请求 Kafka 集群元数据信息的。
如何处理请求
Reactor 模式
Reactor 模式是事件驱动架构的一种实现方式,特别适合应用于处理多个客户端并发向服务器端发送请求的场景
Controller
控制器组件(Controller)是 Apache Kafka 的核心组件。它的主要作用是在 Apache ZooKeeper 的帮助下管理和协调整个 Kafka 集群
高水位
基本使用
线上集群部署方案
操作系统
I/O模型
主流的 I/O 模型通常有 5 种类型:
阻塞式 I/O
非阻塞式 I/O
I/O 多路复用
信号驱动 I/O
异步 I/O
阻塞式 I/O
非阻塞式 I/O
I/O 多路复用
信号驱动 I/O
异步 I/O
Java 中 Socket 对象的阻塞模式和非阻塞模式就对应于前两种模型
Linux 中的系统调用 select 函数就属于 I/O 多路复用模型
大名鼎鼎的 epoll 系统调用则介于第三种和第四种模型之间
至于第五种模型,其实很少有 Linux 系统支持,反而是 Windows 系统提供了一个叫 IOCP 线程模型属于这一种。
Kafka 客户端底层使用了 Java 的 selector,selector 在 Linux 上的实现机制是 epoll,而在 Windows 平台上的实现机制是 select。因此在这一点上将 Kafka 部署在 Linux 上是有优势的,因为能够获得更高效的 I/O 性能。
数据网络传输效率
在 Linux 部署 Kafka 能够享受到零拷贝技术所带来的快速数据传输特性。(Windwos需要JDK 8.20版本之后支持)
社区支持度
Windows 平台上部署 Kafka 只适合于个人测试或用于功能验证,千万不要应用于生产环境。
磁盘
磁盘I/O性能
磁盘的容量
根据消息数和预留时间数,预估磁盘容量,实际的使用情况,预留20%~30%的磁盘空间
带宽
根据实际带宽资源和业务SLA 预估服务器数量
集群配置参数
Broker 端参数
主题级别的参数
JVM 端参数
操作系统级别的参数
Kafka
消息引擎基础
消息引擎(Message System)
消息格式
序列化/反序列化
消息格式:CSV,XML,JSON
消息传输协议
点对点模型(消息队列模型)
发布订阅模型(广播模型 publisher/subscriber)
特点
“削峰填谷”
缓冲上下游瞬间突发流量,使其平滑
解耦
异步
Kafka基本术语
主题(Topic)
发布订阅的对象
客户端(Clients)
生产者(Producer)
向主题发布消息的客户端应用程序,可以发布一到多个主题
消费者(Consumer)
订阅这些主题消息的客户端应用程序,可以订阅一到多个主题
消费者组(Consumer Group)
多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐
重平衡(Rebalance)
消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程
Rebalance 是 Kafka 消费者端实现高可用的重要手段
消费者位移(Consumer Offset)
表征消费者消费标志位,每个消费者都有自己的消费者位移
获取消息方式-消息拉取(Consumer Pull)
服务端
服务端
Kafka的服务器端由被称为Broker等服务进程构成,一个Kafka集群由多个Broker组成
Broker
负责接收和处理客户端发送过来的请求,以及对消息进行持久化
同一个集群,将不同Broker分散进行在不同的机器上,提高高可用能力
备份机制(Replication)
备份
相同的数据拷贝到多台机器上
副本(Replica)
拷贝的相同数据
领导者副本(Leader Replica)
与客户端程序进行交互,提供服务,
工作机制
生产者总是向领导者副本写消息;消费者总是从领导这副本读消息
追随者副本(Follower Replica)
领导者副本的追随副本
工作机制
向领导者副本发送请求,请求领导者副本把最新生产的消息进行同步
分区(Partitioning)
伸缩性
Kafka分区机制
将每个主题划分成多个分区(Partition),每个分区是一组有序的消息日志
消息架构(3层)
主题层
每个主题可以配置M个分区
分区层
每个分区的N个副本中,只有一个充当领导这角色,对外提供服务;其它N-1副本是追随者副本
消息层
分区中若干条消息,每条消息的位移从0开始,依次递增
消息日志(Log)
Kafka Broker使用消息日志(Log)来持久化数据
写消息日志方式
追加写入内容(append-only)避免了缓慢的随机I/O操作,改为性能较好的顺序I/O写操作
实现Kafka高吞吐量特性手段
日志段(Log Segment)
消息日志进一步细分为多个日志段
当每个日志段写满时,Kafka会自动切分出一个新的日志段,并将老的日志段保存
Kafka使用定时任务定期检查老的日志段能否被删除,实现回收磁盘空间
消息位移(Offset)
表示分区中每条消息的位置消息,是一个单调递增且不变的值
结构图
Kafka角色定位
Apache Kafka 是消息引擎系统,也是一个分布式流处理平台(Distributed Streaming Platform)
Kafka设计初衷
提供一套 API 实现生产者和消费者
降低网络传输和磁盘存储开销
实现高伸缩性架构
Kafka版本选择
Kafka Streams 组件
它提供了 Kafka 实时处理流数据的能力
Kafka Connect 组件
具体的连接器(Connector),串联起上下游的外部系统。
Kafka版本号
kafka-2.11-2.1.1
2.11 表示前面的版本号是编译 Kafka 源代码的 Scala 编译器版本
2.1.1 表示Kafka 实际的版本号 大版本号 - 小版本号 - Patch 号
0.7版本:
只有基础消息队列功能,无副本;打死也不使用
只有基础消息队列功能,无副本;打死也不使用
0.8版本:
增加了副本机制,新的producer API;建议使用0.8.2.2版本;不建议使用0.8.2.0之后的producer API
0.9版本:
增加权限和认证,新的consumer API,Kafka Connect功能;不建议使用consumer API;
0.10版本:
引入Kafka Streams功能,bug修复;建议版本0.10.2.2;建议使用新版consumer API
0.11版本:
producer API幂等,事物API,消息格式重构;建议版本0.11.0.3;谨慎对待消息格式变化
1.0和2.0版本:
Kafka Streams改进;建议版本2.0;
运维与监控
主题管理
动态配置
消费者组位移管理
KafkaAdminClient
认证机制
MirrorMaker
监控框架
授权管理
Kafka调优
流处理应用搭建实例
0 条评论
下一页