Apache Kafka
2020-09-29 15:43:29 0 举报
AI智能生成
Kafka:消息中间件、日志系统、流处理等
作者其他创作
大纲/内容
核心概念
kafka采用分区(Partition)的方式,使得消费者能够做到并行消费,从而大大提高了自己的吞吐能力。
同时为了实现高可用,每个分区又有若干份副本(Replica),这样在某个broker挂掉的情况下,数据不会丢失。
Broker
一个Borker就是Kafka集群中的一个实例,或者说是一个服务单元。
连接到同一个zookeeper的多个broker实例组成kafka的集群。
在若干个broker中会有一个broker是leader,其余的broker为follower。
leader在集群启动时候选举出来,负责和外部的通讯。
当leader死掉的时候,follower们会再次通过选举,选择出新的leader,确保集群的正常工作。
Consumer Group
Kafka和其它消息系统有一个不一样的设计,在consumer之上加了一层group。
同一个group的consumer可以并行消费同一个topic的消息,但是同group的consumer,不会重复消费。
这就好比多个consumer组成了一个团队,一起干活,当然干活的速度就上来了。
group中的consumer是如何配合协调的,其实和topic的分区相关联,后面我们会详细论述。
如果同一个topic需要被多次消费,可以通过设立多个consumer group来实现。
每个group分别消费,互不影响。
Topic
kafka中消息订阅和发送都是基于某个topic。
比如有个topic叫做NBA赛事信息,那么producer会把NBA赛事信息的消息发送到此topic下面。
所有订阅此topic的consumer将会拉取到此topic下的消息。
Topic就像一个特定主题的收件箱,producer往里丢,consumer取走。
分区(Partition)
大多数消息系统,同一个topic下的消息,存储在一个队列。分区的概念就是把这个队列划分为若干个小队列,每一个小队列就是一个分区
分区的设计大大的提升了kafka的吞吐量!!
1、一个partition只能被同组的一个consumer消费
2、同一个组里的一个consumer可以消费多个partition
3、消费效率最高的情况是partition和consumer数量相同。这样确保每个consumer专职负责一个partition。
4、consumer数量不能大于partition数量。由于第一点的限制,当consumer多于partition时,就会有consumer闲置。
5、consumer group可以认为是一个订阅者的集群,其中的每个consumer负责自己所消费的分区
副本(Replica)
提到副本,肯定就会想到正本。副本是正本的拷贝。
在kafka中,正本和副本都称之为副本(Repalica),但存在leader和follower之分。
活跃的称之为leader,其他的是follower。
每个分区的数据都会有多份副本,以此来保证Kafka的高可用。
副本保证了kafka的高可用性
消息进来的时候会先存入leader replica,consumer的消费也是从leader replica读取的
kafka通过轮询算法保证leader replica是均匀分布在多个broker上
1、Replica均匀分配在Broker上,同一个partition的replica不会在同一个borker上
2、同一个partition的Replica数量不能多于broker数量。多个replica为了数据安全,一台server存多个replica没有意义。server挂掉,上面的副本都要挂掉。
3、分区的leader replica均衡分布在broker上。此时集群的负载是均衡的。这就叫做分区平衡
分区平衡
在讲分区平衡前,先讲几个概念:
1、AR: assigned replicas,已分配的副本。每个partition都有自己的AR列表,里面存储着这个partition最初分配的所有replica。注意AR列表不会变化,除非增加分区。
2、PR(优先replica):AR列表中的第一个replica就是优先replica,而且永远是优先replica。最初,优先replica和leader replica是同一个replica。
3、ISR:in sync replicas,同步副本。每个partition都有自己的ISR列表。ISR是会根据同步情况动态变化的。
最初ISR列表和AR列表是一致的,但由于某个节点死掉,或者某个节点的follower replica落后leader replica太多,那么该节点就会被从ISR列表中移除。此时,ISR和AR就不再一致
由此可见,分区平衡操作就是使leader副本和优先副本保持一致的操作。可以把优先副本理解为分区的平衡状态位,平衡操作就是让leader副本归位
消息读写
1、每个partition都是有序的不可变的。
2、Kafka可以保证partition的消费顺序,但不能保证topic消费顺序。
3、无论消费与否,保留周期默认七天(可配置)。
4、每个consumer维护的唯一元数据是offset,代表消费的位置,一般线性向后移动。
5、consumer也可以重置offset到之前的位置,可以以任何顺序消费,不一定线性后移。
分区写入策略
轮询策略
随机策略
按键保存策略
按键保存策略,就是当生产者发送数据的时候,可以指定一个key,计算这个key的hashCode值,按照hashCode的值对不同消息进行存储
选举
leader broker选举
controller控制器选举
通过分布式协调系统:ZooKeeper实现
controller也负责增删Topic以及Replica的重新分配
leader replica选举
副本选举
从/brokers/topics/[topic]/partitions/[partition]/state读取该Partition当前的ISR
从ISR列表中选举出一个副本作为leader
核心组件
控制器
我们已经知道Kafka的集群由n个的broker所组成,每个broker就是一个kafka的实例或者称之为kafka的服务。
其实控制器也是一个broker,控制器也叫leader broker。
他除了具有一般broker的功能外,还负责分区leader的选取,也就是负责选举partition的leader replica。
控制器选举
包括集群启动在内,有三种情况触发控制器选举:
1、集群启动
2、控制器所在代理发生故障
3、zookeeper心跳感知,控制器与自己的session过期
控制器选举基于ZooKeeper
协调器
消费者协调器
消费者协调器主要负责如下工作:
1、更新消费者缓存的MetaData
2、向组协调器申请加入组
3、消费者加入组后的相应处理
4、请求离开消费组
5、向组协调器提交偏移量
6、通过心跳,保持组协调器的连接感知。
7、被组协调器选为leader的消费者的协调器,负责消费者分区分配。分配结果发送给组协调器。
8、非leader的消费者,通过消费者协调器和组协调器同步分配结果。
组协调器
组协调器负责处理消费者协调器发过来的各种请求。它主要提供如下功能:
1、在与之连接的消费者中选举出消费者leader。
2、下发leader消费者返回的消费者分区分配结果给所有的消费者。
3、管理消费者的消费偏移量提交,保存在kafka的内部主题中。
4、和消费者心跳保持,知道哪些消费者已经死掉,组中存活的消费者是哪些。
消费者入组过程
消费偏移量管理
下面两种情况分别会造成重复消费和丢消息:
如果提交的偏移量小于消费者最后一次消费的偏移量,那么再均衡后,两个offset之间的消息就会被重复消费
如果提交的偏移量大于消费者最后一次消费的偏移量,那么再均衡后,两个offset之间的消息就会丢失
偏移量有两种提交方式
自动提交偏移量
设置enable.auto.commit为true,设定好周期,默认5s。
消费者每次调用轮询消息的poll()方法时,会检查是否超过了5s没有提交偏移量,
如果是,提交上一次轮询返回的偏移量。
这样做很方便,但是会带来重复消费的问题。
假如最近一次偏移量提交3s后,触发了再均衡,
服务器端存储的还是上次提交的偏移量,那么再均衡结束后,
新的消费者会从最后一次提交的偏移量开始拉取消息,
此3s内消费的消息会被重复消费。
手动提交偏移量
设置 enable.auto.commit为false。程序中手动调用commitSync()提交偏移量,此时提交的是poll方法返回的最新的偏移量。
commitSync()是同步提交偏移量
主程序会一直阻塞,偏移量提交成功后才往下运行。这样会限制程序的吞吐量。如果降低提交频次,又很容易发生重复消费
commitSync只要没有发生不可恢复错误,会进行重试,直到成功
commitAsync()异步提交偏移量
只管提交,而不会等待broker返回提交结果
commitAsync不会进行重试,失败就是失败了
偏移量提交的最佳实践
采用同步和异步混合的方式提交偏移量。
1、正常消费消息时,消费结束提交偏移量,采用异步方式。
2、如果程序报错,finally中,提交偏移量,采用同步方式,确保提交成功。
3、再均衡前的回调方法中,提交偏移量,采用同步方式,确保提交成功。
日志管理器
日志的存储
Kafka的消息以日志文件的形式进行存储。不同主题下不同分区的消息是分开存储的。同一个分区的不同副本也是以日志的形式,分布在不同的broker上存储。
日志确实是以副本为单位的,每个副本对应一个log对象。但实际在物理上,一个log又划分为多个logSegment进行存储
logSegment代表逻辑上的一组文件,这组文件就是.log、.index、.timeindex这三个不同文件扩展名,但是同文件名的文件。
.log存储消息
.index存储消息的索引
.timeIndex,时间索引文件,通过时间戳做索引。
命名规则为.log文件中第一条消息的前一条消息偏移量,也称为基础偏移量,左边补0,补齐20位。
比如说第一个LogSegement的日志文件名为00000000000000000000.log,假如存储了200条消息后,
达到了log.segment.bytes配置的阈值(默认1个G),那么将会创建新的logSegment,
文件名为00000000000000000200.log。以此类推。另外即使没有达到log.segment.bytes的阈值,
而是达到了log.roll.ms或者log.roll.hours设置的时间触发阈值,同样会触发产生新的logSegment。
日志的定位
日志定位也就是消息定位,输入一个消息的offset,kafka如何定位到这条消息呢?
日志定位的过程如下:
1、根据offset定位logSegment。(kafka将基础偏移量也就是logsegment的名称作为key存在concurrentSkipListMap中)
2、根据logSegment的index文件查找到距离目标offset最近的被索引的offset的position x。
3、找到logSegment的.log文件中的x位置,向下逐条查找,找到目标offset的消息。
副本管理器
副本机制使得kafka整个集群中,只要有一个代理存活,就可以保证集群正常运行。这大大提高了Kafka的可靠性和稳定性
Kafka中代理的存活,需要满足以下两个条件:
1、存活的节点要维持和zookeeper的session连接,通过zookeeper的心跳机制实现
2、Follower副本要与leader副本保持同步,不能落后太多。
副本管理器所承担的职责如下:
1、副本过期检查
2、追加消息
3、拉取消息
4、副本同步过程
5、副本角色转换
6、关闭副本
follower与leader同步数据
Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制
Follower可以批量的从Leader复制数据,而且Leader充分利用磁盘顺序读以及send file(zero copy)机制,这样极大的提高复制性能,内部批量写磁盘,大幅减少了Follower与Leader的消息量差
介绍
特性
1、消息持久化
2、高吞吐量
3、可扩展性
应用场景
1、消息系统
2、日志系统
3、流处理
安装和使用
单机环境
1、下载zookeeper,解压
2、创建zookeeper配置文件
3、启动zookeeper
4、下载kafka,解压
5、修改kafka的配置文件
进入kafka根目录下的config文件夹下,打开server.properties,修改如下配置项
zookeeper.connect=localhost:2181
broker.id=0
log.dirs=/tmp/kafka-logs
zookeeper.connect是zookeeper的链接信息,broker.id是当前kafka实例的id,log.dirs是kafka存储消息内容的路径
6、启动kafka
集群环境
集群环境的搭建也很简单,在单机环境的基础上,让多个单机连接到同一个zookeeper即可。需要注意两点:
1、每个实例设置不同的broker.id。
2、如果多个实例部署在同一台服务器,还要注意修改log.dirs为不同目录,确保消息存储时不会有冲突。
集群环境的具体搭建,在此精简教程中不再做详细讨论。
使用
1、创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic study
2、启动消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic study --from-beginning
3、开启生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic study
4、发送你的第一条消息
0 条评论
下一页