Kafka消息引擎
2021-02-20 15:37:44 3 举报
AI智能生成
全面介绍Kafka消息引擎,主体大纲从技术简介、发展历程、专业术语、版本选型、部署方案、核心参数、技术原理、高级功能、集群监控、性能调优、最佳实践、Kafka Streams、Apache Kafka等方面解析Kafka。
作者其他创作
大纲/内容
技术简介
Apache Kafka是消息引擎系统,也是一个分布式流处理平台(Distributed Streaming Platform)
发展历程
LinkedIn最初碰到的历史问题?
数据正确性不足。因为数据的收集主要采用轮询的方式,如何确定轮询的间隔时间就变成一个高度经验化的事情,虽然可以采用一些类似启发式算法(Heuristic)来帮助评估间隔时间值,但一旦指定不当,必然会造成较大的数据偏差。
系统高度定制化,维护成本高。各个业务子系统都需要对接数据收集模块,引入了大量的定制开销和人工成本。
Kafka名字的由来?
因为Kafka系统的写性能很强,所以找了个作家的名字来命名似乎是个好主意。大学期间我上了很多文学课,非常喜欢Franz Kafka这个作家,另外开源软件起这个名字听上去很酷。
提供三个方面的特性
提供一套API实现生产者和消费者
降低网络传输和磁盘存储开销
实现高伸缩架构
正式开源
2011年Kafka正式进入到Apache基金会孵化并于次年10月顺利毕业成为Apache顶级项目
专业术语
消息:Record
Kafka处理的主要对象
主题:Topic
主题是承载消息的逻辑容器,实际使用中多用来区分具体业务
分区:Partition
一个有序不变的消息序列,每个主题下可以有多个分区
消息位移:Offset
表示分区中每条消息的位置信息,是一个单调递增且不变的值
副本:Replica
Kafka中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可以配置多个副本实现高可用。
生产者:Producer
向主题发布新消息的应用程序
消费者:Consumer
从主题订阅新消息的应用程序
消费者位移:Consumer Offset
表征消费者消费进度,每个消费者都有自己的消费者位移
消费者组:Consumer Group
多个消费者实例共同组成一个组,同时消费多个分区以实现高吞吐
重平衡:Rebalance
消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程,Rebalance是Kafka消费者端实现高可用的重要手段
版本选型
Apache Kafka
简介:社区版Kafka,开发人数最多,版本迭代速度最快,仅提供最基础的组件,Kafka Connect只提供一种连接器,即读写磁盘文件的连接器,没提供任何监控框架和工具(只能借助第三方监控Kafka Manager)
优势:迭代速度快,社区响应度高,有更高的把控度
劣势:仅提供核心基础组件,缺失一些高级特性
建议:如果仅仅只需要一个消息引擎系统亦或是简单的流处理应用场景,同时需要对系统有较大把控度,推荐使用此版本
Confluent Kafka
简介:分免费版和企业版两种。前者和Apache Kafka非常相像,除常规组件外,免费版还包括Schema注册中心和REST proxy两大功能。除此以外,免费版还包含了更多的连接器,都是Confluent公司开发并认证过的。缺陷在于没有国内业务发展计划
优势:集成了很多高级特性且由Kafka原班人马打造,质量上有保证。
劣势:相关文档资料不全,国内普及效率低,没有太多可供参考的范例
建议:如果需要用到Kafka的一些高级特性,推荐使用此版本
CDH/HDP Kafka
简介:大数据云公司发布的Kafka,天然集成Apache Kafka,有便捷的UI界面进行安装、运维、管理、监控
优势:操作简单,节省运维成本
劣势:把控度低,演进速度较慢
建议:如果需要快速搭建消息引擎系统,或者需要搭建的是多框架构成的数据平台且Kafka只是其中一个组件,那么推荐使用此版本。
部署方案
操作系统
建议线上环境将Kafka部署在Linux操作系统,在以下3个方面,Linux的表现更胜一筹:
1、I/O模型的使用
2、数据网络传输效率
3、社区支持度
1、I/O模型的使用
Kafka客户端底层使用了Java的selecter,selecter在Linux上的实现机制是epoll,而在windows上的实现机制是select,因此在这一点上将Kafka部署在Linux上是有优势的,因为能够获得更高的I/O性能。
2、数据网络传输效率
Kafka需要在磁盘和网络间进行大量数据传输。在Linux部署Kafka能够享受到零拷贝技术所带来的快速数据传输性。
3、社区支持度
社区目前对windows平台上发现的Kafka Bug不做任何承诺,一般不会修复。因此windows平台上部署Kafka只适合于个人测试或用于功能验证,切不要应用于生产环境。
总结
考量点:操作系统I/O模型
建议:将Kafka部署在Linux上
磁盘
1、应该选择普通的机械磁盘还是固态硬盘?
分支主题
使用RAID的优势在于:提供冗余的磁盘存储空间和提供负载均衡
2、到底是否应该使用磁盘阵列(RAID)?
分支主题
总结
考量点:磁盘I/O性能
建议:普通环境使用机械硬盘,不需要搭建RAID
磁盘容量
Kafka集群的存储容量规划?
磁盘容量规划需考虑以下几个元素:
1、新增消息数
2、消息留存时间
3、平均消息大小
4、备份数
5、是否启用压缩
举例说明:假如每天需向Kafka集群发送1亿条消息,每条消息保存两份防止丢失,默认保存两周时间,假设消息平均大小是1KB,那么需预留多少磁盘空间?
每天消息总空间大小等于 1亿 * 1KB * 2 / 1000 / 1000 = 200GB 。Kafka集群除了消息数据还有其他类型比如所索引数据,再为这些数据预留 10% 的空间,总容量为 220 GB,保存两周,整体容量为 220GB * 14 ≈ 3TB 左右,Kafka支持数据压缩,假设压缩比例是0.75,那么最后需要规划的存储空间是 0.75 * 3 = 2.25TB
总结
考量点:根据消息数、存留时间预估磁盘容量
建议:实际使用中建议预留20% ~ 30%的磁盘空间
带宽
如何进行宽带资源的规划?
带宽主要有两种:1Gbps的千兆网络和10Gbps的万兆网络,以千兆网络举例说明
举例说明:假设公司机房是千兆网络,即1Gbps,现在有个业务,其业务目标或SLA是在1小时内处理1TB的业务数据。那么问题来了,到底需要多少台Kafka服务器来完成这个业务?
由于带宽是1Gbps,即每秒处理1Gb的数据,假设每台Kafka服务器都是安装在专属的机器上,也就是说每台Kafka机器上没有混布其它服务,毕竟真实环境中不建议这么做,通常情况下,你只能假设Kafka会用到70%的带宽资源,因为总要为其它应用或进程留一些资源。
根据实际使用经验,超过70%的阈值就有网络丢包的可能性了,所以70%的设定是一个比较合理的值,也就是说单台Kafka服务器最多也就能使用700Mb的资源。这是它实用的最大宽带资源,你不能让Kafka服务器常规性使用这么多资源,故通常要再额外预留2/3的资源,即单台服务器使用带宽700Mb / 3 ≈ 240Mbps。(这里的2/3其实是相当保守的,你可以结合自己机器的使用情况酌情减少比值。)
有了240Mbps,我们就可以计算1小时内处理1TB数据所需的服务器数量了。根据这个目标,我们每秒需要处理2336Mb(1024 * 1024 / 3600 * 8)的数据,除了240,约等于10台服务器。如果消息还需要额外复制2份,那么用的服务器数还需要乘以3,即30台。
总结
考量点:根据实际宽带资源和业务SLA预估服务器数量
对于千兆网络,建议每台服务器按照700Mbps来计算,避免大流量下的丢包
核心参数
Broker端参数
存储信息
log.dirs
指定了 Broker 需要使用的若干个文件目录路径,在线上生产环境中一定要为 log.dirs 配置多个路径,具体格式是一个CSV 格式,也就是用逗号分隔的多个路径,比如/home/kafka1,/home/kafka2,/home/kafka3 这样。如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。
挂载在不同的物理磁盘好处:
1、提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。
2、能够实现故障转移:即 Failover 。这是 Kafka 1.1 版本新引入的强大功能。要知道在以前,只要Kafka Broker 使用的任何一块磁盘挂掉了,整个 Broker 进程都会关闭。但是自 1.1 开始,这种情况被修正了,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且 Broker 还能正常工作。
log.dir
注意这是 dir ,结尾没有 s ,说明它只能表示单个路径,它是补充上一个参数用的
Zookeeper相关
zookeeper.connect
如果我让多个 Kafka 集群使用同一套 ZooKeeper 集群,那么这个参数应该怎么设置呢?这时候chroot 就派上用场了。这个 chroot 是 ZooKeeper 的概念,类似于别名。
Broker 连接相关
listeners
学名叫监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务
advertised.listeners
和 listeners 相比多了个 advertised 。 Advertised 的含义表示宣称的、公布的,就是说这组监听器是 Broker 用于对外发布的
host.name/port
列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了
Topic 管理
auto.create.topics.enable
是否允许自动创建 Topic
unclean.leader.election.enable
是否允许 Unclean Leader 选举
false
坚持之前的原则,坚决不能让那些落后太多的副本竞选 Leader 。这样做的后果是这个分区就不可用了,因为没有 Leader 了
true
那么 Kafka 允许你从那些 “ 跑得慢 ” 的副本中选一个出来当 Leader 。这样做的后果是数据有可能就丢失了,因为这些副本保存的数据本来就不全,当了 Leader 之后它本人就变得膨胀了,认为自己的数据才是权威的。
auto.leader.rebalance.enable
是否允许定期进行 Leader 选举
如果设置为true表示允许 Kafka 定期地对一些 Topic 分区进行 Leader 重选举,当然这个重选举不是无脑进行的,它要满足一定的条件才会发生。严格来说它与上一个参数中 Leader 选举的最大不同在于,它不是选 Leader ,而是换 Leader !比如 Leader A 一直表现得很好,但若 auto.leader.rebalance.enable=true ,那么有可能一段时间后 Leader A 就要被强行卸任换成
Leader B
数据留存
log.retention.{hour|minutes|ms}
这是个 “ 三兄弟 ” ,都是控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、 minutes 次之、 hour 最低
log.retention.bytes
这是指定 Broker 为消息保存的总磁盘容量大小
这个值默认是 -1 ,表明你想在这台 Broker 上保存多少数据都可以,至少在容量方面 Broker 绝对为你开绿灯,不会做任何阻拦。
这个参数真正发挥作用的场景其实是在云上构建多租户的 Kafka 集群:设想你要做一个云上的 Kafka 服务,每个租户只能使用100GB 的磁盘空间,为了避免有个 “ 恶意 ” 租户使用过多的磁盘空间,设置这个参数就显得至关重要了
message.max.bytes
控制 Broker 能够接收的最大消息大小
这个参数默认的977KB太少了,还不到 1MB
实际场景中突破 1MB 的消息都是屡见不鲜的,因此在线上环境中设置一个比较大的值还是比较保险的做法。毕竟它只是一个标尺而已,仅仅衡量 Broker 能够处理的最大消息大小,即使设置大一点也不会耗费什么磁盘空间的
Topic级别参数
保存消息
retention.ms
规定了该 Topic 消息被保存的时长。默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值
retention.bytes
规定了要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1 ,表示可以无限使用磁盘空间
处理消息大小
max.message.bytes
它决定了 Kafka Broker 能够正常接收该 Topic 的最大消息大小
设置Topic级别参数
创建 Topic 时进行设置
需求:设想你的部门需要将交易数据发送到 Kafka 进行处理,需要保存最近半年的交易数据,同时这些数据很大,通常都有几 MB ,但一般不会超过5MB 。现在让我们用以下命令来创建 Topic :
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topictransaction --partitions 1 --replication-factor 1 --configretention.ms=15552000000 --configmax.message.bytes=5242880
下面看看使用另一个自带的命令 kafka-configs 来修改 Topic 级别参数。假设我们现在要发送最大值是 10MB 的消息,该如何修改呢?命令如下:
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-typetopics --entity-nametransaction --alter --add-config max.message.bytes=10485760
修改 Topic 时设置
JVM参数
堆大小(Heap Size)设置
将你的 JVM 堆大小设置成 6GB 吧,这是目前业界比较公认的一个合理值
垃圾回收器(GC)设置
Java7/8
如果 Broker 所在机器的 CPU 资源非常充裕,建议使用 CMS 收集器。启用方法是指定 -XX:+UseCurrentMarkSweepGC
否则,使用吞吐量收集器。开启方法是指定 -XX:+UseParallelGC
>=Java9
用默认的 G1 收集器就好了。在没有任何调优的情况下, G1 表现得要比 CMS 出色,主要体现在更少的 Full GC ,需要调整的参数更少等,所以使用G1 就好了
Kafka相关
KAFKA_HEAP_OPTS:指定堆大小
KAFKA_JVM_PERFORMANCE_OPTS:指定GC参数
比如你可以这样启动 Kafka Broker ,即在启动 Kafka Broker 之前,先设置上这两个环境变量:
$> export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g
$> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
$> bin/kafka-server-start.sh config/server.properties
操作系统参数
文件描述符限制
通常情况下将它设置成一个超大的值是合理的做法,比如ulimit -n 1000000
文件系统类型
这里所说的文件系统指的是如ext3、ext4或XFS这样的日志型文件系统。根据官网的测试报告,XFS的性能要强于ext4,所以生产环境最好还是使用XFS。(ZFS性能更强劲)
Swappiness
建议将swappniess配置成一个接近0但不为0的值,比如1
提交时间(Flush落盘时间)
默认是5秒,太过频繁,可以适当地增加提交间隔
技术原理
生产者消息分区机制
为什么分区?
Kafka消息组织结构实际为三级结构:主题-分区-消息,主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。
Kafka为何使用分区设计,而不直接使用多个主题?
分区的作用?
就是提供负载均衡的能力
为了实现系统的高伸缩性( Scalability )
实现业务级别的消息顺序的问题
分区策略?
轮询策略( Round-robin)
即顺序分配。比如一个主题下有 3 个分区,那么第一条消息被发送到分区0 ,第二条被发送到分区 1 ,第三条被发送到分区 2 ,以此类推。当生产第 4 条消息时又会重新开始,即将其分配到分区 0
轮询策略是 Kafka Java 生产者 API 默认提供的分区策略。如果你未指定 partitioner.class 参数,那么你的生产者程序会按照轮询的方式在主题的所有分区间均匀地 “ 码放 ” 消息。
总结:轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。
随机策略(Randomness)
所谓随机就是我们随意地将消息放置到任意一个分区上
实现随机策略版的 partition 伪代码,只需两行代码即可:
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。
本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以 如果追求数据的均匀分布,还是使用轮询策略比较好。事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。
按消息键保序策略( Key-ordering)
Kafka 允许为每条消息定义消息键,简称为 Key 。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据。
特别是在 Kafka 不支持时间戳的年代,在一些场景中,工程师们都是直接将消息创建时间封装进 Key里面的。一旦消息被定义了 Key ,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略
实现这个策略的 partition 方法同样简单,只需要下面两行代码即可:
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
Kafka 默认分区策略实际上同时实现了两种策略:如果指定了 Key ,那么默认实现按消息键保序策略;如果没有指定 Key ,则使用轮询策略。
其它分区策略
基于地理位置的分区策略
这种策略一般只针对那些大规模的 Kafka 集群,特别是跨城市、跨国家甚至是跨大洲的集群。
实例:根据 Broker 所在的 IP 地址实现定制化的分区策略
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();
自定义分区策略
需要显式地配置生产者端的参数 partitioner.class
编写生产者程序时,你可以编写一个具体的类实现 org.apache.kafka.clients.producer.Partitioner 接口。这个接口也很简单,只定义了两个方法: partition() 和 close() ,通常你只需要实现最重要的 partition 方法。
方法签名:int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
这里的 topic 、 key 、 keyBytes 、 value 和 valueBytes 都属于消息数据, cluster 则是集群信息(比如当前 Kafka 集群共有多少主题、多少 Broker 等)。 Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。只要你自己的实现类定义好了 partition 方法,同时设置 partitioner.class 参数为你自己实现类的 Full Qualified Name ,那么生产者程序就会按照你的代码逻辑对消息进行分区。虽说可以有无数种分区的可能,但比较常见的分区策略也就那么几种
生产者压缩算法
怎么压缩
Kafka消息格式
V1版本
V2版本
V2 版本是 Kafka 0.11.0.0 中正式引入的
Kafka消息层次
消息集合( message set )
(若干条)日志项( record item )
消息( message )
为什么引入V2版本?
把消息的公共部分抽取出来放到外层消息集合里面,这样就不用每条消息都保存这些信息了
原来在 V1 版本中,每条消息都需要执行 CRC 校验,但有些情况下消息的 CRC 值是会发生变化的
比如在 Broker 端可能会对消息时间戳字段进行更新,那么重新计算之后的 CRC值也会相应更新
再比如 Broker 端在执行消息格式转换时(主要是为了兼容老版本客户端程序),也会带来 CRC 值的变化
保存压缩消息的方法发生了变化
之前 V1 版本中
保存压缩消息的方法是把多条消息进行压缩然后保存到外层消息的消息体字段中
而 V2 版本的
做法是对整个消息集合进行压缩。显然后者应该比前者有更好的压缩效果
何时压缩
在 Kafka 中,压缩可能发生在两个地方:生产者端和 Broker 端
生产者端
生产者程序中配置 compression.type 参数即表示启用指定类型的压缩算法
实例:如何构建一个开启 GZIP 的 Producer 对象:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 开启 GZIP 压缩
props.put("compression.type", "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);
这里比较关键的代码行是 props.put(“compression.type”, “gzip”) ,它表明该 Producer 的压缩算法使用的是 GZIP 。这样 Producer 启动后生产的每个消息集合都是经 GZIP 压缩过的,故而能很好地节省网络传输带宽以及 Kafka Broker 端的磁盘占用
Broker 端
大部分情况下 Broker 从 Producer 端接收到消息后仅仅是原封不动地保存而不会对其进行任何修改
有两种例外情况就可能让 Broker 重新压缩消息
情况一: Broker 端指定了和 Producer 端不同的压缩算法
实例:Producer端使用GZIP压缩,Broker端使用 Snappy 算法进行压缩?
这种情况下 Broker 接收到 GZIP 压缩消息后,只能解压缩然后使用 Snappy 重新压缩一遍。如果你翻开 Kafka 官网,你会发现 Broker 端也有一个参数叫 compression.type ,和上面那个例子中的同名。但是这个参数的默认值是 producer ,这表示 Broker 端会 “ 尊重 ”Producer 端使用的压缩算法。可一旦你在 Broker 端设置了不同的 compression.type 值,就一定要小心了,因为可能会发生预料之外的压缩 / 解压缩操作,通常表现为 Broker 端 CPU 使用率飙升。
情况二: Broker 端发生了消息格式转换
所谓的消息格式转换主要是为了兼容老版本的消费者程序,在一个生产环境中, Kafka 集群中同时保存多种版本的消息格式非常常见。为了兼容老版本的格式, Broker 端会对新版本消息执行向老版本格式的转换。
这个过程中会涉及消息的解压缩和重新压缩。一般情况下这种消息格式转换对性能是有很大影响的,除了这里的压缩之外,它还让Kafka 丧失了引以为豪的 Zero Copy 特性。
Zero Copy:说的是当数据在磁盘和网络进行传输时避免昂贵的内核态数据拷贝,从而实现快速的数据传输
何时解压缩
Consumer 端解压缩
通常来说解压缩发生在消费者程序中,也就是说 Producer 发送压缩消息到Broker 后, Broker 照单全收并原样保存起来。当 Consumer 程序请求这部分消息时, Broker 依然原样发送出去,当消息到达 Consumer 端后,Consumer 自行解压缩还原成之前的消息
Consumer 怎么知道这些消息是用何种压缩算法压缩的呢?
Kafka 会将启用了哪种压缩算法封装进消息集合中,这样当 Consumer 读取到消息集合时,它自然就知道了这些消息使用的是哪种压缩算法。
Broker 端解压缩
每个压缩过的消息集合在 Broker 端写入时都要发生解压缩操作,目的就是为了对消息执行各种验证
总结: Producer 端压缩、 Broker 端保持、Consumer 端解压缩
各种压缩算法对比
衡量压缩算法优劣的两个重要指标
压缩比
原先占 100 份空间的东西经压缩之后变成了占 20 份空间,那么压缩比就是 5 ,显然压缩比越高越好
压缩/解压缩的吞吐量
比如每秒能压缩或解压缩多少 MB 的数据。同样地,吞吐量也是越高越好
GZIP
Snappy
LZ4
Zstandard(简写为 zstd)
Kafka中使用结论
吞吐量
LZ4 > Snappy > zstd 和 GZIP
压缩比
zstd > LZ4 > GZIP > Snappy
网络带宽
使用 Snappy 算法占用的网络带宽最多, zstd 最少,这是合理的,毕竟 zstd 就是要提供超高的压缩比
CPU 使用率
各个算法表现得差不多,只是在压缩时 Snappy 算法使用的 CPU 较多一些,而在解压缩时 GZIP 算法则可能使用更多的 CPU
最佳实践
何时启用压缩是比较合适的时机呢?
CPU资源充足
Producer 程序运行机器上的 CPU 资源要很充足。如果 Producer 运行机器本身 CPU 已经消耗殆尽了,那么启用消息压缩无疑是雪上加霜,只会适得其反
如果你的客户端机器 CPU 资源有很多富余,强烈建议你开启 zstd压缩,这样能极大地节省网络资源消耗
带宽资源有限
毕竟万兆网络还不是普通公司的标配,因此千兆网络中 Kafka 集群带宽资源耗尽这件事情就特别容易出现
有条件的话尽量保证不要出现消息格式转换的情况
Java生产者如何管理TCP连接
Java消费者如何管理TCP连接
Kafka副本机制
高级功能
Kafka 拦截器
生产者拦截器
生产者拦截器允许你在发送消息前以及消息
提交成功后植入你的拦截器逻辑
消费者拦截器
消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑
集群监控
性能调优
最佳实践
无消息丢失配置实现
Kafka 到底在什么情况下才能保证消息不丢失呢?
Kafka 只对 “ 已提交 ” 的消息( committed message )做有限度的持久化保证
第一个核心要素是 “ 已提交的消息 ”
当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为 “ 已提交 ” 消息了。
为什么是若干个 Broker 呢?这取决于你对 “ 已提交 ” 的定义
你可以选择只要有一个 Broker 成功保存该消息就算是已提交
也可以是令所有 Broker 都成功保存该消息才算是已提交
第二个核心要素就是 “ 有限度的持久化保证 ”
这里的有限度就是说 Kafka 不丢消息是有前提条件的
假如你的消息保存在 N 个 Kafka Broker 上,那么这个前提条件就是这 N 个 Broker 中至少有 1个存活。只要这个条件成立, Kafka 就能保证你的这条消息永远不会丢失
“ 消息丢失 ” 案例
案例 1 :生产者程序丢失数据
场景:你写了一个 Producer 应用向 Kafka 发送消息,最后发现 Kafka 没有保存,于是大骂: “Kafka 真烂,消息发送居然都能丢失,而且还不告诉我?!
fire and forget方式
目前 Kafka Producer 是异步发送消息的,也就是说如果你调用的是 producer.send(msg) 这个API ,那么它通常会立即返回,但此时你不能认为消息发送已成功完成。这种发送方式有个有趣的名字,叫 “fire and forget” (发射后不管),它的意思是,执行完一个操作后不去管它的结
果是否成功。
这种方式挺不靠谱,如果出现消息丢失,
我们是无法知晓的。
使用producer.send(msg) 导致消息没有发送成功的原因?
网络抖动,导致消息压根就没有发送到 Broker 端
消息本身不合格导致 Broker 拒绝接收(比如消息太大了,超过了 Broker 的承受能力)
解决方案: Producer 永远要使用带有回调通知的发送 API ,也就是说不要使用producer.send(msg) ,而要使用 producer.send(msg, callback) 。
这里的callback可以针对消息提交失败的情况进行有针对性的处理,处理发送失败的责任在 Producer 端而非 Broker端。
瞬时错误
仅仅让 Producer 重试就可以了
消息不合格
可以调整消息格式后再次发送
Broker 都宕机
无论 Producer 端怎么重试都会失败的
案例 2 :消费者程序丢失数据
Consumer 端丢失数据主要体现在 Consumer 端要消费的消息不见了。 Consumer 程序有个 “ 位移 ” 的概念,表示的是这个 Consumer 当前消费到的 Topic 分区的位置。
针对位移有2种情况会出现数据丢失
将先消费消息,再更新位移顺序颠倒
加入当前位移是90, 我先将位移更新到100,然后开始消费消息,当消费到第95条时出错终止了消费,那么下次消费从100开始,96~99的消息就会丢失。
解决方案:维持先消费消息,再更新位移的顺序,就能最大限度地保证消息不丢失
可能带来的问题是消息的重复消费
多线程消费,某个线程运行失败导致
Consumer 程序从 Kafka 获取到消息后开启了多个线程异步处理消息,而 Consumer 程序自动地向前更新位移。假如其中某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于 Consumer 而言实际上是丢失了
解决方案:如果是多线程异步处理消费消息, Consumer 程序不要开启自动提交位移,而是要应用程序手动提交位移
最佳实践
不要使用 producer.send(msg) ,而要使用 producer.send(msg, callback) 。记住,一定要使用带有回调通知的 send 方法
设置 acks = all 。 acks 是 Producer 的一个参数,代表了你对 “ 已提交 ” 消息的定义。如果设置成all ,则表明所有副本 Broker 都要接收到消息,该消息才算是 “ 已提交 ” 。这是最高等级的 “ 已提交 ” 定义
设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失
设置 unclean.leader.election.enable = false 。这是 Broker 端的参数,它控制的是哪些 Broker有资格竞选分区的 Leader 。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的Leader ,必然会造成消息的丢失。故一般都要将该参数设置成 false ,即不允许这种情况的发生
设置 replication.factor >= 3 。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余
设置 min.insync.replicas > 1 。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是 “ 已提交 ” 。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1
确保 replication.factor > min.insync.replicas 。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可
用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1
确保消息消费完成再提交。 Consumer 端有个参数 enable.auto.commit ,最好把它设置成false ,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的
Kafka Streams
Apache Kafka
Kafka版本命名
例如:kafka_2.11-2.2.1.tgz 前面的版本号是编译Kafka源代码的Scala编译器版本。所以这里实际Kafka的版本号为2.1.1。 Kafka服务端代码完全由Scala编写,而Kafka新版客户端代码完全由Java语言编写。(至于为什么不用Scala,只是因为社区来了一批Java程序员,老的Scala程序员隐退罢了)
由三部分组成,即“大版本号-小版本号-Patch号”
Kafka版本演进
0.7
开源的初始版本,只提供了最基础的消息队列功能,连副本机制都没有,不建议使用。
0.8
正式引入副本机制,成为真正意义上完备的分布式高可靠消息队列解决方案,需使用老版本客户端API,需指定Zookeeper的地址而非Broker地址
1、老版本客户端API存在很多问题,特别是生产者API,它默认使用同步方式发消息,吞吐量不会太高。虽然其支持异步方式,但实际场景可能会造成消息丢失,因此0.8.2.0版本社区引入了新版本Producer API,即需要指定Broker地址的Producer
2、对于使用0.8版本的建议至少升级到0.8.2.2这个版本,因为该版本中老版本消费者API是比较稳定的,这个版本不建议使用新版本Producer API,此时它的bug还非常多。
0.9
这是一个重量级的大版本更新,增加了基础的安全认证/权限功能
同时使用Java重写了新版本消费者API
此时新版本Producer API在这个版本中算比较稳定了,线上环境可以切换新版Producer,但不要使用新版本Consumer API,因为Bug超多
引入了Kafka Connect组件用于实现高性能的数据抽取
0.10
里程碑式的大版本,该版本引入了Kafka Streams,从这个版本起,Kafka正式升级成分布式流处理平台(主要包括0.10.1和0.10.2两个小版本)
在0.10.2.2版本起,新版本Consumer API算是比较稳定了,所以如果使用0.10大版本,建议至少升级到0.10.2.2然后使用新版本Consumer API。并且0.10.2.2修复了一个可能导致Producer性能降低的Bug。
0.11
引入了两个重量级的功能变更:一个是提供幂等性Producer API以及事物(Transaction)API
该特性是Kafka实现流处理结果正确性的基石,没有它们,Kafka Streams在做流处理时无法向批处理那样保证结果的正确性。
由于刚推出,此时的事物API有一些Bug,不算十分稳定
另一个是对Kafka消息格式做了重构
目前最主流的版本之一,这个版本中各大功能组件都变得非常稳定了。如果对1.0版本是否用于线上环境依然感到困惑,至少应升级到0.11.0.3因为这个版本的消息引擎功能已经非常完善了。
1.0
2.0
0 条评论
下一页