powerjob-work
2020-07-29 20:18:44 28 举报
PowerJob-Work 是一款强大的分布式任务调度与执行框架,致力于解决大规模分布式系统中的任务调度问题。它具有简单易用、高效稳定、高可扩展性等特点,能够帮助企业轻松实现复杂的业务逻辑和数据处理。通过 PowerJob-Work,开发者可以轻松地将任务拆分成多个子任务,并分配给不同的执行器进行处理,从而实现任务的并行执行和负载均衡。同时,PowerJob-Work 还提供了丰富的监控和管理功能,帮助用户实时了解任务执行情况,确保系统稳定运行。总之,PowerJob-Work 是企业实现高效、可靠、稳定的分布式任务调度的理想选择。
作者其他创作
大纲/内容
创建scheduledPool线程池
初始化实例变量
ServerDiscoveryService#discovery()请求http://xxx/server/acquire?appId=1&tServer=xx获取server地址
是否创建异常
1.提交任务到线程池执行
执行成功||执行失败
结束
FrequentTaskTracker#initTaskTracker
CommonTaskTracker#initTaskTracker
普通计算节点,处理来自 TaskTracker 的请求
是
OhMyWorker#afterPropertiesSet()初始化工作
LogSubmitter#run向服务端server发送日志固定延迟 5S
根任务处理
处理TaskTrackerStartTaskReq请求提交Task到线程池执行
广播
创建PT
是否广播
查询task
派发任务到 ProcessorTracker
最终任务执行
最终任务处理
MapReduceProcessor#reduce
taskPersistenceService#getAllTaskResult查询所有Task执行结果,reduce阶段 或 postProcess阶段 使用
固定延迟 5S
TaskTracker#fetchRunningStatus()查询任务实例的详细运行状态返回给Server端的InstanceServer
ProcessorTracker#initTimingJob()初始化定时任务
BasicProcessor#process
是否广播执行
开始
处理server的任务执行请求和worker的task请求
默认
9.初始化定时任务
上报 worker正在执行 状态
ProcessorRunnable#reportStatus上报Task状态
8.服务发现 ServerDiscoveryService#discovery()
ProcessorTracker#initProcessor()初始化Processor处理器
创建TaskTracker
ScheduledExecutorService timingPool 线程池
span style=\"font-size: inherit;\
ProcessorTracker#initThreadPool()初始化 线程池
TaskTracker#create调用构建方法 构建基本属性
ProcessorRunnable#run任务执行线程
否
ProcessorTracker#submitTask
所有的TaskTracker都保存在TaskTrackerPool
ProcessorTracker#ProcessorTracker(TaskTrackerStartTaskReq req)通过构造方法创建ProcessorTracker
TaskTracker#updateTaskStatus更新Task状态
ServerDiscoveryService#discovery()定时更新server地址固定频率 10S
TaskTrackerReportInstanceStatusReq向服务端发送创建失败请求
修改 worker执行失败状态
启动成功
处理TaskTrackerStopInstanceReq停止ProcessorTracker
更新Task状态
MapReduce
TaskTrackerActor#onReceiveProcessorReportTaskStatusReq
构建TaskContext上下文
taskPersistenceService#getAllUnFinishedTaskByAddress获取本地 ProcessorTracker 未完成的任务
ProcessorTracker#CheckerAndReporter.run定时向TaskTracker汇报 固定频率10S一次
秒级任务会自己维护一个计数器(triggerTimes)按照设置的频率生成子任务subInstance固定频率通过scheduleAtFixedRate调度固定延迟通过schedule调度
正常任务处理
WorkerHealthReporter#run向服务端server发送Worker 的心跳固定频率 15S
查询数据
固定延迟 2S
taskTracker#broadcast生成广播任务
上报执行成功或失败状态
BroadcastProcessor#preProcess
7.初始化存储TaskPersistenceService.INSTANCE.init()用户目录下/power-server/H2 DB
taskTracker#dispatchTask
更新task状态
taskPersistenceService.batchSave批量保存Task数据到本地DB
tell(AskResponse.succeed(null))回复接受成功
SpringApplication.Run
BroadcastProcessor#postProcess
构建子任务subTaskList发送接收子任务结果
收藏
收藏
0 条评论
下一页