kafka
2020-08-25 10:30:56 5 举报
AI智能生成
kafka架构
作者其他创作
大纲/内容
为什么要使用消息队列
解耦
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束
冗余
消息队列把消息进行持久化知道他们完全被处理,防止数据处理过程中造成的数据流失
可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理
灵活性&峰值处理
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
异步处理
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们
kafka的特点
高吞吐,低延迟(读写都在leader上)
数据顺序写(直接在磁盘顺序写,不在cache中维护)
Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。 官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
NIO零拷贝
消息日志分段存储
每个分区的offset日志对应的目录就是Topic-Partition,不同的分区可能存储在不同的服务器上
每个分区里面就是很多个LogSegmentFile,也就是日志段文件
每个分区的数据会被分成多个段,放在多个文件里,每个文件还有自己的索引文件
限定了每个日志段(LogSegmentFile)的大小为1G,存满了就新建另一个日志段
限定了每个日志段(LogSegmentFile)的大小为1G,存满了就新建另一个日志段
00000000000000000000.index
00000000000000000000.log
00000000000000000000.timeindex
00000000000000000000.log
00000000000000000000.timeindex
0000000-5367851
00000000000005367851.index
00000000000005367851.log
00000000000005367851.timeindex
00000000000005367851.log
00000000000005367851.timeindex
5367851-9987654
00000000000009987654.index
00000000000009987654.log
00000000000009987654.timeindex
00000000000009987654.log
00000000000009987654.timeindex
9987654就是这个消息写入到此SegmentFile的起始offset
消息日志二分查找
使用稀疏索引
限定了在日志文件中每写入多少条数据,就要在索引中写入一条索引数据,默认是4KB
由索引文件的物理位置定位到某条数据,再向下遍历。偏移量是有序的
数据可以以batch的形式批量发送
数据压缩
负载均衡(之前由Zookeeper维护,现在有kafak内部的metadata维护)
可以在客户端人为控制将消息发送到不同的分区
每个partition都有多个replica,有一个replica是leader,leader的选举和replica的fail over由Zookeeper维护
通过zookeeper管理broker,以及消费者组中consumer的加入和离开
高并发
允许数千个客户端同时消费数据,consumer的消费offset由自身维护存储在kafak中,而不需要server端来维护
容错性
允许集群中节点失败(若节点总数为n,可允许n-1个节点宕机)
消息拉取
分布式
producer
分配策略
指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition值,也就是常说的 round-robin 算法。
发送到broker策略
目的:为保证 producer 发送的数据,能可靠的发送到指定的 topic, topic 的每个 partition 收到producer 发送的数据后,都需要向 producer 发送 ack(acknowledgement 确认收到),如果producer 收到 ack, 就会进行下一轮的发送,否则重新发送数据。
何时发送ack?确保有follower与leader同步完成,leader再发送ack,这样才能保证leader挂掉之后,能在follower中选举出新的leader。
发送策略
半数以上的follower同步完成,即可发送ack继续发送重新发送
延迟低,不用等待所有follower都同步完
但是容错率低,同样为了容忍 n 台节点的故障,需要 2n+1 个副本
全部的follower同步完成,才可以发送ack(kafka选择的这种)
延迟高,但网络延迟对kafka影响小
容错率高,同样为了容忍n台节点的故障,只需要 n+1 个副本
ISR(in-sync replica set)
出现背景: leader 收到数据,所有 follower 都开始同步数据,但有一个 follower,因为某种故障,迟迟不能与 leader 进行同步,那 leader 就要一直等下去,直到它完成同步,才能发送 ack
解决策略:ISR意味和leader保持同步的follow集合,当 ISR 中的 follower 完成数据的同步之后,就会给 leader 发送 ack。
如何选择ISR中的follow:选择与leader通信频繁的,数据相差小的follow
如果follow与leader信息条数相差大于replica.lag.max.message就踢出ISR
batch批量发送数据原因,会导致follow频繁进出ISR,
然后导致频繁更改存储在Zookeeper中的ISR信息
然后导致频繁更改存储在Zookeeper中的ISR信息
如果leader与follow超过replica.lag.time.max.ms时间为同步,就从ISR中踢出(kafka选的这个)
ACK机制
0: producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟, broker 一接收到还没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据;
1:producer 等待 broker 的 ack, partition 的 leader 落盘成功后返回 ack,如果在 follower同步成功之前 leader 故障,那么将会丢失数据
-1(all) : producer 等待 broker 的 ack, partition 的 leader 和 ISR 的follower 全部落盘成功后才返回 ack。但是如果在 follower 同步完成后, broker 发送 ack 之前, leader 发生故障,那么会造成数据重复。
数据一致性问题
LEO:(Log End Offset)每个副本的最后一个offset
HW:(High Watermark)高水位,指的是消费者能见到的最大的 offset, ISR 队列中所有follow中最小的 LEO
follower 故障:follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后, follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。
leader 故障:leader 发生故障之后,会从 ISR 中选出一个新的 leader,之后,为保证多个副本之间的数据一致性, 其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader同步数据。
只保证数据一致性,不保证数据不丢失或者不重复(由ACK保证)
ExactlyOnce
将服务器的 ACK 级别设置为-1(all),可以保证 Producer 到 Server 之间不会丢失数据,即 At Least Once 语义。
相对的,将服务器 ACK 级别设置为 0,可以保证生产者每条消息只会被发送一次,即 At Most Once 语义
At Least Once 可以保证数据不丢失,但是不能保证数据不重复;相对的, At Most Once可以保证数据不重复,但是不能保证数据不丢失。 但是,对于一些非常重要的信息,比如说交易数据,下游数据消费者要求数据既不重复也不丢失,即 Exactly Once 语义。
0.11 版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据, Server 端都只会持久化一条。
PID:生产者表示
SequenceNumber:消息的序列化号
paratition:分区
如果pid+partition对应的SequenceNumber重复时,则代表这组消息是重复数据,broker只会持久化一条
At Least Once + 幂等性 = Exactly Once,也就是不丢消息,也没有重复消费
producer事务
解决的问题
我们知道At Least Once + 幂等性 = Exactly Once保证了kafka数据不会丢失,也不会重复消费,但还是会出现问题的。幂等性由<Pid,SequenceNumber+partition>来保证,如果producer发送消息给了broker后,broker接收到后再返回ack过程中leader宕机,此时producer就会重新发送消息,这样新选举出来的leader已经同步了之前leader的消息,就会有重复消息,而幂等性就可以消除重复性。但是如果此时producer也宕机了,当他重启后就会生成一个新的Pid,便无法保证幂等性。
解决方式
每个客户端都会给producer生成一个TransactionId,并且TransactionId与Pid进行绑定,TransactionId被维护在Transaction Coordinator中,每次producer都会去coordinator中找相应TransactionId锁对应的自己的Pid,就能保证一个生产者的Pid不会变化
consumer
消费方式
push
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。
pull(kafka选用的)
pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中, 一直返回空数据。 针对这一点, Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有数据可供消费, consumer 会等待一段时间之后再返回,这段时长即为 timeout。
再平衡策略的触发事件
consumer group中的新增或删除某个consumer,导致其所消费的分区需要分配到组内其他的consumer上
consumer订阅的topic发生变化,比如订阅的topic采用的是正则表达式的形式,如test-*此时如果有一个新建了一个topic test-user,那么这个topic的所有分区也是会自动分配给当前的consumer的,此时就会发生再平衡
consumer所订阅的topic发生了新增分区的行为,那么新增的分区就会分配给当前的consumer,此时就会触发再平衡
分区分配策略(再平衡策略)
round-robin
round-robin是轮询,它是按照消费者组为主要目标的,也就是把消费者组中所有consumer订阅的所有topic-partition都组合起来,根据这些topic-partition的hash值进行排序,然后再从头依次循环发送给消费者组中的每个消费者,如果分配给这个consumer的topic是它没有订阅的,那么就轮询下一个,指导将这个topic-partition分配给最近的一个订阅了它的consumer。可以看出轮询方式会导致消费者组中,每个cosumer所承载的分区数量不一致,从而导致消费者组中consumer的压力不一致
- 尝试将t0-0分配给C0,由于C0订阅了t0,因而可以分配成功;
- 尝试将t1-0分配给C1,由于C1订阅了t1,因而可以分配成功;
- 尝试将t1-1分配给C2,由于C2订阅了t1,因而可以分配成功;
- 尝试将t2-0分配给C0,由于C0没有订阅t2,因而会轮询下一个consumer;
- 尝试将t2-0分配给C1,由于C1没有订阅t2,因而会轮询下一个consumer;
- 尝试将t2-0分配给C2,由于C2订阅了t2,因而可以分配成功;
- 同理由于t2-1和t2-2所在的topic都没有被C0和C1所订阅,因而都不会分配成功,最终都会分配给C2
range
range是重分配策略,它是按照topic为主题的,也就是把订阅了这个topic的消费者组的所有消费者都组合起来,将topic的分区进行均匀分布(但是当topic的partition数量/消费者组的consumer数量!=整数时,分配还是不均匀的,还会导致消费者组中每个consumer的压力不一致)
假设有两个consumer:C0和C1,两个topic:t0和t1,这两个topic分别都有三个分区,那么总共的分区有六个:t0-0、t0-1、t0-2、t1-0、t1-1和t1-2。那么Range分配策略将会按照如下步骤进行分区的分配:
sticky
Sticky:这种分区策略是最新版本中新增的一种策略,其主要实现了两个目的
目的
将现有的分区尽可能均衡的分配给各个consumer,存在此目的的原因在于Round Robin和Range分配策略实际上都会导致某几个consumer承载过多的分区,从而导致消费压力不均衡
如果发生再平衡,那么重新分配之后在前一点的基础上会尽力保证当前未宕机的consumer所消费的分区不会被分配给其他的consumer上
实现步骤
初始状态分配的特点是,所有的分区都还未分配到任意一个consumer上。
这里我们假设有三个consumer:C0、C1和C2,三个topic:t0、t1和t2,
这三个topic分别有1、2和3个分区,那么总共的分区为:t0-0、t1-0、t1-1、t2-0、t2-1和t2-2。
关于订阅情况,这里C0订阅了t0,C1订阅了t0和1,C2则订阅了t0、t1和t2。
这里我们假设有三个consumer:C0、C1和C2,三个topic:t0、t1和t2,
这三个topic分别有1、2和3个分区,那么总共的分区为:t0-0、t1-0、t1-1、t2-0、t2-1和t2-2。
关于订阅情况,这里C0订阅了t0,C1订阅了t0和1,C2则订阅了t0、t1和t2。
首先将所有的分区进行排序,排序方式为:首先按照当前分区所分配的consumer数量从低到高进行排序,
如果consumer数量相同,则按照分区的字典序进行排序。这里六个分区由于所在的topic的订阅情况各不相同,因而其排序结果如下
如果consumer数量相同,则按照分区的字典序进行排序。这里六个分区由于所在的topic的订阅情况各不相同,因而其排序结果如下
然后将所有的consumer进行排序,其排序方式为:首先按照当前consumer已经分配的分区数量有小到大排序,
如果两个consumer分配的分区数量相同,则会按照其名称的字典序进行排序。
由于初始时,这三个consumer都没有分配任何分区,因而其排序结果即为其按照字典序进行排序的结果:
如果两个consumer分配的分区数量相同,则会按照其名称的字典序进行排序。
由于初始时,这三个consumer都没有分配任何分区,因而其排序结果即为其按照字典序进行排序的结果:
然后将各个分区依次遍历分配给各个consumer,首先需要注意的是,这里的遍历并不是C0分配完了再分配给C1,而是每次分配分区的时候都整个的对所有的consumer从头开始遍历分配,如果当前consumer没有订阅当前分区,则会遍历下一个consumer。然后需要注意的是,在整个分配的过程中,各个consumer所分配的分区数是动态变化的,而这种变化是会体现在各个consumer的排序上的,比如初始时C0是排在第一个的,此时如果分配了一个分区给C0,那么C0就会排到最后,因为其拥有的分区数是最多的
首先将t2-0尝试分配给C0,由于C0没有订阅t2,因而分配不成功,继续轮询下一个consumer;
然后将t2-0尝试分配给C1,由于C1没有订阅t2,因而分配不成功,继续轮询下一个consumer;
接着将t2-0尝试分配给C2,由于C2订阅了t2,因而分配成功,此时由于C2分配的分区数发生变化,各个consumer变更后的排序结果为:
然后将t2-0尝试分配给C1,由于C1没有订阅t2,因而分配不成功,继续轮询下一个consumer;
接着将t2-0尝试分配给C2,由于C2订阅了t2,因而分配成功,此时由于C2分配的分区数发生变化,各个consumer变更后的排序结果为:
接下来的t2-1和t2-2,由于也只有C2订阅了t2,因而其最终还是会分配给C2,
最终在t2-0、t2-1和t2-2分配完之后,各个consumer的排序以及其分区分配情况如下:
最终在t2-0、t2-1和t2-2分配完之后,各个consumer的排序以及其分区分配情况如下:
接着继续分配t1-0,首先尝试将其分配给C0,由于C0没有订阅t1,因而分配不成功,继续轮询下一个consumer;
然后尝试将t1-0分配给C1,由于C1订阅了t1,因而分配成功
同理,接下来会分配t1-1,虽然C1和C2都订阅了t1,但是由于C1排在C2前面,因而该分区会分配给C1
然后尝试将t1-0分配给C1,由于C1订阅了t1,因而分配成功
同理,接下来会分配t1-1,虽然C1和C2都订阅了t1,但是由于C1排在C2前面,因而该分区会分配给C1
尝试将t0-0分配给C0,由于C0订阅了t0,因而分配成功,最终的分配结果为
与前面讲解的Round Robin进行对比,可以很明显的发现,Sticky重分配策略分配得更加均匀一些
consumer的offset的存储
Kafka 0.9 版本之前, consumer 默认将 offset 保存在 Zookeeper 中,从 0.9 版本开始,consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic 为__consumer_offsets
offset的是以ConsumerGroup+Topic+Partition来分类存储的,因为消费者可能从消费者组中加入或者宕机,具有不确定性,所以需要通过消费者组来作为唯一标识
Consumer事务(保证相对较弱)
由于每个Topic-Partition对应的存储offset的segment有多个,而kafka消费者可以批量消费数据,如果这批数据是跨了2个segment的,而刚好消费者消费消息失败,需要再次消费。但此时第一个segment过期了,就无法消费到第一个segment中的消息了。这是事务所无法保证的。
分支主题
0 条评论
下一页