Kafka源码——KafkaServer(version-0.10.1.0)
2022-01-30 21:34:04 25 举报
Kafka源码——KafkaServer(version-0.10.1.0)
作者其他创作
大纲/内容
loadLogs()
val topicPartition = Log.parseTopicPartitionName(logDir)
updateIsr(newInSyncReplicas)
val prev = addSegment(segment)
new LogManager
this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
updateLogEndOffset(nextOffsetMetadata.messageOffset)
//构建一个数据信息 //比如有version //有自己broker id号 //有时间戳
默认启动8个线程,一般情况下,生产环境里面我们是要去设置这个参数。
val channel = newConnections.poll()
Utils.newThread(\"kafka-socket-acceptor-%s-%d\
val keys = nioSelector.selectedKeys()
启动服务整个Kafka 服务端的功能 都是在这个里面
inflightResponses += (response.request.connectionId -> response)
NIO的服务端
start processing requests
val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
Replica(Leader)分区
RequestChannel.Request
registerSessionExpirationListener()
leaderReplica.highWatermark = newHighWatermark
//按照一定的频率 //但是我们发现这个频率的阈值 这儿控制,kafka给的是一个long的最大值。 //也就是意味kafka这儿是不会主动的把内存里面的数据刷写到磁盘 //把内存里面的数据刷写到磁盘这个操作是由 操作系统完成的。 //当然也可以自己去配置这个值。
获取到当前partition的所有replica的LEO的值
7、返回数据,带上Leader的HW
SocketChannel
this.nioSelector.wakeup();
val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
logManager.startup()
核心代码方法
replicaManager
var connectionQuotas: ConnectionQuotas = _连接队列
Selector
从requestQueue获取request对象
val brokerId = config.brokerId
selector.wakeup()
case RequestChannel.SendAction发送OP_WRITE请求
获取到key
threads(i) = Utils.daemonThread(\"kafka-request-handler-\
if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark))
lock synchronized
缓存响应到Map中
Kafka.scala
scheduler.schedule(\"kafka-log-flusher\
val maxQueuedRequests = config.queuedMaxRequests最大请求队列,默认500
KafkaServer
case RequestChannel.CloseConnectionAction
当前broker主机的id
val recoveryPoints = this.logsByDir.get(dir.toString)
processors.foreach { processor => Utils.newThread(\"kafka-network-thread-%d-%s-%d\
返回响应
val serverChannel = ServerSocketChannel.open()
加锁
return amILeader
用来处理已经完成的请求,为临时response队列的response执行回调函数
创建目录
wakeup()
也就是当前的服务器就是controller服务器了
if(leaderId != -1)
zkUtils = initZk()
scheduler.schedule(\"flush-log\
log.flush
LogSegment
如果是客户端发送过来 要进行网络连接的请求。
channel.force(true)
注册某个监听器
NIO封装
启动更新ISR数据线程
selecotr 查看是否有 事件注册上来。
KafkaRequestHandler
for(log <- allLogs; if !log.config.compact)
6、尝试更新自己的HW,min(ISR 中所有副本的LEO)
遍历所有注册上来的key
创建索引文件
start scheduler
poll()
创建线程池处理的队列里面的请求
def flush(): Unit = flush(this.logEndOffset)
case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)每一台broker都会接受到元数据更新的请求
把request请求放入队列
链接ZK
transportLayer.removeInterestOps(SelectionKey.OP_READ);
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
新获取一个偏移量
val segment = new LogSegment
selector.unmute(send.destination)
新建一个文件。用LEO的名字作为文件名
case ApiKeys.FETCH => handleFetchRequest(request)follower发送过来拉取数据的请求(同步数据)
start group coordinator
//要去获取controller的 id号。 //我们用的是场景驱动的方式,此时此刻应该就是我们的第一台服务器 //第一次启动,那么肯定还没有controller的,所以他这儿应该是获取不到 //ID //如果是第一次启动,那么返回值是-1
KafkaChannel channel = channelOrFail(send.destination());
MappedByteBuffer(java.nio内存映射工具类,通过操作内存把数据同步到文件中)
遍历所有partition的ISR
index.flush()
5、尝试更新ISR列表
shutdownLatch.await()
解析配置文件里面的参数
val pool = Executors.newFixedThreadPool(ioThreads)
回调封装响应
返回新创建的segment
zkCheckedEphemeral.create()
更新isr数据
val ready = nioSelector.select(500)
向客户端生产者返回响应
调用flush的方法
KafkaRequestHandlerPool默认是8个线程
while循环
从队列里面获取Request对象
serverChannel.socket().setReceiveBufferSize(recvBufferSize)
遍历所有的目录(配置的存储日志的目录)
leaderId = brokerId
创建线程KafkaRequestHandler
segment.flush()
调用javaNIO的方法,强制把数据从内存里面刷写到磁盘
获取或创建cluster_id
构建一个Acceptor对象,它是一个核心的线程,监听并且接受客户端的请求,同时建立数据传输通道ServerSocketChannel,然后轮询的方式交给一个后端的Processor线程处理(在Acceptor类的主构造函数里面,启动了3个Processor线程)
if (key.isAcceptable)
根据SelectionKey获取到serverSocketChannel
解析请求
zkUtils.zkClient.subscribeStateChanges(sessionExpireListener)
写索引
多个Process线程,一个线程就对应一个响应队列。这个id就是一个Process线程的编号。
交给KafkaApis进行最终的处理
ISR列表里面 减去 要被移除出去的 等于 新的ISR列表
apis.handle(req)
metadataCache = new MetadataCache(config.brokerId)
val sendBufferSize = config.socketSendBufferBytes val recvBufferSize = config.socketReceiveBufferBytes
唤醒NIO
Partition 1
case ApiKeys.PRODUCE => handleProducerRequest(request)处理生产者发送过来的请求
为每个目录都创建一个线程池后面肯定是启动线程池里面的线程去加载Log
val nioSelector = NSelector.open()
if(outOfSyncReplicas.nonEmpty)
LogManager
.index 文件
注册OP_WITER
等待acceptor启动完成
var curr = requestChannel.receiveResponse(id)
selector.poll(300)
初始化logManager
KafkaChannel channel = this.channels.get(id);
.log 文件
解析,获取SocketChannel里面的各种参数
Partition 0
kafkaHealthcheck
//processors(线程一,线程二,线程三)//Acceptor线程启动起来以后 //如果有请求发送过来,会把这些请求轮询的发送给不同的 //Processor线程去处理。 //processors(0) = 第一个线程处理 //processors(1) = 第二个线程处理//prodcessrs(2) = 第三个线程处理currentProcessor = (currentProcessor + 1) % processors.length
MappedByteBuffer.force()
val newOffset = logEndOffset
val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
serverChannel.socket.bind(socketAddress)
磁盘
requestChannel.receiveResponse(id)
While循环
8、更新自己的LEO
checkpointRecoveryPointOffsets
创建对象
注册某个监听器,但是这个监听目前对于我们来说暂时不重要,所以我们就不看了。
把segment添加到某一个数据结构里面
Processor 线程2
receive
cleanupLogs
logDir <- dirContent if logDir.isDirectory
后续源码见:Kafka源码——KafkaProducer核心流程(version-0.10.1.0)
服务端副本同步Leader和Follower的LEO/HW更新流程
是
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)提交偏移量
如果创建完了,自己就成为了leader ,也就是controller了
recoveryPointCheckpoints
val resp = inflightResponses.remove(send.destination)
获取对应线程的对应的队列里面的响应对象
获取到一个KafakChannel
RequestChannel.Response
迭代获取每一个Response对象
val numProcessorThreads = config.numNetworkThreadsKafka服务端处理的网络线程数个数,默认是3
adminManager
while (isRunning)
groupCoordinator
轮询
Y完成选举了
2)定时把内存里面的数据刷写到磁盘
val maxConnectionsPerIp = config.maxConnectionsPerIp默认Int.MaxValue
对当前的这个连接移除OP_READ事件
等待服务执行完成
val iter = keys.iterator()
服务端启动,客户端(生产者)发送过来请求,服务端对请求进行处理,服务端给客户端发送响应。客户端接受到响应以后 -》 下一个请求的发送
接收和发送 请求的时候一些缓存的大小
循环启动三个processor线程
endpoints.values
时间索引
for (dir <- this.logDirs)
scheduler.schedule(\"isr-change-propagation\
服务一直就在不断的循环
注册OP_READ事件
this.logDirs.foreach(checkpointLogsInDir)
1、写数据到LeaderPartition
3、定时更新一个检查点的文件
NIO内存映射1、内存映射文件是由一个文件到一块内存的映射,可以理解为将一个文件映射到进程地址,然后可以通过操作内存来访问文件数据。说白了就是使用虚拟内存将磁盘的文件数据加载到虚拟内存的内存页,然后就可以直接操作内存页数据。2、我们读写一个文件使用redis()和write()方法,这两个方法是调用系统低层接口来传输数据,因为内核空间的文件页和用户空间的缓冲区没有一一对应,所以读写数据时会在内核空间和用户空间之间进行数据拷贝,在操作大量文件数据时会导致性能很低,使用内存映射文件可以非常高效的操作大量文件数据。3、通过内存映射机制操作文件比使用常规的方法和使用FlieChannel读写高效的多。4、MappedByteBuffer底层就是使用的NIO内存映射技术,通过操作MappedByteBuffer对象,吧数据同步到文件中。
kafkaController
注册OP_ACCEPT
Broker
metadataCache
Processor 线程1
val response = responseQueues(processor).poll()
requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize) queueSize默认是500
segments:segment文件的集合,使用了ConcurrentSkipListMap类型(原理可以另行百度,这里不做详细分析)。key为baseOffset,即系列2里说的,例如000.log/index则baseOffset=0,238.log/index则baseOffset=238;value为Segment(后续分析)nextOffsetMetadata:主要记录message的offset,Segment的baseOffset,Segment的log的大小dir:Segment文件在磁盘上对应位置的File对象,即xxx/topic-artition/logEndOffset:nextOffsetMetadata的messageOffset的属性,即下一个消息的offset,也可以当做是当前最后的一个offset
newConnections.add(socketChannel)
processCompletedReceives()
ReplicaManager
默认创建3个newProcessor线程,但是我们一般 搭建kafka集群的时候回去配置这个参数
register()
Broker2
Log
Y
req = requestChannel.receiveRequest(300)
1)定时检查文件,清理超时的文件。
segment
serverChannel.configureBlocking(false)
获取到老的HW的值
responseQueues =new Array[BlockingQueue[RequestChannel.Response]](numProcessors)numProcessors默认是3
KafkaApis工具类
缓存
responseQueues(response.processor).put(response)
讲接收到的request放入request队列中
zkUtils
3、同步数据带上自己的LEO
maybeIncrementLeaderHW(leaderReplica)
客戶端(生产者/消费者)
2、更新自己的LEO
选举
LogDir代表就是一个分区的目录
val processors = new Array[Processor](totalProcessorThreads)
val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides 默认值“”
SocketServer
消息是否超過4096字節
channel.setSend(send);
N开始选举创建一个临时目录创建这样的一个目录 /controller/,然后把这个目录里面写上一些自己的信息
Partition.maybeShrinkIsr()
processor调用accept方法对socketChannel进行处理
获取到要被移除出去的replica
绑定了一个OP_WRITE事件一旦绑定了这个事件以后,我们就可以往客户端发送响应了
controllerElector.startup
不断迭代
processor.run() while (isRunning)
异步刷新
线程启动起来
请求
configureNewConnections()while (!newConnections.isEmpty) {
移除数据结构里面的一些数据信息
初始化ReplicaMananger核心参数logManager
curr = requestChannel.receiveResponse(id
加载日志
连接队列val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
checkpointLogsInDir
//kafka服务有时候会涉及到重启。//我重启应该要恢复哪些数据? //其实这儿会定时更新一个检查点文件 -》服务于Kafka服务重启的时候恢复数据使用。
循环kafka节点
循环创建了三个Processor的线程
创建日志目录
scheduler.schedule(\"kafka-log-retention\
elect
acceptor.awaitStartup()
total += log.deleteOldSegments()
注册
logManager
val oldHighWatermark = leaderReplica.highWatermark
启动监控更新ISR列表的线程
val totalProcessorThreads = numProcessorThreads * endpoints.size
执行NIO poll,获取对应socketChannel上准备的就绪的IO操作发送请求、响应
创建一个Log对象。
把响应存入到了一个队列里面先从数组里面取出对应Processor一个队列,然后把这个响应放入到这个队列里面
FileChannelImpl.force
对zk上面的/controller目录注册了监听器
log.flush()
val processorEndIndex = processorBeginIndex(默认值0) + numProcessorThreads(默认住3)
val endpoints = config.listenerskafka集群实例信息,一般是一个节点只会配置一个实例
val timestamp = SystemTime.milliseconds.toStringval electString = Json.encode(Map(\"version\
roll()
注册了一个监听器
返回接受的响应
socketServer
connectionQuotas.inc(socketChannel.socket().getInetAddress) socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setKeepAlive(true)
获取分区信息
if (ready > 0)
把Log对象放入了logs里面
!
删除满足删除条件的文件
while (iter.hasNext && isRunning)
不断的获取连接队列里面的SocketChannel
while (curr != null) curr.responseAction match (匹配响应类型)
val socketChannel = serverSocketChannel.accept()
this.send = send;
leaderId = getControllerID
for (i <- processorBeginIndex until processorEndIndex)
_clusterId = getOrGenerateClusterId(zkUtils)
注册一个response对象,发送response,并将response放入到inflightResponses临时队列
for(i <- 0 until numThreads) {
9、更新自己的HW,min(自己的LEO,Leader的HW)
Broker1
server.startup()
新建出来一个LogSegment
更新ISR列表
OP_ACCEPT
selector.unmute(curr.request.connectionId)
Acceptor.run()
processCompletedSends()
Processor 线程3
val pool = Executors.newFixedThreadPool(ioThreads)
KafkaRequestHandlerPool创建线程池处理的队列里面的请求
KafkaApis
KafkaScheduler
selector.mute(receive.source)
sendResponse(curr)
log.deleteOldSegments
把响应放入队列中
return amILeader
元数据缓存
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
注册OP_READ
启动logManager定时调度了三个任务
for (dir <- this.logDirs)
创建
native 方法
loadLogs()
selector.close(connectionId)
Acceptor(线程)
FollowerPartition
启动acceptor线程
scheduler.schedule(\"kafka-recovery-point-checkpoint\
timeIndex.flush()
LeaderPartition
kafkaServerStartable.awaitShutdown
往KafkaChannel里面绑定一个发送出去的请求
server.awaitShutdown
allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs))
KafakChannel缓存的请求
mmap.force内存映射
broker一启动,就要启动选举的方法。
//如果新的HW的值 大于老的HW的值 //就用心的HW的值作为 leader partition的HW的值
遍历所有当前主机的所有的segment
不断获取Request请求
createAndValidateLogDirs(logDirs)
processNewResponses()
4、更新其他replica(分区)的LEO
尝试启动controller选举
case RequestChannel.NoOpActionOP_READ请求
//LEO=lastoffset+1 //获取LEO的值作为最新的一个偏移量 //举个例子:lastoffset=10001 //LEO=10002 //我们这儿新获取一个偏移量的时候,用的是10002(logEndOffset)这个值。
kafkaServerStartable.startup
ServerScoketChannel9092
一个分区(磁盘上面的一个目录) 对应一个Log
重新监听 OP_READ
transportLayer.addInterestOps(SelectionKey.OP_READ);
val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
scheduler.schedule(\"isr-expiration\
selector.send(response.responseSend)
每个broker完成注册
............
获取到一个socketChannel
val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
processors(线程组)
completedReceives(处理完了的响应)List<NetworkReceive>
RequestChannel(请求通道)
更新元数据
对于获取到的请求按照协议进行解析,解析出来就是一个一个Request
kafkaScheduler.startup()
生产者
KafkaRequestHandler.runwhile(true)
flushDirtyLogs
onBecomingLeader()
把获取到SocketChannel存入到了自己的队列
取消OP_READ事件
requestChannel.sendResponse(new RequestChannel.Response(×××)
从里面取一个最小值,作为HW值
各个handle××××××Request的sendResponseCallback方法
0 条评论
下一页