02_kafka源码
2024-05-23 21:29:09 62 举报
为你推荐
查看更多
kafka源码深入剖析
作者其他创作
大纲/内容
RecordAppendResult appendResult = last.tryAppend(... ...)
Y
网络通信的组件,NetworkClient一个网络连接最多空闲多长时间(9分钟),每个连接最多有几个request没收到响应(5个),重试连接的时间间隔(50ms),Socket发送缓冲区大小(128kb),Socket接收缓冲区大小(32kb)
放入channel管理器中
connectionStates.isConnected(node)Broker是否已经建立了连接了
LRU保证活跃的连接数量,丢弃不常用连接,减轻客户端网络压力idleExpiryManager.update()
org.apache.kafka.clients.KafkaClient#ready检查一下是否准备好向broker发送数据,此时若没跟某个Broker建立好连接,必须在这里把长连接准备好,之后才可以把数据发送过去,直接就基于最底层的NIO来开发的
一个响应是否读取完整NetworkReceive#complete
发送NetworkClient#poll
N
阻塞一段时间maxBlockMs - 获取元数据耗费的时间
多线程争抢创建buffer,并修改这个pool已使用的空间底层是返回nio的byteBufferByteBuffer.allocate(size);
建立连接KafkaSelector
1
全部满足条件,说明网络组件准备好了
上锁第二次 : 尝试放入消息
当有新的batch创建的时候,就唤醒sender线程,因为底层的nioSelector有可能正在执行select(ms)卡住了。this.sender.wakeup();
若发送成功将Send添加到completedSends集合
将KafkaChannel中的NetworkReceive设置为null,标志着一次完整的响应读取,不处理拆包和粘包问题了
释放内存块deallocate
处理超时handleTimedOutRequests
是指定分区key
timeToWaitMs,这个Batch从创建开始算起,最多等待多久就必须去发送,如果是在重试的阶段,这个时间就是重试间隔,但是在非重试的初始阶段,就是linger.ms的时间(100ms)
sender第三次run
这一批响应是否处理完?deque.isEmpty()
计算协议封装后的消息大小消息不能大于maxRequestSize和totalMemorySize
构建RecordBatch
返回新的NetworkReceive
获取到未响应的nodeInFlightRequests#getNodesWithTimedOutRequests
增加BuffePool的可用大小
selectionKey关注着OP_READ事件
Sender#handleProduceResponse
key.isWritable()是否关注OP_WRITE事件
parseResponse
NetworkClient
去broker上拉取了一次集群的元数据过来,后面每隔5分钟会默认刷新一次集群元数据
拿到锁的线程根据创建的buffer构建MemoryRecords
TopicPartition
利用copyOnWrite读多写少的特点,保证线程安全的创建Deque
selector.connect ()开始尝试连接
KafkaProducer
返回头一块空间free.pollFirst()
doubleCheck放入batches
KafkaChannel封装了socketChannel
NetworkClientsocket网络连接组件
Selector#addToCompletedReceives
如果要更新metadata,先拿到version,更新needUpdate=trueversion = metadata.requestUpdate()
while (partitionsCount == null)直到超时,或者获取到partition元数据
唤醒sender线程异步去拉取这个topic的元数据sender.wakeup();
BufferPool
根据TopicPartition创建或get对应的Deque
ByteBuffer
删除stage当这一批响应处理完成后,就删除stage,这样就可以继续往这个broker发送数据了。
selectionKey获取kafkaChannelkey.attachment()
1、就直接将InFlightRequests中的请求移除2、包装一个响应回去
TransportLayer#addInterestOps()关注OP_WRITE事件
遍历batch中的每条消息(thunks)
检查所有准备好的leader partition所在的broker网络情况
清空buffer并放回free的deque中
socketChannel
key的murmur2计算出hash值%partitionCount
元数据管理器 Metadata1、从broker集群去拉取元数据的Topics(Topic -> Partitions(Leader+Followers,ISR))、2、每隔一段时间就再次发送请求刷新元数据,metadata.max.age.ms,默认是5分钟,默认每隔5分钟一定会强制刷新一下3、在发送消息的时候,若发现写入的某个Topic对应的元数据不在本地,会发送请求到broker尝试拉取这个topic对应的元数据,若你在集群里增加了一台broker,也会涉及到元数据的变化
封装List<ClientRequest>请求集合,设置回调函数Sender#createProduceRequests
KafkaChannel#receive()通过transportLayer通过nio读取响应NetworkReceive#readFrom()
第一个线程
batch
是否有异常
返回,等待下次继续读取,同时此次的Buffer和receive等对象留在内存中
expired,当前Batch已经等待的时间(120ms) >= Batch最多只能等待的时间(100ms),已经超出了linger.ms的时间范围了,否则呢,60ms < 100ms,此时就没有过期。如果linger.ms默认是0,就意味着说,只要Batch创建出来了,在这个地方一定是expired = true
删除inFlightRequests滞留的请求。这样才能让后续的请求发送到broker上
inFlightRequests中超时机制
full,Batch是否已满,如果说Dequeue里超过一个Batch了,说明这个peekFirst返回的Batch就一定是已经满的,另外就是如果假设Dequeue里只有一个Batch,但是判断发现这个Batch达到了16kb的大小,也是已满的
有准备好的broker
sendable,综合上述所有条件来判断,这个Batch是否需要发送出去,如果Bach已满必须得发送,如果Batch没有写满但是expired也必须得发送出去,如果说Batch没有写满而且也没有expired,但是内存已经消耗完毕
发送数据
消息分区DefaultPartitioner#partition
inFlightRequests#completeNext处理并移除存放在inFlightRequest中的请求
Y 准备好了,可以发送数据过去
partition进行分组
poll()
sender第二次run
加入待更新的元数据集合,后面尝试再次拉取
层层条件过滤,选出满足的batch
返回ReadyCheckResult1、可以发送数据的leader所在的broker2、最近下一次检查的时间3、没有元数据的partition
设置send
格式化响应体,解析kafka二进制协议parseResponse()
是否可用重试S?ender#canRetry
将申请到的buffer还给BufferPoolfree.deallocate(buffer)
分区器 Partitioner决定,你发送的每条消息是路由到Topic的哪个分区里去的
上锁第一次 : 尝试放入消息
万能poll方法NetworkClient poll()
Sender线程负责从缓冲区里获取消息发送到broker上去,request最大大小(1mb),acks(1,只要leader写入成功就认为成功),重试次数(0,无重试),请求超时的时间(30s),线程类叫做“KafkaThread”,线程名字叫做“kafka-producer-network-thread”,此处线程直接被启动
第一次启动所有broker都没准备好会删除掉batch对应的broker,开始执行poll()
内存缓冲中batch超时检测机制
是否有可发送的batch?
bufferPool.allocate()
batch是否有足够的空间放入这条消息?
包装 DataOutputStream,将二进制压缩流按约定的消息格式写入byteBuffer中,实现消息压缩wrapForOutput(bufferStream)
sender.run()
Deque
构建统计组件Metrics
如果receive == null
最终拿到partition元数据,返回partitionsCount = cluster.partitionCountForTopic(topic);
拿到Deque,获取Deque中最后一个batchRecordBatch last = deque.peekLast();
producer初始化
BufferPool将已经存在的buffer返回最终放入topicPartition的deque中此时pool的 availableSize - bufferSize
Cluster cluster = metadata.fetch();
handleCompletedReceives解析响应,放入响应列表
影响broker的消息发送
没有处理中的响应stagedReceives
重新创建新的batch
last = null ?
如果free队列为空?
发送消息 1、获取partition元数据KafkaProducer#send()
封装exception
取消对OP_CONNECT的关注增加对OP_READ的关注
消息压缩及写入ByteBuffer Compressor
内存缓冲区 RecordAccumulator消息缓冲机制,发送到每个分区的消息会被打包成batch,一个broker上的多个分区对应的多个batch会被打包成一个request,batch size(16kb)默认情况下,如果只考虑batch的机制的话,那么必须要等到足够多的消息打包成一个batch,才能通过request发送到broker上去;但是有一个问题,如果你发送了一条消息,但是等了很久都没有达到一个batch大小所以说要设置一个linger.ms,如果在指定时间范围内,都没凑出来一个batch把这条消息发送出去,那么到了这个linger.ms指定的时间,比如说5ms,如果5ms还没凑出来一个batch,那么就必须立即把这个消息发送出去
发送完成后的处理handleCompletedSends
获取SendKafkaChannel#write()
2
摘出broker
如果大小超过设定的batch大小,那么就会创建一个消息大小的batch。这样导致的问题是,一个消息是一个batch,batch失去意义。所以我们要合理的设定batch.size , max.request.size linger.ms等参数。
发送消息
如果有的partition缺少元数据?
超时检测,获取到超时的batchRecordAccumulator#abortExpiredBatches
判断超时,回调onComplete函数,返回TimeoutExceptionRecordBatch#maybeExpire
Selector#poll
依然无内存
拦截器ProducerInterceptors
此时broker批量返回的响应都已处理完,并全部放到stage的延时队列中。接下来,我们需要每次将取出stage中头一个响应放入complete中,进行完成响应的处理。
获取Cluster通过metadata.fetch()获取之前metadata.update()设置的broker地址
关闭连接Selectable#close()
返回存入缓存结果,等待回调RecordAppendResult
释放掉从缓冲区借来的内存块RecordAccumulator#deallocate
注册到nio selector上关注OP_CONNECT事件
计算消息的最大大小
通过NetWorkClient走底层的网络通信,把每个Broker的ClientRequest给发送过去就可以了,poll方法,他是负责实际的 进行网络IO通信操作的一个核心的方法,负责发送数据出去,也包括读取响应回来
发送出去的数据Send
发送请求
放到responses列表中
直接返回
生成自增clientId
丢弃等待gc
下次什么时候在检查
Sender再次run的时候,需要判断重试的消息是否达到了重试间隔,再次发送RecordAccumulator#ready中的backingOffbackingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
RecordAppendResult appendResult = last.tryAppend(... ...)
说明上次已经读取完整了一个响应创建响应对象NetworkReceive
遍历所有响应responses
最近接收到的数据Receive
关联key和channel,将来可以通过selectionKey获取到channelkey.attach(channel);
sender第一次run
selectionKey
获取kafkaChannel
broker.id
添加处理完成集合completedReceives
当没有batch可以发送并且没到linger.ms,设置nextReadyCheckDelayMs,下次再来检查是否有batch可以发送,起码要等nextReadyCheckDelayMs时间过了以后才可以
如果acks = 0
第二个线程
inFlightRequests.canSendMore(node)Broker同一时间最多容忍5个请求发送过去但是还没有收到响应,超过5个就不再发送
移动position到可读的位置 buffer.flip();
打开长连接SocketChannel socketChannel = SocketChannel.open();设置生产环境socketChannel参数
构建interceptCallback,放入缓冲区之后会根据情况回调
SocketChannel#write()
标记重新拉取元数据,因为有可能是leader partition改变到其他broker上了metadataUpdater.requestUpdate()
匹配请求和响应
拿出当时的request
从BufferPool中切一块缓存创建batch所需要的新的内存空间(ByteBuffer)其实这个pool管理的就是一个Deque<ByteBuffer> free队列,这个队列的每个元素就是代表的每个batch的一小块未使用且待分配内存空间元素数量 * 每个元素大小 = 缓冲区大小。
kafkaChannel
KafkaChannel#read粘包拆包处理
waitedTimeMs,当前时间减去上一次发送这个Batch的时间,若Batch从来没有发送过,上一次发送时间就是Batch被创建出来的那个时间,这个Batch从创建开始到现在已经等待了多久了
有一些batch被发送出去了,获取到了响应,此时就可以释放那个batch底层对应的ByteBuffer,就会被放回到BufferPool里面去,此时就可以唤醒阻塞的线程,再次申请一个新的ByteBuffer构造一个Batch
selector.isChannelReady(node)和broker之间的socketChannel是否已连接
响应对象rewind,将position置0
取消关注OP_WRITE事件TransportLayer#removeInterestOps()
筛选出已经建立连接的broker
RecordAccumulator 数据结构
Selector(kafkaSelector)
重新入队,放入内存缓冲区的头一个RecordAccumulator#reenqueue
放入缓冲区 RecordAccumulator#append
自增integer % partitionCount
放入发送但未响应KafkaClient#send
send()只能暂存一个Send
waitOnMetadata()保证topic有可用的元数据
调用
ByteBufferSend#completed
回调处理RecordBatch#done
响应回调
请求打包
关闭输出流last.records.close()
序列化组件keySerializervalueSerializer
若buffersize = 设定的大小?比如16k
在topicPartition的Deque中增加last batchdq.addLast(batch);
接收响应
已发送但未响应的集合InFlightRequests#add()(Deque<ClientRequest> .addFirst(request) )
抛异常
是否有partition元数据?partitionsCount = cluster.partitionCountForTopic
获取已经可以发送消息的partition
update()更新broker集群地址,维护Metadata的Cluster之后发送消息时,若发现没有topic相关元数据,就会从这个地址去拉取元数据
partition()
发现last batch是有的,就将消息尝试放入这个batch
加载配置构建ProducerConfig
exhuasted,内存是否已经耗尽,可能有人阻塞在写操作,无法申请到内存,在等待新的内存块空闲出来才可以创建新的Batch
KafkaSelectorSelector#send()
放入已连接集合中connected
此时closed,当前客户端要关闭掉,此时就必须立马把内存缓冲的Batch都发送出去
处理接收到的响应handleCompletedReceives
调用callback的onComplete
Sender线程
backingOff,是否重试且上一次发送这个batch的时间 + 重试间隔的时间,是否大于了当前时间
!metadataUpdater.isUpdateDue(now)当前broker不能处于元数据加载的过程
找到所有可发送的batch对应的leader partition所在的broker,放入readyNodes集合
获取有请求的selectionKeynioSelector.select(ms)
ByteBufferSend#writeTo
内存复用
遍历内存缓冲区的batches组件,获取每个partition中头一个batchRecordBatch batch = deque.peekFirst();
从channel中读取响应KafkaChannel#read
获取每个RecordBatch,对每个batch做响应处理Sender#completeBatch
序列化处理keySerializervalueSerializer
对每个Broker都创建一个ClientReqeust,包括了多个Batch,就是在这个Broker上的多个Leader Partition所对应的Batch,聚合起来组成一个ClientRequest,形成一个请求,发送到Broker上去inFlightRequestsselector.send
ByteBuffer输出流ByteBufferOutputStream(buffer)
添加响应到响应集合中(响应和请求配对)List<ClientResponse> responses响应中包含请求信息(ClientRequest)
初始化连接
是否有足够的空间创建buffer
KafkaChannel
加入等待响应的集合incomplete.add(batch)
NetworkClient#poll
如果建立完连接channel.finishConnect()
free(Deque)
TransportLayer 负责网络通信
请求暂存
拉取元数据
更新batch的标志位,不让其他人写入了writable = false
出现了拆包粘包问题上次没读取完整一个响应从内从中拿到上次的NetworkReceive接着处理
0 条评论
回复 删除
下一页