Apache Kafka
2020-09-29 15:43:29 0 举报
AI智能生成
Kafka:消息中间件、日志系统、流处理等
作者其他创作
大纲/内容
核心概念
kafka采用分区(Partition)的方式,使得消费者能够做到并行消费,从而大大提高了自己的吞吐能力。同时为了实现高可用,每个分区又有若干份副本(Replica),这样在某个broker挂掉的情况下,数据不会丢失。
Broker
一个Borker就是Kafka集群中的一个实例,或者说是一个服务单元。连接到同一个zookeeper的多个broker实例组成kafka的集群。在若干个broker中会有一个broker是leader,其余的broker为follower。leader在集群启动时候选举出来,负责和外部的通讯。当leader死掉的时候,follower们会再次通过选举,选择出新的leader,确保集群的正常工作。
Consumer Group
Kafka和其它消息系统有一个不一样的设计,在consumer之上加了一层group。同一个group的consumer可以并行消费同一个topic的消息,但是同group的consumer,不会重复消费。这就好比多个consumer组成了一个团队,一起干活,当然干活的速度就上来了。group中的consumer是如何配合协调的,其实和topic的分区相关联,后面我们会详细论述。如果同一个topic需要被多次消费,可以通过设立多个consumer group来实现。每个group分别消费,互不影响。
Topic
kafka中消息订阅和发送都是基于某个topic。比如有个topic叫做NBA赛事信息,那么producer会把NBA赛事信息的消息发送到此topic下面。所有订阅此topic的consumer将会拉取到此topic下的消息。Topic就像一个特定主题的收件箱,producer往里丢,consumer取走。
分区(Partition)
大多数消息系统,同一个topic下的消息,存储在一个队列。分区的概念就是把这个队列划分为若干个小队列,每一个小队列就是一个分区
分区的设计大大的提升了kafka的吞吐量!!
1、一个partition只能被同组的一个consumer消费2、同一个组里的一个consumer可以消费多个partition3、消费效率最高的情况是partition和consumer数量相同。这样确保每个consumer专职负责一个partition。4、consumer数量不能大于partition数量。由于第一点的限制,当consumer多于partition时,就会有consumer闲置。5、consumer group可以认为是一个订阅者的集群,其中的每个consumer负责自己所消费的分区
副本(Replica)
提到副本,肯定就会想到正本。副本是正本的拷贝。在kafka中,正本和副本都称之为副本(Repalica),但存在leader和follower之分。活跃的称之为leader,其他的是follower。每个分区的数据都会有多份副本,以此来保证Kafka的高可用。
副本保证了kafka的高可用性
消息进来的时候会先存入leader replica,consumer的消费也是从leader replica读取的
kafka通过轮询算法保证leader replica是均匀分布在多个broker上
1、Replica均匀分配在Broker上,同一个partition的replica不会在同一个borker上2、同一个partition的Replica数量不能多于broker数量。多个replica为了数据安全,一台server存多个replica没有意义。server挂掉,上面的副本都要挂掉。3、分区的leader replica均衡分布在broker上。此时集群的负载是均衡的。这就叫做分区平衡
分区平衡
在讲分区平衡前,先讲几个概念:1、AR: assigned replicas,已分配的副本。每个partition都有自己的AR列表,里面存储着这个partition最初分配的所有replica。注意AR列表不会变化,除非增加分区。2、PR(优先replica):AR列表中的第一个replica就是优先replica,而且永远是优先replica。最初,优先replica和leader replica是同一个replica。3、ISR:in sync replicas,同步副本。每个partition都有自己的ISR列表。ISR是会根据同步情况动态变化的。最初ISR列表和AR列表是一致的,但由于某个节点死掉,或者某个节点的follower replica落后leader replica太多,那么该节点就会被从ISR列表中移除。此时,ISR和AR就不再一致
由此可见,分区平衡操作就是使leader副本和优先副本保持一致的操作。可以把优先副本理解为分区的平衡状态位,平衡操作就是让leader副本归位
消息读写
1、每个partition都是有序的不可变的。2、Kafka可以保证partition的消费顺序,但不能保证topic消费顺序。3、无论消费与否,保留周期默认七天(可配置)。4、每个consumer维护的唯一元数据是offset,代表消费的位置,一般线性向后移动。5、consumer也可以重置offset到之前的位置,可以以任何顺序消费,不一定线性后移。
分区写入策略
轮询策略
随机策略
按键保存策略
按键保存策略,就是当生产者发送数据的时候,可以指定一个key,计算这个key的hashCode值,按照hashCode的值对不同消息进行存储
选举
leader broker选举
controller控制器选举
通过分布式协调系统:ZooKeeper实现
controller也负责增删Topic以及Replica的重新分配
leader replica选举
副本选举
从/brokers/topics/[topic]/partitions/[partition]/state读取该Partition当前的ISR
从ISR列表中选举出一个副本作为leader
核心组件
控制器
我们已经知道Kafka的集群由n个的broker所组成,每个broker就是一个kafka的实例或者称之为kafka的服务。其实控制器也是一个broker,控制器也叫leader broker。他除了具有一般broker的功能外,还负责分区leader的选取,也就是负责选举partition的leader replica。
控制器选举
包括集群启动在内,有三种情况触发控制器选举:1、集群启动2、控制器所在代理发生故障3、zookeeper心跳感知,控制器与自己的session过期
控制器选举基于ZooKeeper
协调器
消费者协调器
消费者协调器主要负责如下工作:1、更新消费者缓存的MetaData2、向组协调器申请加入组3、消费者加入组后的相应处理4、请求离开消费组5、向组协调器提交偏移量6、通过心跳,保持组协调器的连接感知。7、被组协调器选为leader的消费者的协调器,负责消费者分区分配。分配结果发送给组协调器。8、非leader的消费者,通过消费者协调器和组协调器同步分配结果。
组协调器
组协调器负责处理消费者协调器发过来的各种请求。它主要提供如下功能:1、在与之连接的消费者中选举出消费者leader。2、下发leader消费者返回的消费者分区分配结果给所有的消费者。3、管理消费者的消费偏移量提交,保存在kafka的内部主题中。4、和消费者心跳保持,知道哪些消费者已经死掉,组中存活的消费者是哪些。
消费者入组过程
消费偏移量管理
下面两种情况分别会造成重复消费和丢消息:如果提交的偏移量小于消费者最后一次消费的偏移量,那么再均衡后,两个offset之间的消息就会被重复消费如果提交的偏移量大于消费者最后一次消费的偏移量,那么再均衡后,两个offset之间的消息就会丢失
偏移量有两种提交方式
自动提交偏移量
设置enable.auto.commit为true,设定好周期,默认5s。消费者每次调用轮询消息的poll()方法时,会检查是否超过了5s没有提交偏移量,如果是,提交上一次轮询返回的偏移量。
这样做很方便,但是会带来重复消费的问题。假如最近一次偏移量提交3s后,触发了再均衡,服务器端存储的还是上次提交的偏移量,那么再均衡结束后,新的消费者会从最后一次提交的偏移量开始拉取消息,此3s内消费的消息会被重复消费。
手动提交偏移量
设置 enable.auto.commit为false。程序中手动调用commitSync()提交偏移量,此时提交的是poll方法返回的最新的偏移量。
commitSync()是同步提交偏移量
主程序会一直阻塞,偏移量提交成功后才往下运行。这样会限制程序的吞吐量。如果降低提交频次,又很容易发生重复消费
commitSync只要没有发生不可恢复错误,会进行重试,直到成功
commitAsync()异步提交偏移量
只管提交,而不会等待broker返回提交结果
commitAsync不会进行重试,失败就是失败了
偏移量提交的最佳实践
采用同步和异步混合的方式提交偏移量。1、正常消费消息时,消费结束提交偏移量,采用异步方式。2、如果程序报错,finally中,提交偏移量,采用同步方式,确保提交成功。3、再均衡前的回调方法中,提交偏移量,采用同步方式,确保提交成功。
日志管理器
日志的存储
Kafka的消息以日志文件的形式进行存储。不同主题下不同分区的消息是分开存储的。同一个分区的不同副本也是以日志的形式,分布在不同的broker上存储。
日志确实是以副本为单位的,每个副本对应一个log对象。但实际在物理上,一个log又划分为多个logSegment进行存储
logSegment代表逻辑上的一组文件,这组文件就是.log、.index、.timeindex这三个不同文件扩展名,但是同文件名的文件。.log存储消息.index存储消息的索引.timeIndex,时间索引文件,通过时间戳做索引。
命名规则为.log文件中第一条消息的前一条消息偏移量,也称为基础偏移量,左边补0,补齐20位。比如说第一个LogSegement的日志文件名为00000000000000000000.log,假如存储了200条消息后,达到了log.segment.bytes配置的阈值(默认1个G),那么将会创建新的logSegment,文件名为00000000000000000200.log。以此类推。另外即使没有达到log.segment.bytes的阈值,而是达到了log.roll.ms或者log.roll.hours设置的时间触发阈值,同样会触发产生新的logSegment。
日志的定位
日志定位也就是消息定位,输入一个消息的offset,kafka如何定位到这条消息呢?
日志定位的过程如下:1、根据offset定位logSegment。(kafka将基础偏移量也就是logsegment的名称作为key存在concurrentSkipListMap中)2、根据logSegment的index文件查找到距离目标offset最近的被索引的offset的position x。3、找到logSegment的.log文件中的x位置,向下逐条查找,找到目标offset的消息。
副本管理器
副本机制使得kafka整个集群中,只要有一个代理存活,就可以保证集群正常运行。这大大提高了Kafka的可靠性和稳定性
Kafka中代理的存活,需要满足以下两个条件:1、存活的节点要维持和zookeeper的session连接,通过zookeeper的心跳机制实现2、Follower副本要与leader副本保持同步,不能落后太多。
副本管理器所承担的职责如下:1、副本过期检查2、追加消息3、拉取消息4、副本同步过程5、副本角色转换6、关闭副本
follower与leader同步数据
Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制
Follower可以批量的从Leader复制数据,而且Leader充分利用磁盘顺序读以及send file(zero copy)机制,这样极大的提高复制性能,内部批量写磁盘,大幅减少了Follower与Leader的消息量差
Apache Kafka
介绍
特性
1、消息持久化
2、高吞吐量
3、可扩展性
应用场景
1、消息系统
2、日志系统
3、流处理
安装和使用
单机环境
1、下载zookeeper,解压
2、创建zookeeper配置文件
3、启动zookeeper
4、下载kafka,解压
5、修改kafka的配置文件
6、启动kafka
集群环境
集群环境的搭建也很简单,在单机环境的基础上,让多个单机连接到同一个zookeeper即可。需要注意两点:1、每个实例设置不同的broker.id。2、如果多个实例部署在同一台服务器,还要注意修改log.dirs为不同目录,确保消息存储时不会有冲突。集群环境的具体搭建,在此精简教程中不再做详细讨论。
使用
1、创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic study
2、启动消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic study --from-beginning
3、开启生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic study
4、发送你的第一条消息
0 条评论
回复 删除
下一页