kafka 延迟任务定时器架构
2022-05-27 20:45:52 0 举报
kafka 延迟任务定时器架构
作者其他创作
大纲/内容
已构建的分层时间轮
tickMs: Long = 1wheelSize: Int = 20
ShutdownableThread
timingWheel
overflowWheel
tickMs=tickMs * wheelSizewheelSize: Int = 20
超时时间不在本层
TimerTaskList
如果任务加入时间轮返回false,并且任务还未取消
拉取的任务不为空
poll
任务的超时时间在当前时间轮的刻度范围之外
taskExecutor = Executors.newFixedThreadPool(1)
上一层时间轮引用
任务添加到 bucket
doWork
offer
从延迟队列拉取任务阻塞超时 200ms
addTimerTaskEntryTimerTaskEntry
ExpiredOperationReaper
add TimerTask
任务的超时时间在当前时间轮的刻度范围内
超时任务收割机
超时时间 bucket.getExpiration
advanceClock
timerTaskEntry保存到时间轮
添加调度任务
创建上一层时间轮
推动时间轮advanceClock
delayQueue DelayQueue[TimerTaskList]
We only need to enqueue the bucket when its expiration time has changed
taskExecutor.submit(timerTaskEntry.timerTask)
Remove all task entries and apply the supplied function to each of them
bucket.flush
SystemTimer
执行任务
timeTask包装为 TimerTaskEntry
reinsert
delayQueue
0 条评论
下一页