kafka核心
2021-11-22 10:41:10 0 举报
AI智能生成
KAFKA核心
作者其他创作
大纲/内容
kafka原理
备份机制
好处
方便实现“Read-your-writes”
方便实现单调读(Monotonic Reads)
副本角色
基于领导者(Leader-based)的副本机制
追随者副本
不对外提供服务
请求处理
方案
顺序请求
缺陷
吞吐量差
适用
请求发送非常不频繁的系统
每个请求使用单独线程处理
缺陷
开销大
适用
请求发送频率很低的业务场景
Reactor模式
Reactor模式是事件驱动架构的一种是先方式,特别适合应用于
处理多个客户端并发向服务器端发送请求的场景。
处理多个客户端并发向服务器端发送请求的场景。
num.network.threads
默认3,表示每台Broker启动时会创建3个网络线程,专门处理客户端发送的请求
Rebalance全流程解析
Controller
职责
1、主题管理(创建、删除、增加分区)
kafka-topic脚本
2、分区重分配
kafka-reassign-partitions脚本
3、Preferred领导者选举
只要是kafka为了避免Broker负载过量而
提供的一种换Leader的方案。
提供的一种换Leader的方案。
4、集群成员管理(新增Broker、Broker主动关闭、Broker宕机)
包括自定检测新增Broker、Broker主动或被动宕机
5、数据服务
控制器故障转移
(Failover)
(Failover)
当运行中的控制器突然宕机或意外终止时,
kafka能够快速地感知到,并立即启用备用
控制器来替代之前失效的控制器。
kafka能够快速地感知到,并立即启用备用
控制器来替代之前失效的控制器。
高水位
作用
定义消息可见性,即用来标识分区下的哪些消息是可以被消费的
帮助kafka完成副本同步
运维与监控
主题管理
动态配置
消费者组位移管理
KafkaAdminClient
认证机制
MirrorMaker
监控框架
授权管理
kafka调优
流处理应用搭建实例
高级kafka应用
Kafka Streams
Kafka DSL开发
应用实例
基本使用
线上方案制定
操作系统
Windows
只适合于个人测试或用于功能验证,
千万不要应用于生产环境
千万不要应用于生产环境
Linux
最多,最优
I/O模型的使用
阻塞式I/O
非阻塞式I/O
I/O多路复用
信号驱动I/O
异步I/O
数据网络传输效率
社区支持度
能享受到零拷贝基数所带来的的快速数据传输特性
macOS
磁盘
使用机械磁盘完全能够胜任kafka线上环境
追求性价比的公司可以不搭建RAID,使用普通磁盘组成存储空间即可
磁盘容量
新增消息条数
消息留存时间
平均消息大小
备份数
是否启用压缩
带宽
集群配置参数
参数类型
静态参数:指必须在Kafka的配置文件server.properties中进行设置的参数,
同时必须重启Broker进程才能生效。
同时必须重启Broker进程才能生效。
主体级别参数:Kafka提供专门的kafka-configs命令来修改
JVM和操作系统级别参数
Broker端参数
针对存储信息
log.dirs:重要!指定Broker需要使用的若干个文件目录的路径。
没有默认值,需亲自指定。例:/home/kafka1,/home/kafka2,...
没有默认值,需亲自指定。例:/home/kafka1,/home/kafka2,...
提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据,有更高的吞吐量
能够实现在故障转移,即Failover
log.dir:单个路径,对上一个参数的补充
(只设置log.dirs就行,不要设置这个参数)
(只设置log.dirs就行,不要设置这个参数)
Zookeeper参数
Zookeeper的作用
负责协调管理并保存Kafka集群的所有元数据信息,比如集群有哪些Broker在运行、
创建了哪些Topic、每个Topic都有多少分区以及这些分区的Leader副本都在哪些
机器上等信息。
创建了哪些Topic、每个Topic都有多少分区以及这些分区的Leader副本都在哪些
机器上等信息。
zookeeper.connect:Kafka与zookeeper相关最重要的参数,
例如:zk1:2181,zk2:2181,zk3:2181,...2181是zookeeper默认端口
例如:zk1:2181,zk2:2181,zk3:2181,...2181是zookeeper默认端口
chroot
多个kafka集群使用同一套zookeeper集群
zookeeper.connect配置如:
zk1:2181,zk2:2181,zk3:2181/kafka1和
zk1:2181,zk2:2181,zk3:2181/kafka2
切记chroot只需要写1次,而且是加到最后
zk1:2181,zk2:2181,zk3:2181/kafka1和
zk1:2181,zk2:2181,zk3:2181/kafka2
切记chroot只需要写1次,而且是加到最后
Broker之间通讯
listeners
学名监听器,告诉外部连接者要通过什么协议访问
指定主机名和端口开放的kafka服务
指定主机名和端口开放的kafka服务
若干个逗号分隔的三元组,每个三元组的格式为<协议名称,主机名,端口号>
协议名称可以是标准的名字(PLAINTEXE、SSL、TLS等)也可以自定义
协议名称可以是标准的名字(PLAINTEXE、SSL、TLS等)也可以自定义
一旦自定义,就要指定listener.scurity.protocol.map参数,告诉这个协议底层使用了哪种安全协议。
比如listener.scurity.protocol.map=CONTROLLER:PLAINTEXT。表示CONTROLLER这个自定义协议
底层使用明文不加密传输数据。
比如listener.scurity.protocol.map=CONTROLLER:PLAINTEXT。表示CONTROLLER这个自定义协议
底层使用明文不加密传输数据。
主机名建议:全部使用主机名,即Broker端和Client端应用配置中全部填写主机名。
advertisend.listeners
和listeners相比多了个advertised。Advertised的含义表示宣称的、公布的,
就是说这组监听器是Broker用于对外发布的。
就是说这组监听器是Broker用于对外发布的。
host.nome/port
过期的参数,不需要指定,可以忘掉
Topic管理
auto.create.topics.enable:是否允许自动创建Topic
建议设置成false,防止线上存在稀奇古怪的topic
unclean.leader.election.enable:是否允许Unclean Leader选举
默认false,建议显示设置成false
auto.leader.rebalance.enable:是否云系定期进行Leader选举
建议生产环境设置成false
数据留存
log.retention.{hours|minutes|ms}:都是控制一条消息数据被保存多长时间,
优先级:ms>minutes>hours
优先级:ms>minutes>hours
log.retention.bytes:指定Broker为消息保存的总磁盘容量大小,
默认-1,表明不限制。
默认-1,表明不限制。
message.max.bytes:控制Broker能够接受的最大消息大小。
默认1000012,不到1MB,太少了。建议设置一个比较大的值。
默认1000012,不到1MB,太少了。建议设置一个比较大的值。
Topic级别参数
retention.ms:规定该消息被保存的时长,默认7天。
设置该参数会覆盖Broker端的全局参数。
设置该参数会覆盖Broker端的全局参数。
retention.bytes:规定了要为Topic预留多大空间,默认设置为-1.
该参数再多租户的Kafka集群中会有用武之地。
该参数再多租户的Kafka集群中会有用武之地。
设置方式
创建Topic时进行设置
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880
修改Topic时设置
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760
个人建议使用这种方式
JVM参数
将JVM堆大小设置成6GB
GC设置
Java7
如果Broker所在机器的CPU资源非常充裕,建议使用CMS收集器。
启用方法是指定XX:+UseCurrentMarSweepGC
启用方法是指定XX:+UseCurrentMarSweepGC
否则使用吞吐量收集器。开启方法时指定XX:+UseParalleGC
Java8
手动设置使用G1收集器
设置(环境变量)
KAFKA_HEAP_OPTS:指定堆大小
KAFKA_JVM_PERFOR<ANCE_OPTS:指定GC参数
例:
$> export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g
$> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
$> bin/kafka-server-start.sh config/server.properties
$> export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g
$> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
$> bin/kafka-server-start.sh config/server.properties
操作系统参数
文件描述符限制
ulimirt -n 1000000
文件系统类型
ext3
ext4
XFS
最好选用XFS
Swappiness
配置成一个接近0但不是0的值
提交时间
适当拉大提交间隔
kafka入门
消息引擎基础
编码格式
使用纯二进制字节序列
传输协议
点对点模型
(也叫消息队列模型)
(也叫消息队列模型)
发布/订阅模型
主题(Topic)
发布者(Publisher)
订阅者(Subscriber)
kafka基本术语
消息(Record)
主题(topic)
发布订阅的对象叫做主题
客户端(clients)
生产者(Producer)
向主题发送消息的客户端应用程序叫生产者
消费者(Consumer)
订阅主题消息的客户端应用程序叫做消费者
消费者位移(Consumer Offset)
记录当前消费了分区的哪个位置上
(消费者消费进度的指示器)
(消费者消费进度的指示器)
消费者组(Consumer Group)
多个消费者实例共同组成一个组来消费一组主题。
这个组主题中的每个分区都只会被组内的一个消费者实例消费
这个组主题中的每个分区都只会被组内的一个消费者实例消费
服务器端
由被称为Broker的服务进程构成
Broler负责接受和处理客户端发送过来的请求,以及对消息进行持久化
通常将不同的Broker分散运行在不同的机器上,
避免一台机器宕机,全部Broker进程都挂掉。
避免一台机器宕机,全部Broker进程都挂掉。
高可用
备份机制(Replication)
就是把相同的数据拷贝到多台机器上,
而这些相同的数据拷贝称为副本(Replica)
而这些相同的数据拷贝称为副本(Replica)
领导者副本(Leader Replica)
对外(客户端)进行交互
追随者副本(Follower Replica)
只是被动地追随领导者副本而已,
不能与外界交互
不能与外界交互
副本的工作机制
生产者总是向领导者副本写消息
消费者总是从领导者副本读消息
追随者副本只做一件事:向领导者副本发送请求,
请求领导者把最新生产的消息发给它,这样它能
保持与领导者的同步。
请求领导者把最新生产的消息发给它,这样它能
保持与领导者的同步。
伸缩性(Scalability)
分区(Partitioning)
将每个主题划分成多个分区,每个分区是一组有序的消息日至。
生产者生产的每条消息只会发送到一个分区中。
生产者生产的每条消息只会发送到一个分区中。
位移(Offset)
每条消息在分区中的位置信息由一个叫位移的舒俱来表征
三层消息架构
第一层是主题层
每个主题可以配置M个分区,
而每个分区又可以配置N个副本。
而每个分区又可以配置N个副本。
第二层是分区层
每个分区的N个副本中只能有一个充当领导角色,对外提供服务。
其他N-1个副本是追随者副本,只是提供数据冗余之用。
其他N-1个副本是追随者副本,只是提供数据冗余之用。
第三层是消息层
分区中包含若干条消息,每条消息位移从0开始,依次递增。
持久化
kafka使用消息日志(Log)来保存数据,一个日志就是在磁盘上一个只能追加写(Append-only)消息的物理文件
因为是追加,避免了缓慢的随机I/O操作,采用顺序I/O写操作
删除旧日志:通过日志段(Log Segment)机制
kafka后台有定时任务定期检查老日志段是否被删除,
实现回收磁盘空间的目的
实现回收磁盘空间的目的
重平衡(Rebalance)
消费者组内某个消费者实例挂掉后,
其他消费者实例自动重新分配订阅
主题分区的过程。Rebalance是kafka
消费者端实现高可用的重要手段。
其他消费者实例自动重新分配订阅
主题分区的过程。Rebalance是kafka
消费者端实现高可用的重要手段。
kafka角色定位
是消息引擎系统,也是一个分布式流处理平台
优势
更容易实现端对端的正确性
kafka自己对于流式计算的定位
kafka版本选择
Apache Kafka
社区版
Confluent Kafka
商业化
跨数据中心备份
Schema注册中心
集群监控工具
Cloudera/Hortonworks Kafka
提供大数据平台
版本号
客户端
生产者
分区机制
作用:提升负载均衡的能力,实现系统的高伸缩性(Scalability)
分区策略
默认分区策略
自定义分区策略
需要显示地配置生产者端的参数partitioner.class
轮询策略(Round-robin,顺序分配)
轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地
平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是
最常用的分区策略之一。
平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是
最常用的分区策略之一。
随机策略(Randomness)
按消息键保存策略(Key-ordering)
其他策略
基于地理位置的分区策略
一般只针对于大规模Kafka集群,特别是跨城市、跨国家、
甚至是跨大洲的集群
甚至是跨大洲的集群
压缩算法
合适压缩
生产者端
生产者程序中配置compression.type参数表示启用指定的压缩算法
Broker端
2.1.0版本前
GZIP
Snappy
Lz4
2.1.0后
增加zstd(Zstandard)算法
算法优劣
吞吐量
LZ4>Snappy>zstd和GZIP
压缩比
zstd>LZ4>GZIP>Snappy
无消息丢失配置
kafka只对“已提交”的消息(committed message)做有限度的持久化保证
生产者丢失数据
producer.sens(msg)
不等待确认消息,立即返回
producer.send(msg,callback)
带回调通知的发送API
消费者程序丢失数据
解决方案:维持先消费消息,再更新位移的顺序
重复消费(多线程消费)
Consumer程序不要开启自动提交位移,而是要应用程序手动提交位移
最佳实践
1、不是要使用producer.send(msg),而是用producer.send(msg,callback)
2、设置acks=all。acks是Producer的一个参数,代表了对“已提交”消息的定义,
如果设置成all,则表明所有副本Broker都要接收到消息,该消息才是“已提交”。
这是最高等级的“已提交”定义
如果设置成all,则表明所有副本Broker都要接收到消息,该消息才是“已提交”。
这是最高等级的“已提交”定义
3、设置retries为一个较大的值。这里的retries是Producer的参数,对应Producer自动重试。
当出现网络的瞬时抖动时,消息发送可能会失败,此时设置了retries>0的Producer能够自动
重试,避免消息丢失。
当出现网络的瞬时抖动时,消息发送可能会失败,此时设置了retries>0的Producer能够自动
重试,避免消息丢失。
4、设置unclean.leader.election.enable=false。这是Broker端的参数,它控制的是
哪些Broker有资格竞选分区Leader。如果一个Broker落后原先的Leader太多,那么它
一旦成为新的Leader,就会造成数据丢失,设置为false是不允许这种情况发生。
哪些Broker有资格竞选分区Leader。如果一个Broker落后原先的Leader太多,那么它
一旦成为新的Leader,就会造成数据丢失,设置为false是不允许这种情况发生。
5、设置replication.factor>=3。Broker端的参数,标识最好将消息保存几份,
毕竟目前防止消息丢失的主要机制就是冗余。
毕竟目前防止消息丢失的主要机制就是冗余。
6、设置min.insync.replicas>1。Broker端参数,控制的是消息至少要被写入
多少个副本才算“已提交”。设置成大于1可以提升消息持久性。实际环境中
千万不要使用默认值1.
多少个副本才算“已提交”。设置成大于1可以提升消息持久性。实际环境中
千万不要使用默认值1.
7、确保replication.factor>min.insync.replicas。如果两者相等,那么只要其中一个
副本挂机,整个分区就无法正常工作了。设置成replication.factor=min.insybc.replicas+1
副本挂机,整个分区就无法正常工作了。设置成replication.factor=min.insybc.replicas+1
8、确保消息消费完再提交。Counsumer端有个参数enable.auto.commit,最好把它设置成false,并采用手动提交位移的方式。
高级功能
Kafka拦截器
生产者拦截器
允许再发送消息前以及消息提交后植入拦截器逻辑
消费者拦截器
下消费消息前以及提交为以后编写特定逻辑
TCP连接管理
幂等性生产者与事务
消息可靠性保证
最多一次(at most once):消息可能会丢失,但绝不会被重复发送
(默认)至少一次(at least once):消息不会丢失,但有可能被重复发送
精确一次(exactly once):消息不会丢失,也不会被重复发送
幂等性(Idempotence)
指某些操作或函数能够被执行多次,但每次得到的结果都是不变的。
幂等性Producer
producer默认不是幂等的
props.put("enable.idempotence",true)或者props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)。
enable.idempotence被设置成true后,Producter自动升级成幂等性Producer。
enable.idempotence被设置成true后,Producter自动升级成幂等性Producer。
所用范围
只能保证单分区上的幂等性
只能实现单会话上的幂等性
事务(Transaction)
事务Producer
开启enable.idempotence=true
设置Producer端参数transactional.id。最好设置一个有意义的名字
API
初始化事务:initTranscation
事务开始:beginTransaction
事务提交:commitTransaction
终止事务:abortTransaction
Consumer端设置
设置isolation.level参数
read_uncommitted:这事默认值,表明Consumer能读取kafka写入的任何消息。
read_committed:表明Consumer只会读事务型Producer成功提交事务写入的消息。
当然也能看到非事务型Producer写入的所有信息。
当然也能看到非事务型Producer写入的所有信息。
消费者
消费者组(Consumer group)
Consumer Group是kafka提供的可扩展且具有容错性的消费者机制
理解
1、consumer Group下可以有一个或者多个Consumer实例。这里的实例可以是一个
单独的进程,也可以是同一个进程下的线程。在实际场景中,使用进程更为常见。
单独的进程,也可以是同一个进程下的线程。在实际场景中,使用进程更为常见。
2、Group ID是一个字符串,在一个kafka集群中,它标识位移的一个Consumer Group。
3、Consumer Group下所有实例订阅的主题的单个分区,只能分配给组内的某个
Consumer实例消费。这个分期当然也可以呗其他的Group消费。
Consumer实例消费。这个分期当然也可以呗其他的Group消费。
位移主题(_consumer_offsets)
原理
将Consumer的位移数据作为一条条普通的kafka消息,提交到_consumer_offsets中,
_consumer_offsets的主要作用是保存Kafka消费者的位移信息
_consumer_offsets的主要作用是保存Kafka消费者的位移信息
格式
key格式
唯一主题的Key中应该保存3部分内容<Group Id,主题名,分区号>
消息格式
第一种格式:位移值、时间戳、用户自定义的数据等
第二种格式:用于保存Consumer Group信息的消息
第三种格式(墓碑消息/delete mark):用于删除Group过期位移设置是删除Group的消息
创建
当kafka集群中的第一个Consumer程序启动时,Kafka会自动创建位移主题
分区数
Broker端offsets.topic.num.paritions的取值,默认50
副本数和备份因子
Broker端offsets.topic.replication.factor参数,默认3
提交位移
参数
Consumer端的enable.auto.commit
true时自动提交
auto.commit.interval.ms
控制提交间隔
自动提交位移
问题:只要Consumer启动着,就会无限期地向位移主题写入消息
手动提交位移
API:consumer.commitSync等
删除过期消息
Compact策略
Kafka提供了专门的后台线程定期地巡检待Compact的主题,
看看是否存在满足条件的可删除数据
看看是否存在满足条件的可删除数据
重平衡(Rebalance)
本质上是一种协议,规定了一个Consumer Group下的所有Consumer如何达成一致,
来分配订阅的每个分区。
来分配订阅的每个分区。
触发条件
组成员数发生变更
订阅主题数发生变更
订阅主题的分区发生变更
缺点
对Consumer Group消费国成有极大的影响。在Reblalance过程中,
所有的Consumer实例都会停止消费,等待Rebalance完成。
所有的Consumer实例都会停止消费,等待Rebalance完成。
Rebalance的设计是所有Consumer实例共同参与,全部重新分配所有分区。
Rebalance太慢
非必要Rebalance
未及时发送信条,导致Consumer被踢出
设置session.timeout.ms=6s
设置:heartbeat.interval.ms=2s
保证Consumer实例在被判定为“dead”之前,能够发送至少
3轮心跳请求,即session.timeout.ms>=3*heartbeat.interval.ms
3轮心跳请求,即session.timeout.ms>=3*heartbeat.interval.ms
Consumer消费时间过程导致
设置max.poll.interval.ms大一点的值
位移提交
用户角度
自动提交
缺陷:比如5秒提交一次,3秒的时候发生Rebalance,导致3面数据重复消费
手动提交
缺陷:在调用commitSync()时,COnsumer程序会处于阻塞状态,知道Broker返回提交结果。
consumer.commitAsync():异步操作,不会阻塞
commitASync()不能替代commitSync()
commitAsync()失败后不会自动重试
Consumer角度
同步提交
异步提交
异常处理
原因
CommitFailedException,就是Consumer客户端在提交位移时出现了错误
或异常,而且还是那种不可恢复的严重异常。
或异常,而且还是那种不可恢复的严重异常。
在手动提交位移时发生(commitSync方法)
解决办法
增加期望的时间间隔max.poll.interval.ms参数值
减少poll方法一次性返回的消息数量,即减少max.poll.records参数值
发生场景
场景一:当消息处理的总时间超过了预设的max.poll.inerval.ms参数值时
1、缩短单条消息处理的时间
2、增加Consumer端允许下游系统消费一批消息的最大时长(max.poll.interval.ms)
3、减少下游系统一次性消费的消息总数(max.poll.records参数值)
4、下游系统采用多线程来加速消费
场景二:设置了相同group.id值的消费者组程序和
独立消费者程序。在独立消费者程序手动提交位移
时抛出异常。
独立消费者程序。在独立消费者程序手动提交位移
时抛出异常。
多线程开发实例
1、消费者程序启动多个线程,每个线程维护专属的kafkaConsumer实例
负责完整的消息获取、消息处理流程。
负责完整的消息获取、消息处理流程。
2、消费者程序使用单或多线程获取消息,同时创建
多个消费者线程执行消息处理逻辑。
多个消费者线程执行消息处理逻辑。
TCP连接管理
group监控
消费进度\滞后进度
(消费者Log\Consumer Log)
(消费者Log\Consumer Log)
指消费者当前落后生产者的程度
监控方法
1、使用kafka自带的命令行工具kafka-consumer-groups脚本
bin/kafka-consumer-groups.sh(bat)
2、使用kavka javaConsumerAPI编程
3、使用kafka自带的JMX监控指标
0 条评论
下一页