Elastic-job
2021-12-27 23:17:01 0 举报
Elastic-job源码
作者其他创作
大纲/内容
RegistryCenterConnectionStateListenerimplements ConnectionStateListener
- jobName:String- serverService- instanceService- shardingService- executionService
2.3 创建作业调度控制器2.6 调度作业
ExecutorServiceObject
- threadPoolExecutor- workQueue:BlockingQueue<Runnable>
+createExecutorService()
LiteJobConfiguration
- typeConfig: JobTypeConfiguration- monitorExecution: boolean- maxTimeDiffSeconds: int- monitorPort: int- jobShardingStrategyClass: String- reconcileIntervalMinutes: int- disabled: boolean- overwrite: boolean
AbstractElasticJobExecutor
JobScheduler
+ setGuaranteeServiceForElasticJobListeners()+ init()+ createJobDetail()+ createElasticJobInstance()+ createScheduler()+ getBaseQuartzProperties()
JobInstance
- jobInstanceId
+ getIp():String
ShardingListenerManager
- jobName:String-configNode- instanceNode- serverNode- shardingService- ShardingTotalCountChangedJobListener- ListenServersChangedJobListener
根据不同的作业类型,返回对应的作业执行器。
ZookeeperConfiguration
+ attribute1:type = defaultValue+ attribute2:type- attribute3:type
+ operation1(params):returnType- operation2(params)- operation3()
AbstractJobListenerimplements TreeCacheListener
+ childEvent(params):returnType- dataChanged(params)
AbstractListenerManager
- jobNodeStorage:JobNodeStorage
+ start()+ addDataListern(listern TreeCacheListern)
JobRootConfiguration
+ getTypeConfig():JobTypeConfiguration
exception.RegExceptionHandler
注册中心
elastic-job-common-core.com.dangdang.ddframe.job.reg
属性
ShardingService
- jobName: String- leaderService- configService: ConfigurationService- instanceService- serverService- executionService- jobNodePath
+ setReshardingFlag() 设置需要重新分片的标记+ isNeedSharding():boolean 判断是否需要重分片+ shardingIfNecessary()+ blockUntilShardingCompleted()+ waitingOtherJobCompleted()+ resetShardingInfo(shardingTotalCount:int)+ getShardingItems(jobInstanceId:String)+ getLocalShardingItems()+ hasShardingInfoInOfflineServers()
ExecutorServiceHandler
+ createExecutorService()
不同服务的监听管理器,均继承AbstractListenerManager
LiteJob
- elasticJob- jobFacade
+ execute(context:JobExecutionContext ) + span style=\"font-size: inherit;\
zookeeper.ZookeeperRegistryCenter
2.2 设置当前作业分片总数2.4 添加作业调度控制器
作业注册中心的监听器管理者。管理者两类组件:不同服务的监听管理器,注册中心连接状态监听器
DataflowJobConfiguration
- coreConfig:JobCoreConfiguration- jobType:jobType.DATAFLOW- jobClass:String- streamingProcess
2.1 检查 作业执行环境(校验本机时间是否合法)2.2 获取 当前作业服务器的分片上下文2.3 发布作业状态追踪事件(State.TASK_STAGING)2.4 跳过 存在运行中的被错过作业2.5 执行 作业执行前的方法2.6.5 注册作业完成信息2.7 执行 被跳过触发的作业2.8 执行 作业失效转移2.9执行 作业执行后的方法
com.dangdang.ddframe.job.config
SchedulerFacade
- jobName:String- configService 等各种服务- listenerManager
+ newJobTriggerListener()+ updateJobConfiguration(liteJobConfig)+ registerStartUpInfo(enabled)+ shutdownInstance()
base.CoordinatorRegistryCenter
线程池服务处理器注册表
com.dangdang.ddframe.job.lite.internal
elastic-job-lite-core.com.dangdang.ddframe.job.lite.internal
作业配置
2.6 执行多个作业的分片 span style=\"font-size: inherit;\
JobFacade
JobRegistry
ScriptJobExecutor
2.1更新作业配置
<Interface>base.RegistryCenter
调用 ExecutorServiceObject 的 #createExecutorService(....) 方法创建线程池
2.5.2 选举主节点2.5.3 持久化作业服务器上线信息2.5.4 持久化作业运行实例上线相关信息2.5.5 设置需要重新分片的标记2.5.6 初始化作业监听服务2.5.7 初始化 调解作业不一致状态服务
JobExecutorFactory
+ getJobExecutor():AbstractElasticJobExecutor
使用 ExecutorServiceHandler 创建线程池。
作业注册表.维护了单个 Elastic-Job-Lite 进程内作业相关信息,可以理解成其专属的 Spring IOC 容器。因此,其本身是一个单例。
2.1 更新作业配置2.5 注册作业启动信息
SimpleJobExecutor
ExecutorServiceHandlerRegistry
作业配置1.创建作业调度器2. 初始化
+ getJobType()+ getJobClass()+ getCoreConfig()
ListenerManager
- jobNodeStorage:JobNodeStorage- electionListenerManager- shardingListenerManager- failoverListenerManager- monitorExecutionListenerManager- shutdownListenerManager- triggerListenerManager- rescheduleListenerManager- guaranteeListenerManager- regCenterConnectionStateListener
+ startAllListeners()
SimpleJobConfiguration
- coreConfig:JobCoreConfiguration- jobType:jobType.SIMPLE- jobClass:String
ScriptJobConfiguration
- coreConfig:JobCoreConfiguration- jobType:jobType.SCRIPT- jobClass:String- scriptCommandLine:String
guaranteeListenerManager
2.5.1开启 所有监听器
注册中心,定义了简单的增删改查注册数据和查询时间的接口方法。
JobCoreConfiguration
- jobName:String- cron:String- shardingTotalCount:int- shardingItemParameters:String- jobParameter:String- failover:boolean- misfire:boolean- description:String- jobProperties
ElectionListenerManager
作业执行1.创建作业执行器2.执行
CloudJobConfiguration
- appName:String- typeConfig:JobTypeConfiguration- cpuCount: double- memoryMB: double- jobExecutionType:CloudJobExecutionType- beanName:String- applicationContext:String
用于协调分布式服务的注册中心,定义了持久节点、临时节点、持久顺序节点、临时顺序节点等目录服务接口方法,隐性的要求提供事务、分布式锁、数据订阅等特性。
JobScheduleController
- scheduler- jobDetail- triggerIdentity: String
+ scheduleJob(cron:String)+ rescheduleJob(cron:String)+ createTrigger(cron:String)+ isPaused():boolean+ pauseJob()+ resumeJob()+ triggerJob()+ shutdown()
ElasticJobListener
ExecutorService
elastic-job-common-core
DefaultExecutorServiceHandler
封装了CoordinatorRegistryCenter,使用jobNodePath.getFullPath(node)函数,只需要传作业节点名称node即可
DataflowJobExecutor
使用 Apache Curator 进行 Zookeeper 注册中心。
JobNodeStorage
- regCenter- jobName- jobNodePath
elastic-job-lite-core
注册中心监听器
收藏
收藏
0 条评论
下一页