大麦任务调度系统mp-tts
2022-05-15 17:57:15 0 举报
大麦任务调度系统mp-tts
作者其他创作
大纲/内容
否
线程池task.execute(),真正调度业务系统执行的接口
TaskDispatchPool.Worker.run(task)
基于metaq的任务分发执行调度器MetaqTaskDispatcherFacade
客户端注册任务taskSchedualClient--register(task)
TaskScheduleClient1.配置consumer2.taskExecutorContext上下文中加入执行器
false
将任务持久化到db任务参数表任务表此处用编程式事务控制
根据客户端执行返回值TaskCallback执行是否成功
init()
初始化本地任务执行线程池初始化任务执行的路由选择器将TaskDispatcherFacade服务提供出去发布监控服务
唤醒任务
执行任务
任务执行选择器,也就是需要执行的excutor
根据task对象生成任务key
任务执行1.任务如何执行2,任务失败如何重试
mp-tts
任务注册
TriggerUnExecutedTasksTimerListener
是
任务注册成功
配置TaskScheduleService的hsf consumer
此处就是调用应用客户端的handle方法
调度业务系统去执行任务dispatchTask(task)
将配置的taskScheduleService作为构造函数入参传入
任务重复注册
构造方法传入
亮点:1.将任务和调度解偶2. 每个任务的handle方法由各个应用本身去做,实现了调度自由3.利用机器分组保证8个分组的机器扫描对应的8个分表中的任务缺点:对数据库要求比较高,与DB频繁交互,不断扫描数据库,拿出任务进行执行;执行任务,改变数据库状态。并且任务数据不断积累进一步思考:大众点评的xxl-job与此相似,可以借鉴
任务执行线程池
task.setExecuteIp(IPUtils.getLocalHostIP());task.setIp(task.getExecuteIp());...省略 callback = task.execute();此处就开始执行我们的客户端自定义的execute方法了
任务分发调度器
hsf方式
如:cn.damai.platform.tts.client.dispatch.dispatcher.TaskDispatcherFacade:1.0.0.pwy.mx-xxxx(应用)
cn.damai.platform.tts.timer.TTSTimerStartup#startlistener.tick()准备开始处理唤醒任务
metaq方式
true
判断是否达到最大重试次数
TaskDispatchPool
TriggerTimeoutTasksTimerListener
发布泛化调用服务
db状态改为待执行,根据客户端设置的重试次数,task.execute()进行重试
QuartzNotifyToBeExecutedTasksTimeListener
程序启动同时初始化QuartzNotifyToBeExecutedTasksTimeListener
注册任务
从数据库拿到任务,quartz定时唤醒,quartzNotify.notify(toBeExecutedTask)cn.damai.platform.tts.awakener.QuartzTaskAwakener#execute乐观锁更新状态为执行中此处的唤醒时间就是注册任务时设置的执行时间taskInvoker.invoke(task)
任务执行的路由选择器TaskExecutorRouter
客户端执行器
系统中有3个比较经典的timeListener
TaskScheduleService注册任务registerTask()
executorReference.get().submit(worker)
更新任务状态结束,无需重试
引入tts的应用,在配置文件中配置TaskScheduleClient后,被Spring容器管理,应用启动执行
持久化是否成功
未处理任务执行
超时任务执行
泛化调用HSFTaskDispatcherFacade.handle(task)
任务执行
任务执行器上下文配置taskExecutorContext
根据taskKey和env查db,tts_task_schedule表是否有正在调度的任务
基于HSF的任务分发执行调度器HSFTaskDispatcherFacade
@Scheduler(\"taskScheduleClient\")TaskScheduleClient
泛化调用
成功,更新任务状态结束,无需重试
该handle方法将任务,放入线程池中taskDispatchPool.put(task)
放入线程池异步
0 条评论
回复 删除
下一页