kafka生产环境使用知识点总结
2024-01-18 16:38:44 11 举报
AI智能生成
Kafka生产环境使用知识点总结:首先,确保Kafka集群的稳定性和高可用性,通过副本机制实现数据冗余。其次,合理配置生产者和消费者的参数,如batch.size、linger.ms等,以提高吞吐量和降低延迟。再次,使用分区策略将消息均匀分布到不同的主题和分区,避免单点瓶颈。此外,监控Kafka集群的运行状态,如CPU、内存、磁盘使用情况,以及生产者和消费者的吞吐量、延迟等指标,及时发现并解决问题。最后,根据业务需求选择合适的消息存储策略,如日志压缩、索引等,以优化存储空间和查询性能。
作者其他创作
大纲/内容
kafka介绍
kafka最初由linkedin使用Scala语言开发,后贡献给Apache基金会。它是一个高吞吐量、多分区、多副本、多订阅,基于Zookeeper协调的分布式发布订阅消息系统。
使用场景
日志收集
kafka可以收集系统中各个服务的日志,再通过kafka已统一接口的形式暴露给消费者
消息服务
消息服务解耦、异步、缓存消息等
运营指标
kafka也用来记录平台运营中的各种指标数据。如:报警、报告等数据
流式处理
spark streaming和storm。
用户追踪
记录用户在web、app等平台上的活动。如:浏览网页、点击事件等
kafka基本概念
producer生产者
消息生产者,向Broker发送消息
broker
kafka节点,一个broker可以看成一个kafka服务
partition分区
消息分区,kafka消息存储在分区上的,一个topic可以关联多个分区,分区内的消息是有序的
topic 主题
kafka发送的消息是按照topic进行分组的。每条消息都需要指定topic
consumer消费者
消息消费者,从broker消费消息
consumer group 消费者组
每个consumer属于一个特定的ConsumerGroup。一条消息可以被多个ConsumerGroup消费,但同一个组中只有一个Consumer能消费这条消息
kafka安装
环境
- 操作系统centos7、jdk8、kafka_2.11‐2.4.1、zookeeper-3.5.8
配置
修改kafka 配置文件 config/server.properties
启动
kafka-server-start.sh [-daemon] server.properties
server.properties核心配置详解
基本使用(控制台命令—了解)
创建主题
bin/kafka-topics.sh --create --zookeeper 192.168.0.2:2181 --replication-factor 1 --partitions 1 --topic test
replication-factor : 副本数
partitions:分区数
partitions:分区数
删除主题
bin/kafka-topics.sh --delete --topic test --zookeeper 192.168.0.2:2181
查看主题
bin/kafka-topics.sh --list --zookeeper 192.168.0.2:2181
生产者发送消息
bin/kafka-console-producer.sh --broker-list 192.168.0.2:9092 --topic test
消费者消费消息
默认是消费最新的消息:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.2:9092 --topic test
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.2:9092 --topic test
消费之前的消息:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.2:9092 --from-beginning --topic test
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.2:9092 --from-beginning --topic test
订阅多主题
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.2:9092 --whitelist "test|test-2"
单播消费
让所有消费者在同一个消费组里即:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.2:9092 --consumer-property group.id=testGroup --topic test
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.2:9092 --consumer-property group.id=testGroup --topic test
多播消费
保证这些消费者属于不同的消费组即可
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.2:9092 --consumer-property group.id=testGroup-2 --topic test
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.2:9092 --consumer-property group.id=testGroup-2 --topic test
查看消费者组
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.0.2:9092 --list
查看消息消费情况
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.0.2:9092 --describe --group testGroup
current-offset:当前消费组的已消费偏移量
log-end-offset:主题对应分区消息的结束偏移量(HW)
lag:当前消费组未消费的消息数
log-end-offset:主题对应分区消息的结束偏移量(HW)
lag:当前消费组未消费的消息数
kafka集群
kafka启动一个broker 本身可以看做一个集群,只是集群中只有一个节点。组成真正意义上的集群只需要多启动几个节点就行。
集群配置:
#broker.id属性在kafka集群中必须要是唯一
broker.id=1
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://192.168.0.3:9092
#kafka连接zookeeper的地址,要把多个kafka实例组成集群,对应连接的zookeeper必须相同
zookeeper.connect=192.168.0.2:2181,192.168.0.3:2181,192.168.0.4:2181,192.168.0.5:2181
#broker.id属性在kafka集群中必须要是唯一
broker.id=1
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://192.168.0.3:9092
#kafka连接zookeeper的地址,要把多个kafka实例组成集群,对应连接的zookeeper必须相同
zookeeper.connect=192.168.0.2:2181,192.168.0.3:2181,192.168.0.4:2181,192.168.0.5:2181
可以通过连接zookeeper查看集群是否注册成功:ls /brokers/ids
Kafka的总控制器Controller
kafka Controller作用
kafka集群中有多个broker,其中某一个broker会被选举为Controller,负责整个集群中的分区和副本的管理。例如:
1. 当分区中的某个leader挂掉后,Kafka Controller负责leader选举
2. 集群中的分区、topic isr列表等元数据发生改变时,Kafka Controller 让其他分区感知道。
1. 当分区中的某个leader挂掉后,Kafka Controller负责leader选举
2. 集群中的分区、topic isr列表等元数据发生改变时,Kafka Controller 让其他分区感知道。
kafka Controller选举机制
Kafka Controller的选举借助于zookeeper实现,在broker启动过程中会再zookeeper中创建/controller 临时节点,有zookeeper保证只有一个broker创建成功。创建成功的broker就成为Kafka Controller
Kafka 的partition 分区leader选举机制
当前leader挂掉后,会被Kafka Controller感知道,有Kafka Controller 从分区中的ISR列表中选出第一个副本作为leader。
ISR列表: 1.副本节点不能产生网络分区,必须能与zookeeper保持会话以及跟leader副本网络连通
2. 副本能复制leader上的所有写操作,并且不能落后太多
2. 副本能复制leader上的所有写操作,并且不能落后太多
Consumer 消费者维护Offset记录
每个consumer会定期将自己消费分区的offset提交给kafka内部topic:__consumer_offsets,提交过去的时候,
key是 consumerGroupId+topic+分区号,value就是当前offset的值,kafka会定期清理topic里的消息,最后就保留最新的那条数据。如上图所示:Kafka默认会给50个分区,应对高并发请求。
key是 consumerGroupId+topic+分区号,value就是当前offset的值,kafka会定期清理topic里的消息,最后就保留最新的那条数据。如上图所示:Kafka默认会给50个分区,应对高并发请求。
Kafka 的Rebalance机制
Rebalance机制指的是如果Kafka的分区数量或者消费者组中的消费者发送了变化。Kafka的ReBalance机制会对消费者进行重新分配。如某个消费者组中的一个消费者宕机了,Kafka会将分配给它的分区重新分配给其他消费者,如果消费者重启了,又会分配一些分区给它消费。
ReBalance策略
RangeAssignor范围分区分配策略
partition.assignment.strategy=]org.apache.kafka.clients.consumer.RangeAssignor
以单个Topic维度来分配分区的,每个Topic的分区尽量均衡的分配给消费者。
以单个Topic维度来分配分区的,每个Topic的分区尽量均衡的分配给消费者。
消费者组中的消费者按照字母顺序排序,主题上的分区按照分区序号排序
先计算出每个消费者最少平均分配的分区数,剩下的分区按顺序分配给消费者
RoundRobinAssignor轮询分区策略
针对所有主题Topic分配分区的。使用轮询策略将分区分配给主题下的消费者
将所有主题下的消费者按照字母顺序排序,Topic下的分区按照分区序号排序
按照轮询策略,依次给消费者分配主题下的分区
StickyAssignor 粘性分区策略
分配给消费者的分区尽可能均衡,分配给消费者的主题分区最多相差一个
分区重新分配时尽可能保证和上传分配的一致
CooperativeStickyAssignor策略
Kafka2.4.0开始引入CooperativeStickyAssignor策略。CooperativeStickyAssignor与之前的StickyAssignor虽然都是维持原来的分区分配方案。最大的区别是:StickyAssignor仍然是基于eager协议,分区重分配时候,都需要consumers先放弃当前持有的分区,重新加入consumer group;
而CooperativeStickyAssignor基于cooperative协议,该协议将原来的一次全局分区重平衡,改成多次小规模分区重平衡。渐进式的重平衡。
而CooperativeStickyAssignor基于cooperative协议,该协议将原来的一次全局分区重平衡,改成多次小规模分区重平衡。渐进式的重平衡。
producer 发送消息流程
producer采用push模式将消息发送到选择的分区中,顺序写入文件
分区选择:producer在发送消息是如果指的了分区,就直接发送到指定的分区;如果发送时没有指定分区而是指定了key的话,通过对key的value值进行hash计算得到目标分区。如果未指定分区又没有指定key,默认采用轮询策略选择分区。
消息写入
producer从zookeeper中获取leader分区,将消息发送给leader。leader收到消息后顺序写入本地文件
其他flower节点从leader pull 拉取消息,并写入到本地log文件,然后向leader返回ACK
Leader收到所有ISR列表中Leader的ACK后增加HW,向producer发送ACK
HW&LEO
HW(High WaterMark)高水位,ISR列表中所有副本的LEO(log end Offset)最后提交数据日志偏移。Consumer 只能消费到高水位的消息
Kafka日志存储结构
Kafka消息日志采用分区分段存储,存储目录为Topic-partition分区号,在该目录下日志文件采用分段存储。每个分段存储不一样的log消息。Kafka规定单个log日志文件大小为1G,方便加载到内存中。
日志存储结构如如上所示。
Kafka生成环境问题
消息丢失
producer生产者端的消息丢失
Producer 端发送配置采用ack=0模式,当producer向broker发送消息后不用等broker进行确认,就可以进行下一步操作。broker没有收到消息导致的丢失
当ack = 1 时,只保证Leader持久化了消息,如果Leader宕机也可能导致消息丢失
当ack = -1/all的情况,在配置了min.insync.replicas配置的备份个数情况下,保证配置副本个数持久化了数据,确认发送成功。如果配置副本为1,就更ack =1 时情况一致了。
Consumer消费者端丢失消息
消费者端采用自动提交模式,消费者拉取到消息后并没有真正处理完消息,被自动提交后。消费者重启时消费不到上次的消息。
解决方案
如果对消息丢失不敏感,可以采用 ack = 0 或者 ack = 1 的模式,这种发送消息的性能最高
如果要保证消息不丢失,多用在金融领域。设置 ack = - 1 或者 all 。配置min.insync.replicas副本数在两个以上,保证ISR列表中有最新的数据,Leader选举后不会丢失数据。
重复消费
producer生产者端
producer在发送消息时由于网络原因,消息到达了broker但没有收到ack,导致的重复发送消息,消费者端消费了同样的消息
Consumer消费者端
Consumer端设置自动提交,由于消费者端处理完数据重启,并没有提交数据,导致重启后重复拉取到了消息,重复消费
解决方案
重复消费需要进行幂等性校验。如在数据库中使用唯一键约束,或者在每条消息上添加唯一id字段,消费者端进行验证。
消息积压
由于生产者端发送消息太快导致消费者端来不及消费消息出现的消息积压
通过修改消费者程序,将消息转发到一个多分区的topic中,启多个消费者消费这个topic内的消息
由于消费者端bug或者消息格式发生改变导致的消费不动出现的消息积压
通过设置异常topic,将程序错误时的消息发送到异常topic中,再针对这个异常topic写消费者进行分析
消息乱序
Kafka保证同一分区的消息保证是有序的,多个分区消息是无序的。
如果要保证消息有序,可以通过只设置一个分区进行消息发送与消费,但这样会导致并发度极大降低,一般不怎么做。
可以采用在消费者端获取到消息后自定义逻辑保证消息的有序性。如:内存队列
可以采用在消费者端获取到消息后自定义逻辑保证消息的有序性。如:内存队列
Kafka高性能机制
读写数据批量传输和数据压缩
消息持久化时使用磁盘顺序读写机制
Kafka数据传输中的零拷贝机制
Kafka利用操作系统的sendFile机制,减少数据的拷贝次数。不用再从内核空间拷贝到jvm用户空间,减少了两次磁盘拷贝、内核和用户上下文切换
0 条评论
下一页