ForkJoin 处理流程
2021-06-24 11:27:00 0 举报
Java ForkJoinPool 的原理流程
作者其他创作
大纲/内容
20. join()
18. compute()
14. w.runTask(t)
17. exec()
任务提交
7.2 new WorkQueue()
任务处理
8. wt.start()
2. 如果任务队列数组已经存在,且随机选中的偶数下标的任务队列不为 null 等等条件成立,使用 CAS font color=\"#ff0000\
ForEachTask
7. fac.newThread()
scan() 方法为working stealing 算法核心,内部会一直轮询任务队列数组wokrQueues中的每一条任务队列直到成功偷取一个任务或者工作线程终止才会退出
6. createWorker()
处理任务时先执行偷取的任务,再执行私有任务队列中的任务
push()
((ForkJoinWorkerThread)t).workQueue.push(this)
externalSubmit()内部是一个for空循环,跳出逻辑主要结合runState来控制,内部工作分为3步:1. 完成任务队列数组 workQueues的初始化,扭转 runState2. 计算出一个workQueues数组偶数下标,如该位置没有任务队列,新建一个 Submission Queue 队列,并将其添加到 workQueues 数组对应下标3. 计算出的workQueues数组偶数下标位置已经有任务队列,将任务 CAS 入队,标记任务为已提交,并调用 signalWork(),跳出空循环
进入子类实现,以 ForEachTask 为例
11. pool.runWorker(workQueue)
15.1 (currentSteal = task).doExec()
3. externalSubmit()
工作线程启动之后,调用其run()方法开始工作
9. fork()
19. taskToFork.fork() 任务分解
7.2 为工作线程创建 WorkQueue,并将这条队列放入任务队列数组 workQueues 奇数下标
5. tryAddWorker()
1. invoke()
join()等待任务完成并获取处理结果
21. doJoin()
WorkQueue
16. doExec()
7.2 this.workQueue = pool.registerWorker(this)
4. signalWork()
12. w.growArray()
5. 如果当前只有很少的工作线程,需要去创建工作线程
9. 判断当前线程是否是 ForkJoinWorkerThread 线程,如果是说明是内部大任务分解的小任务,直接将任务插入其私有队列,如果不是说明是外部提交,需要走外部任务提交流程
ForkJoinWorkerThread
15.2 execLocalTasks()
runWork 开始初始化 WorkQueue 工作队列中的 array 数组,并开启了for空循环一直调用scan()方法扫描偷取任务,获取到任务后即执行
CountedCompleter
externalPush(this)
进入子类实现,以 CountedCompleter 为例
7.1 ForkJoinWorkerThread()
2. externalPush()
newThread()
ForkJoinPool
13.scan()
DefaultForkJoinWorkerThreadFactory
10. run()
ForkJoinTask
0 条评论
下一页