Kafka源码学习
2022-06-25 13:24:21 0 举报
kafka消息发送
作者其他创作
大纲/内容
handleCompletedReceives
transportLayer#write()
暂存channel中关注OP_WRITE
MemoryRecordsBuilder
ByteBuffer
添加
返回阻塞等待的时间
OP_CONNECT
waitOnMetadata()
byteBuffer
total momery默认32M
selectionKey
List<ClientRequest>
阻塞等待
是否过期
kafkaSelector
cluster信息
poll
List<ClientResponse> responses
kafkaChannel#receive()
wait(60s)
version
Patitioner将消息路由到分区
kafkaChannel#write()
拉取成功唤醒主线程
free缓冲池是否为空
broker id
获取topic的partition数量
发起连接请求
根据标识位,判断是否更新
kafkaChannel
等待拉取元数据
ProducerBatch
封装请求发送到Broker
是
拉取topic元数据
读取数据处理粘包和拆包问题
更新触发拉取标识位
getOrCreateQueue
allocate()
socketChannel#write()
后台线程
调用write将clientRequest的数据写出去
消息写入内存缓冲区
BufferPool
metadata元信息
否
handleCompletedSends
Deque
数据是否读取完成
哪些broker可以发送数据
peekFirst
List<Send> completedSends发送已完成的请求
inFlightRequestsDeque<NetworkClient.InFlightRequest> reqs 发送后没有收到响应
释放内存块
disConnected
读取响应后删除inFightRequest
topicPartition
brokerId
是否可以读数据
add()
不关注OP_CONNECT,增加OP_READ
kafka thread kafka-producer-network-thread
kafkaChannel#send()
NetworkClient
通过size和buffer是否remaining来解决粘包问题
遍历每个准备好的节点
是否建立连接1. 如果集群元数据信息没有正在更新,或者需要发送更新请求2. 当前连接状态为READY,channel为ready,3. InFlightRequest,为空或者第一个发送成功,或者发送未成功的请求小于5
从池中获取一个byteBuffer
tryAppend()尝试获取batch
设置更新标识
byteBuffer#writeTo()发送多个byteBuffer
broker和client是否可以建立连接
run()
调用每条消息的回调函数
Producer
free queue
Connecting
socketChannel
selector
Senderrunnable
finishConnect
是否有异常
pollFirst
调用自定义的拦截器
TransportLayer
唤醒与此发送线程关联的selector
write
networkClient
broker
this.meta.update()
是否可以写数据
是否可以建立连接
OP_WRITE
topics集合
NetWorkReceive#readFrom()
三次握手连接的建立
OP_READ
broker连接状态更新CONNECTED
发送请求数据
socketChannel#finishConnect
封装
waiters queue
是否为空
kafkaChannel#read()
send
peek.last()
needUpdate标识位
batch重新入队
封装为request
available Memory= available Memery + free
topic是否存在
取消对OP_WRITE的监听添加数据到已完成的list中
append()
初始化topicPartition为空
分配一块byteBuffer
broker是否有连接
监听
遍历每个完成的响应
Connected
尝试跟broker建立连接
Sender线程
处理响应流程
1. 每次先从batch queue 获取batch2. batch不为空或者未满直接写入3. 否则从bufferPoll申请byteBuffer4. bufferPoll 的free queue有,则直接pollFirst5. 否则分配一个byteBuffer6. 并发下,double check多申请byteBuffer,还到到free queue
ClusterContectionStates
是创建一个ByteBuffer
RecordAccumulator内存缓冲区
Kafka Broker
attach
0 条评论
回复 删除
下一页