Kafka源码
2022-05-08 18:37:20 14 举报
kafka
作者其他创作
大纲/内容
代码结构
checkstyle
bin 执行脚本
clients 客户端(java写的)
connect 获取数据源
core 消息系统(Scala写的)
examples 使用样例
gradle
streams 流式计算和处理(java)
KafkaProducer
KafkaProducer核心组件
在一个JVM内部,如果要搞多个kafkaproducer的话,每个都会默认生成一个client-id,producer-自增长数字
Partitioner 分区器:用来决定发送的每条消息路由到Topic的哪个分区
Metadata
用来从broker集群去拉取元数据 Topics(Topic -> Partitions(Leader+Followers, ISR)),后面如果写消息到Topic,才知道这个Topic有哪些Partitions,和分区Leader所在的Broker
启动的时候会拉取一次,后面每隔一小段时间会再次发送请求刷新元数据 metadata.max.age.ms 默认值5分钟
发送消息的时候如果发现某个topic本地没有metadata,会尝试拉取这个topic的元数据
核心参数
maxRequestSize 请求最大大小 默认1MB
totalMemorySize 缓冲区大小 默认32MB
retryBackOffMs 重试时间间隔 默认100MS
maxBlockTimeMs 缓冲区填满后阻塞时间 默认60S
requestTimeoutMS 请求超时时间 默认30s
RecordAccumulator 缓冲区 负责消息复杂的缓冲机制
batchSize 一次请求消息打包的大小 默认16K
lingerMS 设置指定时间,如果batchSize没凑够也会发送消息
网络通信组件 NetworkClient
MaxInFlightRequestsPerConnection 同一个broke最多能受到几个没有响应的请求 默认5
ConnectionMaxIDLE空闲网络连接的最大数量
一个网络连接最多空闲时间 默认9分钟
重试连接的时间间隔 默认5MS
SendBuffer Socket发送缓冲区的大小 默认128K
ReceiveBuffer Socket接收缓冲区大小 默认32K
Sender线程类叫“KafkaThread”,线程名字叫"kafka-producer-network-thread" 负责从缓冲区获取消息发送到broke上去
acks 默认是1 只要leader写入成功就返回
retries 失败重试次数 默认0
序列化组件
拦截器组件
KafkaProducer在初始化的时候会不会从集群拉取元数据?
并没有真正的去某一个broke上拉取元数据,但是把配置的broke地址转化为node,放在cluster对象实例里,对集群元数据做了初始化,在后面根据发送消息时候的topic,再去拉取
KafkaProducer.send()执行流程
回调自定义的拦截器
序列化key和value,将数据转换成字节数组
基于获取到的topic元数据,使用Prititioner组件获取消息对应的分区
分区策略
没有指定key时,kafka会使用自己维护的一个AutomicInteger类型的 自增counter均匀的把消息分配到这个topic的分区上
如果指定了key,kafka会使用自己实现的murmur2工具类将key这个字节数组转换为一个hash值,然后对分区数进行取模,可以保证相同的key都在一个分区
检查要发送的消息是否超出消息最大大小,和内存缓冲最大大小
将消息添加到内存缓冲里去
BufferPool里面有一个deque作为队列缓存一些ByteBuffer内存空间,可以复用,大小是batch大小
BytePool数量 * batch 大小 = 已经缓存内存空间大小
availableMemory是剩余还可以使用的空间大小
设置好自定义的callback回调函数以及对应拦截器的回调函数
如果某个分区对应的batch满了或者创建了新的batch,此时就会唤醒sender线程发送batch
bath是如何判定可以被发送出去的
batch已满
时间超过linger.ms
内存耗尽
客户端要关闭了
强制flush所有数据时
判断目标broke是否可以发送请求过去
如果不满足上面条件,如何建立连接
概述
消息队列
Kafka是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域
解耦
可恢复性
缓冲
灵活性&峰值处理能力
异步通信
模式
点对点
发布/订阅
0 条评论
下一页