Java并发编程基础
2021-02-01 14:06:11 9 举报
AI智能生成
JAVA并发编程基础
作者其他创作
大纲/内容
并发基础
三个特性
原子性
含义
一个操作在其他线程看来不可分割
实现方法
Lock
synchronized
可见性
含义
一个线程更新了变量后值对另一个线程立即可见
实现方法
volatile
通过内存屏障来保证
写之前加storestore屏障
写之后加storeload屏障
读之后加loadload屏障、loadstore屏障
synchronized
通过内存屏障来保证
final
被final修饰的字段在<init>、<clinit>中一旦初始化完成,并且构造器没有把“this”的引用传递出去,那在其他线程中就能看见final字段的正确值
原理
SS屏障禁止重排序
有序性
含义
保证感知顺序与源代码顺序一致
synchronized
定义
内部排他可重入锁
阻塞一个线程需要从用户态切换到内核态,引起上下文切换,这是很耗时的操作
内存语义
进入
把在同步块内使用到的变量从线程的工作内存中清除,会重新从主内存获取
monitorenter
JVM会在MonitorEnter对应的机器码指令之后插入一个加载屏障
JVM会在MonitorEnter对应的机器码指令之后插入一个获取屏障
退出
把在同步块内对共享变量的修改刷新到主内存
monitorexit
JVM会在MonitorExit对应的机器码指令之前插入一个释放屏障
JVM会在MonitorExit对应的机器码指令之后插入一个存储屏障
SL屏障
功能
原子性
可见性
有序性
可重入原理
Java虚拟机会为每个内部锁分配一个入口集(EntrySet),用于记录等待获得相应内部锁的线程
多个线程申请同--个锁的时候,只有一个申请者能够成功,而其他申请者的申请操作会失败。
这些申请失败的线程并不会抛出异常,而是会被暂停(生命周期状态变为BLOCKED),
并被存入相应锁的入口集中等待再次申请锁的机会。人口集中的线程就被称为相应内部锁的等待线程
这些申请失败的线程并不会抛出异常,而是会被暂停(生命周期状态变为BLOCKED),
并被存入相应锁的入口集中等待再次申请锁的机会。人口集中的线程就被称为相应内部锁的等待线程
由于Java虚拟机对内部锁的调度仅支持非公平调度,被唤醒的等待线程占用处理器运行时可能还有其他新的活跃线程
(处于RUNNABLE状态,且未进入过入口集)与该线程抢占这个被释放锁,因此被唤醒的线程不一定就能成为该锁的持有线程。
(处于RUNNABLE状态,且未进入过入口集)与该线程抢占这个被释放锁,因此被唤醒的线程不一定就能成为该锁的持有线程。
Java虚拟机如何从一个锁的入口集中选择一个等待线程与Java虚拟机的具体实现有关。
volatile
定义
轻量级锁
功能
原子性
有序性
64位变量读写的原子性
内存语义
写入
变量直接写入主内存,不会被编译器分配到寄存器存储
读取
从主内存获取最新值
monitorexit
JVM原理:禁止重排序
写入
之前插入释放屏障(Release Barrier)
禁止了volatile写操作与该操作之前的任何读、写操作进行重排序,
从而保证了volatile写操作之前的任何读、写操作会先于volatile写操作被提交
从而保证了volatile写操作之前的任何读、写操作会先于volatile写操作被提交
即其他线程看到写线程对volatile变量的更新时,写线程在更新volatile变量之前所执行的内存操作
的结果对于读线程必然也是可见的。更新操作的感知顺序与源代码一致。
的结果对于读线程必然也是可见的。更新操作的感知顺序与源代码一致。
SS屏障实现
之后插入存储屏障(Store Barrier)
冲刷处理器缓存
SL屏障实现
读取
之前插入加载屏障(Load Barrier)
刷新处理器缓存
LL屏障实现
之后插入获取屏障(Acquire Barrier)
禁止了volatile读操作与该操作之后的任何读、写操作进行重排序,
从而保证了volatile读操作对之后的任何读、写操作都是可见的
从而保证了volatile读操作对之后的任何读、写操作都是可见的
LS屏障实现
对应
注意
对于引用型volatile变量,volatile关键字只是保证读线程能够读取到一个指向对象的相对新的内存地址(引用),
而这个内存地址指向的对象的实例/静态变量值是否是相对新的则没有保障。
而这个内存地址指向的对象的实例/静态变量值是否是相对新的则没有保障。
对数组变量的引用,效果只体现在数组本身的引用上,否则需要使用AtomicReferenceArray等类
使用场景
作为状态标志
写入变量时不依赖当前值
该变量不会与其他变量一起纳入不变性条件中
锁
概述
乐观锁与悲观锁
公平锁与非公平锁
ReentrantLock pairLock = new ReentrantLock(true)
ReentrantLock pairLock = new ReentrantLock(false)
默认非公平锁
独占锁与共享锁
可重入锁
原理
在锁内部维护一个线程标示,用来标示该锁目前被哪个线程占用,然后关联一个计数器。
开始计数器值为0, 说明该锁没有被任何线程占用。
当一个线程获取了该锁时,计数器的值会变成1,
这时其他线程再来获取该锁时会发现锁的所有者不是自己而被阻塞挂起。
当一个线程获取了该锁时,计数器的值会变成1,
这时其他线程再来获取该锁时会发现锁的所有者不是自己而被阻塞挂起。
但是当获取了该锁的线程再次获取锁时发现锁拥有者是自己,就会把计数器值加+1,
当释放锁后计数器值-1。当计数器值为0时,
锁里面的线程标示被重置为null,这时候被阻塞的线程会被唤醒来竞争获取该锁。
当释放锁后计数器值-1。当计数器值为0时,
锁里面的线程标示被重置为null,这时候被阻塞的线程会被唤醒来竞争获取该锁。
自旋锁
-XX : PreBlockSpinsh
Java内存模型
解决问题
在编译器中生成的指令顺序,可以与源代码中的顺序不同
编译器还会把变量保存在寄存器而不是内存中
处理器可以采用乱序或并行等方式来执行指令
保存在处理器本地缓存中的值,对于其他处理器是不可见的
缓存可能会改变将写入变量提交到主内存的次序
最小保证
对变量的写入操作在何时对其他线程可见
Happens-Before
若要满足操作A的结果对操作B可见,需要满足A Happens-Before B
如果两个操作之间没有Happens-Before关系,JVM可以任意重排序
包括
程序次序规则
在一个线程内,按照控制流顺序,书写在前面的操作先行发生于书写在后面的操作
管程锁定规则
一个unlock操作先行发生于后面对同一个锁的lock操作
volatile变量规则
对一个volatile变量的写操作先行发生于后面对这个变量的读操作
线程启动规则
Thread对象的start()方法先行发生于此线程的每一个动作
线程终止规则
线程中的所有操作都先行发生于对此线程的终止检测
线程中断规则
对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生
终结器规则
对象的构造方法先行发生于对象的终结方法
传递性
重排序
含义
Java 内存模型允许编译器、处理器、运行时对不存在数据依赖性的指令指令重排序以提高运行性能
乱序来源
JIT编译器
处理器
乱序执行、顺序提交(重排序缓冲器)
写缓冲器
高速缓存
分类
指令重排序
内存重排序
LoadLoad
一个处理器上先后执行两个读内存操作LI和L2,
其他处理器对这两个内存操作的感知顺序可能是L2-L1
其他处理器对这两个内存操作的感知顺序可能是L2-L1
StoreStore
一个处理器上先后执行两个写内存操作W1和W2,
其他处理器对这两个内存操作的感知顺序可能是W2-→W1
其他处理器对这两个内存操作的感知顺序可能是W2-→W1
LoadStore
StoreLoad
伪共享
原因
在Cache内部是按行存储的,其中每一行称为 一个Cache行。
Cache行是Cache与主内存进行数据交换的单位,大小一般为2的幂次数字节。
Cache行是Cache与主内存进行数据交换的单位,大小一般为2的幂次数字节。
当CPU访问某个变量时,首先会去看CPU Cache内是否有该变量,如果有则直接从
中获取,否则就去主内存里面获取该变量,然后把该变量所在内存区域的一个Cache行大
小的内存复制到Cache中。
中获取,否则就去主内存里面获取该变量,然后把该变量所在内存区域的一个Cache行大
小的内存复制到Cache中。
由于存放到Cache行的是内存块而不是单个变量,所以可能会把多个变量存放到一个Cache行中。
当多个线程同时修改一个缓存行里面的多个变量时,由于同时只能有一个线程操作缓存行,
所以相比将每个变量放到-一个缓存行,性能会有所下降,这就是伪共享
当多个线程同时修改一个缓存行里面的多个变量时,由于同时只能有一个线程操作缓存行,
所以相比将每个变量放到-一个缓存行,性能会有所下降,这就是伪共享
解决方法
字节填充
@Contented
内存屏障
含义
被插人到两个指令之间进行使用的屏障,其作用是禁止编译器、处理器重排序从而保障有序性。
使两侧指令无法穿越
基本内存屏障
分类
LL
使处理器根据无效化队列中的Invalidate消息删除其高速缓存中相应的副本。
使处理器有机会将其他处理器对共享变量所做的更新同步到该处理器的高速缓存中
使处理器有机会将其他处理器对共享变量所做的更新同步到该处理器的高速缓存中
SS
将写缓冲器中的现有条目做一个标记,以表示这些条目代表的写操作需要先于该屏障之后的写操作被提交。
处理器在执行写操作的时候如果发现写缓冲器中存在被标记的条目,
此时处理器将写操作的数据写入写缓冲器,从而使得StoreStore屏障之前的任何写操作先于该屏障之后的写操作被提交。
处理器在执行写操作的时候如果发现写缓冲器中存在被标记的条目,
此时处理器将写操作的数据写入写缓冲器,从而使得StoreStore屏障之前的任何写操作先于该屏障之后的写操作被提交。
SL
完全拥有以上两个功能
开销最大
汇编层面原理
LOCK指令前缀
把写缓冲区中所有的数据刷新到内存中
复合内存屏障
分类
可见性保障
加载屏障(Load Barrier)
刷新处理器缓存
存储屏障(Store Barrier)
冲刷处理器缓存
有序性保障
获取屏障(Acquire Barrier)
设立在一个读操作后,禁止之后的任何读、写操作重排序到前面
LL+LS
释放屏障(Release Barrier)
设立在一个写操作前,禁止之前的任何读、写操作重排序到后面
LS+SS
缓存
缓存条目结构
Tag
地址信息
Data Block
缓存行,储存数据
Flag
标志状态信息
Invalid
Shared
Exclusive
Modified
修改过,还没同步回主内存
缓存一致性协议MESI
消息
硬件缓冲区
写缓冲器
无效化队列
引人无效化队列( Invalidate Queue )之后,处理器在接收到Invalidate消息之后并不
删除消息中指定地址对应的副本数据,而是将消息存人无效化队列之后就回复Invalidate
Acknowledge消息,从而减少了写操作执行处理器所需的等待时间。有些处理器(比如x86)
可能没有使用无效化队列。
删除消息中指定地址对应的副本数据,而是将消息存人无效化队列之后就回复Invalidate
Acknowledge消息,从而减少了写操作执行处理器所需的等待时间。有些处理器(比如x86)
可能没有使用无效化队列。
线程基础
线程创建方式
实现 Runnable接口的 run 方法,装入Thread
共用代码逻辑
基于组合的思想
继承 Thread 类并重写 run 方法
方便获取当前线程
方便传参
不支持多继承,任务与代码没有分离
实现 Callable接口的 call 方法,装入 FutureTask<>,再装入Thread
异步执行
可获取返回值
FutureTask#get
线程属性
ID
Name
Daemon
Priority
线程方法
通知与等待
Object#wait(0)
获取一个共享变量的监视器锁
若未获取,下一步会抛出 IllegalMonitorStateException异常。
调用共享变量的wait()方法, 调用线程被阻塞挂起
其他线程调用了该共享对象的notify()或者 notifyAll() 方法
该线程被唤醒
其他线程调用了该线程的 interrupt()方法
该线程抛出 InterruptedException 异常返回。
while循环解决虚假唤醒
Object#wait(long timeoutMillis)
超时也会唤醒
Object#notify()
获取一个共享变量的监视器锁
若未获取,下一步会抛出 IllegalMonitorStateException异常。
调用共享变量的notify()方法, 随机唤醒之前的一个等待线程
Object#notifyAll()
获取一个共享变量的监视器锁
若未获取,下一步会抛出 IllegalMonitorStateException异常。
调用共享变量的notifyAll()方法, 唤醒之前的所有等待线程
Thread#join()
等待目标线程实例执行终止
通过wait实现
睡眠与让权
Thread#sleep(long millis)
休眠指定时间,当前线程不参与CPU调度
不会释放锁
Thread#yield
当前线程直接让出 CPU 使用权,丢弃剩下的时间片,然后处于就绪状态
CPU直接进行下一轮调度
中断
Thread#Interrupt
设置目标线程的中断标志为 true
如果目标线程遇到阻塞,会抛出 InterruptedException 异常而返回,并通常清除中断标志
Thread#isInterrupted
检测目标线程的中断标志
不会清除标志
Thread#Interrupted
检测当前线程的中断标志
清除标志置为false
= currentThread().isinterrupted(true );
设置为守护线程
Thread#setDaemon(true)
线程的状态
并发相关类
ThreadLocal
方法
#set(T value)
设置当前线程的线程特有对象到从当前线程 的 threadLocals 里面
#setInitialValue
#InitialValue
获取线程特有对象的初始值
继承重写此方法来改变默认值
#withInitial(Supplier<? extends S> supplier)
返回一个带有初始值的ThreadLocal子类
#get
从当前线程 的 threadLocals 里面获取当前线程的线程特有对象
#remove
从当前线程 的 threadLocals 里面删除该本地变量
#createMap(Thread t, T firstValue)
初始化目标线程的threadLocals,并装入Vlaue
#getMap(Thread t)
返回目标线程的threadLocals
原理
每个线程的本地变量存放在调用线程的 threadLocals 变量里面,类型为ThreadLocalMap
问题
web应用下内存泄漏
原理
ThreadLocalMap储存数据的结构是一个Entry数组,key为ThreadLocal对象,value为储存的值
Entry类继承了弱引用(WeakReference<ThreadLocal<?>>),引用指向ThreadLocal
当ThreadLocal成为垃圾被回收后,即Key被回收了,对应的Entry的key同时变成null,这个Entry就变成无效条目(stale Entry)。
但是此时Entry引用的value仍然存活,变成垃圾。
但是此时Entry引用的value仍然存活,变成垃圾。
web环境下,由于web类加载器的存在,使得对象引用出现环路,线程不死,此线程使用过的垃圾都无法回收
解决方法
每次set时从计算出的哈希位置依次向后查找,
直到查到当前位置的Entry Key相等,或者Entry为null
直到查到当前位置的Entry Key相等,或者Entry为null
如果发现了当前位置的Entry Key相等,说明之前set过,此时仅仅替换value即可返回
每当如果遇到stale entry就调用replaceStaleEntry,清理掉周围一段距离的stale entry;
如果当前位置Entry为null,说明之前没有set过,新建一个Entry放到当前位置即可,
并还要调用cleanSomeSlots清理掉当前位置和之后一段距离的stale entry
并还要调用cleanSomeSlots清理掉当前位置和之后一段距离的stale entry
每次使用完ThreadLocal调用remove方法
单线程多任务
每次使用ThreadLocal时先清理当前线程的ThreadLocalMap
InheritableThreadLocal
目标
解决子进程无法方法父进程的线程特有对象的问题
重写方法
原理
现在所有操作改为在Thread.inheritableThreadLocals上操作
创建子线程时,子线程会复制父线程的inheritableThreadLocals
使用场景
子线程需要使用存放在 threadlocal 变量 中的用户登录信息
一些中间件需要把统一的 id 追踪的整个调用链路记录下来
ThreadLocalRandom
前景问题
Random在处理并发问题上,将seed指定为原子类,多个线程同时调用时,CAS操作造成自旋降低性能。
原理
种子储存于每个线程中,每个线程生成随机数时都根据自己老的种子计算新的种子,
并使用新种子更新老的种子,再根据新种子计算随机数,就不会存在竞争问题了。
并使用新种子更新老的种子,再根据新种子计算随机数,就不会存在竞争问题了。
方法
#current
获取 ThreadLocalRandom 实例
初始化当前线程中 的 threadLocalRandomSeed和 threadLocalRandomProbe 变量
#nextlnt(int bound)
计算当前线程的下一个随机数
Unsafe
方法
#objectFieldOffset(Field field)
返回指定的变量在所属类中的内存偏移地址,
该偏移地址仅仅在该Unsafe函数中访问指定字段时使用。
该偏移地址仅仅在该Unsafe函数中访问指定字段时使用。
#arrayBaseOffset(Class arrayClass)
获取数组中第一个元素的地址
#arraylndexScale(Class arrayClass)
获取数组中一个元素占用的字节
#compareAndSwapLong(Object obj, long offset, long expect, long update)
比较对象 obj 中偏移量为offset的变量的值是否与 expect相等,相等则使用update值更新 , 然后返回 true ,否则返回 false
#getAndSetLong(Object obj , long offset, long update )
获取对象 obj 中偏移量为 offset 的变量 volatile 语义的当前值 ,并设置变量 volatile 语义的值为 update
#getLongvolatile(Object obj, long offset)
获取对象 obj 中偏移量为 offset 的变量对应 volatile语义的值
#park(boolean isAbsolute, long time )
阻塞当前线程
time=0表示一直阻塞
#unpark(Object thread)
唤醒调用 park 后阻塞的线程
AtomicLong
LongAdder
前景问题
AtomicLong在多个线程同时调用时,CAS操作造成自旋降低性能。
原理
把一个变量分解为多个变量,让同样多的线程去竞争多个资源,
在内部维护多个Cell 变量,每个 Cell 里面有一个初始值为 0 的 long 型变量
多个线程在争夺同一个 Cell 原子变量时如果失败了,并不是在当前 Cell 变量上一直自旋 CAS 重试,
而是尝试在其他 Cell 的变量上进行 CAS 尝试
而是尝试在其他 Cell 的变量上进行 CAS 尝试
获取 LongAdder 当 前值时, 是把所有 Cell 变量 的 value 值累加后再加上 base 返回的
LongAccumulator
JUC
List
CopyOnWriteArrayList
原理
写时复制快照,之后用新数组替换原数组
结构构成
Object lock = new Object()
JDK8?之后改ReentrantLock为synchronized(lock)
volatile Object[] array
accessed only via getArray/setArray
方法
构造
CopyOnWriteArrayList()
增
add(E e)
进入同步
复制一个新数组,长度为旧数组长度+1
设置数组末尾的新值
替换成新数组
删
remove(int index)
复制剩余值到新数组中
改
set(int index, E element)
如果新值与旧值不同,克隆出新数组,在新数组上修改再替换
如果相同,也要再写一遍保证volatile语义
查
get(int index)
elementAt(Object[] a, int index)
其他
iterator
保存着原数组的引用,因此迭代过程中原数组发生的改变不影响此次迭代
弱一致性
Map
ConcurrentHashMap
原理
1.7
Segment+ HashEntry
基于currentLevel(默认16)划分出了多个Segment来对key-value进行存储
put、remove会加锁,get和containsKey不会加锁
get到的值为null时会加锁
#size
在不加锁的情况下遍历所有的段,读取其count以及modCount,
这两个属性都是volatile类型的,并进行统计,再遍历一次所有的段,比较modCount是否有改变。
如有改变,则再尝试两次。
这两个属性都是volatile类型的,并进行统计,再遍历一次所有的段,比较modCount是否有改变。
如有改变,则再尝试两次。
如执行了三次上述动作,仍然有问题,则遍历所有段,分别进行加锁,然后进行计算。
计算完毕后释放所有锁,从而完成计算动作。
计算完毕后释放所有锁,从而完成计算动作。
1.8
CAS+synchronized
每个bin的第一个Node插入、删除、替换使用CAS
#size
累加各个bin中Node的个数计算得到,而且这一过程不加锁,即得到的size值不一定是最新的
结构构成
常量
同HashMap
int MIN_TRANSFER_STRIDE = 16
int RESIZE_STAMP_BITS = 16
int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1
int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS
变量
volatile Node<K,V>[] table
延迟初始化到第一次put
volatile Node<K,V>[] nextTable
volatile long baseCount
volatile int sizeCtl
指示表初始化和扩容,默认值为0
当在初始化的时候指定了大小,这会将这个大小保存在sizeCtl中,大小为数组的0.75
当为负的时候,说明表正在初始化或扩张
-1表示初始化
-N 表示有N-1个线程正在进行扩容操作
如果table未初始化,表示table需要初始化的大小
如果table初始化完成,表示table的扩容阈值,默认是table大小的0.75倍
volatile int transferIndex
volatile int cellsBusy
volatile CounterCell[] counterCells
Unsafe U = Unsafe.getUnsafe()
内部类
Node<K,V>
int hash
K key
volatile V val
volatile Node<K,V> next
TreeNode<K,V>
同HashMap
TreeBin<K,V>
TreeNode<K,V> root
volatile TreeNode<K,V> first
volatile Thread waiter
volatile int lockState
树化后当作整棵树储存在数组中
ForwardingNode<K,V>
Node<K,V>[] nextTable
在转移的时候放在头部的节点,是一个空节点
ReservationNode<K,V>
方法
构造
ConcurrentHashMap()
ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)
初始化sizeCtl
增
put(K key, V value)
putVal(K key, V value, boolean onlyIfAbsent)
数组为null或长度为0,先初始化
如果哈希位置上没有节点,直接新建节点插入即可
不用加锁,覆盖也是正常情况
否则有节点
否则
同步这个节点
如果取出来的节点的hash值是MOVED(-1)的话,则表示当前正在对这个数组进行扩容并复制到新的数组,则当前线程也去帮助复制,之后再继续插入的工作
如果是链表(hash大于0),进行更新或者插入,并统计链表长度
如果是TreeBin,说明是树,调用TreeBin#putTreeVal添加到树中
如果链表长度大于阈值了,调用#treeifyBin,树化或扩容
链表长度+1
addCount(long x, int check)
删
remove(Object key)
改
casTabAt(Node<K,V>[] tab, int i,Node<K,V> c, Node<K,V> v)
cas原子操作,在指定位置设定值
setTabAt(Node<K,V>[] tab, int i, Node<K,V> v)
原子操作,在指定位置设定值
treeifyBin(Node<K,V>[] tab, int index)
如果数组长度<64,则选择扩容2倍
否则
同步头结点
先遍历链表,处理TreeNode的next指针
新建TreeBin,构造了红黑树
调用#setTabAt,将此处节点换成TreeBin
hash为-2
tryPresize(int size)
查
tabAt(Node<K,V>[] tab, int i)
Unsafe
返回节点数组的指定位置的节点的原子操作
get(Object key)
其他
initTable()
循环判断当前数组是否为null或长度为0,当否时就返回,否则进入循环
如果sizeCtl<0,说明有线程正在初始化,yield当前线程
否则尝试将sizeCtl CAS为 -1,表示当前线程要开始初始化,失败了就回到上一步
否则CAS成功
新建数组,若初始化时指定了长度(保存在sizeCtl)则使用,否则默认长度16
sizeCtl长度变为数组长度的3/4
Queue
ConcurrentLinkedQueue
结构
volatile Node<E> head,tail = new Node<E>()
默认头、尾节点都是指向 item 为 null 的哨兵节点 。 新元素会被插入队列末尾,出队时从队列头部获取一个元素
tail结点不总是最后一个结点
减少volatile变量的写开销
内部类Node
volatile E item
volatile Node<E> next
使用 UnSafe(现在改为VarHandle) 的 CAS 算法来保证出入队时操作链表的原子性。
方法
增
#offer(E e)
新建Node
从tail节点开始一直向后找到last Node
CAS插入
若失败重新循环
CAS更新tail
失败也没关系
如果发现遍历到的节点next指向自身
说明该节点已被删除,不在队列上
只能从head重新开始遍历
删
#poll
查
其他
#size
并发环境下不一定准确
Set
CopyOnWriteArraySet
AbstractQueuedSynchronizer
依赖工具类
LockSupport
作用
调用Unsafe类方法挂起、唤醒线程
结构构成
Unsafe unsafe = Unsafe.getUnsafe()
方法
#park()
=Unsafe#park(false, 0L);
如果调用 park 方法的线程已经拿到了与 LockSupport 关联的许可证,则马上返回
否则调用线程被持续阻塞挂起
#park(Object blocker)
当钱程在没有持有许可证的情况下调用 park 方法而被阻塞挂起时 ,这个 blocker 对象会被记录到该线程内部。
使用诊断工具可以观察线程被阻塞 的原因,所以 JDK 推荐我们使用带有 blocker 参数的 park 方法,
并且blocker被设置为 this ,这样当在打印线程堆栈排查问题时就能知道是哪个类被阻塞了
并且blocker被设置为 this ,这样当在打印线程堆栈排查问题时就能知道是哪个类被阻塞了
诊断工具是通过调用 getBlocker(Thread) 方法来获取 blocker 对象的,
线程被激活后清除blocker
#parkNanos(long nanos ,Object blocker)
#unpark(Thread thread )
调用 park方法而被阻塞的线程会返回
当一个线程调用 unpark 时,如果参数 thread 线程没有持有 thread 与 LockSupport 类关联的许可证, 则 让 thread 线程持有。
所以一个线程不会park两次
如果 thread 之前因调用 park ()而被挂起,则调用unpark 后,该线程会被唤醒。
结构构成
Thread exclusiveOwnerThread
当前拥有此排他锁的线程
volatile Node head
固定是一个dummy node,因为它的thread成员固定为null
volatile Node tail
尾节点,请求锁失败的线程,会包装成node,放到队尾
volatile int state
状态信息,实现类定义具体意义
ReentrantLock
当前线程获取锁的可重入次数
ReentrantReadWriteLock
高16位表示读状态,也就是获取该读锁的次数
低 16 位表示获取到写锁的线程的可重入次数
semaphore
当前可用信号的个数
CountDownlatch
表示计数器当前的值
FutureTask
当前任务的状态
内部类Node
static Node SHARED = new Node()
当做参数,标记该Node的线程是获取共享资源时被阻塞挂起后放入 AQS 队列的
static Node EXCLUSIVE = null
当做参数,标记该Node的线程是获取独占资源时被挂起后放入AQS 队列的
volatile int waitStatus
记录当前线程等待状态
CANCELLED (线程被取消了)
1
线程超时或响应了中断
SIGNAL ( 线程需要被唤醒)
-1
每次都由SIGNAL节点唤醒下一个节点,充当闹钟
CONDITION (线程在条件队列里面等待〉
-2
PROPAGATE (释放共享资源时需要通知其他节点〕
-3
volatile Node prev
volatile Node next
volatile Thread thread
对应进入AQS队列里面的一个线程
Node nextWaiter
队列中下一个节点的类型
内部类 ConditionObject
Node firstWaiter
条件队列的头
Node lastWaiter
条件队列的尾
可以直接访问 AQS 对象内部的变量 ,比如 state 状态值和 AQS 队列
ConditionObject 是条件变量,每个条件变量对应一个条件队列 (单向链表队列),其用来存放调用条件变量的await方法后被阻塞的线程
方法
基础
#isHeldExclusively
子类重写用来判断当前锁是独占还是共享
#enq(Node node)
给定节点,入队
当一个线程获取锁失败后该线程会被转换为Node节点,然后将该节点插入到AQS的阻塞队列。
若未初始化则先初始化,head和tail节点均指向一个空节点
CAS操作先替换tail节点,然后更改next指针
返回旧tail
#addWaiter(Node mode)
给定节点类型,添加新的等待线程节点,入队
返回新tail
#acquireQueued(final Node node, int arg)
用一个死循环不断去执行tryAcquire,直到获取锁
每次循环都会判断是否可以尝试获取锁(p == head),如果可以,那么尝试(tryAcquire(arg))。
如果尝试获取锁成功,那么函数的使命就达到了,执行完相应收尾工作,然后返回。
如果 不可以尝试 或者 尝试获取锁却失败了,那么阻塞当前线程(parkAndCheckInterrupt)。
如果当前线程被唤醒了,又会重新走这个流程。被唤醒时,是从parkAndCheckInterrupt处唤醒,然后从这里继续往下执行。
如果尝试获取锁成功,那么函数的使命就达到了,执行完相应收尾工作,然后返回。
如果 不可以尝试 或者 尝试获取锁却失败了,那么阻塞当前线程(parkAndCheckInterrupt)。
如果当前线程被唤醒了,又会重新走这个流程。被唤醒时,是从parkAndCheckInterrupt处唤醒,然后从这里继续往下执行。
把node的有效前驱(node类型不是CANCELLED的)找到,并且将有效前驱的状态设置为SIGNAL,之后便返回true代表马上可以阻塞了
因为存在去除CANCELLED节点的操作,至少要循环两次才能阻塞自己。第一次设置前驱节点状态,第二次阻塞自己
返回interrupted标志
操作资源
独占资源
#acquire(int arg)
当前线程调用tryAcquire 方法尝试获取资源
具体为设置状态变量 state 的值,成功则直接返回
失败则将当前线程封装为类型为 Node. EXCLUSIVE 的 Node 节点后插入到 AQS 阻 塞 队列的尾部
调用LockSupport#park( this )方法挂起自己
如果循环过程中发生了中断,要重新设置好中断标志
#acquirelnterruptibly(int arg)
获取资源或挂起时会响应中断,将对应Node状态标记为CANCELLED,之后移除该节点
#tryAcquireNanos(int arg, long nanosTimeout)
LockSupport.parkNanos(this, nanosTimeout);
#release(int arg)
当前线程尝试调用 tryRelease 方法释放资源
head状态不为0即唤醒head后继
共享资源
#acquireShared(int arg)
当前线程尝试调用tryAcquireShared方法尝试获取资源
设置状态变量 state 的值,成功则直接返回
失败则调用doAcquireShared将当前线程封装为类型为 Node.SHARED 的 Node 节点后插入到 AQS 阻 塞 队列的尾部
调用LockSupport#park( this )方法挂起自己
#releaseShared(int arg)
当前线程尝试调用 tryReleaseShared 方法释放资源
调用 LockSupport.unpark(thread )方法激活 AQS 队列里面被阻塞的一个线程(thread)
被激活的线程则使用 tryAcquireShared 尝试,看当前状态变量 state的值是否能满足自己的需要,满足则该线程被激活,然后继续 向下运行
否则还是会被放入 AQS 队列并被挂起
条件变量
Condition#await
当线程调用条件变量的await()方法时,在内部会构造一个类型为Node.CONDITION的node节点,
然后将该节点插入条件队列末尾,之后当前线程会释放获取的锁(也就是会操作锁对应的state变量的值),
并被阻塞挂起。
然后将该节点插入条件队列末尾,之后当前线程会释放获取的锁(也就是会操作锁对应的state变量的值),
并被阻塞挂起。
这时候如果有其他线程调用lock.lock()尝试获取锁,就会有一个线程获取
到锁,如果获取到锁的线程调用了条件变量的await ()方法,则该线程也会被放入条件
变量的阻塞队列,然后释放获取到的锁,在await()方法处阻塞。
到锁,如果获取到锁的线程调用了条件变量的await ()方法,则该线程也会被放入条件
变量的阻塞队列,然后释放获取到的锁,在await()方法处阻塞。
Condition#signal
当另外一个线程调用条件变量的signal方法时,在内部会把条件队列里面队头的一个线程节点
从条件队列里面移除并放入AQS的阻塞队列里面,然后调用unpark激活线程。
从条件队列里面移除并放入AQS的阻塞队列里面,然后调用unpark激活线程。
实现类
Lock
ReentrantLock
结构构成
state
当前线程获取锁的可重入次数
Sync sync
内部类
Sync extends AbstractQueuedSynchronizer
#nonfairTryAcquire(int acquires)
#tryRelease(int releases)
FairSync extends Sync
#tryAcquire(int acquires)
如果state=0且队列中前驱只有head节点,CAS尝试获取锁
如果重入,state+1
方法
构造
ReentrantLock()
默认非公平锁
ReentrantLock(boolean fair)
锁
#lock()
#lockInterruptibly()
#tryLock()
ReentrantReadWriteLock
结构构成
Sync sync
state
高16bit作为读锁的计数范围
低16bit作为写锁的计数范围
ReadLock readerLock = new ReadLock(this)
WriteLock writerLock = new WriteLock(this)
内部类
Sync extends AbstractQueuedSynchronizer
NonfairSync extends Sync
FairSync extends Sync
ReadLock implements Lock
WriteLock implements Lock
HoldCounter
记录各个线程分别拿走了多少读锁
int count = 0
long tid = getThreadId(Thread.currentThread())
防止垃圾不被回收
ThreadLocalHoldCounter extends ThreadLocal<HoldCounter>
方法
Queue
ArrayBlockingQueue
结构
items 数组:存放队列元素
int putindex :入队元素下标
int takelndex :出队下标
int count :队列元素个数
ReentrantLock lock
Condition notEmpty
Condition notFull
方法
同下,不过只用了一个全局锁,管理进和出
LinkedBlockingQueue
结构
单向无界链表
Node<E> head,Node<E>last
默认都是指向 item 为 null 的哨兵节点
AtomicInteger count
记录队列元素个数
ReentrantLock putLock
控制同时只能有一个线程可以获取锁,在队列尾部添加元素
Condition notFull = putLock.newCondition();
当队列满时,执行put的线程会被放入这个条件队列进行等待
ReentrantLock takeLock
控制同时只有一个线程可以从队列头获取元素
Condition notEmpty = takeLock.newCondition();
当队列为空时,执行take的线程会被放入这个条件队列进行等待
方法
构造
LinkedBlockingQueue()
默认大小Integer.MAX_VALUE
LinkedBlockingQueue(int capacity)
增
#put(E e)
加锁入队
如果队列满阻塞当前线程,加入notfull等待唤醒
阻塞时可被中断抛出异常
一定条件下唤醒notempty、notfull
#offer(E e)
如果队列满直接返回false
#add(E e)
如果队列满抛出异常
取
#take
从队列头部获取并移除一个元素
如果队列为空则阻塞当前线程直到队列不为空
如果在阻塞时被其他线程设置了中断标志,则被阻塞线程会抛出异常返回
如果在阻塞时被其他线程设置了中断标志,则被阻塞线程会抛出异常返回
#poll
从队列头部获取并移除一个元素
空队列直接返回null
#peek
从队列头部获取一个元素
空队列直接返回null
#remove(Object o)
删除队列里面指定的元素,有则删除并返回true,没有则返回false。
加双重锁
#size
PriorityBlockingQueue
结构
queue 数组:存放队列元素
默认长度11
平衡二叉树实现优先排序
int size :队列元素个数
ReentrantLock lock
Condition notEmpty
队列空时,出队线程将阻塞在这里
无notnull,因为是无界队列,因此put操作非阻塞
volatile int allocationSpinLock
标识当前是否正在扩容
相当于AQS的state,持有这个state才可以准备新数组以扩容
方法
同上
#tryGrow(Object[] array, int oldCap)
如果oldCap < 64, 新容量等于2(oldCap + 1)
如果oldCap >=64, 新容量等于1.5oldCap
#size
加锁获取
DelayQueue
结构
PriorityQueue<E> q:存放队列元素
元素需要实现Delayed接口
只有元素到期后才能将其出队
到期时间短的Delayed元素排在队列前面
ReentrantLock lock
Condition available = lock.newCondition();
Thread leader
Leader- Follower 模式的变体,减少不必要的线程等待
它总是等待获取队首
方法
存
#offer(E e)
如果添加的元素是最先过期的
leader = null;
available.signal();
取
#poll
获取并移除队头过期元素
如果没有过期元素则返回 null
#take
#size
other
CountDownLatch
方法
#await
主线程调用后阻塞直到计数器为0
#countDown
子线程任务调用,计数减一
CyclicBarrier
方法
#await
某一线程调用后,计数器减一
若此时计数器不为0,调用线程阻塞直到计数器为0
否则当前线程执行构造函数里传入的任务,之后唤醒其他阻塞线程
可重用
Semaphore
方法
#release
某一线程调用后,计数器加一
#acquire(int n)
某一线程调用后,阻塞直到信号量为n
Executor
线程池
ThreadPoolExecutor
构成
AtomicInteger ctl
高3位用来表示线程池状态
低29位用来表示线程个数
初始值ctlOf(RUNNING, 0)
线程池状态
RUNNING
接受新任务并且处理阻塞队列里的任务
SHUTDOWN
拒绝新任务但是处理阻塞队列里的任务
STOP
拒绝新任务并且抛弃阻塞队列里的任务 ,同时会中断正在处理的任务
TIDYING
所有任务都执行完(包含阻塞 队列里面的任务)后当前线程池活动线程
数为 0 , 将要调用 terminated 方法
数为 0 , 将要调用 terminated 方法
TERMINATED
终止状态 。 terminated 方法调用完成 以后的状态
CAPACITY
线程最 大个数(低29位) 00011111111111111111111111111111
runStateOf(int c) { return c &~CAPACITY ; )
获取高3位(运行状态)
workerCountOf(int c) { return c & CAPACITY ; )
获取低29位(线程个数)
ctlOf (int rs , int we) { return rs I we; )
计算ctl 新值(线程状态与线程个数 )
ReentrantLock mainLock
控制新增 Worker线程操作的原子性
Condition termination = mainLock.newCondition();
线程调用 awaitTermination 时用来存放阻塞的线程
方法
ThreadPoolExecutor (
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue <Runnable> workQueue ,
ThreadFactory threadFactory,
RejectedExecutionHandler handler
)
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue <Runnable> workQueue ,
ThreadFactory threadFactory,
RejectedExecutionHandler handler
)
corePoolSize :线程池核心线程个数
没有任务执行时线程池的大小
只有阻塞队列满时才会创建超出这个数量的线程
maximunPoolSize : 线程池最大线程数量。
keeyAliveTime :存活时间
如果当前线程池中的线程数量比核心线程数量多 ,并且
是闲置状态, 则这些闲置的线程能存活的最大时间
是闲置状态, 则这些闲置的线程能存活的最大时间
workQueue :用于保存等待执行的任务的阻塞队列
基于数组的有界ArrayBlockingQueue
基于链表的无界 LinkedBlockingQueue
最多只有一个元素的同步队列 SynchronousQueue
优先级队列 PriorityBlockingQueue
RejectedExecutionHandler :饱和策略
当队列满并且线程个数达到 maximunPoolSize后采取的策略
AbortPolicy (抛出异常〉、
CallerRunsPolicy (使用调用者所在线程来运行任务)
DiscardOldestPolicy (调用 poll 丢弃一个任务,执行当前任务)
DiscardPolicy (默默丢弃,不抛出异常〉
#execute(Runnable command)
如果当前线程池中线程个数小 于 corePoolSize , 会向 workers 里面新增一个核心线程( core 线程)执行该任务
否则如果当前线程池处于 RUNNING 状态则添加当前任务到任务队列 。
如果添加任务成功 ,则对线程池状态进行二次校验
如果当前线程池状态不是 RUNNING 了则把任务从任务队列移除,移除后执行拒绝策略
如果二次校验通过,则重新判断当前线程池里面是否还有线程,如果没有则新增一个线程
如果添加任务失败 ,则说明任务队列己满,尝试新开启线程来执行该任务
如果当前线程池中线程个数>maximumPoolSize 则执行拒绝策略。
#submit(callable command)
异步future
#addWorker(Runnable firstTask, boolean core)
通过 CAS 操作增加线程数
把并发安全的任务添加到 workers 里面,并且启动线程执行任务
#shutdown
调用 shutdown 方法后 ,线程池就不会再接受新的任务了,但是工作队列里面 的任务还是要执行的 。
该方法会立刻返回,并不等待 队列任务完成再返回
#shutdownNow
调用 shutdownNow 方法后,线程池就不会再接受新的任务了,并且会丢弃工作队列里面的任务 , 正在执行的任务会被中断 ,
该方法会立刻返回 ,并不等待激活的任务执行完成。返回值为这时候队列里面被丢弃的任务列表。
ScheduledThreadPoolExecutor
Executors
newCachedThreadPool(ThreadFactory threadFactory)
核心线程个数0,和最大线程个数INTEGER.MAX
阻塞队列为同步队列,最大长度为1
SynchronousQueue
keeyAliveTime = 60s
适合频率高,执行时间短
newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
核心线程个数和最大线程个数都为 nThreads
阻塞队列最大长度为INTEGER.MAX
LinkedBlockingQueue
keeyAliveTime = 0
需要主动关闭
newSingleThreadExecutor(ThreadFactory threadFactory)
核心线程个数和最大线程个数都为 1
阻塞队列最大长度为INTEGER.MAX
LinkedBlockingQueue
keeyAliveTime = 0
适合单消费者
0 条评论
下一页