kafka核心
2021-11-22 10:41:10 0 举报
AI智能生成
KAFKA核心
作者其他创作
大纲/内容
方便实现“Read-your-writes”
方便实现单调读(Monotonic Reads)
好处
基于领导者(Leader-based)的副本机制
不对外提供服务
追随者副本
副本角色
备份机制
吞吐量差
缺陷
请求发送非常不频繁的系统
适用
顺序请求
开销大
请求发送频率很低的业务场景
每个请求使用单独线程处理
方案
Reactor模式是事件驱动架构的一种是先方式,特别适合应用于处理多个客户端并发向服务器端发送请求的场景。
默认3,表示每台Broker启动时会创建3个网络线程,专门处理客户端发送的请求
num.network.threads
Reactor模式
请求处理
Rebalance全流程解析
kafka-topic脚本
1、主题管理(创建、删除、增加分区)
kafka-reassign-partitions脚本
2、分区重分配
只要是kafka为了避免Broker负载过量而提供的一种换Leader的方案。
3、Preferred领导者选举
包括自定检测新增Broker、Broker主动或被动宕机
4、集群成员管理(新增Broker、Broker主动关闭、Broker宕机)
5、数据服务
职责
当运行中的控制器突然宕机或意外终止时,kafka能够快速地感知到,并立即启用备用控制器来替代之前失效的控制器。
控制器故障转移(Failover)
Controller
定义消息可见性,即用来标识分区下的哪些消息是可以被消费的
帮助kafka完成副本同步
作用
高水位
kafka原理
主题管理
动态配置
消费者组位移管理
KafkaAdminClient
认证机制
MirrorMaker
监控框架
授权管理
kafka调优
流处理应用搭建实例
运维与监控
Kafka Streams
Kafka DSL开发
应用实例
高级kafka应用
只适合于个人测试或用于功能验证,千万不要应用于生产环境
Windows
阻塞式I/O
非阻塞式I/O
I/O多路复用
信号驱动I/O
异步I/O
I/O模型的使用
数据网络传输效率
社区支持度
最多,最优
能享受到零拷贝基数所带来的的快速数据传输特性
Linux
macOS
操作系统
使用机械磁盘完全能够胜任kafka线上环境
追求性价比的公司可以不搭建RAID,使用普通磁盘组成存储空间即可
磁盘
新增消息条数
消息留存时间
平均消息大小
备份数
是否启用压缩
磁盘容量
带宽
线上方案制定
静态参数:指必须在Kafka的配置文件server.properties中进行设置的参数,同时必须重启Broker进程才能生效。
主体级别参数:Kafka提供专门的kafka-configs命令来修改
JVM和操作系统级别参数
参数类型
提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据,有更高的吞吐量
能够实现在故障转移,即Failover
log.dir:单个路径,对上一个参数的补充(只设置log.dirs就行,不要设置这个参数)
针对存储信息
负责协调管理并保存Kafka集群的所有元数据信息,比如集群有哪些Broker在运行、创建了哪些Topic、每个Topic都有多少分区以及这些分区的Leader副本都在哪些机器上等信息。
Zookeeper的作用
多个kafka集群使用同一套zookeeper集群
chroot
Zookeeper参数
学名监听器,告诉外部连接者要通过什么协议访问指定主机名和端口开放的kafka服务
一旦自定义,就要指定listener.scurity.protocol.map参数,告诉这个协议底层使用了哪种安全协议。比如listener.scurity.protocol.map=CONTROLLER:PLAINTEXT。表示CONTROLLER这个自定义协议底层使用明文不加密传输数据。
主机名建议:全部使用主机名,即Broker端和Client端应用配置中全部填写主机名。
listeners
和listeners相比多了个advertised。Advertised的含义表示宣称的、公布的,就是说这组监听器是Broker用于对外发布的。
advertisend.listeners
过期的参数,不需要指定,可以忘掉
host.nome/port
Broker之间通讯
建议设置成false,防止线上存在稀奇古怪的topic
auto.create.topics.enable:是否允许自动创建Topic
默认false,建议显示设置成false
unclean.leader.election.enable:是否允许Unclean Leader选举
建议生产环境设置成false
auto.leader.rebalance.enable:是否云系定期进行Leader选举
Topic管理
log.retention.{hours|minutes|ms}:都是控制一条消息数据被保存多长时间,优先级:ms>minutes>hours
log.retention.bytes:指定Broker为消息保存的总磁盘容量大小,默认-1,表明不限制。
message.max.bytes:控制Broker能够接受的最大消息大小。默认1000012,不到1MB,太少了。建议设置一个比较大的值。
数据留存
Broker端参数
retention.ms:规定该消息被保存的时长,默认7天。设置该参数会覆盖Broker端的全局参数。
retention.bytes:规定了要为Topic预留多大空间,默认设置为-1.该参数再多租户的Kafka集群中会有用武之地。
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880
创建Topic时进行设置
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760
个人建议使用这种方式
修改Topic时设置
设置方式
Topic级别参数
将JVM堆大小设置成6GB
如果Broker所在机器的CPU资源非常充裕,建议使用CMS收集器。启用方法是指定XX:+UseCurrentMarSweepGC
否则使用吞吐量收集器。开启方法时指定XX:+UseParalleGC
Java7
手动设置使用G1收集器
Java8
GC设置
KAFKA_HEAP_OPTS:指定堆大小
KAFKA_JVM_PERFOR<ANCE_OPTS:指定GC参数
例:$> export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g$> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true$> bin/kafka-server-start.sh config/server.properties
设置(环境变量)
JVM参数
ulimirt -n 1000000
文件描述符限制
ext3
ext4
最好选用XFS
XFS
文件系统类型
配置成一个接近0但不是0的值
Swappiness
适当拉大提交间隔
提交时间
操作系统参数
集群配置参数
基本使用
使用纯二进制字节序列
编码格式
点对点模型(也叫消息队列模型)
发布者(Publisher)
订阅者(Subscriber)
主题(Topic)
发布/订阅模型
传输协议
消息引擎基础
消息(Record)
发布订阅的对象叫做主题
主题(topic)
向主题发送消息的客户端应用程序叫生产者
生产者(Producer)
订阅主题消息的客户端应用程序叫做消费者
记录当前消费了分区的哪个位置上(消费者消费进度的指示器)
消费者位移(Consumer Offset)
消费者(Consumer)
多个消费者实例共同组成一个组来消费一组主题。这个组主题中的每个分区都只会被组内的一个消费者实例消费
消费者组(Consumer Group)
客户端(clients)
Broler负责接受和处理客户端发送过来的请求,以及对消息进行持久化
通常将不同的Broker分散运行在不同的机器上,避免一台机器宕机,全部Broker进程都挂掉。
由被称为Broker的服务进程构成
服务器端
对外(客户端)进行交互
领导者副本(Leader Replica)
只是被动地追随领导者副本而已,不能与外界交互
追随者副本(Follower Replica)
生产者总是向领导者副本写消息
消费者总是从领导者副本读消息
追随者副本只做一件事:向领导者副本发送请求,请求领导者把最新生产的消息发给它,这样它能保持与领导者的同步。
副本的工作机制
就是把相同的数据拷贝到多台机器上,而这些相同的数据拷贝称为副本(Replica)
备份机制(Replication)
伸缩性(Scalability)
将每个主题划分成多个分区,每个分区是一组有序的消息日至。生产者生产的每条消息只会发送到一个分区中。
每条消息在分区中的位置信息由一个叫位移的舒俱来表征
位移(Offset)
分区(Partitioning)
高可用
每个主题可以配置M个分区,而每个分区又可以配置N个副本。
第一层是主题层
每个分区的N个副本中只能有一个充当领导角色,对外提供服务。其他N-1个副本是追随者副本,只是提供数据冗余之用。
第二层是分区层
分区中包含若干条消息,每条消息位移从0开始,依次递增。
第三层是消息层
三层消息架构
kafka使用消息日志(Log)来保存数据,一个日志就是在磁盘上一个只能追加写(Append-only)消息的物理文件
因为是追加,避免了缓慢的随机I/O操作,采用顺序I/O写操作
删除旧日志:通过日志段(Log Segment)机制
kafka后台有定时任务定期检查老日志段是否被删除,实现回收磁盘空间的目的
持久化
消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance是kafka消费者端实现高可用的重要手段。
重平衡(Rebalance)
kafka基本术语
是消息引擎系统,也是一个分布式流处理平台
更容易实现端对端的正确性
kafka自己对于流式计算的定位
优势
kafka角色定位
社区版
Apache Kafka
跨数据中心备份
Schema注册中心
集群监控工具
商业化
Confluent Kafka
提供大数据平台
Cloudera/Hortonworks Kafka
版本号
kafka版本选择
kafka入门
作用:提升负载均衡的能力,实现系统的高伸缩性(Scalability)
默认分区策略
需要显示地配置生产者端的参数partitioner.class
自定义分区策略
轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是最常用的分区策略之一。
随机策略(Randomness)
按消息键保存策略(Key-ordering)
一般只针对于大规模Kafka集群,特别是跨城市、跨国家、甚至是跨大洲的集群
基于地理位置的分区策略
其他策略
分区策略
分区机制
生产者程序中配置compression.type参数表示启用指定的压缩算法
生产者端
Broker端
合适压缩
GZIP
Snappy
Lz4
2.1.0版本前
增加zstd(Zstandard)算法
2.1.0后
LZ4>Snappy>zstd和GZIP
吞吐量
zstd>LZ4>GZIP>Snappy
压缩比
算法优劣
压缩算法
kafka只对“已提交”的消息(committed message)做有限度的持久化保证
不等待确认消息,立即返回
producer.sens(msg)
带回调通知的发送API
生产者丢失数据
解决方案:维持先消费消息,再更新位移的顺序
Consumer程序不要开启自动提交位移,而是要应用程序手动提交位移
重复消费(多线程消费)
消费者程序丢失数据
2、设置acks=all。acks是Producer的一个参数,代表了对“已提交”消息的定义,如果设置成all,则表明所有副本Broker都要接收到消息,该消息才是“已提交”。这是最高等级的“已提交”定义
3、设置retries为一个较大的值。这里的retries是Producer的参数,对应Producer自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时设置了retries>0的Producer能够自动重试,避免消息丢失。
4、设置unclean.leader.election.enable=false。这是Broker端的参数,它控制的是哪些Broker有资格竞选分区Leader。如果一个Broker落后原先的Leader太多,那么它一旦成为新的Leader,就会造成数据丢失,设置为false是不允许这种情况发生。
5、设置replication.factor>=3。Broker端的参数,标识最好将消息保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
6、设置min.insync.replicas>1。Broker端参数,控制的是消息至少要被写入多少个副本才算“已提交”。设置成大于1可以提升消息持久性。实际环境中千万不要使用默认值1.
7、确保replication.factor>min.insync.replicas。如果两者相等,那么只要其中一个副本挂机,整个分区就无法正常工作了。设置成replication.factor=min.insybc.replicas+1
8、确保消息消费完再提交。Counsumer端有个参数enable.auto.commit,最好把它设置成false,并采用手动提交位移的方式。
最佳实践
无消息丢失配置
允许再发送消息前以及消息提交后植入拦截器逻辑
生产者拦截器
下消费消息前以及提交为以后编写特定逻辑
消费者拦截器
Kafka拦截器
高级功能
TCP连接管理
最多一次(at most once):消息可能会丢失,但绝不会被重复发送
(默认)至少一次(at least once):消息不会丢失,但有可能被重复发送
精确一次(exactly once):消息不会丢失,也不会被重复发送
消息可靠性保证
指某些操作或函数能够被执行多次,但每次得到的结果都是不变的。
producer默认不是幂等的
props.put(\"enable.idempotence\
幂等性Producer
只能保证单分区上的幂等性
只能实现单会话上的幂等性
所用范围
幂等性(Idempotence)
开启enable.idempotence=true
设置Producer端参数transactional.id。最好设置一个有意义的名字
初始化事务:initTranscation
事务开始:beginTransaction
事务提交:commitTransaction
终止事务:abortTransaction
API
事务Producer
read_uncommitted:这事默认值,表明Consumer能读取kafka写入的任何消息。
read_committed:表明Consumer只会读事务型Producer成功提交事务写入的消息。当然也能看到非事务型Producer写入的所有信息。
设置isolation.level参数
Consumer端设置
事务(Transaction)
幂等性生产者与事务
生产者
Consumer Group是kafka提供的可扩展且具有容错性的消费者机制
1、consumer Group下可以有一个或者多个Consumer实例。这里的实例可以是一个单独的进程,也可以是同一个进程下的线程。在实际场景中,使用进程更为常见。
2、Group ID是一个字符串,在一个kafka集群中,它标识位移的一个Consumer Group。
3、Consumer Group下所有实例订阅的主题的单个分区,只能分配给组内的某个Consumer实例消费。这个分期当然也可以呗其他的Group消费。
理解
消费者组(Consumer group)
将Consumer的位移数据作为一条条普通的kafka消息,提交到_consumer_offsets中,_consumer_offsets的主要作用是保存Kafka消费者的位移信息
原理
唯一主题的Key中应该保存3部分内容<Group Id,主题名,分区号>
key格式
第一种格式:位移值、时间戳、用户自定义的数据等
第二种格式:用于保存Consumer Group信息的消息
第三种格式(墓碑消息/delete mark):用于删除Group过期位移设置是删除Group的消息
消息格式
格式
当kafka集群中的第一个Consumer程序启动时,Kafka会自动创建位移主题
Broker端offsets.topic.num.paritions的取值,默认50
分区数
Broker端offsets.topic.replication.factor参数,默认3
副本数和备份因子
创建
true时自动提交
Consumer端的enable.auto.commit
控制提交间隔
auto.commit.interval.ms
参数
问题:只要Consumer启动着,就会无限期地向位移主题写入消息
自动提交位移
API:consumer.commitSync等
手动提交位移
提交位移
Compact策略
Kafka提供了专门的后台线程定期地巡检待Compact的主题,看看是否存在满足条件的可删除数据
删除过期消息
位移主题(_consumer_offsets)
本质上是一种协议,规定了一个Consumer Group下的所有Consumer如何达成一致,来分配订阅的每个分区。
组成员数发生变更
订阅主题数发生变更
订阅主题的分区发生变更
触发条件
对Consumer Group消费国成有极大的影响。在Reblalance过程中,所有的Consumer实例都会停止消费,等待Rebalance完成。
Rebalance的设计是所有Consumer实例共同参与,全部重新分配所有分区。
Rebalance太慢
缺点
设置session.timeout.ms=6s
设置:heartbeat.interval.ms=2s
保证Consumer实例在被判定为“dead”之前,能够发送至少3轮心跳请求,即session.timeout.ms>=3*heartbeat.interval.ms
未及时发送信条,导致Consumer被踢出
设置max.poll.interval.ms大一点的值
Consumer消费时间过程导致
非必要Rebalance
重平衡(Rebalance)
缺陷:比如5秒提交一次,3秒的时候发生Rebalance,导致3面数据重复消费
自动提交
缺陷:在调用commitSync()时,COnsumer程序会处于阻塞状态,知道Broker返回提交结果。
consumer.commitAsync():异步操作,不会阻塞
commitAsync()失败后不会自动重试
commitASync()不能替代commitSync()
手动提交
用户角度
同步提交
异步提交
Consumer角度
位移提交
CommitFailedException,就是Consumer客户端在提交位移时出现了错误或异常,而且还是那种不可恢复的严重异常。
在手动提交位移时发生(commitSync方法)
原因
增加期望的时间间隔max.poll.interval.ms参数值
减少poll方法一次性返回的消息数量,即减少max.poll.records参数值
解决办法
1、缩短单条消息处理的时间
2、增加Consumer端允许下游系统消费一批消息的最大时长(max.poll.interval.ms)
3、减少下游系统一次性消费的消息总数(max.poll.records参数值)
4、下游系统采用多线程来加速消费
场景一:当消息处理的总时间超过了预设的max.poll.inerval.ms参数值时
场景二:设置了相同group.id值的消费者组程序和独立消费者程序。在独立消费者程序手动提交位移时抛出异常。
发生场景
异常处理
1、消费者程序启动多个线程,每个线程维护专属的kafkaConsumer实例负责完整的消息获取、消息处理流程。
2、消费者程序使用单或多线程获取消息,同时创建多个消费者线程执行消息处理逻辑。
多线程开发实例
指消费者当前落后生产者的程度
消费进度\\滞后进度(消费者Log\\Consumer Log)
bin/kafka-consumer-groups.sh(bat)
1、使用kafka自带的命令行工具kafka-consumer-groups脚本
2、使用kavka javaConsumerAPI编程
3、使用kafka自带的JMX监控指标
监控方法
group监控
消费者
客户端
KAFKA核心
0 条评论
回复 删除
下一页