AQS 源码分析
2022-02-14 18:07:21 0 举报
AQS(AbstractQueuedSynchronizer)是Java并发包中的一个抽象类,它为构建锁和其他同步器提供了一个框架。AQS的主要组件是一个FIFO队列,用于存放等待线程。当一个线程请求锁时,它将被放入队列中等待。一旦锁可用,队列中的头线程将被唤醒并获取锁。AQS还支持公平锁和非公平锁,公平锁要求等待时间最长的线程获得锁,而非公平锁则不保证这一点。AQS的设计使得它可以很容易地实现各种同步器,如ReentrantLock、CountDownLatch等。总之,AQS是一个灵活且强大的并发工具,它为Java程序员提供了一种简单的方式来解决并发问题。
作者其他创作
大纲/内容
就好比一个企业里面有10个(core)正式工的名额,最多招10个正式工,要是任务超过正式工人数(task > core)的情况下,工厂领导(线程池)不是首先扩招工人,还是这10人,但是任务可以稍微积压一下,即先放到队列去(代价低)。10个正式工慢慢干,迟早会干完的,要是任务还在继续增加,超过正式工的加班忍耐极限了(队列满了),就的招外包帮忙了(注意是临时工)要是正式工加上外包还是不能完成任务,那新来的任务就会被领导拒绝了(线程池的拒绝策略)
阻塞的位置 唤醒的位置
SYNC aqs子类await
否
获取锁成功 执行业务逻辑finally unlock独占锁的逻辑复用
条件队列await
AQS父类acquireSharedInterruptibly(1)
条件队列 trip.await();
tail初始化
release
使用CompletionService实现先获取的报价先保存到数据库//创建线程池ExecutorService executor = Executors.newFixedThreadPool(10);//创建CompletionServiceCompletionService<Integer> cs = new ExecutorCompletionService<>(executor);//异步向电商S1询价cs.submit(() -> getPriceByS1());//异步向电商S2询价cs.submit(() -> getPriceByS2());//异步向电商S3询价cs.submit(() -> getPriceByS3());//将询价结果异步保存到数据库for (int i = 0; i < 3; i++) { Integer r = cs.take().get(); executor.execute(() -> save(r));}
doSignalAll(first);
threadlocal get方法 获取当前线程 并通过线程拿到 threadlocalmap的引用set方法设置共享的内容对象会把threadlocal 对象作为key 放到当前线程的threadlocalmap 中
keepAliveTime
next=null
获取前驱waittes ta状态是否魏可被唤x w醒-1
T3 count 自减等于0
判断state状态等于0
1.管道(pipe)及有名管道(namedpipe):管道可用于具有亲缘关系的父子进程间的通信,有名管道除了具有管道所具有的功能外,它还允许无亲缘关系进程间的通信。2.信号(signal):信号是在软件层次上对中断机制的一种模拟,它是比较复杂的通信方式,用于通知进程有某事件发生,一个进程收到一个信号与处理器收到一个中断请求效果上可以说是一致的。3.消息队列(messagequeue):消息队列是消息的链接表,它克服了上两种通信方式中信号量有限的缺点,具有写权限得进程可以按照一定得规则向消息队列中添加新信息;对消息队列有读权限得进程则可以从消息队列中读取信息。4.共享内存(sharedmemory):可以说这是最有用的进程间通信方式。它使得多个进程可以访问同一块内存空间,不同进程可以及时看到对方进程中对共享内存中数据得更新。这种方式需要依靠某种同步操作,如互斥锁和信号量等。5.信号量(semaphore):主要作为进程之间及同一种进程的不同线程之间得同步和互斥手段。6.套接字(socket):这是一种更为一般得进程间通信机制,它可用于网络中不同机器之间的进程间通信,应用非常广泛。
LockSupport.unpark(s.thread);
头节点下一个节点获取锁成功跳出自旋 如果是中断唤醒时 自定义标示为真
可以尝试CAS加锁compareAndSetState
t.join()/t.join(long millis),当前线程里调用其它线程t的join方法,当前线程进入WAITING/TIMED_WAITING状态,当前线程不会释放已经持有的对象锁。线程t执行完毕或者millis时间到,当前线程进入就绪状态。
比较主内存中的值是否与工作内存中的值相等 相等则更新 ,不相等则重新读取做内存刷新工作内存
前驱节点是否是head
enq 入队自旋+CAS 操作 保证必须成功
唤醒
CountDownLatch
Thread.sleep(long millis),一定是当前线程调用此方法,当前线程进入TIMED_WAITING状态,但不释放对象锁,millis后线程自动苏醒进入就绪状态。作用:给其它线程执行机会的最佳方式。
fullyRelease
try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); }
cyclicBarrier
tryAcquire
容器CLH双向链表
doReleaseShared
内部类SYN继承AQS 模版设计模式 子类实现具体的控制实现 完成框架的算法
失败
nonfairTryAcquireShared 扣减state如果资源够则更改剩余资源数量 并返回 CAS 否则返回负数代表获取失败
tryReleaseShared模板方法子类实现
线程的异常 InterruptedException
T1 count 自减等于2
独占锁
LockSupport.park(this);//阻塞park中断可被唤醒 执行获取中断的状态返回中断的结果 Thread.interrupted()获取中断状态,并且擦除中断标记
内部通过阻塞队列+FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序,内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果
unparkSuccessor
tryRelease
LockSupport.unpark(s.thread);
对象头记录线程
单链表入队
pre
再次尝试获取锁
唤醒head节点的下一个节点就是下一个等待的node 原因因为获取锁之后node就是head
nodeT1.pre=原来的tailcsa 设置 tail =nodeT1原来的tail的next=nodeT1
T2 count 自减等于1
去除原来的head
sync子类
重量级锁
成功 并且剩余资源大于零
ReentrantLock.lockInterruptibly获取可中断锁
LockSupport.unpark(s.thread);唤醒
AQS ReentrantLock
acquireQueued
java 线程 中断标志位
阻塞线程
用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;4、priorityBlockingQuene:具有优先级的无界阻塞队列;
成功
条件队列转移到同步等待队列
SYNC aqs子类acquire
unlock
用来缓存 每个Thread对象都有一个ThreadLocal 的内部静态类ThreadLocal.ThreadLocalMap threadLocals = null;
判断条件
tryAcquire1上面的过程
单链表出队操作收尾节点都置为null private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; 先取出下一个节点 first.nextWaiter = null; 断开指向下一个节点的指针 transferForSignal(first); first = next; 将下一个节点赋值给已经清空指针的节点循环如此遍历所有节点直到没有下一个节点 断开所有指针 单链表出队 } while (first != null); }
handler线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:1、AbortPolicy:直接抛出异常,默认策略;2、CallerRunsPolicy:用调用者所在的线程来执行任务;3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;4、DiscardPolicy:直接丢弃任务;上面的4种策略都是ThreadPoolExecutor的内部类。当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
指定资源数量 没有获取到的线程阻塞排队限流场景
thread=null
nextGeneration
sync aqs 子类lock
线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime;
setHead(node)设置当前节点为头节点
线程池
不等于0 直接-1 入队阻塞
可被打断的锁
head=null
调用一个线程的interrupt() 方法中断一个线程,并不是强行关闭这个线程,只是跟这个线程打个招呼,将线程的中断标志位置为true,线程是否中断,由线程本身决定。
应用场景总结当需要批量提交异步任务的时候建议你使用CompletionService。CompletionService将线程池Executor和阻塞队列BlockingQueue的功能融合在了一起,能够让批量异步任务的管理更简单。CompletionService能够让异步任务的执行结果有序化。先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如Forking Cluster这样的需求。线程池隔离。CompletionService支持自己创建线程池,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。
非0
final ReentrantLock lock = this.lock; lock.lock();
dowait
枷锁是否成功
parkAndCheckInterrupt
await
执行优先栅栏的线程代码
自旋锁
准备阻塞等待获取锁死循环自旋 等待唤醒复用独占锁逻辑等待获取锁 ,获取失败再次阻塞 等待唤醒
是
轻量级
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;
非公平实现类SYNC 子类
shouldParkAfterFailedAcquire
还原state=0独占线程置空
state 加1可重入
返回节点
场景:线程上下文可以共享 session 数据库连接 线程之间的隔离
next
3.阻塞(BLOCKED):表示线程阻塞于锁。4.等待(WAITING):进入该状态的线程需要等待其他线程做出一些特定动作(通知或中断)。5.超时等待(TIMED_WAITING):该状态不同于WAITING,它可以在指定的时间后自行返回。6. 终止(TERMINATED):表示该线程已经执行完毕。
将前驱节点状态cas改为可被唤醒-1
线程状态
获取锁
返回成功
如条件队列 单链表尾部,阻塞线程
count 计数器partys 副本
maximumPoolSize
结果汇总人满发车
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
enq 入队
node
Y 满了
doAcquireInterruptibly等同与 acquireQueued当中断发生时抛出异常
小于零代表资源数不等于零
加锁 await
waitestate节点的生命状态:信号量 SIGNAL = -1 //可被唤醒 CANCELLED = 1 //代表出现异常,中断引起的,需要废弃结束 CONDITION = -2 // 条件等待 PROPAGATE = -3 // 传播 0 - 初始状态Init状态
AQS父类sync.releaseShared(1);
释放锁
acquire父类AQS的方法
waitStatus
threadlocal
队列为空初始化
添加到工作队列
pre=null
tryReleaseShared
处理中断异常cancelAcquire
线程池监控public long getTaskCount() //线程池已执行与未执行的任务总数public long getCompletedTaskCount() //已完成的任务数public int getPoolSize() //线程池当前的线程数public int getActiveCount() //线程池中正在执行任务的线程数量
LockSupport.park(this);//阻塞 Thread.interrupted()获取中断状态,并且擦除中断标记
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
如果还有资源
tryAcquireShared
doReleaseShared 继续唤醒后续节点性能提升
不等于0
AQS模版方法tryReleaseShared
NonfairSync 非公平所实现类
T3 unlock
Semaphore
偏向锁
共享锁 在获取资源后可以传播唤醒后续节点
N
返回自定义中断标示 默认为false 当节点被阻塞之后被中断唤醒标示成功
addWaiter
countDown
Y小于
去除原来的head 出队
条件队列转入同步等待队列
setHead(node)设置当前节点为头节点
如果是第一个等待节点则需要唤醒后续的健康节点
Node node = addConditionWaiter();
acquire父类AQS的方法release
Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev;
// 前半段: await 释放锁,进入条件队列单链表,然后阻塞线程 T1 T2 加入条件队列 等待被唤醒 //过渡阶段:满足条件的时候 被其他调用singal/singalAll的线程唤醒 (前提:要在同步队列中) // 调用singal/singalAll的线程: 条件队列转同步队列, // 可以在释放锁的时候唤醒head的后续节点所在的线程 // 后半段: 获取锁( 如果有竞争,cas获取锁失败,还会阻塞), // 释放锁(唤醒同步队列中head的后续节点所在的线程) // 后半段的逻辑其实就是独占锁的逻辑
node初始化节点
工具类
CompletionService原理
CountDownLatch小于零代表资源数不等于零 不等于零就入队阻塞
Semaphore共享锁
condition 条件等待队列 单向链表释放锁资源线程阻塞将节点带条件等待队列
用前驱节点的状态判断下一节点是否可以被阻塞
sync aqs 子类调用父类AQS的release
obj.notify()唤醒在此对象监视器上等待的单个线程,选择是任意性的。notifyAll()唤醒在此对象监视器上等待的所有线程。
suspend() 和 resume() 方法:挂起 恢复 过期方法 挂起不释放资源
当前线程数是否小于最大线程数量
waitStatus=0
head初始化成空node
1. 初始(NEW):新创建了一个线程对象,但还没有调用start()方法。
tail =null
workQueue
T2
trip.signalAll()
队列是否满
synchronized
设置独占
能够获取到,节点出队,并且把head往后挪一个节点,新的头结点就是当前节点
内存泄露的情况使用方法:对象不再使用 内存却没有释放 threadlocalmap的生命周期和thread一样长 threadlocal 是个弱引用每次使用完ThreadLocal都调用它的remove()方法清除数据将ThreadLocal变量定义成private static,这样就一直存在ThreadLocal的强引用,也就能保证任何时候都能通过ThreadLocal的弱引用访问到Entry的value值,进而清除掉
同步队列unlock时候会唤醒头结点的 下一个节点
do{}while 循环条件队列全部出队 并入队同步等待队列
0
等待唤醒 暂停cpu调度如果线程是因为中断唤醒的时候设置自定义中断标示为true
方法里如果抛出InterruptedException,线程的中断标志位会被复位成false,如果确实是需要中断线程,要求我们自己在catch语句块里再次调用interrupt()。也只是打标志 中断与否自行控制
当前线程可以安全被parkAndCheckInterrupt用来阻塞线程
阻塞 LockSupport.park(this);
核心思想
ArrayBlockingQueue
doReleaseShared唤醒
cas waitstate -2改为 0
资源不是零就阻塞countDown就释放一个资源 资源数为0的时候就唤醒 共享传播唤醒所有线程场景一 发令枪:模拟并发多线程阻塞等待一起执行场景二 一个线程等待汇总结果 其他线程全部结束 后等待线程唤醒
tryAcquireShared 模板方法子类实现
如果时中断唤醒获取锁需要再次处理中断
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) //共享锁
Thread.yield(),一定是当前线程调用此方法,当前线程放弃获取的CPU时间片,但不释放锁资源,由运行状态变为就绪状态,让OS再次选择线程。作用:让相同优先级的线程轮流执行,但并不保证一定会轮流执行。实际中无法保证yield()达到让步目的,因为让步的线程还有可能被线程调度程序再次选中。Thread.yield()不会导致阻塞。该方法与sleep()类似,只是不能由用户指定暂停多长时间。
aqs 父类
lock.lock
acquireQueued ,准备阻塞等待获取锁死循环自旋
doAcquireSharedInterruptibly
提交任务数量小于 corePoolSize
static方法interrupted() 判定当前线程是否处于中断状态,同时中断标志位改为false。
线程方法
释放同步锁资源
锁的升级
移除前驱节点是cancel的节点 将自己的状态改为canceled 等待后面的节点将自己移除
lockInterruptibly
线程池流程
创建新的核心线程执行任务
有资源扣减成功 并且扣减结果为0
线程阻塞的位置唤醒的时候回通过共享所得传播机制继续唤醒其他线程 因为资源为0 其他所有线程的获取锁方法都为 1 都可以继续唤醒
N
thread=T1
模板方法不同子类实现不一样
put
准备入队 阻塞 doAcquireSharedInterruptibly(arg);
不断循环占用cpu
T1线程通过自旋+cas插入尾部成功
如果自己是为节点cas将自己的前驱设置为为节点 尾部前移
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error(\"Maximum permit count exceeded\
不是需要阻塞
准备入条件队列 释放锁 在阻塞线程
2. 运行(RUNNABLE):Java线程中将就绪(ready)和运行中(running)两种状态笼统的称为“运行”。线程对象创建后,其他线程(比如main线程)调用了该对象的start()方法。该状态的线程位于可运行线程池中,等待被线程调度选中,获取CPU的使用权,此时处于就绪状态(ready)。就绪状态的线程在获得CPU时间片后变为运行中状态(running)。
释放锁过程
计数器自减等于0
非公平的实现 lock
创建新临时的线程
禁止指令重排,保证可见性,不能保证原子性
Y 小于
ThreadLocal、InheritableThreadLocal、父子线程的浅拷贝TransmittableThreadLocal线程池 避免都复用第一个提交的线程 改为从提交的任务的线程中复制threadlocal
AQS父类 sync.acquireSharedInterruptibly(1);
volatile
while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
AQS ReentrantLock unlock
先锁定资源 awite的线程阻塞排队 当所有资源释放之后再执行awite异步转同步的场景
T1
obj.wait(),当前线程调用对象的wait()方法,当前线程释放对象锁,进入等待队列。依靠notify()/notifyAll()唤醒或者wait(long timeout) timeout时间到自动唤醒。
结果为负数 获取失败的需要入队阻塞 覆改前驱节点的装状态为 -1
入队同步等待队列
nonfairTryAcquire尝试获取非公平锁
单利双重检查class Singleton{ private volatile static Singleton instance = null; private Singleton() { } public static Singleton getInstance() { if(instance==null) { synchronized (Singleton.class) { if(instance==null) instance = new Singleton(); } } return instance; }}
不是
CAS枷锁原子操作 只能有一个成功
线程自我处理中断逻辑线程正在处理重要的逻辑时候不能强制打断 stop禁用的原因 线程中断是协作式的
setHeadAndPropagate传播
阻塞队列
addWaiter加入同步等待队列 尾部 把当前线程的引用传入node 指定mode 独占 或者共享
失败 资源小于0
SYNC内部类AQS子类releaseShared
加锁
selfInterrupt 因为在阻塞唤醒时擦除过一次中断信号
拒绝策略来处理
acquireInterruptibly * 与acquireQueued逻辑相似,唯一区别节点还不在队列当中需要先进行入队操作
thread=T2
自旋
判断当前独占线程是否是自己
corePoolSize 核心线程数
枷锁
Thread.currentThread().interrupt()
对象类型整形值long型值cmpxchg执行
自旋➕cas释放资源state增加回去
transferForSignal(first);
0 条评论
下一页