注意红色部分:源码+白话的方式,解析 线程基础&线程池&线程池分类&ThreadPoolExecutor核心方法
2021-10-13 19:46:24 0 举报
对线程池的基础总结理解和线程池的核心方法解析, 主要分析核心类ThreadPoolExecutor 线程池的各种状态 多线程:任务的存取(核心非核心) 任务的执行
作者其他创作
大纲/内容
类图
N
Y
获取任务再启动工作线程java.util.concurrent.ThreadPoolExecutor.Worker.run()
ThreadPoolExecutor
线程池
return;
获取ctl
中断当前线程状态
如果线程池还在接收任务,获取当前工作线程数
首先是获取当前线程的工作任务,如果有就执行,如果没有再到队列中获取。
completedAbruptly = false;
//阻塞,直到所有任务在shutdown后完成执行,或超时发生,或当前线程中断(以先发生的为准)。
获取加入新worker后工作线程的数量,并与largestPoolSize 进行比对,如果大于它,就更新
public SynchronousQueue() {this(false);}
优化版通过线程池来完成
线程池类型
final int COUNT_BITS = Integer.SIZE - 3;//29final int CAPACITY = (1 << COUNT_BITS) - 1;//2的29次方-1 //运行状态final int RUNNING = -1 << COUNT_BITS;//2的COUNT_BITS次方 的负数//停机状态 调用shutdown()final int SHUTDOWN = 0 << COUNT_BITS;//0//停止状态 调用shutdownNow()final int STOP = 1 << COUNT_BITS;//2的0+COUNT_BITS次方//整理状态 线程池和队列都是空或者线程池是空final int TIDYING = 2 << COUNT_BITS;//2的1+COUNT_BITS次方//终止状态 调用终止的钩子方法terminated完成时final int TERMINATED = 3 << COUNT_BITS;//2的2+COUNT_BITS次方-1
return false;
抽象类AbstractExecutorService:将submit实现,并在内部调用execute方法,但不对execute方法进行实现
ExecutorService newWorkStealingPool = Executors.newWorkStealingPool();
线程池类图&分类
try
//构造方法将自定义的Callable传入public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
//获取当前的工作线程数是否为0else if (workerCountOf(recheck) == 0)
w.lock();
int c = ctl.get();
创建一个线程池,该线程池重用在共享无界队列上运行的固定数量的线程。在任何时候,最多n个线程将是活动的处理任务。如果在所有线程都处于活动状态时提交其他任务,它们将在队列中等待,直到有线程可用。如果任何线程在关机之前的执行过程中由于故障而终止,那么如果需要执行后续任务,将使用一个新线程代替它。池中的线程将一直存在,直到它显示调用shutdown。
jdk1.5之前
工作线程获取工作任务时,是否需要设置超时时间的标记
finally { if (! workerStarted) addWorkerFailed(w); }
核心属性
任务放在非核心工作线程
队列容量,为Integer.MAX_值public LinkedBlockingQueue() {this(Integer.MAX_VALUE); }
用传入的Runnable创建Worker
final Thread t = w.thread;
尝试启动工作线程
10000694
package com.lijie.thread;import java.util.ArrayList;public class Test3 { public static void main(String[] args) throws Exception { final ArrayList<String> list=new ArrayList<String>(); long start = System.currentTimeMillis(); for (int i = 0; i < 10000; i++) { Thread thread=new Thread(){ @Override public void run() { list.add(Thread.currentThread().getName()); } }; thread.start(); //join方法在这里就是如果thread线程还活着,那main线程就会一直等待,//保证了每次循环所创建的thread执行完了,再进行下一次循环,保证数据的准确性 thread.join(); } System.out.println(list.size()); System.out.println(System.currentTimeMillis()-start); }}
//启动有序关机,执行以前提交的任务,但不接受新任务。如果调用已经关闭,则调用没有其他效果。此方法不会等待以前提交的任务完成执行
break retry;//修改成功退出标记位置的循环,也就是说直接终止了内外层循环,执行标记位循环完后面的代码。
主池控制状态ctl是一个原子整数,包含两个概念字段:workerCount:指示线程的有效数量runState:指示是否正在运行、正在关闭等。
int wc = workerCountOf(c);
task.run();
ForkJoinPool
getTask()
private RunnableScheduledFuture<?>[] queue =new RunnableScheduledFuture<?>[INITIAL_CAPACITY];private static final int INITIAL_CAPACITY = 16;private final ReentrantLock lock = new ReentrantLock();
wt.interrupt();
核心方法就是execute。为了兼容需要返回值的需求,又实现了submit方法,submit方法是通过Runnable创建RunnableFuture来获取返回值,不过最终也是内部调用execute方法来执行线程任务。而且submit会将传入的task类封装成jdk字段的自带的task会被封装成jdk自带的RunnableFuture。
N:重新获取c = ctl.get();
主池控制状态ctl
//再次判断工作线程是否已经被其他线程启动了if (t.isAlive())throw new IllegalThreadStateException();
工作线程数超过最大值,或超过最大线程数
线程的基础方式
continue retry;比较消除低位影响后的ctl与之前获取的rs的值是否不等,如果是就代表已经被其他线程修改了,跳过本次循环。接下来执行下一次的外层循环。
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
任务放到核心线程中
获取工作线程数workerCount
int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s;workerAdded = true;//修改worker添加状态为已完成
//首先判断是否有任务,有才操作,否则直接返回while (task != null || (task = getTask()) != null)
1、在执行任务的线程中将调用beforeExecute和afterExecute等方法,在这些方法中还可以添加日志、计时、监视或者统计信息收集的功能。2、无论任务是从run中正常返回,还是抛出一个异常而返回,afterExecute都会被调用。如果任务在完成后带有一个Error,那么就不会调用afterExecute。3、如果beforeExecute抛出一个RuntimeException,那么任务将不被执行,并且afterExecute也不会被调用。4、在线程池完成关闭时调用terminated,也就是在所有任务都已经完成并且所有工作者线程也已经关闭后,terminated可以用来释放Executor在其生命周期里分配的各种资源,此外还可以执行发送通知、记录日志或者手机finalize统计等操作。
if (workerAdded) { t.start(); workerStarted = true; }
如果线程池已中断不再处理任务或线程池已经关闭不再接收新任务只处理队列中的已有任务但队列为空时:循环递减WorkerCount并返回空
Throwable thrown = null;
通过实现Callable接口解决多线程没有返回的问题
ScheduledThreadPoolExecutor
+ public ScheduledThreadPoolExecutor(int corePoolSize)
定时线程池
if (workerCountOf(c) < corePoolSize)
final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); }
rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
区分单线程和多线程小demo&线程实现的基础方式
DelayedWorkQueue优先队列是定制的优先级队列,只能用来存储RunnableScheduledFutures任务。堆(堆结构是用数组实现的二叉树)是实现优先级队列的最佳选择,而该队列正好是基于堆数据结构的实现。
获取对应的线程
循环for (;;)
源码
finally { mainLock.unlock(); }
继承Thread类
workers.add(w);
runWorker(Worker w)具体逻辑
finally
for (;;)
将要启动的新worker加到workers中
ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
注意红色部分:源码+白话的方式,解析 线程基础&线程池&线程池分类&ThreadPoolExecutor核心方法
int c = ctl.get();int rs = runStateOf(c);
ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(10);
获取当前线程Thread wt = Thread.currentThread();获取工作线程的工作任务Runnable task = w.firstTask;//置空工作任务并解锁w.firstTask = null;w.unlock();boolean completedAbruptly = true;
检查是否可以根据当前池状态和给定的绑定(core或maximum)添加一个新的worker。如果是,则相应地调整worker计数,并且如果可能,将创建并启动一个新的worker,并运行firstTask作为其第一个任务。如果池已停止或符合关闭条件,此方法将返回false。如果线程工厂在被要求创建线程时失败,它也返回false。如果线程创建失败,或者是由于线程工厂返回null,或者是由于异常(通常是thread .start()中的OutOfMemoryError)),直接回滚。
Demo
1000011
task = null;w.completedTasks++;w.unlock();
再次判断当前任务的工作线程是否为空if (t != null)
//isRunning(int c) ===》c < SHUTDOWN==》为true就代表线程池为running状态//offer:如果没有超过最大容量,则立即将指定的元素插入到该队列中,任务排队if (isRunning(c) && workQueue.offer(command))
实现Callable接口,并作为参数创建FutureTask(Callable<V> callable)
加锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();
int rs = runStateOf(ctl.get());if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null))
如果运行的线程少于corePoolSize,启动新线程
jdk1.5
运行的工作线程数workerCount
&是都是1才是1.状态值都是2的29次方的倍数。最大也就是三倍。状态值&-2的29次方,结果都还是自身。-2的29次方二进制:11111111111111111111111111111111111000000000000000000000000000002的29次方:0000000000000000000000000000000000100000000000000000000000000000乘以2也就是01000000000000000000000000000000乘以3也就是01100000000000000000000000000000前19位只要有1,&2的29次方运算时,那些位置还都是1,只要后面20到32位上没有1,最终结果都是自身。
if三种情况都会调用核心方法addWorker
初始化超时标记boolean timedOut = false;
//获取当前主池控制状态ctlint c = ctl.get();int rs = runStateOf(c);//c & ~CAPACITY
boolean workerStarted = false;boolean workerAdded = false;Worker w = null;
if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))
解锁
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; }
Y:线程池状态值大于等于0(即非RUNNING状态,只有running状态才可以接收新任务),且不是下面的场景:(当前状态为停止状态,且等待的任务队列为空,但工作队列不为空)
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
RUNNING 正在运行:接受新任务和处理排队的任务SHUTDOWN 关闭:不接受新任务,但处理排队的任务STOP 停止:不接受新任务、不处理排队的任务和中断正在进行的任务TIDYING 整理:所有任务都已终止,workerCount为零,转换到状态整理的线程将运行terminated()钩子方法TERMINATED 终止:terminated() 已完成这些值之间的数字顺序很重要,以便进行有序比较。运行状态随时间单调增加,但不需要达到每个状态。这些转变是:运行->关闭:在调用shutdown()时,可能隐式地在finalize()中(运行或关闭)->停止:在调用shutdownNow()时关闭->整理:当队列和池都为空时停止->整理:当池为空时清理->终止:当终止的()钩子方法完成时在awaitTermination()中等待的线程将在状态达到TERMINATED时返回
public void run() { runWorker(this);}
线程池执行器留出的扩展方法
如果ctl第20到32位上出现1,则会被消除,这样可以消除低位的影响compareAndIncrementWorkerCount方法会依次修改工作线程数。
return workerStarted;
w = new Worker(firstTask);
如果线程池不在接收新任务(线程池状态=STOP)或者如果当前线程已被中断(并清除中断状态)且当前线程池也是不再接收新任务;前面只要满足,调用wt.isInterrupted(如果还是中断状态返回true)如果还是
Executor
- void execute(Runnable command);
分类
代码
//在此处打上标记,用于后面直接跳至此处retry:
任务提交分配给线程池的优先级顺序:核心线程>队列>非核心线程任务执行顺序优先级:核心线程>非核心线程>队列
for (;;) 进入第一层循环
线程池核心方法源码解析ThreadPoolExecutor#execute&
largestPoolSize:只要通过mainLock加的worker,都要同步到这个数上。这个数记录的就是当前线程池最新的工作线程数。
基础
这里就是启动工作线程的方法,重点有两点:1、有俩个启动线程前后的预留的扩展口span style=\"font-size: inherit;\
N:进入第二层循环
int recheck = ctl.get();
//拒绝当前任务,抛出异常reject(command);
//当前线程池变成不是running状态//remove:如果该任务存在,则将其从执行器的内部队列中移除,从而使其在尚未启动时无法运行,并返回tureif (! isRunning(recheck) && remove(command))
对该方法的逻辑梳理
多线程join:会使主线程(或者说调用t.join()的线程)进入等待池并等待t线程执行完毕后才会被唤醒。并不影响同一时刻处在运行状态的其他线程。
//修改失败,再次获取ctlc = ctl.get(); //比较线程池状态与之前的是否发生变化if (runStateOf(c) != rs)
worker成功添加后,启动对应的线程,并标记worker已启动
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue;}
实现Runnable接口
如果worker没有被标记启动成功,则需要workers的移除wo新加的这个worker,并将前面cas加上的workerCount值减1
接口ExecutorService
1、工作线程数少于最大核心线程数,调用addWorker通过cas修改工作线程数和状态,并启动工作线程执行传入的工作任务。成功就return。失败就重新获取ctl。2、工作线程数已达到最大核心线程数。 线程池状态为running,且工作线程队列没有超过最大容量并成功将要执行的线程加到工作队列workQueue中, 重新获取当前线程池的ctl值。 2-1、当前线程池状态又变为非running,那就将加进去的工作任务从工作队列中移除并抛出异常 2-2、如果当前线程池的工作线程为0,调用addWorker通过cas修改工作线程数和状态,并启动工作线程。 因为这个任务在前面offer方法加到workQueue中了,而在第一次判断时,只有线程池状态变成非running抛异常前才会调用remove,将其从队列中移除了, 能走到这里,代表没有移除,那么启动worker时,只用传null启动一个worker,否则会导致任务执行两次。3、执行到这里,代表前面判断时,没有异常也没有空闲线程数,在最后再进行一次尝试,尝试启动一个非工作线程。4、都没有成功,启动拒绝策略,调用RejectedExecutionHandler实现类
workerCount是允许启动和不允许停止的工人数量。该值可能暂时不同于活动线程的实际数量,例如,当ThreadFactory在请求时无法创建线程,并且退出线程在终止前仍在执行簿记时。用户可见池大小将报告为工作集的当前大小。运行状态提供主要的生命周期控制,具有以下值:RUNNING 正在运行:接受新任务和处理排队的任务SHUTDOWN 关闭:不接受新任务,但处理排队的任务STOP 停止:不接受新任务、不处理排队的任务和中断正在进行的任务TIDYING 整理:所有任务都已终止,workerCount为零,转换到状态整理的线程将运行terminated()钩子方法TERMINATED 终止:terminated() 已完成这些值之间的数字顺序很重要,以便进行有序比较。运行状态随时间单调增加,但不需要达到每个状态。这些转变是:运行->关闭:在调用shutdown()时,可能隐式地在finalize()中(运行或关闭)->停止:在调用shutdownNow()时关闭->整理:当队列和池都为空时停止->整理:当池为空时清理->终止:当终止的()钩子方法完成时在awaitTermination()中等待的线程将在状态达到TERMINATED时返回检测从关闭到整理的转换比较复杂,因为队列在非空状态下可能变为空,在关闭状态下也可能变为空,但只能在看到workerCount为空后,如果workerCount为0(有时需要重新检查——见下文)终止。
外层for循环结束
任务放入队列
1、首先判断核心线程数是否需要设置超时时间,默认是false。2、如果不需要再判断是否工作线程数大于核心线程数,如果大于也要设置自动作废的超时时间注:allowCoreThreadTimeOut如果为false(默认),则核心线程即使在空闲时也保持活动状态。如果为true,则核心线程使用keepAliveTime超时等待工作。 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
public SynchronousQueue(boolean fair) { //false用栈(先进后出),true用队列FIFO(先进先出) transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();}
if (compareAndIncrementWorkerCount(c))//通过cas将当前工作线程+1
0 条评论
下一页