kafka producer架构
2020-08-25 10:31:33 4 举报
AI智能生成
kafka producer架构
作者其他创作
大纲/内容
1、KafkaProducer是生产者的入口,也是主线程,它还维护sender子线程。
2、在主线程中,不断往RecordAccumulator中追加消息。
3、RecordAccumulator是一个消息的仓库,当有消息batch封箱完成时,KafkaProducer会唤醒Sender线程做消息的发送处理。
4、Sender首先把batch按照要发往的node分组,生成ClientRequest请求对象。
5、Sender再通过NetworkClient的send方法,把ClientRequest需要的资源准备好,如Channel,数据等。
6、Sender最后通过NetworkClient的poll方法,底层通过nio把准备好的请求最终发送出去。
7、Sender再统一处理response,进行重试或者回调。
2、在主线程中,不断往RecordAccumulator中追加消息。
3、RecordAccumulator是一个消息的仓库,当有消息batch封箱完成时,KafkaProducer会唤醒Sender线程做消息的发送处理。
4、Sender首先把batch按照要发往的node分组,生成ClientRequest请求对象。
5、Sender再通过NetworkClient的send方法,把ClientRequest需要的资源准备好,如Channel,数据等。
6、Sender最后通过NetworkClient的poll方法,底层通过nio把准备好的请求最终发送出去。
7、Sender再统一处理response,进行重试或者回调。
KafkaProducer.send()
kafakProducer.send(TOPIC,MSG)
拦截器预处理
判断producer是否可用
判断topic所在的metadata是否可用
对key(TOPIC)和value(MSG)进行序列化
计算出要消息要发往的TOPIC的PARTATION,并将要发往的TOPIC和PARTATION封装
计算序列化后的大小
通过RecordAccumulator.append()把消息加入到batch中
如果batch满了或者新建了一个batch,就唤醒sender执行发送
RecordAccumulator.append()
通过原子操作,把追加消息的线程数+1
获取主题分区对应的ProducerBatch队列,如果没有此队列则新建
同步代码中尝试将消息tryAppend()会出现以下情况:
a:如果dq中有batch就向队尾的batch加入消息
a.1:添加成功
a.2:如果添加失败则说明队尾的那个batch已满,需要继续往队尾新建batch
b:如果dq中没有batch,返回null
a:如果dq中有batch就向队尾的batch加入消息
a.1:添加成功
a.2:如果添加失败则说明队尾的那个batch已满,需要继续往队尾新建batch
b:如果dq中没有batch,返回null
预估size大小,从BytePool申请ByteBuffer
同步代码块中,再次添加看是否成功
若还未成功,则创建batch,并且创建ProducerBatch里面的MemoryRecordsBuilder
然后真正的调用batch的tryAppend()方法去append消息
batch的tryAppend()和RecordAccumulator的tryAppend()不同
batch的tryAppend()和RecordAccumulator的tryAppend()不同
将新建的batch加入dq,并将batch存入一个专门用来存未发送的batch的set
释放buffer,并且原子操作使追加消息的线程数-1
ProducerBatch.tryAppend()
调用MemoryRecordsBuilder.hasRoomFor()看看是否有足够空间容纳消息
调用MemoryRecordsBuilder.append()追加消息
保存存放了callback和对应FutureRecordMetadata对象的thunk到List<Thunk> thunks中
MemoryRecordsBuilder.append()
把字节数组形式的key和value转成Bytebuffer
计算写入的offset,如果不是第一次写入,那么lastOffset+1,否则是baseOffset
如果magic号大于1,那么调用appendDefaultRecord(),否则调用appendLegacyRecord()
在DefaultRecord.writeTo()方法中,通过调用Utils.writeTo()往DataOutputStream写入key,value,header
更新状态例如消息的数量,offset
Sender.run()
进入runOnce()方法,sender的主要逻辑实际在这个方法中。一旦running为false跳出主循环,根据状态判断是继续发送完成,还是强制关闭。强制关闭的话,通过accumulator.abortIncompleteBatches()把RecourdAccumulator中incomplete集合中保存的未完成ProducerBatch做相应的处理,对他们进行封箱,防止继续有新的消息被追加进来,然后从所属Deque中删除掉,释放掉BufferPool中的空间
Sender.runOnce()
long pollTimeout = sendProducerData(now)
client.poll(pollTimeout, now)触发网络IO,把消息真正发送出去
client.poll(pollTimeout, now)触发网络IO,把消息真正发送出去
Sender.sendProducerData()
根据accumulator中batch中的消息中的不同的消息对应不同的TOPIC,PARTATON的数据,然后根据TOPIC,PARTATION查找到kafka集群中相对应的node,看看那些node可用,哪些node不可用,然后返回ReadyCheckResult
如果获得到的某个node为null,也就是未知节点,也就是ReadyCheckResult 中的unknownLeaderTopics有值,那么则需要更新Kafka集群元数据,也就是更新kafka的metadata
循环返回的result中的readyNodes,检查KafkaClient对该node是否符合网络IO的条件,不符合的从集合中删除
通过accumulator.drain()方法把待发送的消息,从之前的ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches按node进行分组转换成Map<Integer, List<ProducerBatch>> batches
获取所有过期的batch,循环做过期处理
计算接下来外层程序逻辑中调用NetWorkClient的poll操作时的timeout时间
循环按照node调用sendProduceRequests(batches, now)方法,将待发送的ProducerBatch封装成为ClientRequest,然后“发送”出去。注意这里的发送,其实只是加入发送的队列。等到NetWorkClient进行poll操作时,才发生网络IO
返回client的poll操作timeout时间
Sender.sendProduceRequest()
入参时将上一步的batches的key(node)和value(信息)拆分,循环value;TopicPartition tp = batch.topicPartition; MemoryRecords records = batch.records(); 获取ProducerBatch中的TopicPartition信息和MemoryRecords信息。按照主题分区分堆ProducerBatch和MemoryRecords得到两个以TopicPartition为Key的Map,Map<TopicPartition, MemoryRecords>produceRecordsByPartition和Map<TopicPartition, ProducerBatch>recordsByPartition。此时node,TopicPartation,以及信息都已经明文获得
声明ProduceRequest.Builder对象,他内部有引用指向produceRecordsByPartition
生成ClientRequest对象,它包含ProduceRequest.Builder的引用。
调用NetWorkClient的send方法,做好发送ClientRequest的准备
NetworkClient.send()
檢查NetWorkClient是否為存活狀態
檢查是否是內部請求,如果不是內部請求,進行連接狀態等的檢查
獲取request中的AbstractRequest.Builder,也就是我們上一個方法創建的ProduceRequest.Builder
调用doSend(clientRequest, isInternalRequest, now, builder.build(version));這裡的builderb.build(version)就是ProduceRequest
NetWorkClient.doSend()
生成RequestHeader對象,包括apiKey 、version、clientId、 correlation这些属性
生成待发送的Send对象,这个Send对象封装了目的地和header生成的ByteBuffer对象,Send底层是nio的ByteBuffer
生成InFlightRequest請求。它持有ClientRequest,request,send等对象,把InFlightRequest添加到inFlightRequests中,
InFlightRequests中按照node的id存储InFlightRequest的队列
InFlightRequests中按照node的id存储InFlightRequest的队列
调用select.send()发送数据,这里是伪发送,只负责把对应的channel和send绑定起来,也就是把数据设置到相应的channel中。并设置channel关注的事件是WRITE
NetWorkClient.poll()
底层通过nio把准备好的请求最终发送出去
0 条评论
下一页