Kafka源码——KafkaProducer核心流程(version-0.10.1.0)
2022-01-20 20:15:43 26 举报
KafkaProducer核心流程主要包括以下几个步骤:首先,生产者将消息发送到指定的Topic中;然后,生产者将消息封装成一个个的ProducerRecord对象,并发送给KafkaBroker;接着,KafkaBroker将消息存储在对应的Partition中;最后,当有消费者订阅该Topic时,KafkaBroker会将消息发送给消费者。在整个过程中,KafkaProducer还负责处理网络异常、重试机制以及消息的序列化和反序列化等操作。
作者其他创作
大纲/内容
发送数据
KafkaChannel.write
Cluster(Kafka集群本身的元数)
首先要获取到队列里面一个批次
receive(receive);
RecordBatch last = deque.peekLast();
Set<SelectionKey> immediatelyConnectedKeys
ByteBuffer.allocate(16K)
topic-Partition1leader
List<String> connected完成建立连接的主机
lastRefreshMs上一次更新元数据的时间。
metadataExpireMs多久自动更新一次元数据,默认值是5分钟更新一次
TransportLayer transportLayer封装了SocketChannel、SelectionKey
update()
if (!size.hasRemaining())
boolean metadataFetchInProgress控制元数据拉取是否成功
ByteBuffer.allocate(16K)(availableMemory)
Add SelectionKey.OP_READ
if(channel.finishConnect())
唤醒后继续开始执行
5、synchronized (dq)尝试把数据写入到批次里面(线程一是失败,线程二进来执行这段代码的时候,是成功的)
1
handleInitiateApiVersionRequests(updatedNow)
result = send; send = null;
随机直接对key取一个hash值 % 分区的总数取模
ensureValidRecordSize(serializedSize);
topic-Partition3leader
dq.addLast(batch);
SelectionKey.OP_CONNECT
result = receive; receive = null;
一直在读取数据
发送消息
OP_WRITE
free.deallocate(buffer); return appendResult;
继续读
topic-Partition1follower1
3
Sender——Runnable
for(;;)run()
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
doSend()
int bytesRead = channel.read(size);
是否读完一个完整的响应消息
appendsInProgress.decrementAndGet();
java.nio.channels.Selector nioSelectorjavaNIO里面的Selector,Selector是负责网络的建立,发送网络请求,处理实际的网络IO
RecordBatch
socketChannel.register
归还内存
ByteBuffer[] buffers(write的数据)
如果size还有剩余的内存空间
Deque<RecordBatch> dq = getOrCreateDeque(tp);
queue.size() this.maxInFlightRequestsPerConnection
ByteBuffer buffer数据内容(this.buffer = ByteBuffer.allocate(receiveSize))
2
初始化kafkaProducer
RecordAccumulator(缓存组件)
receive.payload().rewind();
wait(remainingWaitMs);
return availablePartitions.get(part).partition();
释放锁
Metadata(元数据管理)
Dqueue
4、确认一下消息的大小是否超过了最大值
producer = new KafkaProducer<>(props);
handleResponse
TopicPartition 2
lastSuccessfulRefreshMs上一次成功更新元数据的时间
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
完成网络的连接
completedReceives(处理完了的响应)List<NetworkReceive>
执行网络IO的操作this.selector.poll
6、 给每一条消息都绑定他的回调函数
cluster = metadata.fetch();
Add SelectionKey.OP_CONNECT
6、根据内存大小封装批次线程一到这儿 会根据内存封装出来一个批次
poll()
Broker1(Controller)
send(send)
if (receive.complete())
申请内存
MetadataUpdater metadataUpdater处理更新元数据 实现类DefaultMetadataUpdater
NetworkClient(KafkaClient)
ByteBuffer size其实就是一个int类型的大小(int receiveSize = size.getInt())
Kafka 服务端
轮询
refreshBackoffMs更新元数据的请求的最小的时间间隔,默认值是100ms
NetworkReceive receive 接收到的响应(处理拆包,先读取4字节的数据,(代表的意思就是后面跟着的消息体的大小))
List<String> disconnected没有建立连接的主机
connect()
String clientId(例如:producer-1)
绑定 op_write
5、根据元数据信息,封装分区对象
if (keyBytes == null)
InFlightRequestsproducer向broker发送数据的时候,其实是有多个网络连接,默认值是5,代表每个网络连接可以忍受 producer端发送给broker 消息然后消息没有响应的个数
TopicPartition 1
inFlightRequests.canSendMore(node)
N发生拆包
7、 把消息放入accumulator(32M的一个内存,默认值)然后有accumulator把消息封装成为一个批次一个批次的去发送。
TopicPartition 3
不断的读取数据涉及到粘包和拆包
if(channel.ready() && key.isReadable() && !hasStagedReceive(channel))
3、根据分区器选择消息应该发送的分区
使用分区器进行选择合适的分区
Deque<NetworkClient.InFlightRequest> queue = requests.get(node)
写入队列成功释放内存
maybeHandleCompletedReceive
topic-Partition2leader
Metadata 元数据管理
SelectionKey.OP_WRITE
Send(实现类ByteBufferSend) 发送出去的请求
socketChannel.connect(address);
pollSelectionKeys(this.nioSelector.selectedKeys())
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
2、对消息的key和value进行序列化
String idibroker的id
写事件
从Selector上找到有多少个key注册了
BufferPool内存池(直接内存)(默认32M)
send.writeTo(transportLayer)
initiateConnect()
3、计算一个批次的大小
立马就要对这个Selector上面的key要进行处理
Y没有拆包的时候,真正的消息体大小
List<String> failedSends建立连接失败的主机
8、唤醒sender线程。他才是真正发送数据的线程
处理返回响应
去建立连接、发送数据this.client.poll
Send send = channel.write()
1、拉取元数据2、发送数据到服务端
this.nioSelector.selectNow()
this.sender.wakeup();
读事件
transportLayer.finishConnect();
检查与要发送数据的主机的网络是否已经建立好this.client.ready()
int receiveSize = size.getInt()
channel.write(buffers)
this.completedSends.add(send)
version元数据是有版本号,每次更新元数据,都会修改一下这个版本号
ByteBufferSend
连接事件
Main线程
1、先根据分区找到应该插入到哪个队列里面如果有已经存在的队列,那么我们就使用存在队列如果队列不存在,那么我们新创建一个队列
唤醒sender线程 this.client.wakeup();-- this.selector.wakeup();
kafka-producer-network-thread
阻塞主线程
topic-Partition2follower2
topic-Partition3follower1
7、 线程一把这个批次放入到这个队列的队尾
if (appendResult != null)
4、根据批次的大小去分配内存(线程一,线程二,线程三,执行到这儿都会申请内存)
接受服务端发送回来的响应(Read事件)
while ((networkReceive = channel.read()) != null)
if (size.hasRemaining())
selector.send(inFlightRequest.send);
分配内存
int readyKeys = select(timeout)
needUpdate上一次成功更新元数据的时间
read += bytesRead;return read
addSelectionKey.OP_WRITE
2、尝试往队列里面的批次里添加数据
Broker2(follower)
addToCompletedReceives();
往批次里面去写数据
先读取4字节的数据(消息体的大小)
数据全部处理完重置标志位
Broker3(follower)
topic-Partition3follower2
4
notifyAll()
queue.peekFirst()
这个last不为空插入数据
int part = Utils.toPositive(nextValue) % availablePartitions.size();
发送网络请求
this.buffer = ByteBuffer.allocate(receiveSize);
maybeCloseOldestConnection(endSelect);
handleCompletedReceives()
写完之后删除OP_WRITE事件
List<Node> nodeskafka的服务器的信息
更新触发拉取数据标志位sender.wakeup();
关闭资源
SelectionKey.OP_READ
topic-Partition2follower1
线程一,就往批次里面写数据,这个时候就写成功了。
selector
SelectionKey key
batches 队列(RecordBatch的默认大小16kb)
1、同步等待拉取元数据maxBlockTimeMs 最多能等待多久
0 条评论
下一页