Kafka快速入门
2021-08-13 16:51:24 0 举报
AI智能生成
kafka快速入门导图
作者其他创作
大纲/内容
概述
定义
基于发布/订阅模式的分布式消息队列
主要用于大数据实时处理领域
消息队列
传统消息队列使用场景
以注册用户为例
以注册用户为例
同步处理
用户注册页面填写注册信息
注册信息写入数据库
调用发送短信接口
发送短信
页面相应注册成功
异步处理
用户注册页面填写注册信息
注册信息写入数据库
发送短信的请求写入消息队列
发送短信
页面相应注册成功
使用消息队列好处
解耦
允许你独立的扩展或修改两边的处理过程
可恢复性
部分组件失效不会影响整个系统
主要针对处理消息的进程挂掉
缓冲/削峰
控制优化数据流过系统的速度,解决生产消费处理速度不一致的情况
灵活/削峰
消息队列可根据需要增加服务器
在访问剧增是顶住压力,高峰过后释放资源
异步通信
允许延时处理
消息队列两种模式
点对点
消费者主动拉取数据,消息收到后消息清除
流程
生产者产生消息放入队列Queue
消费者从Queue去除消息消费
消息被消费后Queue删除消息,不保留
Queue支持多个消费者,但是一个消息只能被一个消费者消费
发布/订阅模式
一对多,消费者消费后数据不会删除
流程
生产者将消息发布到topic
同时有一个或多个消费者订阅改topic
发布到topic的消息会被所有订阅该topic的消费者消费
消费者消费数据模式
拉取
能处理多少拉多少
可能存在消息堆积
推送
不管你能不能处理,我就推给你
可能超过消费着消费能力
kafka采用消费者拉取模式
Kafka基础架构
术语/组件
Producer
消息生产者,向 kafka broker 发消息的客户端
Consumer
消息消费者,向 kafka broker 取消息的客户端
Consumer Group (CG)
消费者组,由多个 consumer 组成
消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费
消费者组之间互不影响
所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
Broker
一台 kafka 服务器就是一个 broker
一个集群由多个 broker 组成
一个 broker可以容纳多个 topic
Topic
可以理解为一个队列,队列的名字就是topic的名字
生产者和消费者面向的都是一个 topic;
Partition
为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上
一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列
Replica
副本,保障某个节点故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作
一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower
leader
每个分区多个副本的“主”
生产者发送数据的对象,以及消费者消费数据的对象都是 leader
follower
每个分区多个副本中的“从”
实时从 leader 中同步数据,保持和 leader 数据的同步
leader 发生故障时,某个 follower 会成为新的leader
入门篇
安装部署
教程
百度/官网一堆
命令行操作
查看当前kafka集群所有Topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
创建 topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first
--topic
定义 topic 名
--replication-factor
定义副本数
--partitions
定义分区数
删除 topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first
server.properties 中设置 delete.topic.enable=true 否则只是标记删除
发送消息/生产消息
bin/kafka-console-producer.sh --brokerlist hadoop102:9092 --topic first
输入消息,回车即可
消费消息
bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
新旧版本命令支持程度不一样
--from-beginning
把主题中以往所有的数据都读取出来。
当然是指还没过期删除的数据
查看Topic详情
bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first
返回分区数量、副本数量、副本分布、isr等
修改分区数
bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6
Kafka不支持减少分区数,可以通过删除原先的Topic,然后创建新Topic,重新设置分区数。
进阶
Kafka 工作流程及文件存储机制
topic/partition
topic
Kafka 中消息是以 topic 进行分类的,是逻辑上的概念
生产者生产消息,消费者消费消息,都是面向 topic的
partition
partition 是物理上的概念
每个 partition 对应于一个或多个 log 文件,该 log 文件中存储的就是 producer 生产的数据
Producer 生产的数据会被不断追加到该log 文件末端,且每条数据都有自己的 offset
消费者组中的每个消费者,都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。
分片/索引
于生产者生产的消息会不断追加到 log 文件末尾,
为防止 log 文件过大导致数据定位效率低下,
Kafka 采取了分片和索引机制
为防止 log 文件过大导致数据定位效率低下,
Kafka 采取了分片和索引机制
将每个 partition 分为多个 segment。
每个 segment对应两个文件——“.index”文件和“.log”文件
每个 segment对应两个文件——“.index”文件和“.log”文件
这些文件位于一个文件夹下
该文件夹的命名规则为:topic 名称+分区序号。
例如,first 这个 topic 有三个分区,则其对应的文件夹为 first-0,first-1,first-2。
index 和 log 文件以当前 segment 的第一条消息的 offset 命名。
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
“.index”文件存储大量的索引信息,
“.log”文件存储大量的数据
索引文件中的元数据指向对应数据文件中 message 的物理偏移地址
Kafka 生产者
分区策略
分区原因
方便在集群中扩展
每个 Partition 可以通过调整以适应它所在的机器
一个 topic又可以有多个 Partition 组成
所有整个集群就可以适应任意大小的数据
可以提高并发
可以以 Partition 为单位读写
分区的原则
发送的数据封装成ProducerRecord对象
分区原则如下
指明 partition
直接将指明的值直接作为 partiton 值
没有指明 partition 值但有 key
将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值
既没有 partition 值又没有 key
第一次调用时随机生成一个整数,(后面每次调用在这个整数上自增)
将这个值与 topic 可用的 partition 总数取余得到 partition值,也就是常说的 round-robin 算法。
数据可靠性保证
实现机制
topic 的每个 partition 收到producer 发送的数据后,要向 producer 发送 ack(acknowledgement 确认收到)
如果producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。
何时发送ack?
确保有follower与leader同步完成leader再发送ack,
这样才能保证leader挂掉之后,能在follower中选举出新的leader
多少个follower同步完成之后发送ack?
方案1
半数以上的follower同步完成,即可发送ack
优点
延迟低
缺点
选举新的 leader 时,容忍 n 台节点的故障,
需要 2n+1 个副本
需要 2n+1 个副本
方案2
全部的follower同步完成,才可以发送ack
优点
选举新的 leader 时,容忍 n 台节点的故障,需要 n+1 个副本
缺点
延迟高
kafka选择方案二
原因
同样为了容忍 n 台节点的故障,
第一种方案要 2n+1 个副本,
第二种方案只要 n+1个副本,
而 Kafka 的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
第一种方案要 2n+1 个副本,
第二种方案只要 n+1个副本,
而 Kafka 的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
虽然第二种方案的网络延迟会比较高,但网络延迟对 Kafka 的影响较小。
kafka通过ISR进行了优化
ISR
Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower 集合
当 ISR 中的 follower 完成数据的同步之后,leader 就会给 follower 发送 ack
如果 follower长时间 未 向 leader 同 步 数 据 , 则 该 follower 将 被 踢 出 ISR
该 时 间 阈 值 由replica.lag.time.max.ms 参数设定。Leader 发生故障之后,就会从 ISR 中选举新的 leader。
旧版本还有个最大数据量的参数,已舍弃
解决一个flower故障,导致leader一直收不到同步信息,长时间等待同步而无法发送ack的问题
ack 应答机制
机制
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR 中的 follower 全部接收成功。
根据用户对可靠性和延迟的要求进行权衡,Kafka 为用户提供了三种可靠性级别
acks 参数配置
0
producer 不等待 broker 的 ack
broker 一接收到还没有写入磁盘就已经返回
当 broker 故障时有可能丢失数据
这一操作提供了一个最低的延迟
1
producer 等待 broker 的 ack
partition 的 leader 落盘成功后返回 ack
如果在 follower同步成功之前 leader 故障,那么将会丢失数据
-1(all)
producer 等待 broker 的 ack
partition 的 leader 和 follower 全部落盘成功后才返回 ack
注意此处flower为ISR中的flower
但是如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复。
故障处理细节
术语
LEO(Log End Offset)
指的是每个副本最大的 offset
HW(High Watermark)
指的是消费者能见到的最大的 offset,ISR 队列中最小的 LEO
follower 故障
follower 发生故障后会被临时踢出 ISR
待该 follower 恢复后,follower 会读取本地磁盘上次记录的 HW
将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步
等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,可以重新加入 ISR
leader 故障
leader 故障
leader 发生故障之后,会从 ISR 中选出一个新的 leader
注意,这个新的leader的LED不一定是所有flower中最大的
为保证多个副本之间的数据一致性
其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader同步数据。
其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader同步数据。
新的leader的HW1如果小于挂掉的leader的HW0,那么HW0-HW1之间的数据可能会重复发送
可以通过幂等性解决
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
Exactly Once 语义
ack存在的问题
ACK=-1
可以保证 Producer 到 Server 之间不会丢失数据,即 At Least Once 语义
可以保证数据不丢失,但是不能保证数据不重复
ACK=0
可以保证生产者每条消息只会被发送一次,即 At Most Once 语义。
可以保证数据不重复,但是不能保证数据不丢失
但是在某些场合,针对重要的信息,比如交易数据,要求数据既不重复也不丢失,即 Exactly Once 语义
0.11 版本以前的 Kafka,对此是无能为力的
幂等性
0.11 版本的 Kafka,引入了一项重大特性:幂等性
幂等性就是指 Producer 不论向 Server 发送多少次重复数据,Server 端都只会持久化一条
幂等性结合 At Least Once 语义,就构成了 Kafka 的 Exactly Once 语义:At Least Once + 幂等性 = Exactly Once
启用幂等性,只需要将 Producer 的参数中 enable.idompotence 设置为 true 即可
幂等性的实现原理
Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游
开启幂等性的 Producer 在初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number
Broker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。
但是 PID 重启就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨分区跨会话的 Exactly Once。
子主题
Kafka 消费者
消费 方式
consumer 采用 pull(拉)模式
consumer 根据 consumer 的消费能力以适当的速率从 broker 中读取数据。
pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数据
针对上述不足,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有数据可供消费,
consumer 会等待一段时间之后再返回,这段时长即为 timeout。
consumer 会等待一段时间之后再返回,这段时长即为 timeout。
对比另一种push (推)模式
消息发送速率是由 broker 决定的,很难适应消费速率不同的消费者
它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息
分区分配策略
一个 consumer group 中有多个 consumer,
一个 topic 有多个 partition,
所以必然会涉及到 partition 的分配问题,即确定那个 partition 由哪个 consumer 来消费。
一个 topic 有多个 partition,
所以必然会涉及到 partition 的分配问题,即确定那个 partition 由哪个 consumer 来消费。
基本概念
当外部程序消费topic数据时,kafka将其视为消费组(ConsumerGroup),每个消费组包含1个或多个消费者(Consumer)
(有效)消费者数量最多可以为分区总数量,并不是可以无限量。
当消费组中的任意一个消费者挂掉时,kafka会对消费组进行平衡(Rebalance),再根据存活消费数和消费者分配策略重新分配消费者。
三种分配策略
Range
以主题为单位,以数据顺序排列可用分区,以字典顺序排列消费者
将topic分区数除以消费者总数,以确定分配给每个消费者的分区数
如果没有平均分配,那么前几个消费者将拥有一个额外的分区。
案例
RoundRobin(默认)
将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序
通过轮询方式逐个将分区依次分配给每个消费者(或判断消费者是否订阅了主题)
如果同一个消费组内所有的消费者的订阅信息都是相同的,那么 RoundRobinAssignor 分配策略的分区分配会是均匀的。
如果同一个消费组内的消费者订阅的信息是不相同的,那么在执行分区分配的时候就不是完全的轮询分配,有可能导致分区分配得不均匀。
案例
Sticky (0.11.x 版本新增策略)
“sticky”这个单词可以翻译为“黏性的”,Kafka 从 0.11.x 版本开始引入这种分配策略
主要目的
分区的分配要尽可能均匀
分区的分配尽可能与上次分配的保持相同。
当两者发生冲突时,第一个目标优先于第二个目标。
鉴于这两个目标,StickyAssignor 分配策略的具体实现要比 RangeAssignor 和 RoundRobinAssignor 这两种分配策略要复杂得多。
鉴于这两个目标,StickyAssignor 分配策略的具体实现要比 RangeAssignor 和 RoundRobinAssignor 这两种分配策略要复杂得多。
如果发生分区重分配,那么对于同一个分区而言
有可能之前的消费者和新指派的消费者不是同一个
之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,
这显然很浪费系统资源。
这显然很浪费系统资源。
StickyAssignor 分配策略如同其名称中的“sticky”一样,让分配策略具备一定的“黏性”,
尽可能地让前后两次分配相同,进而减少系统资源的损耗及其他异常情况的发生。
尽可能地让前后两次分配相同,进而减少系统资源的损耗及其他异常情况的发生。
案例
StickyAssignor 分配策略比另外两者分配策略而言显得更加优异,这个策略的代码实现也异常复杂
StickyAssignor 分配策略比另外两者分配策略而言显得更加优异,
这个策略的代码实现也异常复杂,同时在相对早的版本陆续发现些bug,谨慎使用
这个策略的代码实现也异常复杂,同时在相对早的版本陆续发现些bug,谨慎使用
自定义分区策略
知道就行
Kafka 高效 读写
实现机制
顺序写磁盘
Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写
顺序写之所以快,是因为其省去了大量磁头寻址的时间。
零复制技术
正常复制文件步骤
File上传Kernel Space
从Kernel Space上传到User Spcae
从User Spcae上传到Kernel Space
从Kernel Space到NIC
零拷贝
File上传Kernel Space
从Kernel Space到NIC
Zookeeper 在 Kafka 的作用
Kafka 集群中有一个 broker 会被选举为 Controller
负责管理集群 broker 的上下线,所有 topic 的分区副本分配和 leader 选举等工作
Controller 的管理工作都是依赖于 Zookeeper 的
Kafka 事务
Kafka 从 0.11 版本开始引入了事务支持。
事务可以保证 Kafka 在 Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。
事务可以保证 Kafka 在 Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。
Producer 事务
实现原理
为了实现跨分区跨会话的事务,引入一个全局唯一的 Transaction ID,并将 Producer获得的PID和Transaction ID绑定
当Producer重启后就可以通过正在进行的TransactionID 获得原来的 PID
Kafka 引入了一个新的组件 Transaction Coordinator 管理 Transaction
Producer 就是通过和 Transaction Coordinator 交互获得 Transaction ID 对应的任务状态
TransactionCoordinator 还负责将事务所有写入 Kafka 的一个内部 Topic
这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
Consumer 事务
对于 Consumer 而言,事务的保证就会相对较弱,尤其时无法保证 Commit 的信息被精确消费。
这是由于 Consumer 可以通过 offset 访问任意信息,
而且不同的 Segment File 生命周期不同,同一事务的消息可能会出现重启后被删除的情况。
而且不同的 Segment File 生命周期不同,同一事务的消息可能会出现重启后被删除的情况。
Kafka API
Producer API
消息发送流程
main 线程
Sender 线程
线程共享变量 ——RecordAccumulator
异步送 发送 API
KafkaProducer:需要创建一个生产者对象,用来发送数据
ProducerConfig:获取所需的一系列配置参数
ProducerRecord:每条数据都要封装成一个 ProducerRecord 对象
同步发送 API
同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。
由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,
我们也可以实现同步发送的效果,只需在调用 Future 对象的 get 方法即可
Consumer API
Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。
Consumer 需要考虑offset 的维护
自动提交 offset
KafkaConsumer:需要创建一个消费者对象,用来消费数据
ConsumerConfig:获取所需的一系列配置参数
ConsuemrRecord:每条数据都要封装成一个 ConsumerRecord 对象
enable.auto.commit :是否开启自动提交 offset 功能
auto.commit.interval.ms :自动提交 offset 的时间间隔
手动交 提交 offset
commitSync(同步提交)
将次 本次 poll 的一批数据最高的偏移量提交
阻塞当前线程,一直到提交成功,并且会自动失败重试
commitAsync(异步提交)
将次 本次 poll 的一批数据最高的偏移量提交
没有失败重试机制,故有可能提交失败
一般采用异步,因为消费过程中不需要获取异步提交offset,并且失败概率较小
重复消费或漏消费
无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。
先提交 offset 后消费,有可能造成数据的漏消费;
而先消费后提交 offset,有可能会造成数据的重复消费。
上述都是正对消费者不正常挂了的情况,一般情况下除了开始的时候挥霍去偏移量,后续只要不挂机不会有问题
自定义存储 offset
offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalace。
可以选择存储到Mysql、Redis、Hbase等
自定义 Interceptor
拦截器原理
Producer 拦截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定制化控制逻辑
interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会
对消息做一些定制化需求,比如修改消息、过滤等
对消息做一些定制化需求,比如修改消息、过滤等
producer 允许用户指定多个 interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)
实现方法
ProducerInterceptor
configure(configs)
获取配置信息和初始化数据时调用
onSend(ProducerRecord)
该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。
消息被序列化以及计算分区前调用该方法
用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算。
onAcknowledgement(RecordMetadata, Exception)
该方法会在消息从 RecordAccumulator 成功发送到 Kafka Broker 之后,或者在发送过程中失败时调用。
onAcknowledgement 运行在producer 的 IO 线程中
不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息发送效率
不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息发送效率
close
关闭 interceptor,主要用于执行一些资源清理工作
案例
拦截器原理
Kafka监控
Kafka Eagle
Flume 对接 Kafka
自己找案例
0 条评论
下一页