Java并发工具详解
2022-02-04 16:41:11 58 举报
AI智能生成
完整整理了Java并发包的原理 从底层到上层、从数据结构到示例代码,包含大部分工具的实现机制和使用方式 还有一些扩展知识在完善中,如NIO/分布式锁等
作者其他创作
大纲/内容
并发工具的基本原理:cas
atomic
基于cas提供对基本数据类型线程安全的操作
AtomicBoolean
AtomicReference
AtomicReferenceArray
并发工具的核心实现:AQS
原理
数据结构
共享资源
private volatile int state
同步队列(获取资源)
说明
同步队列针对资源,由AQS对象自己维护,包含所有等待获取资源的线程,操作方式为acquire/release
条件队列针对某一个条件,由Condition对象维护,new AQS().new Condition()之后才能使用;
ConditionObject是AQS的内部类,
从属于已存在的AQS对象
从属于已存在的AQS对象
因此,一个AQS对象:拥有一个资源线程队列;并可以创建0-N个条件线程队列(分别表示不同的条件)
后续若无特殊说明,所有的队列操作都是指同步队列
节点(Node)
线程状态:waitStatus
CANCELLED
1 因timeout或interrupt此节点已取消,已取消的线程不会再阻塞
SIGNAL
-1 此节点的后继节点正在阻塞中,因此在release或cancel时必须唤醒后继节点
CONDITION
-2 当前节点正处于条件队列中
在转移之前它将不会被用作同步队列的节点,转移的时候这个值会被设为0
在转移之前它将不会被用作同步队列的节点,转移的时候这个值会被设为0
PROPAGATE
-3 下一个acquireShared信号需要无条件传播
默认值:0
同步队列的前后节点:
用于实现同步队列的双链表
用于实现同步队列的双链表
volatile Node prev;
volatile Node next;
条件队列的节点
Node nextWaiter;
用于Condition时,表示条件队列的下一个节点
条件队列只需要一个简单的
单链表来保存条件等待中的节点。
单链表来保存条件等待中的节点。
用于同步队列时,表示当前模式
特殊值SHARED则表明当前为共享模式
特殊值SHARED则表明当前为共享模式
说明:同步队列和条件队列都用了Node来作为节点,但使用的是不同的构造方法;节点可以在队列间复用
如上所述,线程队列中nextWaiter属性仅用于标识独占/共享模式;条件队列中的nextWaiter表示下一个节点
如上所述,线程队列中nextWaiter属性仅用于标识独占/共享模式;条件队列中的nextWaiter表示下一个节点
封装的线程
volatile Thread thread;
构造方法
用于同步队列添加节点:addWaiter
用于条件队列添加节点
同步队列节点
头节点
private transient volatile Node head;
头结点要么是new Node,要么是上一个已经释放锁的节点
参见acquire/release流程
参见acquire/release流程
尾节点
private transient volatile Node tail;
条件队列(条件等待)
实现
基本用法
await/signal/signalAll
仅用于独占模式,如ReentrantLock#new Condition()
流程
await
条件等待时:
1.当前线程已经获取到锁,则其节点要么不在同步队列,要么一定是头结点
2.一个新节点添加到了条件队列
3.调用release,当前线程释放锁并唤醒同步队列的下一节点
4.park阻塞
5.唤醒之后检查当前节点是否已添加到同步队列(同步和条件队列共用Node对象)
6.如果是,则重试获取锁,这就回到了同步队列的操作流程中去了,一次条件等待至此结束
7.否则继续park阻塞,等待下一次条件达成之后的唤醒(signal)
1.当前线程已经获取到锁,则其节点要么不在同步队列,要么一定是头结点
2.一个新节点添加到了条件队列
3.调用release,当前线程释放锁并唤醒同步队列的下一节点
4.park阻塞
5.唤醒之后检查当前节点是否已添加到同步队列(同步和条件队列共用Node对象)
6.如果是,则重试获取锁,这就回到了同步队列的操作流程中去了,一次条件等待至此结束
7.否则继续park阻塞,等待下一次条件达成之后的唤醒(signal)
awaitNanos(long)
可设定最大等待时间
底层机制:LockSupport.parkNanos(this, nanosTimeout);
signal
doSignal
从头节点开始,将节点移出条件队列并添加到同步队列(复用同一节点)
transferForSignal
总结一下:
条件等待时当前线程释放锁,并添加到条件队列;
条件唤醒时当前线程从条件队列转移到同步队列再次尝试获取锁并继续执行
条件等待时当前线程释放锁,并添加到条件队列;
条件唤醒时当前线程从条件队列转移到同步队列再次尝试获取锁并继续执行
wait/await/sleep的异同
相同
都会使当前线程暂停运行,把运行机会交给其他线程
不同
wait/await会释放锁,必须用在同步代码块中;
sleep与锁无关,可用于任何位置
sleep与锁无关,可用于任何位置
wait是Object的方法,用于synchronized代码块;
await是Condition的方法,用于Lock同步代码;
sleep是Thread的方法,用于操作当前线程;
await是Condition的方法,用于Lock同步代码;
sleep是Thread的方法,用于操作当前线程;
notify、signal分别用于唤醒wait/await线程,sleep无需外部唤醒
条件队列节点(Condition维护)
头节点
private transient Node firstWaiter;
尾节点
private transient Node lastWaiter;
访问控制和流程
获取和释放资源
独占模式
acquire(int arg)
说明:以独占模式获取锁,不响应中断。
由至少一次的tryAcquire调用实现,如果成功则直接返回。
否则线程将会入队,重复阻塞和唤醒,尝试tryAcquire直到成功。
tryAcquire由子类实现。
可用于实现Lock#lock,如ReentrantLock
由至少一次的tryAcquire调用实现,如果成功则直接返回。
否则线程将会入队,重复阻塞和唤醒,尝试tryAcquire直到成功。
tryAcquire由子类实现。
可用于实现Lock#lock,如ReentrantLock
调用过程
tryAcquire
尝试获取资源,模板方法
成功之后直接执行,不入队
acquireQueued
说明:获取资源失败,入队,休眠并在唤醒之后重试
入队:addWaiter
将当前线程和给定模式封装节点并入队
首先尝试以最快的方式入队,失败后以完整方式入队
enq
尝试入队,直到成功;返回值为被添加节点的前一个节点
如果队列为空,则初始化队列:创建一个新节点,将head/tail都指向这个节点
伪代码,注意引用传递:
pred = tail;
node.prev = pred;
cas(tail,node);
pred.next=node;
pred = tail;
node.prev = pred;
cas(tail,node);
pred.next=node;
当前节点入队成功,并且tail移向当前节点
置换tail时以cas操作保证线程安全
重试:acquireQueued
如果当前节点的前一个节
点是头结点则尝试获取资源
点是头结点则尝试获取资源
获取成功
将当前节点设置为新的头结点
注:头结点其实就是预离队节点,将会在下一个节点获取资源成功之后被移除,见下
注:头结点其实就是预离队节点,将会在下一个节点获取资源成功之后被移除,见下
将旧的头结点的next引用设置为null以从队列中脱离
获取失败
将前一个节点的状态设置为Node.SIGNAL,提示其释放锁之后需要唤醒后继节点
状态设置成功后当前线
程调用park进入阻塞状态等待唤醒
程调用park进入阻塞状态等待唤醒
何时唤醒
前置节点释放锁的时候调用release方法
跟condition的关联
无关
只有锁获取成功之后才能使用condition
release(int arg)
说明:独占模式下释放资源,并从头结点开始唤醒后继节点
可用于实现Lock#unlock,如ReentrantLock
可用于实现Lock#unlock,如ReentrantLock
tryRelease
释放资源,模板方法
unparkSuccessor(node)
释放成功之后从头结点开始,唤醒下一个节点
唤醒的时候从头结点开始唤醒,原因:
结合acquire代码,tryAcquire成功的时候是不入队的,
结合acquire代码,tryAcquire成功的时候是不入队的,
头结点要么是新创建的new Node,要么是上一个已经执行完的节点;
唤醒下一个节点之后头结点自动脱离队列,同一个线程再次获取锁
需要重新排队
唤醒下一个节点之后头结点自动脱离队列,同一个线程再次获取锁
需要重新排队
逻辑很简单:
1.直接找下一个节点,如果存在,唤醒
2.如果不存在,从tail开始往前寻找,找到离当前节点最近的待唤醒节点,唤醒
1.直接找下一个节点,如果存在,唤醒
2.如果不存在,从tail开始往前寻找,找到离当前节点最近的待唤醒节点,唤醒
tryAcquireNanos
指定等待的最大时间,获取不到则返回false,非模板方法
相比于acquire只是入队后多了对等待时间的检测,也是通过tryAcquire实现
相比于acquire只是入队后多了对等待时间的检测,也是通过tryAcquire实现
doAcquireNanos
设定等待时间的实现机制:
LockSupport.parkNanos(this, nanosTimeout)
LockSupport.parkNanos(this, nanosTimeout)
其他设定最大等待时间的也都是这个机制,
后续就不一一列举了
后续就不一一列举了
每次唤醒后更新一下park的等待时间,时间到了之后由系统自动唤醒
acquireInterruptibly(int arg)
是否响应中断的区别只在于对线程中断状态的处理
不响应中断只记录状态,响应中断则抛异常
共享模式
acquireShared
tryAcquireShared
获取共享资源,模板方法
返回值
正数:获取成功,且后续节点也有可能获取成功,一般表示剩下的资源数量
0:获取成功,且后续获取不再能成功,一般表示剩余资源为0
负数:获取失败,一般表示缺少的资源数
doAcquireShared
以共享模式入队阻塞,重试tryAcquire
成功后调setHeadAndPropagate替换头结点
并判断条件决定是否将信号向后传播
并判断条件决定是否将信号向后传播
setHead
满足条件后向后传播信号:doReleaseShared
releaseShared
tryReleaseShared
释放共享资源,模板方法
返回值
true:release之后可以允许一个等待中的acquire请求成功(独占或共享模式)
false:其他情况
doReleaseShared
将资源释放的信息向后传播,并通过循环保证传递到操作期间新添加的节点
tryAcauireSharedNanos
可设置最大等待时间,机制同上
acquireSharedInterruptibly
响应中断
模板方法
tryAcquire(int arg)
ReentrantLock、ThreadPoolExecutor、ReentrantReadWriteLock
tryRelease(int arg)
ReentrantLock、ThreadPoolExecutor、ReentrantReadWriteLock
tryAcquireShared(int arg)
CountDownLatch、Semaphore、ReentrantReadWriteLock
tryReleaseShared(int arg)
CountDownLatch、Semaphore、ReentrantReadWriteLock
isHeldExclusively()
ReentrantLock、ThreadPoolExecutor、ReentrantReadWriteLock
组件
locks
ReentrantLock
tryAcquire
公平锁
公平:线程队列为空才尝试获取锁
可重入机制:递增持有次数
非公平锁
非公平:加入队列前先尝试获取锁,获取失败才加入队列,否则直接执行不进队列
可重入:持有锁的线程再次尝试获取锁时直接获取到,只是记录获取次数
tryRelease
1.释放之前判断当前线程是否持有锁
2.如果持有,扣减已获取的资源
扣减后为0:锁已经释放,重置持有锁的线程为null
不为0:当前线程重入过,需要后续继续释放,继续保有线程
ReentrantReadWriteLock
数据结构
共享资源:用int的高低位分别记录读写锁的状态
模板实现
tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
特性总结
一个线程要想同时持有写锁和读锁,必须先获取写锁再获取读锁;写锁可以“降级”为读锁;读锁不能“升级”为写锁。
线程进入读锁的前提条件:
没有其他线程的写锁,即同时存在的话写锁优先获取;
没有写请求或者有写请求,但调用线程和持有锁的线程是同一个。
没有其他线程的写锁,即同时存在的话写锁优先获取;
没有写请求或者有写请求,但调用线程和持有锁的线程是同一个。
线程进入写锁的前提条件:
没有其他线程的读锁
没有其他线程的写锁
没有其他线程的读锁
没有其他线程的写锁
参考:https://www.cnblogs.com/xiaoxi/p/9140541.html
StampedLock
JDK1.8之后提供的新的读写锁
提供了乐观读锁,可取代ReentrantReadWriteLock进一步提升并发性能
是不可重入锁
CountDownLatch
await
acquireSharedInterruptibly
可以响应中断
tryAcquireShared
入队后阻塞,通过tryAcquireShared判断是否已经达到同步状态
依据:state是否已扣减到0
依据:state是否已扣减到0
await(long timeout, TimeUnit unit)
可以设定最大等待时间,由tryAcquireSharedNanos实现
countDown
releaseShared
tryReleaseShared
完成一个任务后扣减一次资源,每次release都会唤醒await线程检查状态
Semaphore
acquire
acquireSharedInterruptibly
这两个方法和aqs同名但不是继承的哦
release
releaseShared
信号量的实现很简单,就是设置指定数量的资源,然后以共享模式使用;信号量消耗完之后后续线程入队等待
工具包
基于ReentrantLock
CyclicBarrier
通过Lock和Condition实现,是Lock/Condition机制的良好范例
场景:某一线程执行完任务之后调用await方法进入等待状态,
等到所有线程都到达之后再一起往下执行
等到所有线程都到达之后再一起往下执行
原理
await
doAwait
nextGeneration
BlockingQueue
ArrayBlockingQueue
ReentrantLock lock
Condition
notEmpty
notFull
LinkedBlockingQueue
BlockingDeque
双端队列
LinkedBlockingDeque
Future
JDK提供的异步任务控制接口,定义了结果获取、任务状态查询、取消任务等接口
也可以简单理解为异步运算结果的占位符
也可以简单理解为异步运算结果的占位符
RunnableFuture
RunnableFuture接口继承了Runnable和Future,可以理解为对原任务做了代理并提供Future的功能
关于Callable的解释:
线程池submit提交Callable和Runnable后都是封装为RunnableFuture执行,
只是在RunnableFuture的run方法中要不要处理返回值的选择不同
关于Callable的解释:
线程池submit提交Callable和Runnable后都是封装为RunnableFuture执行,
只是在RunnableFuture的run方法中要不要处理返回值的选择不同
实现类:FutureTask
get
阻塞,等待任务完成并获取结果
awaitDone
run
代理并执行任务
完成后记录结果并唤醒等待的线程
CompletableFuture
工具类,可用于实现异步的链式调用、异常处理等,待完成
ThreadPoolExecutor
submit
submit的作用就是封装,将原任务封装为FutureTask后提交给execute执行,可跟踪查看任务状态、获取结果
原理
由FutureTask完成任务的封装、状态记录、结果记录、取消任务等操作
任务提交之后封装为FutureTask,这也是一个Runnable,可以直接交给execute执行
任务提交之后封装为FutureTask,这也是一个Runnable,可以直接交给execute执行
即真正执行或入队的是封装之后的FutureTask,不是原本的Runnable/Callable
FutureTask的原理如上
execute
Executor接口定义的方法,由实现类实现
1. 当活跃线程小于核心数量时,创建新线程执行任务;
2. 否则入队等待;
3. 入队失败则再次尝试创建新线程执行;
4. 仍然失败则拒绝执行
2. 否则入队等待;
3. 入队失败则再次尝试创建新线程执行;
4. 仍然失败则拒绝执行
addWorker:新增线程
由Worker封装任务,将自身作为Runnable创建一个线程并运行,
将提交的command作为该线程的第一个任务
将提交的command作为该线程的第一个任务
Worker实现了aqs和Runnable,其run方法实现如下
Worker.run
线程通过getTask从队列中获取任务,队列为空时会阻塞
Worker.run中会调用任务Runnable的run方法,
如FutureTask.run/ScheduledFutureTask.run,此时仅为方法调用不会新增线程
如FutureTask.run/ScheduledFutureTask.run,此时仅为方法调用不会新增线程
ScheduledThreadPoolExecutor
可周期性执行任务的线程池,继承于ThreadPoolExecutor,增加了延迟和周期性执行任务的功能
机制
任务执行的主要流程示例
1. 任务封装为ScheduledFutureTask
2. 队列使用DelayedWorkQueue延迟队列,仅入队
3. addWorker复用父类实现,但不传初始任务
2. 队列使用DelayedWorkQueue延迟队列,仅入队
3. addWorker复用父类实现,但不传初始任务
ScheduledFutureTask
继承于FutureTask,增加了对任务执行时间的处理
run
区分是否重复执行
DelayedWorkQueue
通过内部类DelayedWorkQueue作为任务队列以实现延迟执行任务
原理
基于ReentrantLock/Condition实现线程安全和条件等待及唤醒
在take中检查任务的执行时间,并通过Condition.awaitNanos(long)实现任务的延迟获取
offer
take
ThreadLocal
用于在一个线程内传递状态,使得数据可以跨越多个方法传递且线程安全
基本用法
原理
每个Thread对象中都有一个map属性threadLocals
这个map的key为ThreadLocal对象,value即为该对象记录的值
这个map的key为ThreadLocal对象,value即为该对象记录的值
ThreadLocal本身不保存数据,仅仅是将自身作为key,对应着值存到Thread对象
的threadLocals中去,数据还是由线程自身保存,这是每个线程私有的,所以线程安全
的threadLocals中去,数据还是由线程自身保存,这是每个线程私有的,所以线程安全
因此,ThreadLocal和线程是多对多关系
一个ThreadLocal对象可以在多个线程中使用,一个线程也可以使用多个ThreadLocal
一个ThreadLocal对象可以在多个线程中使用,一个线程也可以使用多个ThreadLocal
源码
set
get
remove
synchronized
扩展
NIO和事件驱动
并发虽然可以实现多任务并行同步,并做到线程休眠和唤醒,但休眠期间线程所占用内存和线程切换的开销无法避免
,在IO密集型任务中这样是比较浪费资源的,特别是在高并发场景下,因此有了NIO
,在IO密集型任务中这样是比较浪费资源的,特别是在高并发场景下,因此有了NIO
NIO
底层原理
epoll
事件驱动
reactor模式
乐观锁和悲观锁
分布式锁
收藏
0 条评论
下一页