xxl-job 2.3.1源码
2022-12-09 03:02:32 1 举报
AI智能生成
简短的语言表述XXL-JOB从调度中心到执行器的整个流程 xxl-job源码地址:https://github.com/xuxueli/xxl-job.git
作者其他创作
大纲/内容
调度中心:xxl-job-admin
启动加载配置XxlJobAdminConfig:底层使用MySQL实现
1,加载JobTriggerPoolHelper:调度执行器,即rpc调用
1.1,创建快线程池
1.2,创建慢线程池
2,启动registryOrRemoveThreadPool线程池:接收执行器的心跳、注册、下线
3,启动registryMonitorThread线程:移除过期的执行器,更新执行器组的地址
4,启动monitorThread线程:执行器执行失败的任务进行告警
如果有配置失败重试,则重试:JobTriggerPoolHelper.trigger
5,启动monitorThread线程:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败
6,启动callbackThreadPool线程池:记录执行器执行结果
7,启动logrThread线程:每分钟上报执行器成功和失败次数,并清除7天前的数据
8,启动scheduleThread线程:选择执行需要被执行的任务
8.1,每4~5秒调度一次
8.2,MySQL自动提交关闭
8.3,上锁:select * from xxl_job_lock where lock_name = 'schedule_lock' for update
这也是为什么调度器支持集群模式
8.4,从MySQL中找到曾经至未来5秒内需要被调度的任务
缺点:xxljob不支持毫秒级别的调度
8.5,处理过期的任务:遍历循环检查查询出来的任务需要被执行的时间是否已经超过(查询MySQL表的时间 - 5秒)
如果已经超过,且调度策略 == 立即执行一次,则调度:JobTriggerPoolHelper.trigger
8.6,处理需要被执行的任务:遍历循环检查查询出来的任务需要被执行的时间是否已经超过查询MySQL表的时间
如果当前任务下次触发时间在未来的5秒内,则放入步骤9的线程队列
8.7,处理需要被执行的任务:排除步骤8.5、步骤8.6外的任务,根据任务的下次触发时间算出在1分钟刻度的多少秒,并放入步骤9的线程队列
8.8,更新下次调度时间
8.9,更新处理过的任务列表
8.10,提交事务
8.11,MySQL自动提交开启
8.12,释放锁
9,启动ringThread线程:每秒执行未来2秒内需要被执行的任务(实现原理:1分钟的时间轮,每秒都挂载着待执行的任务链表)
核心调度方法:JobTriggerPoolHelper.trigger
1,一分钟内,一个任务调度耗时超过500毫秒的次数没有达到十次
则使用快线程池
2,一分钟内,同一个任务调度耗时超过500毫秒,且有十次及其以上
则使用慢线程池
3,根据路由策略调用对应的执行器:XxlJobTrigger.trigger
4,对于超过500ms的请求耗时,记录在超时记录字典中(key:任务id,value:超时次数)
每过一分钟,清空超时记录Map
5,路由策略 == 分片广播
调用每个执行器,传递至少三个参数:当前分片索引,分片总数,任务的阻塞策略
用于拆分一个耗时很大的任务
6,路由策略 != 分片广播
根据路由策略,找到一个执行器,并调用,传递至少三个参数:当前分片索引,分片总数,任务的阻塞策略
路由策略
FIRST(第一个):固定选择第一个机器
LAST(最后一个):固定选择最后一个机器
ROUND(轮询)
RANDOM(随机):随机选择在线的机器
CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上
LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举
LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举
FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度
大白话:就看先注册
BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度
大白话:执行器的任务队列是空的
SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务
web页面配置调度任务,参考:https://www.xuxueli.com/xxl-job/
执行器:xxl-job-core
启动方法XxlJobExecutor.start
1,初始化日志目录
如果日志目录不存在,则创建
2,生成调用调度中心接口的rpc类
如果调度中心是集群模式,则需要用调度中心地址要用英文逗号分割
3,启动localThread线程:如果配置了日志保留时间,则会每天定时清空超过日期的日志文件
4,启动triggerCallbackThread线程:把调度任务结果集合,回调给给所有注册中心
5,启动执行器rpc服务,接受调度中心调度指令:http服务器,基于netty实现
5.1,初始化netty,默认reactor模式
5.2,创建bizThreadPool线程池,无核心线程,最大线程200,等待队列长度2000
5.3,等待调度中心请求,若有则放入bizThreadPool线程池进行处理
5种请求类型
心跳请求:立即返回SUCCESS
空闲请求:从jobThreadRepository找到key为jobId的JobThread,若JobThread在运行或者JobThread的等待队列有值,则返回FAIL,否则返回SUCCESS
终止任务请求:从jobThreadRepository找到key为jobId的JobThread,并停止线程
日志请求:根据jobId找到请求日志
任务执行请求:执行任务
1,尝试根据运行模式找到JobHandler(任务处理器)
BEAN模式:任务以JobHandler方式维护在执行器端;需要结合 "JobHandler" 属性匹配执行器中任务
GLUE模式(Java):任务以源码方式维护在调度中心;该模式的任务实际上是一段继承自IJobHandler的Java类代码并 "groovy" 源码方式维护,它在执行器项目中运行,可使用@Resource/@Autowire注入执行器里中的其他服务
GLUE模式(Shell):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "shell" 脚本
GLUE模式(Python):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "python" 脚本
GLUE模式(PHP):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "php" 脚本
GLUE模式(NodeJS):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "nodejs" 脚本
GLUE模式(PowerShell):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "PowerShell" 脚本
2,若JobHandler(任务处理器)有对应的一个线程(JobThread)执行
2.1,任务的阻塞策略 == DISCARD_LATER,则直接返回FAIL
2.2,任务的阻塞策略 == COVER_EARLY,则新建一个线程(JobThread),并放入jobThreadRepository中,key为jobId
并把之前建立的JobThread线程终止
并把之前建立的JobThread线程终止
2.3,否则把当前任务调度请求放入JobThread的等待队列(triggerQueue)中
若当前调度请求的logId已经存在,则返回FAIL(利用triggerLogIdSet集合)
这样可以避免重复请求的问题
3,若JobHandler(任务处理器)没有一个线程执行,则新建一个线程(JobThread),并放入jobThreadRepository中,key为jobId
特殊说明:JobThread
1,调用IJobHandler.init方法
2,若triggerQueue有数据,则执行JobHandler.execute方法
并将执行结果放入triggerCallbackThread的等待队列中,等待triggerCallbackThread通知给所有的调度中心
并将执行结果放入triggerCallbackThread的等待队列中,等待triggerCallbackThread通知给所有的调度中心
JobHandler.execute方法是用户实现的方法
1,移除triggerLogIdSet中的logId
2,找到日志文件名
3,构建请求上下文
4,若任务设置了超时时间,则新建一个Thread并执行执行JobHandler.execute方法,利用返回的FutureTask来设置超时时间
有一个巧妙的点,新建的Thread执行结束或超时后,新建的Thread会调用interrupt方法,防止新建的Thread恶意进行sleep
5,若任务没有设置超时时间,则执行JobHandler.execute方法
6,生成调度日志
3,若90s内,triggerQueue一直没有数据,则当前JobThread会被终止,并从jobThreadRepository移除
4,若当前JobThread被终止,则将终止结果放入triggerCallbackThread的等待队列中,等待triggerCallbackThread通知给所有的调度中心
6,启动registryThread线程:每隔30秒,向所有的调度中心注册,包含:appname、ip地址
架构图
收藏
0 条评论
下一页