kafka消息发送流程
2021-04-28 16:43:07 16 举报
kafka producer消息发送脉络
作者其他创作
大纲/内容
runOnce()
序列化key valuekeySerializer.serialize() valueSerializer.serialize
doSend
获取元数据信息waitOnMetadata()
获取准备好发送的分区列表
for循环 按每个节点传输List<ProducerBatch>>sendProduceRequest
sender线程
生产者
kafkaProducer.send()kafkaProducer.doSend()
累加到accumulator
kafkaTemplate.send()
构造clientRequest
broker
强制更新元数据this.metadata.requestUpdate()
for循环找出最小的magic
从所有的batches中获取达到投递时间的batchthis.accumulator.expiredBatches(now)
// 获取该topic的元数据信息1.waitOnMetadata(...)// 序列化key value2. keySerializer.serialize() valueSerializer.serialize// 根据分区路由规则获取到发送到哪个分区3.partition()4. 本条消息的大小(字节数)AbstractRecords.estimateSizeInBytesUpperBound()5.验证 大小是否超过限制ensureValidRecordSize()6. accumulator.append() 缓存消息 将消息累加到对应的批次中7. 若是新的批次6会累加失败,会再一次accumulator.append() 创建一个新的Deque放到batchs里 并将该消息放到最后位置8. 如果该批次空间大小已满或者是一个新的批次唤醒sender线程9.return future
创造发送请求this.accumulator.drain
拦截器
return future
this.accumulator.ready()
对应代码
sendProduceRequests
根据分区规则获取发动到的partition
for循环将records根据magic向下转换
构造requestBuildercallback
如果不是内部请求 !isInternalRequest验证是否能够发送 !canSendRequest
partitioner
更新发送请求相关的metricsupdateProduceRequestMetrics
interceptor
获取version
producer
selector.send(send);
producer.send()
如果存在不知道leader节点的topic
long pollTimeout = sendProducerData(currentTimeMs)
验证该条消息大小
1. inProcess == null 没有在进行中的请求 & 元数据更新时间到了2 链接状态没有ready || 终止时间小于现在时间 3. 该node的kafakaChannel为空或者不是ready4. 该node在InFlightRequest的Deque为空 或者里面的个数大于配置的maxInFlightRequestsPerConnection
将准备发送的批次放到inflightBatch中addToInflightBatches
分区路由
转换后的record放入到一个mapproducerbatch放到一个map
代码发送到RecordAccumulator流程
NetWorkClient.doSend()
序列化器
释放该batch申请的空间从inflightbatch中移除
将上诉两步获取的list合并,for循环failBatch
ProducerBatch
kafka发送流程图
从inflightbatch获取达到投递时间的batchgetExpiredInflightBatches
本地缓存RecordAccumulator
KafkaTemplatedoSend()
移除没有准好的节点
构造了两个map
serializer
收藏
0 条评论
下一页