02_kafka原理
2024-05-23 21:21:46 0 举报
kafka原理剖析
作者其他创作
大纲/内容
acceptor
2、更新ISR列表对应的follower的LEO3、更新HW=1
将其请求发送到请求队列中,让io线程去处理请求RequestChannel#sendRequest请求队列:requestQueue
当前channel取消对OP_READ的关注这里就可以保证,之后无限循环的时候,当前channel就不在处理新的请求了,也不会从stagedReceives中拿出来给completedReceives了
请求处理线程池KafkaRequestHandlerPool
broker1
移除正在发送响应inflightResponses.remove
partition1_1follower1
将响应设置到kafkachannel中的send中Selector#send让transportLayer关注OP_WRITE事件
设置空的响应RequestChannel#noOperation
1、分段存储2、限定大小:log.segment.bytes,限定了每个日志段文件的大小,超过就进行log rolling3、二分查找:快速定位offset数据4、定时清理:log.retention.hours参数,可以自己设置数据要保留多少天
ISR列表维护机制
offset
ISR 列表:
broker 1
1 消息写入leader partition,此时LEO指向下一位
将读取到的多个请求放入stagedReceives中
打开selector
batch缓冲区
partition1 副本
放入
JoinGroup请求
1、currentTime转到第3格发现,任务还有10ms< 一格20ms,就触发降层2、扔到第1层时间轮的currentTime后的第10格
延时80ms的任务
获取当时请求的channelSelector#channel()
request queue
一次poll()从N个分区获取消息
coordinator broker选择器
tickMs = 第1层interval 20msinterval = 400ms第2层时间轮
读取消息KafkaChannel#read
订阅临时节点,当controller宕机后,zk回调通知,重新争抢controller
下发方案给所有的consumer成员,此时进入stable状态,之后如果有consumer挂了,重新循环此状态机
broker 3
读取配置文件的log目录,如果不存在就创建LogManager#createAndValidateLogDirs
ResponseBody:responses{ topic partition_responses{ partition error_code base_offset log_append_time log_start_offset } }throttle_time_ms
降层/下滑
partition2副本follower
tickMs = 1msinterval = 20ms第一层时间轮
放入响应队列,一个processor对应一个responseQueues<LinkedBlockingQueue>
设置发送sendBufferSize和接收recvBufferSize大小
/topics节点
currentTime
N
打开serverSocketChannel,监听9092
对象序列化器(可自定义序列化)
ISR
自定义拦截器
ProduceResponse
ProduceRequest
无限循环,处理新连接请求,从newConnections中poll新的连接configureNewConnections()
处理新的响应Processor#processNewResponses
5、更新HW此时“a”对消费者可见
所有broker争抢创建一个成功即为:controller大哥他会实时同步元数据给各个broker
创建一个Acceptor
封装ProducerRecord
ISR列表:follower1: LEO:1
初始化1、ZK2、调度器kafkaScheduler3、日志管理器logManager4、网络组件socketServer5、副本管理replicaManager(包含logManager)6、kafka controller7、groupCoordinator8、请求处理方法集合apis9、请求处理线程池KafkaRequestHandlerPool10、健康检查kafkaHealthcheck
时间跨度tickMs
建立socket长连接写入leader partition
tickMs = 第2层的interval400msinterval = 8000ms第3层时间轮
封装ProducePartitionStatus封装 PartitionResponse
消费过程中内存数据:上一次提交offset,当前offset(还未提交),高水位offset,LEO
fetch 定时从leader拉取数据
回调sendResponseCallback响应producer
broker 2
延时2ms的任务
Y
选择coordinator broker建立socket
kafka 事务:https://www.jianshu.com/p/64c93065473e
partition1_0leader
poll()从自己selector中监听请求
创建多个processor
/ids/broker临时节点
延时任务3
轮训指定procer将socketChannel交给其中一个processor处理
将serversocketchannel注册到selector上,并关注OP_ACCEPT事件
层级上升
缓存已消费offset+分代号consumer group generation = 1
Rebalance后,分代机制保证提交的offset如果过期,就丢弃
解析响应processCompletedReceives()
缓存已消费offset
producer生产者
将索引写入mmap(oscache)MappedByteBuffer
RequestHeader:发送消息的接口api_key=0 (PRODUCE)API的版本号api_version 客户端生成唯一标识一次请求correlation_id客户端的idclient_id
KafkaRequestHandler线程池
batch
HW=0
从controller拉取元数据
重新关注OP_READ保证无限循环的时候将stagedReceives的下一个请求放入completedReceives中
follower2
一个消息集合里包含多个日志,最新名称叫做RecordBatch后来消息格式演化为了如下所示:(1)消息总长度(2)属性:废弃了,已经不用(3)时间戳增量:跟RecordBatch的时间戳的增量差值(4)offset增量:跟RecordBatch的offset的增量差值(5)key长度(6)key(7)value长度(8)value(9)header个数(10)header:自定义的消息元数据,key-value对
3、下发分区方案到组内所有consumer他们就可以按照分区方案找到各leader partition所在的broker建立socket进行消费
无限循环
00000000000005367851.indexoffset=5367851开始的日志00000000000005367851.log 00000000000005367851.timeindex
compressor追加batch组件
follower的HW是要取自己的LEO和leader返回的HW两者中的最小值
发送响应Processor#sendResponse
ResponsetHeader:发送消息的接口api_key=0 (PRODUCE)API的版本号api_version 客户端生成唯一标识一次请求correlation_id客户端的idclient_id
封装kafkachannel
消息1
唤醒卡在poll()上的线程selector.wakeup()
消费实例1
发送消息KafkaChannel#write
TimerTaskList
将channel注册到selector上,关注OP_READSelector#register()
accumulate缓冲区(HashMap数据结构)
写logFileMessageSet#append
group.id+topic+partition0 : 12
只能由组内一个consumer消费
partition1_2follower2
Sender线程batch满或linger.ms时间到了
等待acceptor启动完成,在这里卡住acceptor.awaitStartup()
处理完成的响应processCompletedSends
添加到未真正发送响应inflightResponses
维护
Kafka 高水位 LEO ISR机制
超过1G就重新新增一个log文件Log#maybeRoll
当前关注的是OP_READ请求且stagedReceives为空?
consumer.group 消费组1
心跳线程
写os缓存
a
log entry
比如说tickMs = 1ms,wheelSize = 20,那么时间轮跨度(inetrval)就是20ms,刚开始currentTime = 0
写入
消费实例2
启动acceptor线程
2
broker1socket
消息2
0拷贝
LEO=1
broker2socket
若关注的是OP_WRITE事件
通过nio filechannel写入OScache messages.writeFullyTo(channel)
返回响应RequestChannel#sendResponse
各分区都对应一个batch队列,打包成出来的batch必须都是发往同一个分区,这样才能发送一个batch到这个分区的leader broker
消息a
写入本地日志文件ReplicaManager#appendToLocalLog
ISR列表
ISR列表:follower1: LEO:follower2: LEO:... ...
TimerTaskList双向链表
1、第2次拉取数据告诉leader:自己的LEO=1
processor
TimerTask
更新LEOLogEndOffset = 当前topic-partition写入的最后一个offset + 1Log#updateLogEndOffset
accept()newConnections.add(socketChannel)
partition1副本follower
将index和log刷到磁盘LogSegment#flush log.flush() index.flush()
ISR列表数量< 最小要求数量?
consumer.group 消费组2
OS Cache
所有成员都加入组,进入AwaitingSync状态,不允许提交offset,马上rebalance
partition1 leader (数据分片 日志文件)
Processor
java nio的selector多路复用思想
为每个log目录创建8个线程的线程池来load日志文件LogManager#loadLogs
批量启动所有processor
ack == all?
创建响应队列inflightResponses
1、currentTime转到第2格发现,任务还有50ms< 一格400ms,就触发降层2、扔到第2层时间轮的currentTime后的第3格
异地/多机房灾备kafka多机房集群同步
磁盘
partition2 leader
根据topic和partition选择partition的leader和follower通常会把第一个副本作为leader
初始化logManager扫描log目录,找到自己本地有哪些topic的分区如:test-0
将selectionKey和channel关联
发送到leader partition
遍历所有消息messagesPerPartition
Acceptor
00000000000000000000.index 索引00000000000000000000.log 日志 00000000000000000000.timeindex 时间范围索引
kafka协议(发送消息为例)
0.8.2.x版本 :replica.lag.max.messages 规定follower落后leader多少条数据就将其剔除ISR列表弊端:如果瞬间涌入10w条数据,follower来不及瞬间同步完,就可能被剔除ISR,之后同步完再重新加入ISR列表
部分consumer发joinGroupPreparingRebalance状态
ConnectionQuotas每个ip的最大连接数
磁盘读
follower1
延时操作管理器DelayedOperationPurgatory
临时节点增加和删除动态感知集群状态
立即返回响应
partition1 leader
拉取
当log写入4096个字节时,需要写index稀疏索引
coordinator机制
topic : __consumer_offsets默认50个partition
broker3socket
是否有OP_ACCEPT连接?
3
/controller临时节点
无限循环,检查是否有新的连接请求ready = nioSelector.select(500)
写入的数据大于1w条?if (unflushedMessages >= config.flushInterval)
1 保证ISR中至少有一个follower2 在一条数据写入了leader partition之后,必须复制给ISR中所有的follower partition,才能说代表这条数据已提交3 Leader宕机时只能选举同步的Follower
日志追加写入本地磁盘ReplicaManager#appendMessages
thunks每条消息的callback函数
LeaderNotAvailableException:leader切换过程中写入数据NotControllerException:controller broker挂了,其他broker还未选举成controller的过程中写入数据
成为controller
写索引.index全局offset:物理offset
处理请求KafkaApis#handle
KAFKA原理初探
SocketServer 初始化
ack==0?
Consumer Group状态是:Empty
ISR列表:follower1: LEO:0
获取到即将写入日志的topic的partition
HW=1
response queue
创建processor自己的selector,将来就可以让自己关注注册到自己selector上的channel了(即: KafkaSelector)selector = new KSelector
初始化socketChannel队列newConnections = new ConcurrentLinkedQueue[SocketChannel]
消费者参数列表:1、故障感知参数:heartbeat.interval.ms:consumer心跳时间,必须得保持心跳才能知道consumer是否故障了,然后如果故障之后,就会通过心跳下发rebalance的指令给其他的consumer通知他们进行rebalance的操作session.timeout.ms:kafka多长时间感知不到一个consumer就认为他故障了,默认是10秒max.poll.interval.ms:如果在两次poll操作之间,超过了这个时间,那么就会认为这个consume处理能力太弱了,会被踢出消费组,分区分配给别人去消费,一遍来说结合你自己的业务处理的性能来设置就可以了2、消息消费参数:fetch.max.bytes:获取一条消息最大的字节数,一般建议设置大一些max.poll.records:一次poll返回消息的最大条数,默认是500条connection.max.idle.ms:consumer跟broker的socket连接如果空闲超过了一定的时间,此时就会自动回收连接,但是下次消费就要重新建立socket连接,这个建议设置为-1,不要去回收auto.offset.reset:如果下次重启,发现要消费的offset不在分区的范围内,就会重头开始消费;但是如果正常情况下会接着上次的offset继续消费的3、消息提交:enable.auto.commit:开启自动提交auto.commit.inetrval.ms:5000,默认是5秒提交一次,可能导致消息丢失或重复消费问题。4、Consumer Group状态:PreparingRebalance的状态等待一段时间其他成员加入,这个时间现在默认就是max.poll.interval.ms来指定的,所以这个时间间隔一般可以稍微大一点
broker2
把stagedReceives的一个客户端请求放入completedReceives
kafka一次只能处理每个client的一个请求
ZK集群kafka元数据放到zk集群,保证kafka各节点无状态,提高可伸缩性
ack=自动提交时,每隔一段时间同步offset将offset+分代号发送到__comsumer_offsets topic(老版本是发到zk)
延时450ms的任务
时间轮TimerWheel
group.id % 50 算出 leader partition,这个partition所在的broker即coordinator broker
处理线程从requestQueue队列中获取请求RequestChannel#receiveRequest
写入其他replica
剔除
1 选择leader consumer
这个channel是否关注了OP_READ
拿到当前这个topic的当前segment日志文件如offset = 321的文件:00000321.log
2 SyncGroup附带分区方案
socketChannel生产配置
fsync顺序写文件末尾追加
元数据:topicpartitionbroker
mirror maker
制定分区方案
0.9.x版本 :replica.lag.time.max.ms 规定follower落后leader多长时间就将其剔除ISR列表,解决了按落后条数剔除ISR的缺陷
三种rebalance的策略:range顺序、round-robin轮训、sticky原有不变增量重分配
RequestBody:transactional_idackstimeouttopic_data{ topic data { partition record_set }}
获取processor对应的响应队列RequestChannel#receiveResponse
网卡
封装LogAppendInfo,包含各种offset
集群总控 Controller
1
partitionor分区器根据key做hash或轮训路由(implements Partitioner接口可以自定义分区)
延时任务2
放入待处理队列
follower落后leader的原因:1、CPU IO负载高,机器卡顿2、fullGC 导致3、动态调节副本数量,动态增加partition的数量,follower就无法快速追赶上leader的LEO
追加日志到leader partitionPartition#appendMessagesToLeader
获取log对象然后追加日志Log#append
group.id+topic+partition0 : 35
放入completedSends
延时任务1
将apis、replicaManager封装到处理线程池
消费实例3
批量启动8个请求处理线程KafkaRequestHandler
启动完成,唤醒主线程startupLatch.countDown()
基于数组时间复杂度O(1)的时间轮
创建成功
1、负责监控消费组里的各个消费者的心跳,判断是否宕机2、rebalance
生产者参数列表:1、优化吞吐量buffer.memory:设置发送消息的缓冲区,默认值是33554432,就是32MB,如果发送消息出去的速度小于写入消息进去的速度,就会导致缓冲区写满,此时生产消息就会阻塞住compression.type,默认是none,不压缩,也可以使用lz4压缩,压缩之后可以减小数据量,提升吞吐量,但是会加大producer端的cpu开销batch.size,设置每个batch的大小,如果batch太小,会导致频繁网络请求,吞吐量下降;如果batch太大,会导致一条消息需要等待很久才能被发送出去,而且会让内存缓冲区有很大压力,过多数据缓冲在内存里,默认值是:16384,就是16kb,也就是一个batch满了16kb就发送出去,一般在实际生产环境,这个batch的值可以增大一些来提升吞吐量,可以自己压测一下linger.ms,这个值默认是0,意思就是消息必须立即被发送,但是这是不对的,一般设置一个100毫秒之类的,这样的话就是说,这个消息被发送出去后进入一个batch,如果100毫秒内,这个batch满了16kb,自然就会发送出去,但是如果100毫秒内,batch没满,那么也必须把消息发送出去了,不能让消息的发送延迟时间太长,也避免给内存造成过大的一个压力2、消息大小及请求max.request.size:这个参数用来控制发送出去的消息的大小,默认是1048576字节,也就1mbrequest.timeout.ms:这个就是说发送一个请求出去后,有一个超时的时间限制,默认是30秒,如果30秒都收不到响应,那么就会认为异常,会抛出一个TimeoutException来让我们进行处理3、acks:acks=0时producer根本不管写入broker的消息到底成功没有,发送一条消息出去,立马就可以发送下一条消息,这是吞吐量最高的方式,但是可能消息都丢失了,适用于大数据实时分析。acks=all或acks=-1:leader写入成功以后,必须等待其他ISR中的副本都写入成功,才可以返回响应说这条消息写入成功了,此时你会收到一个回调通知acks配合min.insync.replicas = 2实现消息不丢失,ISR里必须有2个副本,一个leader和一个follower,最最起码的一个,acks = -1,每次写成功一定是leader和follower都成功才可以算做成功,leader挂了,follower上是一定有这条数据,不会丢失retries = Integer.MAX_VALUE,无限重试,如果上述两个条件不满足,写入一直失败,就会无限次重试acks=1:只要leader写入成功,就认为消息成功了,默认给这个其实就比较合适的,还是可能会导致数据丢失的4、消息重试max.in.flight.requests.per.connection:参数设置为1,这样可以保证producer同一时间只能发送一条消息。消息重试是可能导致消息的乱序的,因为可能排在后面的消息都发送出去了,现在收到回调失败了才在重试,此时消息就会乱序,所以可以使用这个参数,但是吞吐量会下降。retry.backoff.ms:两次重试的间隔默认是100毫秒
wheelSize:时间格数量时间轮的总时间跨度(interval)就是tickMs * wheelSize
kafka内部每隔一段时间就compact保留最新的那条数据即可
0 条评论
下一页
为你推荐
查看更多