JUC
2021-11-01 21:44:39 0 举报
并发总结
作者其他创作
大纲/内容
当从一个线程转化为多个线程时,就会出现锁竞争,此时需要先将偏向锁的标记撤销(此动作会带来一些性能损耗),才能升级到轻量级锁
DiscardOldestPolicy
breakBarrier generation.broken = true; count = parties; trip.signalAll();
初始化
thread.sleep()
firstWaiter
ReadLock
ConditionObject
offer( )
prev
共享变量副本
实现
int a = 10; //语句1int r = 2; //语句2a = a + 3; //语句3r = a*a; //语句4
CAS操作,乐观锁的一个实现,采用非阻塞的方式来保证数据的一致性。
corePoolSize不能被回收
对象的内存结构
nextWaiter
NonfairSync
实例数据
CPU调度
final Sync sync;
AbstractQueuedSynchronizer
加锁顺序
轻量级锁(CAS)
lock
package locks;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.LockSupport;public class LockSupport_ { //unpark和park执行顺序没要求,灵活 public static void main(String[] args) throws InterruptedException { //1.开启一个线程后对其进行挂起 Thread T1 =new Thread(()->{ System.out.println(\"线程被挂起了\"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } LockSupport.park(); System.out.println(\"被唤醒了\"); }); T1.start(); //2.通过主线程去唤醒这个线程 LockSupport.unpark(T1); }}
maximumPoolSize
nonfairTryAcquire(1)
next
加锁步骤:1. 初始化reentrantLock时指定用公平锁还是非公平锁;2. 选择公平锁的话,如果锁空闲,那么当前线程先尝试去获取锁,获取不到就进入CLH队列进行等待唤醒获取;3. 选择非公平锁的话,不论锁是否空闲,当前线程都先尝试去获取一下,获取不到就进入CLH队列等待唤醒获取;
waitStatus
ALU
CPU认为从内存中读取一个数据,下一次访问的很有可能是它旁边的数据,所以会进行预读取,目前工业界一次性预读取的大小一般为64Byte,这64个字节的大小一般称为缓冲行,也就是说CPU在读取数据的时候一次性读取一个缓冲行大小。缓存行越大,局部性空间效率越高,但读取时间慢;缓存行越小,局部性空间效率越低,但读取时间快;
分发
tryAcquireSharedreturn (getState() == 0) ? 1 : -1;
AbstractExecutorService
获取值
继承
runnable
Thread=Thread-3
北桥芯片
head
Thread=Thread-1
boolean tryAcquire(int acquires);boolean tryRelease(int releases);boolean tryReadLock();boolean tryWriteLock();ConditionObject newCondition();int fullTryAcquireShared(Thread current);
Worker(Runnable firstTask);run();boolean isHeldExclusively();boolean tryAcquire(int unused);boolean tryRelease(int unused);void lock();boolean tryLock();void unlock();boolean isLocked();void interruptIfStarted();
FairSync
缓存行...
MESI(修改-独占-共享-失效):当前CPU修改完数据后,当前CPU的数据状态为M(修改),修改其他CPU中的数据状态为I(失效),这样当其他CPU对这个数据进行操作时会先从内存中读取,从而保证数据的一致性。
node
handler
从理论上来说,我们希望存储器速度快、体积小、成本低、能耗低、空间大、散热好、断电数据不丢失。但是在现实中,这些条件是无法同时满足的。比如 存储器的体积越小,那么存储空间就会受到制约,电子元件的密度越大,产生的热量越集中,散热就会越差。所以在现实中我们会对上面的一些要求进行权衡和取舍,根据数据的使用频率使用不同的存储器:高频使用的数据,读写越快越好,因此用最贵的材料,放到离CPU最近的位置;使用频率越低的数据,就放到离CPU越远的位置,用越便宜的材料。
Executors 框架基于 ExecutorService实现
LockSupport 的作用 在没有LockSupport之前,线程的挂起和唤醒都是通过Object的wait和notify/notifyAll方法实现。与LockSupport相比较之下,LockSupport更具有灵活性,因为其不需要在同步代码块里,线程之间也不需要维护一个共享的同步对象;而且unpark函数可以先于park()调用,不需要担心线程的执行顺序。
Sync
00000000 00000000 00000000 00000000
什么叫进程?什么叫线程?进程:操作系统分配资源的基本单位线程:CPU调度运行的基本单位
handler的拒绝策略:第一种AbortPolicy:不执行新任务,直接抛出异常,提示线程池已满第二种DisCardPolicy:不执行新任务,也不抛出异常第三种DisCardOldSetPolicy:将消息队列中的第一个任务替换为当前新进来的任务执行第四种CallerRunsPolicy:直接调用execute来执行当前任务
N
重度竞争
//1. 使线程进入waiting状态,一直到被唤醒才继续往下执行 LockSupport.park(this); //------------------阻塞ing------------------- //2. 如果被唤醒,查看自己是不是被中断而唤醒的 return Thread.interrupted();
REGEISTERS
判断当前线程节点是否处于队列中第二个待唤醒的位置
重量级锁(由OS来锁总线)
共享变量
Runnable
指令缓存
countDown()
//1. 线程被打断,直接抛异常if (Thread.interrupted()) throw new InterruptedException();//2. 若当前还有线程在运行if (tryAcquireShared(arg) < 0) //2.1. 阻塞当前线程 doAcquireSharedInterruptibly(arg);
示例:Load1;LoadLoad;Load2
本地内存A
Thread=Thread-2
lastWaiter
旧值改变了
g.broken=false;count=2-1=1;
等待
Thread=Thread-4
队列是否已满
缺点:面对并发量大的情况,CLH队列会越来越长,加长自旋时间,CAS操作使CPU频繁切换,消耗更多的CPU资源
notify & wait
Generation
await()
1、SynchronousQueue ------- 直接提交队列(直接就提交,没有队列进行等待)不存储元素,常用作消息传递2、ArrayBlockingQueue -------有界任务队列(不能及时执行的任务,放到自定义的队列中等待执行)3、LinkedBlockingDeque -----无界任务队列(等待队列的大小是无界,队列是无限大,理论上大小取决于内存大小)4、PriorityBlockingQueue ----优先任务队列(这是一种特殊的无界任务队列,可以按照优先级来执行任务)5、DelayedWorkQueue -----这种队列的内部元素会按照延迟时间的长短对任务进行排序,延时时间越短地就排在队列的前面,越先被执行,他的内部采用的是“堆”的数据结构什么情况选用这些队列?线程数不限制,任务队列来限制,反之亦然
true
获取到锁
虚拟节点
unsafe.compareAndSwapInt()
先试一下获取
tryAcquireShared(arg) 0
准备唤醒
Node
acquire(1)
Class Node
本地内存B
示例:Store1;StoreLoad;Load2
Unsafe类?
workerCount
创建工作线程addWorker( )
Condition
waitStatus >0
核心线程
唤醒同步队列第一个线程!
node3nextWaiter
dowait final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; //1. 当前栅栏被打翻则抛出异常 if (g.broken) throw new BrokenBarrierException(); //2. 当前线程被中断则唤醒所有线程,并打翻当前栅栏 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //3. 计数器减1 int index = --count; //4. 当计数器为0,执行打翻栅栏前的最后一个任务,并唤醒所有线程,还原计数器,开始新一轮 if (index == 0) { boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) //4.1. 保证执行最后一个任务出问题也能唤醒所有线程不至于一直阻塞 breakBarrier(); } } //5. 计数器不为0 for (;;) { try { if (!timed) //5.1. 非定时等待 trip.await(); else if (nanos > 0L) //5.2. 定时等待 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { //5.3. 如果栅栏还没被打翻,但当前线程被中断则打翻栅栏,唤醒所有线程 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { //5.4. 如果刚好打翻栅栏,则直接中断线程 Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); //6. 如果线程因为换代操作而被唤醒则返回计数器的值 if (g != generation) return index; //7. 如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); }
unlock()
Thread
释放锁,移动到CONDITION队列末尾
blocked
当前节点
00100000 00000000 00000000 00000000
Semaphore
running
PrivilegedCallableUsingCurrentClassLoader
lock.lock()
markword
font color=\"#d32f2f\
线程的状态切换
new
将新值替换原值
得到锁
是否达到最大线程数
原值
启动工作线程worker.thread.start( )
tail
存储器总线
执行顺序可能是: 2--->1--->3--->4 :不影响结果 2--->1--->4--->3 :不允许此操作,因为处理器在进行重排序时是会考虑指令之间的数据依赖性,语句3 必定比语句4先执行
只有一个线程情况下,new Object( )
非核心线程
共享模式下获取锁
核心思想:如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列实现的,即将暂时获取不到锁的线程加入到队列中。CLH队列:是一个虚拟队列,即不存在队列实例,仅存在节点之间的关联关系。AQS是将每条请求共享资源的线程封装成一个CLH队列的节点(Node)来实现锁的分配
寄存器
如果是被中断,则设置true
tidying
lock()
cpu1
time_waiting
unpark & park
由于锁已被占用,所以将节点放到CLH队列中等待获取锁
执行到Condition.await()
AbortPolicy
Class ConditionObject
线程池的状态
final AtomicInteger ctl;static final int COUNT_BITS;static final int CAPACITY;static final int RUNNING = -1 << COUNT_BITS;static final int SHUTDOWN = 0 << COUNT_BITS;static final int STOP = 1 << COUNT_BITS;static final int TIDYING = 2 << COUNT_BITS;static final int TERMINATED = 3 << COUNT_BITS;BlockingQueue<Runnable> workQueue;ReentrantLock mainLock;HashSet<Worker> workers;Condition termination;int largestPoolSize;long completedTaskCount;volatile ThreadFactory threadFactory;volatile RejectedExecutionHandler handler;volatile long keepAliveTime;volatile boolean allowCoreThreadTimeOut;volatile int corePoolSize;volatile int maximumPoolSize;static final RejectedExecutionHandler defaultHandler = new AbortPolicy();static final RuntimePermission shutdownPerm = new RuntimePermission(\"modifyThread\");final AccessControlContext acc;
addWaiter(Node.EXCLUSIVE)
unlinkCancelledWaiters()//1. 获取头节点Node t = firstWaiter;//2. 声明一个追踪的节点变量Node trail = null;//3. 循环直到队列为空while (t != null) { //3.1. 获取头节点的下一个节点 Node next = t.nextWaiter; //3.2. 如果头节点状态不是等待状态就移除 if (t.waitStatus != Node.CONDITION) { //3.3. 先将头节点的指向置null t.nextWaiter = null; //3.4. 追踪节点用来记录下一个节点 if (trail == null) firstWaiter = next; else trail.nextWaiter = next; //3.5. 到队尾,将追踪节点设置为尾节点 if (next == null) lastWaiter = trail; } else trail = t; t = next; }
lock.unlock()
Executor
CPU 总线嗅探机制
比较值
nextGeneration trip.signalAll(); count = parties; generation = new Generation();
ExecutorService
corePool
Y
CyclicBarrier
Semaphore作用:是一个计数信号量,常用于限制对某个公共资源进行访问的线程数量;场景:假如 甲乙丙三人去就餐,餐前需要洗手,而只有两个洗手盆,其中一人就需要等洗手盆空出才能用。 这里洗手盆的数量就是 计数信号量,限制了访问洗手盆的人数(线程数)
ThreadPoolExecutor
时间到 / notify() / thread.interrupt()
负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常。
解锁成功
CAS操作
由于不能直接操作主内存,所以各线程对共享变量的修改不能及时更新主内存,使得各线程取得的变量值不是最新的-----引出:内存可见性和缓存一致性协议
Condition 作用:在Synchronized加锁状态时,是使用wait/notify/notifyAll进行线程间的通信。那么在使用ReentrantLock加锁时,是如何实现线程间通信问题的呢?在JUC中既然提供了Lock,也提供了用作其线程间通信的方式,再次引入了Condition。Condition 的 await() 等价于 Object 的 wait()Condition 的 signal() 等价于 Object 的 notify()Condition 的 signalAll() 等价于 Object 的 notifyAll()
CLH队列
什么是指令重排序?处理器为了提高运行效率,可能会对输入代码进行优化,它不保证程序中各个语句的执行顺序同代码中的顺序一致,但是它会保证程序最终执行结果和代码顺序执行结果是一致的禁止指令重排序:指令重排序分三种:1. 编译器优化重排序:编译器在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序。2. 指令级并行重排序:现代处理器采用了指令级并行技术来将多条指令重叠执行,如果不存在数据依赖性,处理器可以改变语句对应的机器指令的执行顺序。3. 内存系统重排序:由于处理器使用缓存和读/写缓冲区,这使得加载和存储操作看上去可能是在乱序执行。什么叫数据依赖性?如果两个操作访问同一个变量,且这两个操作中有一个为写操作,此时这两个操作之间就存在数据依赖性。这里所说的数据依赖性仅针对单个处理器中执行的指令序列和单个线程中执行的操作,不同处理器之间和不同线程之间的数据依赖性不被编译器和处理器考虑。从 Java 源代码到最终执行的指令序列,会分别经历下面三种重排序:
Unsafe类使Java拥有了像C语言的指针一样操作内存空间的能力
线程被中断,则其余等待线程会全部被唤醒并抛出中断异常
stop
Thread-2
WriteLock
JMM 的抽象示意图
时间片用完
判断node前驱状态是否为SIGNAL,是则直接返回true。node前驱状态不是SIGNAL,有可能是ws>0,说明前驱取消了,自旋跳过取消的节点,并寻找链接一个正常的前驱。node前驱状态不是SIGNAL,有可能是0(初始化状态)或PROPAGATE(传播状态),修改node前驱状态为SIGNAL。
waitStatus<=0
原值未变
transient volatile Node head;transient volatile Node tail;volatile int state;
Runnable barrierCommand;int count;Generation generation = new Generation();final ReentrantLock lock = new ReentrantLock();final int parties;final Condition trip = lock.newCondition();
主内存
sync.acquireSharedInterruptibly(1);
//1. 先将状态-1int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException();boolean free = false;//2. c为0代表锁处于空闲状态if (c == 0) { free = true; setExclusiveOwnerThread(null);}setState(c);return free;
trip.await()阻塞
Thread-3
sync = lock.sync;
唤醒(非中断)
ALU运算器
修改值
如何实现内存可见性?使用 volatile 关键字、加锁。加锁为什么能保证内存可见性?线程获取到锁,会清空本地内存,然后从主内存中复制一份共享变量到本地内存,执行代码,然后将新值刷到主内存中,最后释放锁。volatile 的作用?实现可见性和禁止指令重排序。volatile是如何实现内存可见性?使用volatile关键字修饰的共享变量后,各线程从主存拷贝变量到本地内存并修改更新主存后,会通过CPU总线嗅探机制告知其他线程的本地内存副本已失效,需要重新到主存获取,其实就是使用缓存一致性协议来实现的。注意:由于总线嗅探机制,会不断地监听总线,如果大量使用volatile会引起总线风暴,所以,volatile的使用要适合具体场景。
DiscardPolicy
创建节点
ReentrantReadWriteLock
本地内存C
缓存一致性原理:因为多缓存的出现,所以才需要保证缓存一的致性。缓存一致性是根据 CPU总线嗅探机制来实现的。嗅探机制工作原理:每个处理器通过监听在总线上传播的数据来检查自己的缓存值是不是过期了,如果处理器发现自己缓存行对应的内存地址修改,就会将当前处理器的缓存行设置无效状态,当处理器对这个数据进行修改操作的时候,会重新从主内存中把数据读到处理器缓存中。缓存一致性协议有哪些?如MSI,MESI,MOSI,Synapse,Firefly及DragonProtocol等等。如何解决缓存一致性问题?1. 通过总线加锁;2. 通过缓存一致性协议;这两种方式都是硬件层面上的。
执行拒绝策略
语句1 和 语句2 谁先执行,这对结果没有影响
南桥芯片
原值已变
PrivilegedThreadFactory
class pointer
解锁步骤:1. 先将锁状态-1;2. 如果状态为0,则完全释放锁,设置好锁的状态和持有锁线程变量为null后,开始对CLH队列下一节点进行唤醒;3. 如果状态>0,则锁并未被完全释放;
跳过节点2
ReadWriteLock
尾节点移动
if (tryReleaseShared(arg)) { //1. 如果计数器为0,唤醒被await方法阻塞的所有线程 doReleaseShared(); return true;}return false;
唤醒
CAS概念:比较并替换缺点:1. ABA问题:线程A和线程B同时改变内存v(值为0),线程A正在准备开始执行CAS时,线程B已经将内存v的值改为1后又改为0,对于线程A进行CAS时是察觉不到值已经被修改过了解决:加版本号 或者 使用AtomicStampedReference2. 自旋问题:由于一次CAS并不一定能够成功,所以往往会将CAS操作放入到循环中,而每次CAS都需要CPU切换线程来进行比较,一旦线程数多了,CPU来回切换频繁而且又不成功,白白浪费CPU资源,所以在高并发场景下CAS效率并不高3. 范围不能灵活控制:CAS只能对单个变量进行比较,而非多个变量
JMM定义了线程和主内存之间的抽象关系,线程间的共享变量(不包括局部变量,局部变量是线程私有,不参与竞争)存储在主内存中,每个线程都有一个私有内存,存储了共享变量的副本。1. 各本地内存间不能直接互相访问,只能通过主内存进行数据交换2. 线程对变量的操作必须在本地内存中进行,不能直接读写主内存
缓存一致性协议 或锁总线
自旋
如果获取失败,就放入队列等待唤醒
控制器
ReadLock readerLock;WriteLock writerLock;final Sync sync;
for (;;) { //1. 判断是否有前置节点 if (hasQueuedPredecessors()) return -1;b style=\
等待获取锁 / thread.join()
读锁和写锁:读锁:非公平锁实现写锁:公平锁实现(和上面reentrantLock 公平锁实现一样)读读共享:多个线程相继获取读锁后,假如某线程想获取写锁去修改值的话,需要等已获取读锁的线程们释放锁后才能修改。多线程获取和释放读锁是可以同时获取,线程间互不影响。读写互斥:多个线程在获取读锁时,如果有新线程获取到写锁,则所有线程获取读锁失败。从获取写锁的角度看,在获取写锁时,如果发现有读线程,则需要读线程先释放读锁,这样才能获取到写锁。写写互斥:写锁是独占锁
Worker
不能一下就获取到锁
Lock readLock();Lock writeLock();
第二次循环检测到head 的 waitStatus = -1时,就调用parkAndCheckInterrupt将第二个节点挂起(阻塞)
AbstractOwnableSynchronizer
//1. 将锁标识设置为倒计时数量(相当于有多少个线程持有锁)Sync(int count) { setState(count); }
节点3
对齐填充
可以被回收
CONDITION队列
确保操作1在操作2之前
when pool is empty
阻塞
CallerRunsPolicy
二级缓存
IO设备
造价高(更小更快)
shutdown
初始化ReentrantLock(boolean fair)
doReleaseShared for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) {span style=\"font-weight: normal;\
指令级并行重排序
tryAcquire(arg)
如果是处于队列第二位,则进行阻塞
doAcquireSharedInterruptibly //1. 当前线程添加到同步队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { /* 加入队列后,还要对线程进行挂起 */ for (;;) { //2. 获取当前节点的前置节点 final Node p = node.predecessor(); //3. 前置节点是头节点 if (p == head) { //3. 再次判断 int r = tryAcquireShared(arg); //4. 线程全部执行完 if (r >= 0) { //4.1. 唤醒后续线程(就是当前线程),最后通过 //LockSupport.unpark()唤醒线程span style=\
对象头
再次读取原值并确认原值是否被改变
添加到CLH队列
Synchronized 和 Lock 区别1. Synchronized 内置的java关键字,Lock是一个java类2. Synchronized 无法获取锁的状态,Lock 可以判断是否获取到了锁3. Synchronized 会自动释放锁,Lock必须要手动释放锁!如果不释放锁,就会死锁4. Synchronized 线程1(获得锁,阻塞)、线程2(等待,傻傻地等),Lock就不一定等待下去(可以设置超时等待或者中断)5. Synchronized 可重入锁,不可以中断,非公平;Lock 可重入锁,可设置公平或者非公平6. Synchronized 适合锁少量的同步代码,Lock 适合锁大量的同步代码
for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 ||font color=\"#9c27b0\
acquire(1);
内存系统重排序
多核
class pointer:指向class 对象的指针;实例数据:就是本实例中的成员变量,实例数据的大小,取决于声明成员变量的类型的大小;
volatile是如何禁止指令重排序?被volatile 关键字修饰的变量,所生成的汇编代码前会被添加上一个LOCK前缀指令,这个前缀指令相当于一个内存屏障,这个屏障提供3个功能:1. 它确保指令重排序时不会把其后面的指令排到屏障之前,也不会把前面的指令排到屏障的后面,也就是说当执行到内存屏障这句指令时,在它前面的操作已经全部完成;2. 它会强制将对缓存的修改立即写入主存;3. 如果是写操作,它会导致其他CPU中对应的缓存行无效。
if (count < 0) throw new IllegalArgumentException(\"count < 0\");this.sync = new Sync(count);
常见的线程池:1. CachedThreadPool:可缓存的线程池,没有核心线程,有需要时创建线程来执行任务,线程数量为Integer.max_value,相当于无限大;2. ScheduleThreadPool:周期性执行任务的线程池,有核心线程和非核心线程,非核心线程数量和CachedThreadPool一样;3. SingleThreadPool:只有一个线程的线程池,最普通的线程池;4. FixedThreadPool:定长线程池,有核心线程,没有非核心线程;使用场景:1. CachedThreadPool:适合量大,耗时小的任务。任何任务都会被立即执行,优先使用空闲线程来执行任务,否则就创建新线程来执行;2. ScheduleThreadPool:主要用于执行定时任务和具有固定周期的重复任务,非核心线程有时间限制,只要超时或者闲置就会被回收;3. SingleThreadPool:用于执行有顺序的任务,因为只有一个线程,所以确保了所有任务按顺序在同一个线程中执行;4. FixedThreadPool:当所有线程都处于活动状态时,新任务会处于等待,直到有空闲线程;推荐的创建线程池的方式:在项目中一定要注意线程池的使用要严谨,不要使用Executors去直接调用API去创建线程池,要使用ThreadPoolExecutor(阿里内部也明确规定这一点)原因是这样的处理方式能让我们更加明确线程池的运行规则,规避资源耗尽的风险。Executors 返回线程池对象的弊端如下: FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致OOM。 CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致OOM。
01000000 00000000 00000000 00000000
maximunPool
销毁线程processWorkerExit
span style=\
线程模型Jvm线程模型(HotSpotspan style=\
刚启动时,会出现多个内置线程对资源空间进行分配,会产生资源的争抢,待这些启动所需要的线程启动完了(JMM启动成功),才会执行main线程,这里就有一个时间差----延时。待该启动的线程完成启动后,才会进行偏向锁的设置
线程池运行过程
非公平
boolean writerShouldBlock();boolean readerShouldBlock();
内存
JMM控制
doSignal(Node first)do { //1. 如果头节点无下一个节点 if ( (firstWaiter = first.nextWaiter) == null) //1.1. 设置尾节点为null lastWaiter = null; //2. 将头节点指向下一节点为null,GC first.nextWaiter = null;} //循环直到头节点移到同步队列或队列为空while (!transferForSignal(first) &&(first = firstWaiter) != null);
是否需要超时
初始化if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;this.barrierCommand = barrierAction;
一级缓存
读取分析指令
ranAction = true;nextGeneration();
解锁顺序
Thread-1
native boolean compareAndSwapInt()
线程C
解锁失败
加入阻塞队列
waiting
workQueue
Sync sync;
CountDownLatch
ReadLock(ReentrantReadWriteLock lock)
node2nextWaiter
主要是MarkWord,下图是64位
节点1
三级缓存
addConditionWaiter()
sync.releaseShared(1);
当最后一个线程countDown() 完成后,由该线程去唤醒同步队列第一个线程;第一个线程被唤醒后,会继续await()中的代码,由 setHeadAndPropagate() 继续唤醒剩下的线程
cpu2
01100000 00000000 00000000 00000000
Thread thread;Runnable firstTask;volatile long completedTasks;
DefaultThreadFactory
await( )
贯穿整个系统的是一组电子管道,它携带byte 并负责在各个 部件 之间传递。 一般 传送的是 固定长度的 byte块,也就是 word(字)
unparkSuccessor(h)
64位操作系统:4字节;32位操作系统:4字节;
初始化ReentrantLock()
造价低(更大更慢)
fullyRelease(Node node)
doAcquireShared(1)//1. 创建当前线程节点final Node node = addWaiter(Node.SHARED);boolean failed = true; try { boolean interrupted = false; or (;;) { //2. 获取当前线程节点的前一个节点 final Node p = node.predecessor(); //3. 前一节点是头节点 if (p == head) { //3.1. 尝试获取锁 int r = tryAcquireShared(arg); //3.2. 获取锁成功 if (r >= 0) { //3.2.1. 唤醒后续读节点线程,不唤醒写互斥锁线程!!!font color=\"#000000\
示例:Load1;LoadStore;Store2
Node predecessor();boolean isShared();
1. 将线程加入CLH对列2. 再尝试在队列获取
when pool and queue is empty
Node SHARED = new Node();Node EXCLUSIVE = null;CANCELLED = 1;SIGNAL = -1;CONDITION = -2;PROPAGATE = -3;int waitStatus;Node prev;Node next;Thread thread;Node nextWaiter;
头节点移动
ReentrantLock
tryAcquire失败
锁升级的过程
shutdownNow()
terminated
ScheduledExecutorService
指令缓存:用于暂时存储并向CPU递送各类指令;数据缓存:用于暂时存储并向CPU递送运算所需数据;CPU执行指令的速度非常快,所以要先预读一些指令到一级指令缓存中,如果数据和指令都放在L1中,一旦数据覆盖了指令,那么计算机将无法正确执行了,因此L1需要划分为两个区域,而L2和L3不需要参与指令的预读,所以不需要划分
node1nextWaiter
FinalizableDelegatedExecutorService
尝试去获取读锁(若有其他线程持有写锁,则获取读锁失败),获取成功后更新自己的ThreadLocal 存放的HoldCounter
int i = 0; boolean flag = false;i = 1; //语句1 flag = true; //语句2
ReadLock(ReentrantReadWriteLock lock);void lock();lockInterruptibly();Condition newCondition();boolean tryLock();void unlock();
shutdown()
公平锁实现
AQS原理
RejectedExecutionHandler
Sync sync;ReentrantReadWriteLock.ReadLock readerLock;ReentrantReadWriteLock.WriteLock writerLock;
C++底层实现Unsafe类中的compareAndSwapXX是一个本地方法,该方法的实现位于unsafe.cpp中1. 先想办法拿到变量value在内存中的地址;2. 通过Atomic::cmpxchg实现比较替换,其中参数X是即将更新的值,参数e是原内存的值;
tryAcquireShared(1)
worker
sync.acquireShared(1);
DelegatedExecutorService
JMM启动
doAcquireSharedInterruptibly(1)
fullyRelease(Node node)boolean failed = true;try { //1. 获取锁的状态(指new ConditionObject 的锁) int savedState = getState(); //2. 进行解锁 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) //3. 释放锁失败,就将当前线程节点设为失效不再获取锁 node.waitStatus = Node.CANCELLED;}
CountDownLatch(int count)
thread.start()
RunnableAdapter
开启工作任务runWorker( )
p:前置节点node:当前线程节点
磁盘
不会阻塞线程,调用countDown的线程可以继续执行
transferForSignal(first)font color=\"#000000\
acquire()
WriteLock(ReentrantReadWriteLock lock);ReentrantReadWriteLock(boolean fair);
加入同步队列,等到有多余的信号量就唤醒
CountDownLatch 和 CyclicBarrier作用类似,都是等待一组线程都结束才执行下一个任务。但CyclicBarrier 可以重复使用,而CountDownLatch不能CyclicBarrier 简单,通常用于多线程计算,最后合并结果
数据依赖性:举例:
parkAndCheckInterrupt()
在第一次循环中,执行shouldParkAfterFailedAcquire将头节点状态改为 -1
when terminated() method has completed
waitStatus=0
执行到Condition.signal()或Condition.signalAll()
偏向锁
Node firstWaiterNode lastWaiter
确保操作1(更新主存)在操作2之前
try { return font color=\"#d32f2f\
g.broken=false;count=3-1=2;
获取新值
线程非常多的时候
提交任务execute( )
StampedLock 是对 读写锁的升级较之读写锁区别是:1. 相比于普通的ReentranReadWriteLock主要多了一种乐观读的功能;2. 对于乐观读(如果没有进入写模式)可以减少一次读锁的性能消耗,并且不会阻塞写入的操作(乐观读遇到写后转化为悲观,相当于滞后一步);3. 不支持重入;总结相比直接用悲观读锁,乐观读锁可以:1、进入悲观读锁前先看下有没有进入写模式(说白了就是有没有已经获取了悲观写锁)2、如果其他线程已经获取了悲观写锁,那么就只能老老实实的获取悲观读锁(这种情况相当于退化成了读写锁)3、如果其他线程没有获取悲观写锁,那么就不用获取悲观读锁了,减少了一次获取悲观读锁的消耗和避免了因为读锁导致写锁阻塞的问题,直接返回读的数据即可(必须再tryOptimisticRead和validate之间获取好数据,否则数据可能会不一致了,试想如果过了validate再获取数据,这时数据可能被修改并且读操作也没有任何保护措施)
高速缓存
线程B
源代码
主要作用:对CLH队列循环唤醒,唤醒头节点获取锁,如果当前线程处于靠后位置,则对当前线程节点进行排队,如果不是位于第二位置则继续自旋
doSignal(Node first)
是否达到核心线程数
Node addConditionWaiter()unlinkCancelledWaiters()await()signal()signalAll()
if (tryAcquireShared(1) < 0) //获取锁失败后 doAcquireShared(1);
节点2
循环
11100000 00000000 00000000 00000000
多线程 情况下: 由于语句1和语句2不存在数据依赖性,所以可能会发生指令重排。当语句2 比语句1 先执行,同一时间,线程2获取到 语句2的值 为true,跳出循环继而往下执行,由于语句1可能还没执行完,导致线程2调用context时出现错误
//线程1:context = loadContext(); //语句1inited = true; //语句2 //线程2:while(!inited ){ sleep()}doSomethingwithconfig(context);
执行任务task.run( )
注意:countDown()是不涉及CLH队列的,只是单纯修改锁标识而已
N take( )
公平
读锁
是否获取到任务
内存屏障:JMM把内存屏障分为4类:LoadLoad屏障:StoreStore屏障LoadStore屏障StoreLoad屏障
signal( )
REGEISTERS寄存器(存储当前指令和执行数据)
PrivilegedCallable
64位操作系统:8字节;32位操作系统:4字节;
结束
hasQueuedPredecessors Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
系统总线
tryRelease(int releases)
网络存储
static final int SHARED_SHIFT = 16;static final int SHARED_UNIT = (1 << SHARED_SHIFT);static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;Thread firstReader = null;int firstReaderHoldCount;HoldCounter cachedHoldCounter;ThreadLocalHoldCounter readHolds;
barrierCommand
DelegatedScheduledExecutorService
public ThreadPoolExecutor(font color=\"#d32f2f\
workers
void execute(Runnable command);
if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true;}return false;
更新值
示例:Store1;StoreStore;Store2
默认
也就是说,要想并发程序正确地执行,必须要保证原子性、可见性以及有序性。只要有一个没有被保证,就有可能会导致程序运行不正确。
ScheduledThreadPoolExecutor
package locks;import java.util.concurrent.TimeUnit;public class LockSupport_ { //notify 和 wait 执行顺序严格,必须控制代码先wait 后 notify //避免 notify 先执行 而导致线程的 空等待 public static void main(String[] args) throws InterruptedException { final Object o = new Object(); //1.开启一个线程后对其进行挂起 new Thread(()->{ synchronized (o){ try { //2. 休眠3秒,目的是让下面的 notify()先执行,这样本线程就会一直空等待。 TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(\"上锁了\"); try { o.wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(\"被唤醒了\"); } }).start(); //TimeUnit.SECONDS.sleep(3); //2.通过主线程去唤醒这个线程 synchronized (o){ o.notify(); } }}
keepAliveTime 作用于非核心线程,超时就关闭非核心线程
workQueue(等待执行的任务队列)
缺点:造成cpu频繁切换,如果线程数量很大,会有相当大部分的CPU资源浪费在循环比较值的环节上
sync.release(1)
计算机结构图
addConditionWaiter()//1. 获取尾节点Node t = lastWaiter;//2. 有尾节点 并且 尾节点不是等待状态if (t != null && t.waitStatus != Node.CONDITION) { //2.1. 清除不为等待状态的节点 font color=\"#f44336\
解锁
这个比较失败后的循环操作,是CPU进行操作,也就是当有多个线程比较失败后,都是由各自的CPU进行再比较。此时,因CPU某刻只能操作一个线程,所以需要CPU来回切换线程,这样各线程才能实现再比较操作
Sync
非公平锁实现
tryAcquireShared(1)//1. 获取当前线程Thread current = Thread.currentThread();//2. 获取AQS的 state锁状态int c = getState();//3. 如果有线程持有独占锁(写锁)并且当前线程不是持有写锁的线程,那么获取读锁失败if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1;//4. 获取持有共享锁的线程数int r = sharedCount(c);//5. span style=\
编译器优化重排序
线程A
数据缓存
在早期的CPU当中,是通过在总线上加LOCK#锁的形式来解决缓存不一致的问题。因为CPU和其他部件进行通信都是通过总线来进行的,如果对总线加LOCK#锁的话,也就是说阻塞了其他CPU对其他部件访问(如内存),从而使得只能有一个CPU能使用这个变量的内存。比如上面例子中 如果一个线程在执行 i = i +1,如果在执行这段代码的过程中,在总线上发出了LCOK#锁的信号,那么只有等待这段代码完全执行完毕之后,其他CPU才能从变量i所在的内存读取变量,然后进行相应的操作。这样就解决了缓存不一致的问题。 但是上面的方式会有一个问题,由于在锁住总线期间,其他CPU无法访问内存,导致效率低下。 所以就出现了缓存一致性协议。最出名的就是Intel 的MESI协议,MESI协议保证了每个缓存中使用的共享变量的副本是一致的。它核心的思想是:当CPU写数据时,如果发现操作的变量是共享变量,即在其他CPU中也存在该变量的副本,会发出信号通知其他CPU将该变量的缓存行置为无效状态,因此当其他CPU需要读取这个变量时,发现自己缓存中缓存该变量的缓存行是无效的,那么它就会从内存重新读取
获取任务getTask( )
旧值没改变
最终执行的指令序列
通过这个类能操作内存空间
waitStatus=-1
g.broken=false;count=1-1=0;
Sync(int count);int getCount();int tryAcquireShared(int acquires);boolean tryReleaseShared(int releases);
Executors
false
if (h != null && h.waitStatus != 0)
if (Thread.interrupted()) throw new InterruptedException();//1. 将当前线程添加到等待队列 Node node = addConditionWaiter();//2. 释放当前线程持有的锁b style=\
0 条评论
下一页
为你推荐
查看更多