高并发编程的艺术
2019-08-21 13:25:44 34 举报
AI智能生成
Java高并发编程的艺术
作者其他创作
大纲/内容
并发编程的挑战
上下文切换
多线程与单线程的速度比较
因为线程创建和上下文切换的开销,所以多线程不一定比单线程快
如何减少上下文切换
无锁并发编程
CAS算法
Compare And Swap,例如Atomic包下的类
使用最少线程
使用线程池,避免创建过多或不需要的线程
协程
在单线程里实现多任务调度,并在单线程里维持多个任务间的切换
死锁
避免死锁的几个常见方法
避免一个线程同时获取多个锁
避免一个线程在锁内同时占用多个资源,尽量保证每个锁只占用一个资源
尝试使用定时锁
对于数据库锁,加锁和解锁必须在一个数据库连接里,否则会出现解锁失败的情况
资源限制的挑战
什么是
资源限制是指在进行并发编程时,程序的执行速度受限于计算机硬件资源或软件资源
引发的问题
受于资源限制,并发执行可能仍然保持串行执行,这时候因为增加了上下文切换和资源调度的时间,程序将不仅不会执行得更快,反而变得更慢
如何解决
硬件资源限制:使用集群
软件资源限制:使用资源池
Java并发机制的低层实现原理
volatile的应用
volatile的定义与实现原理
如果一个字段被声明称volatile,Java线程内存模型确保所有线程看到这个变量的值是一致的
volatile的实现主要依靠JVM给处理器发送的Lock前缀指令
Lock前缀指令会引起处理器缓存回写到内存。Lock前缀指令导致在执行指令期间,声言处理器的LOCK#信号。
锁总线
处理器可以独占任何共享内存【锁住总线,导致其他CPU不能访问总线,不能访问总线就意味着不能访问系统内存】
锁缓存
锁住处理器的内部缓存,回写到主内存中,并使用缓存一致性机制来保证修改的原子性。
一个处理器的缓存回写到内存会导致其他处理器的缓存无效。
其他处理器会使共享变量所在缓存行无效,下次访问强制从主内存中读取值
volatile的使用优化
将变量追加到64字节
why?
很多处理器的L1、L2或L3缓存的高速缓存行是64个字节宽,不支持部分填充缓存行(即缓存不够64字节也不会自动填充够64字节),这意味着,如果有两个volatile变量都不够64字节,处理器会将他们都读到同一个高速缓存行中,在多处理器下每个处理器都是同样的做法,当一个处理器试图修改一个变量时,会将这个缓存行锁定,那么在缓存一致性机制的作用下,会导致另外一个变量也不能被修改了。
not
缓存行非64字节宽的处理器
共享变量不会被频繁地写
synchronized的实现原理与应用
Java对象头
Java对象头里的Mark Word里默认储存对象的HashCode、分代年龄和锁标记位
32位
64位
锁的升级与对比
偏向锁
大多数情况下,锁不仅不存在多线程竞争,而且总会由同一线程多次获得。
获取:当一个线程访问同步块并获取锁时,会在对象头和栈帧中里存储锁偏向的线程ID,以后该线程在进入和退出同步块时,不需要进行CAS操作来加锁和解锁,只需要简单地测试一下对象头的Mark Word里是否存储着执行当前线程的偏向锁
撤销:偏向锁使用了一种等到竞争出现才释放锁的机制,所以当其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁
关闭偏向锁:
轻量级锁
加锁:线程在执行同步之前,JVM会先在当前线程的栈帧中创建用于存储锁记录的控件,并将对象头中的Mark Word复制锁记录中,官方称为Displaced Mark Word。然后线程尝试使用CAS将对象头中的Mark Word替换为指向锁记录的指针。如果成功,当前线程获得锁,如果失败,表示其他线程竞争锁,当前线程便尝试使用自旋锁来获取锁。
解锁:轻量级解锁时,会使用原子的CAS操作将Displaced Mark Word替换回到对象头,如果成功,则表示没有竞争发现。如果失败,表示当前锁存在竞争,锁会膨胀成重量级锁。
锁的优缺点对比
原子操作的实现原理
相关术语定义
缓存行:缓存的最小单位
比较并交换:也就是CAS。CAS操作需要输入两个数值,一个旧值(期望操作前的值)和一个新值,在操作期间先比较旧值有没有发现变化,如果没有发生变化,才交换成新值,发生了变化则不交换。
CPU流水线:CPU流水线的工作方式就像工业生产上的装配流水线,在CPU中由5~6个不同功能的电路单元组成一条指令处理流水线,然后将一条X86指令分成5~6步后再由这些电路单元分别执行,这样就能实现在一个CPU时钟周期完成一条指令,因此提高CPU的运算速度
内存顺序冲突:内存顺序冲突一般是由假共享引起的,假共享是指多个CPU同时修改同一个缓存航的不同部分而引起其中一个CPU的操作无效,当出现这个内存顺序冲突时,CPU必须清空流水线
处理器如何实现原子操作
两种锁定方式:
总线锁定:使用总线锁就是使用处理器提供的一个LOCK#信号,当一个处理器在总线上输出此信号时,其他处理器的请求将被阻塞住,那么该处理器可以独占共享内存。
缓存锁定:处理器不在总线上声言LOCK#信号,而是修改内部的内存地址,并且利用缓存一致性机制来保证操作的原子性,因为缓存一致性机制会组织同时修改由两个处理器缓存的内存区域数据。
处理器不会使用缓存锁定的情况
当操作的数据不能被缓存在处理器内部,或操作的数据跨多个缓存行,处理器会调用总线锁定。
处理器不支持缓存锁定。
Java如何实现原子操作
使用循环CAS实现原子操作
三大问题
ABA问题
解决思路:增加版本号
循环时间长开销大
只能保证一个共享变量的原子操作
使用锁机制实现原子操作
Java内存模型
Java内存模型的基础
并发编程模型的两个关键问题
线程之间如何通信:线程之间以何种机制来交换信息
共享内存:线程之间共享程序的公共状态,通过写-读内存中的公共状态进行**隐式**通信
消息传递:线程之间没有公共状态,线程之间必须通过发送消息来**显示**进行通信
线程之间如何同步:程序中用于控制不同线程间操作发生相对顺序的机制
在共享内存并发模型里,同步是显示进行的,程序员必须显示指定某个方法或某段代码需要在线程之间互斥执行
在消息传递的并发模型里,由于消息的发送必须在消息的接收之前,因此同步是隐式进行的。
Java内存模型的抽象结构
在Java中,所有实例域、静态域和数组元素都存储在堆内存中,堆内存在线程之间共享(“共享变量”这个术语代指这三个东西)。局部变量,方法定义参数和异常处理器参数不会在线程之间共享,它们不会有内存可见性问题,也不受内存模型的影响。
Java线程之间的通信由Java内存模型(JMM)控制。JMM定义了线程和主内存之间的抽象关系:线程之间的共享变量存储在主内存中,每个线程都有一个私有的本地内存,本地内存中存储了共享变量的副本。注意:本地内存是JMM的一个抽象概念,并不真实存在。
从源代码到指令序列的重排序
编译器优化的重排序
在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序。
指令级并行的重排序【处理器】
现代处理器采用了指令级并行技术来将多条指令重叠执行。如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序。
内存系统的重排序【处理器】
由于处理器使用缓存和读/写缓冲区,这使得加载和存储操作看上
ps:JMM属于语言级的内存模型,它确保在不同的编译器和不同的处理器平台之上,通过禁止特定类型的编译器重排序和处理器重排序(通过内存屏障指令),提供一致的内存可见性保证。
happens-before简介
与程序员密切相关的happens-before规则
程序顺序规则:一个线程中的每个操作,happens-before于该线程的后续操作。
监视器锁规则:对一个锁的解锁,happens-before于随后对这个锁的加锁。
volatile变量规则:对一个volatile域的写,happens-before于任意后续对这个volatile域的读。
传递性:如果A happens-before B,且B happens-before C,那么A happens-before C。
重排序
数据依赖性
写后读
写后写
读后写
as-if-serial语义
不管怎么重排序,(单线程)程序的执行结果不能被改变。编译器、runtime和处理器都必须遵守。遵守as-if-serial语义,编译器和处理器不会对存在数据依赖关系的操作做重排序。
程序顺序规则
只要重排序和顺序执行的结果一致,JMM会允许对于程序顺序的重排序
重排序对多线程的影响
在单线程程序中,对存在控制依赖的操作重排序不会改变执行结果(这也是as-if-serial语义允许对存在控制依赖的操作做重排序的原因);但在多线程程序中,则可能会改变程序的执行结果。
顺序一致性
JMM对正确同步的多线程程序的内存一致性做了如下保证:如果程序是正确同步的,程序的执行将具有顺序一致性———即程序的执行结果与该程序在顺序一致性内存模型中的而执行结果相同。【这里的同步是指广义上的同步,包括对常用同步原语(synchronized、volatile和final)】
顺序一致性内存模型
两大特性
一个线程中的所有操作必须按照程序的顺序来执行
(不管程序是否同步)所有线程都只能看到一个单一的操作执行顺序。在顺序一致性内存模型中,每个操作都必须原子执行且立刻对所有线程可见。
同步程序的顺序一致性效果
在JMM中,临界区内的代码可以重排序,但绝不可以逸出到临界区之外
未同步程序的执行特性
对于未同步或未正确同步的多线程程序,JMM只提供最小安全性:线程执行时读取到的值,要么是之前某个线程写入的值,要么是默认值(0,Null,False),JMM保证线程读操作读取的值不会是无中生有的冒出来。
未同步程序在JMM和顺序一致性内存模型的几个差异
顺序一致性模型保证单线程内的操作会按程序的顺序执行,而JMM不保证。
顺序一致性模型保证所有线程只能看到一致的操作执行顺序,而JMM不保证。
JMM不保证对64位的long型和double型变量的写操作具有原子性,而顺序一致性模型保证对所有的内存读/写操作具有原子性。
volatile的内存语义
特性
可见性。对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入。
原子性:对任意单个volatile变量的读/写具有原子性,但类似于volatile++这种复合操作不具有原子性。
写-读建立的happens-before关系
假设线程A执行writer()方法之后,线程B执行reader()方法。根据happens-before规则,这个立的happens-before关系可以分为3类:
1)根据程序次序规则,1 happens-before 2;3 happens-before 4。
2)根据volatile规则,2 happens-before 3。
3)根据happens-before的传递性规则,1 happens-before 4。
因为1 happens-before4,所以保证了flag变量的内存可见性。
1)根据程序次序规则,1 happens-before 2;3 happens-before 4。
2)根据volatile规则,2 happens-before 3。
3)根据happens-before的传递性规则,1 happens-before 4。
因为1 happens-before4,所以保证了flag变量的内存可见性。
volatile写-读的内存语义
volatile写的内存语义:当写一个volatile变量时,JMM会把该线程对应本地内存中的共享变量值刷新到主内存。
volatile读的内存语义:当读一个volatile变量时,JMM会把该线程对应的本地内存置为无效。线程接下来将从主内存中读取共享变量。
volatile内存语义的实现
为了实现volatile内存语义,JMM会限制编译器重排序和处理器重排序
当第二个操作是volatile写时,不管第一个操作是什么,都不能重排序。
当第一个操作是volatile读时,不管第二个操作是什么,都不能重排序。
当第一个操作是volatile写,第二个操作是volatile读时,不能重排序。
为了实现volatile的内存语义,编译器在生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。
volatile写
在每个volatile写操作的前面插入一个StoreStore屏障。
在每个volatile写操作的后面插入一个StoreLoad屏障。
volatile读
在每个volatile读操作的后面插入一个LoadLoad屏障。
在每个volatile读操作的后面插入一个LoadStore屏障。
JSR-133为什么要增强volatile
旧的Java内存模型允许volatile变量与普通变量重排序
为了提供一种比锁更轻量级的线程之间通信的机制,JSR-133专家组决定增强volatile的内存语义:**严格限制编译器和处理器对volatile变量与普通变量的重排序,确保volatile的写-读和锁的释放-获取具有相同的内存语义**。
锁的内存语义
锁的释放-获取建立的happens-before关系
根据程序顺序规则:1 happens-before 2,2happens-before 3;4 happens-before 5,5 happens-before 6。
根据监视器锁规则:3 happens-before 4。
根据happens-before的传递性:2 happens-before 5。
因为2 happens-before 5,因此,线程A在释放锁之前所有的可见的共享变量,在线程B同一个锁后,将立刻变得对线程B可见。
锁的释放和获取的内存语义
当线程释放锁时,JMM会把该线程对应的本地内存中的共享变量刷新到主内存中。
当线程获取锁时,JMM会把该线程对应的本地内存置为无效。从而使得被监视器保护的临界区代码必须从主内存中读取共享变量。
锁内存语义的实现
利用volatile变量的写-读所具有的内存语义。
利用CAS所附带的volatile读和volatile写的内存语义。
concurrent包的实现
通用的实现模式
首先,声明共享变量为volatile
然后,利用CAS的原子条件更新来实现线程之间的同步
同时,配合以volatile的读/写和CAS所具有的volatile读和写的内存语义来实现线程之间的通信。
final域的内存语义
final域的重排序规则
对于final域,编译器和处理器要遵守两个重拍规则
在构造函数内对一个final域的写入,与随后把这个被构造对象的引用赋值给一个引用变量,这两个操作之间不能重排序。
初次读一个包含final域的对象的引用,与随后初次读这个final域,这两个操作之间不能重排序。
写final域的重排序规则
JMM禁止编译器把final域的写重排序到构造函数之外。
编译器会在final域的写之后,构造函数return之前,插入一个StoreStore屏障。这个屏障禁止处理器把final域的写重排序到构造函数之外。
读final域的重排序规则
在一个线程中,初次读对象引用与初次读该对象包含的final域,JMM禁止处理器重排序这两个操作(注意,这个规则仅仅针对处理器)。编译器会在读final域操作的前面插入一个LoadLoad屏障。
final域为引用过类型
对于引用类型,写final域的重排序规则对编译器和处理器增加如下约束:在构造函数内对一个final引用的对象的成员域的写入,与随后在构造函数外把这个被构造函数对象的引用赋值給一个引用变量,这两个操作之间不能重排序。
final引用不能从构造函数内溢出
在构造函数内部,不能让这个被构造对象的引用为其他线程所见,也就是对象引用不能在构造函数中“溢出”
final语义在处理器中的实现
在x86处理器中:由于X86处理器不会对写-写操作做重排序,所以在X86处理器中,写final域需要的StoreStore障屏会被省略掉。同样,由于X86处理器不会对存在间接依赖关系的操作做重排序,所以在X86处理器中,读final域需要的LoadLoad屏障也会被省略掉。也就是说,在X86处理器中,final域的读/写不会插入任何内存屏障!
JSR-133为什么要增强final的语义
问题:在旧的Java内存模型中,一个最严重的缺陷就是线程可能看到final域的值会改变。比如,一个线程当前看到一个整型final域的值为0(还未初始化之前的默认值),过一段时间之后这个线程再去读这个final域的值时,却发现值变为1(被某个线程初始化之后的值)。最常见的例子就是在旧的Java内存模型中,String的值可能会改变。
解决:为了修补这个漏洞,JSR-133专家组增强了final的语义。通过为final域增加写和读重排序规则,可以为Java程序员提供初始化安全保证:只要对象是正确构造的(被构造对象的引用在构造函数中没有“逸出”),那么不需要使用同步(指lock和volatile的使用)就可以保证任意线程都能看到这个final域在构造函数中被初始化之后的值。
happens-before
JMM的设计
JMM向程序员提供的happens-before规则能满足程序员的需求。JMM的happens-before规则不但简单易懂,而且也向程序员提供了足够强的内存可见性保证。
JMM对编译器和处理器的束缚已经尽可能少。从上面的分析可以看出,JMM其实是在遵循一个基本原则:只要不改变程序的执行结果(指的是单线程程序和正确同步的多线程程序),编译器和处理器怎么优化都行。
happens-before的定义
如果一个操作happens-before另一个操作,那么第一个操作的执行结果将对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前。
两个操作之间存在happens-before关系,并不意味着Java平台的具体实现必须要按照happens-before关系指定的顺序来执行。如果重排序之后的执行结果,与按happens-before关系来执行的结果一致,那么这种重排序并不非法(也就是说,JMM允许这种重排序)。
happens-before与as-if-serial的语义比较
as-if-serial语义保证单线程内程序的执行结果不被改变,happens-before关系保证正确同
步的多线程程序的执行结果不被改变。
步的多线程程序的执行结果不被改变。
as-if-serial语义给编写单线程程序的程序员创造了一个幻境:单线程程序是按程序的顺
序来执行的。happens-before关系给编写正确同步的多线程程序的程序员创造了一个幻境:正
确同步的多线程程序是按happens-before指定的顺序来执行的。
序来执行的。happens-before关系给编写正确同步的多线程程序的程序员创造了一个幻境:正
确同步的多线程程序是按happens-before指定的顺序来执行的。
happens-before规则
程序顺序规则:一个线程中的每个操作,happens-before于该线程中的任意后续操作。
监视器锁规则:对一个锁的解锁,happens-before于随后对这个锁的加锁。
volatile变量规则:对一个volatile域的写,happens-before于任意后续对这个volatile域的
读。
读。
传递性:如果A happens-before B,且B happens-before C,那么A happens-before C。
start()规则:如果线程A执行操作ThreadB.start()(启动线程B),那么A线程的ThreadB.start()操作happens-before于线程B中的任意操作。
join()规则:如果线程A执行操作ThreadB.join()并成功返回,那么线程B中的任意操作happens-before于线程A从ThreadB.join()操作成功返回。
双重检查锁定与延迟初始化
由来
非线程安全的延迟初始化对象
加synchronized来实现线程安全
双重检查锁定来实现延迟初始化对象
问题的根源
对象的初始化可能被重排序
基于volatile的解决方案
volatile保证共享变量的读/写具有原子性
基于类初始化的解决方案
JVM在类的初始化阶段(即在Class被加载后,且被线程使用之前),会执行类的初始化。在执行类的初始化期间,JVM会去获取一个锁。这个锁可以同步多个线程对同一个类的初始化。
静态内部类
静态成员变量
类初始化情况
T是一个类,而且一个T类型的实例被创建。
T是一个类,且T中声明的一个静态方法被调用。(*饿汉模式是这个*)
T中声明的一个静态字段被赋值。
T中生命的一个静态字段被使用,而且这个字段不是一个常量字段。(*静态内部类是这个*)
T是一个顶级类(Top Level Class,见Java语言规范的§7.6),而且一个断言语句嵌套在T内部被执行。
Java内存模型综述
处理器的内存模型
注意,因为处理器要遵守as-if-serial语义,处理器不会对存在数据依赖性的两个内存操作做重排序。
各种内存模型之间的关系
JMM是一个语言级的内存模型,处理器内存模型是硬件级的内存模型,顺序一致性内存模型是一个理论参考模型。
处理器内存模型比语言内存模型要弱,处理器内存模型和语言内存模型都比顺序一致性内存模型要弱。同处理器内存模型一样,越是追求执行性能的语言,内存模型设计得会越弱。
处理器内存模型比语言内存模型要弱,处理器内存模型和语言内存模型都比顺序一致性内存模型要弱。同处理器内存模型一样,越是追求执行性能的语言,内存模型设计得会越弱。
JMM的内存可见性保证
单线程程序。单线程程序不会出现内存可见性问题。编译器、runtime和处理器会共同确保单线程程序的执行结果与该程序在顺序一致性模型中的执行结果相同。
正确同步的多线程程序。正确同步的多线程程序的执行将具有顺序一致性(程序的执行结果与该程序在顺序一致性内存模型中的执行结果相同)。这是JMM关注的重点,JMM通过限制编译器和处理器的重排序来为程序员提供内存可见性保证。
未同步/未正确同步的多线程程序。JMM为它们提供了最小安全性保障:线程执行时读取到的值,要么是之前某个线程写入的值,要么是默认值(0、null、false)。
JSR-133对旧内存模型的修补
增强volatile的内存语义。旧内存模型允许volatile变量与普通变量重排序。JSR-133严格限制volatile变量与普通变量的重排序,使volatile的写-读和锁的释放-获取具有相同的内存语义。
增强final的内存语义。在旧内存模型中,多次读取同一个final变量的值可能会不相同。为此,JSR-133为final增加了两个重排序规则。在保证final引用不会从构造函数内逸出的情况下,final具有了初始化安全性。
Java并发编程基础
线程简介
什么是线程
为什么使用多线程
更多的处理器核心
更快的响应时间
更好的编程模型
线程优先级
线程的状态
Daemon线程
启动和终止线程
构造线程
在运行线程之前首先要构造一个线程对象,线程对象在构造的时候需要提供线程所需要的属性,如线程所属的线程组、线程优先级、是否是Daemon线程等信息。
启动线程
中断
过期的suspend()、resume()和stop()
安全地终止线程
中断状态是线程的一个标识位,而中断操作是一种简便的线程间交互方式,而这种交互方式最适合用来取消或停止任务。除了中断以外,还可以利用一个boolean变量来控制是否需要停止任务并终止该线程。
线程间通信
volatile和synchronized关键字
关键字volatile可以用来修饰字段(成员变量),就是告知程序任何对该变量的访问均需要从共享内存中获取,而对它的改变必须同步刷新回共享内存,它能保证所有线程对变量访问的可见性。
关键字synchronized可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性。
等待/通知机制
等待/通知的相关方法是任意Java对象都具备的,因为这些方法被定义在所有对象的超类java.lang.Object上
使用wait()、notify()和notifyAll()时需要先对调用对象加锁。
调用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列。
notify()或notifyAll()方法调用后,等待线程依旧不会从wait()返回,需要调用notify()或notifAll()的线程释放锁之后,等待线程才有机会从wait()返回。
notify()方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而notifyAll()方法则是将等待队列中所有的线程全部移到同步队列,被移动的线程状态由WAITING变为BLOCKED。
从wait()方法返回的前提是获得了调用对象的锁。
等待/通知的经典范式
等待方遵循规则
通知方遵循规则
管道输入/输出流
Thread.join()的使用
看源码我们可知,join()是利用Object的等待/通知方法
ThreadLocal的使用
其实ThreadLoca操作的就是当前线程的TreadLocalMap成员变量。
线程应用实例
等待超时模式
等待超时模式就是在等待/通知范式基础上增加了超时控制,这使得该模式相比原有范式更具有灵活性,因为即使方法执行时间过长,也不会“永久”阻塞调用者,而是会按照调用者的要求“按时”返回。
数据库连接池
TODO 使用等待超时模式来构造一个简单的数据库连接池
线程池技术
基于线程池技术的简单Web服务器
小结
Java中的锁
Lock接口
Java SE 5之后,并发包中新增了Lock接口(以及相关实现类)
Lock接口提供synchronized关机子锁不具备的几项特性
尝试非阻塞的获取锁:当前线程尝试获取锁,如果这一时刻锁没有被其他线程获取到,则成功获取并持有锁
能被中断的获取锁:与synchronized不同,获取到锁的线程能够响应中断,当获取到锁的线程被中断时,中断异常将会被抛出,同时锁会被释放
超时获取锁:在指定的截止时间之前获取锁,如果截止时间到了仍旧无法获取锁,则返回
Lock是一个接口,定义了锁的获取和释放的基本操作
void lock():获取锁时,调用该方法当前线程将会获取锁,当获得锁后,从该方法返回
void lockInterruptibly() throws InterruptedException:可中断地获取锁,和lock()方法的不同之处在于该方法会响应中断,即在锁的获取中可以中断该线程
boolean tryLock():尝试非阻塞的获取锁,调用该方法后立刻返回,如果能获取则返回true,否则返回false。
boolean tryLock(long time,TimeUint unit) throws InterruotedException:超时的获取锁,当线程在以下3种情况下会返回:①当线程在超时时间内获得了锁②当前线程在超时时间内被中断③超时时间结束,返回false
vold unlock():释放锁
Condition newCondition():获取等待通知组件,该组件和当前的锁绑定,当前线程只有获得了锁,才能调用该组件的awit()方法,而调用后,当前线程将释放锁。
队列同步器
队列同步器的接口
同步器提供的3个方法来访问后修改同步状态
getState():获取当前同步状态
setState(int newState):设置当前同步状态
compareAndSetState(int expect,int update):使用CAS设置当前状态,该方法能够保证状态
设置的原子性。
设置的原子性。
同步器可重写的方法
同步器提供的模版方法
队列同步器的实现分析
同步队列
同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。
同步队列的基本结构
节点加入到同步队列
首节点设置
独占式同步状态获取与释放
获取
如果获取锁失败 && 获取锁失败后入同步队列也失败,则中断线程
获取锁失败后?
1.节点的构造&加入同步队列
2.自旋直到获取到同步状态
释放
线程释放同步状态之后,会唤醒其后续节点,进而使后续节点重新尝试获取同步状态
总结:在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。
共享式同步状态获取与释放
获取
获取失败也是加入同步队列&自旋直到获取到同步状态
释放
该方法在释放同步状态之后,将会唤醒后续处于等待状态的节点。
tryReleaseShared(int arg)方法必须确保同步状态(或者资源数)线程安全释放,一般是通过循环和CAS来保证的,因为释放同步状态的操作会同时来自多个线程。
独占式超时获取同步状态
通过调用同步器的doAcquireNanos(int arg,long nanosTimeout)方法可以超时获取同步状态,即在指定的时间段内获取同步状态,如果获取到同步状态则返回true,否则,返回false。
源码
流程图
自定义同步组件-TwinsLock
同一时刻,只允许至多两个线程同时访问,即初始状态为2
同一时刻支持多个线程访问,就是共享式访问,重写tryAcquireShared和tryReleaseShared
在同步状态变更时,需要使用compareAndSet(int expect,int update)方法做原子性保障。
源码
重入锁
实现重进入
线程再次获取锁
锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再
次成功获取。并且同步状态+1
次成功获取。并且同步状态+1
锁的最终释放
线程重复获取了n次锁,就要n次释放锁,即直到同步状态为0
源码(非公平锁)
获取锁
释放锁
公平与非公平获取锁的区别
非公平获取锁:只要CAS设置同步状态成功即可
公平锁获取锁:会加多一个判断,当前线程节点是否有前驱节点
读写锁
介绍
之前提到锁(如Mutex和ReentrantLock)基本都是排他锁,这些锁在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。
提供的特性
公平性选择
支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平
重进入
该锁支持重进入,以读写线程为例:读线程在获取了读锁之后,能够再次获取读锁。而写线程在获取了写锁之后能够再次获取写锁,同时也可以获取读锁
锁降级
遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。
方法
int getReadLockCount()
返回当前读锁被获取的次数。该次数不等于获取读锁的线程数。因为锁可重入。
int getReadHoldCount()
返回当前线程获取读锁的次数。该方法在Java 6中加入到ReentrantReadWriteLock中,使用ThreadLocal保存当前线程获取的次数。
boolean isWriteLocked()
判断写锁是否被获取
int getWriteHoldCount()
返回当前线程获取写锁的次数。
使用示例
使用读写锁来保证作为本地缓存的HashMap具有线程安全性
实现分析
读写状态的设计
读写锁同样依赖自定义同步器来实现同步功能,读写状态就是其同步器的同步状态
如果在一个整型变量上维护多种状态,就一定需要“按位切割使用”这个变量,读写锁将变量切分成了两个部分,高16位表示读,低16位表示写
如果在一个整型变量上维护多种状态,就一定需要“按位切割使用”这个变量,读写锁将变量切分成了两个部分,高16位表示读,低16位表示写
写锁的获取与释放
获取
除了重入条件,增加了是否存在读锁的判断,如果存在读锁则写锁不能获取
释放
写锁的释放与ReentrantLock的释放过程基本类似,每次释放均减少写状态,当写状态为0时表示写锁已被释放,从而等待的读写线程能够继续访问读写锁,同时前次写线程的修改对后续读写线程可见。
读锁的获取与释放
获取
如果其他线程已经获取了写锁,则当前线程获取读锁失败,进入等待状态。如果当前线程获取了写锁或者写锁未被获取,则当前线程(线程安全,依靠CAS保证)增加读状态,成功获取读锁。
释放
读锁的每次释放(线程安全的,可能有多个读线程同时释放读锁)均减少读状态,减少的值是(1<<16)
锁降级
锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。
示例
锁降级中读锁的获取是否必要呢?
LockSupport工具
用处
当需要阻塞或唤醒一个线程的时候,都会使用LockSupport工具类来完成相应工作。LockSupport定义了一组的公共静态方法,这些方法提供了最基本的线程阻塞和唤醒功能,而LockSupport也成为构建同步组件的基础工具。
提供的方法
void park():阻塞当前线程,如果调用unpark(Thread thread)方法或者当前线程被终端,才能从park()方法返回
void parkNanos(long nanos):阻塞当前线程,最长不超过nanos纳秒,返回条件再park()的基础上增加了超时返回
void parkUntil(long deadline):阻塞当前线程,知道deadline时间(从1970年开始到deadline时间的毫秒数)
void unpark(Thread thread):唤醒处于阻塞状态的线程thread
Java 6增强
LockSupport增加了park(Object blocker)、parkNanos(Object blocker,long nanos)和parkUntil(Object blocker,long deadline)3个方法,用于实现阻塞当前线程的功能,其中参数blocker是用来标识当前线程在等待的对象(以下称为阻塞对象),该对象主要用于问题排查和系统监控。
Condition接口
介绍
任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。
与Object的监视器方法对比
方法
void await() throws InterruptedException
当前线程进入等待状态直到被通知(signal)或中断,当前线程将进入运行状态且从await()方法返回的情况,包括:
①其他线程调用该Condition的signal()或者signalAll()方法,而当前线程被选中唤醒
②其他线程(调用interrupt()方法)中断当前线程
③如果当前等待线程从await()返回,那么表明该线程已经获取了Condition对象所对应的锁
①其他线程调用该Condition的signal()或者signalAll()方法,而当前线程被选中唤醒
②其他线程(调用interrupt()方法)中断当前线程
③如果当前等待线程从await()返回,那么表明该线程已经获取了Condition对象所对应的锁
void awaitUninterruptibly()
当前线程进入等待状态直到被通知,从方法名称上可以看出该方法对中断不敏感
long awaitNanos(long nanosTimeout) throws InterruptedException
当前线程进入等待状态直到被通知、中断或超时。
返回值表示剩余的时间,如果在nanosTimeout纳秒前被唤醒,那么返回值就是(nanosTimeout-实际耗时)。
如果返回值是0或者负数,那么可以认定已经超时了。
返回值表示剩余的时间,如果在nanosTimeout纳秒前被唤醒,那么返回值就是(nanosTimeout-实际耗时)。
如果返回值是0或者负数,那么可以认定已经超时了。
boolean awaitUntil(Date deadline) throws InterruptedException
当前线程进入等待状态直到被通知、中断或者到某个时间。
如果没有到指定时间就被通知,返回true,否则,表示到了指定时间,返回false
如果没有到指定时间就被通知,返回true,否则,表示到了指定时间,返回false
void signal()
唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须获得与Condition相关联的锁
void signalAll()
唤醒所有等待在Condition上的线程,能够从等待方法返回的线程必须获得与Condition相关联的锁
简单使用
实现有界队列
有界队列是一种特殊的队列,当队列为空时,队列的获取操作将会阻塞获取线程,直到队列中有新增元素,当队列已满时,队列的插入操作将会阻塞插入线程,直到队列出现“空位”
实现分析
等待队列
①等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程
②如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。
③节点的定义复用了同步器中节点的定义,也就是说,同步队列和等待队列中节点类型都是同步器的静态内部类
AbstractQueuedSynchronizer.Node。
②如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。
③节点的定义复用了同步器中节点的定义,也就是说,同步队列和等待队列中节点类型都是同步器的静态内部类
AbstractQueuedSynchronizer.Node。
队列基本结构图
Lock的整体结构(一个同步队列和多个等待队列)
等待
调用Condition的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。
源码
1.将当前线程构造成节点加入等待队列
2.释放同步状态,唤醒同步队列中的后续节点,然后进入等待状态
流程图
通知
调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中
源码
1.检查调用此方法的线程是否获取了锁
2.获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程
流程图
Java并发容器和框架
ConcurrentHashMap的实现原理与使用
为什么使用ConcurrentHashMap
线程不安全的HashMap
效率低下的HashTable
ConcurrentHashMap的锁分段技术可有效提升并发访问率
ConcurrentHashMap的结构
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成
Segment是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色
HashEntry则用于存储键值对数据
ConcurrentHashMap的初始化
初始化segment数组
初始化segmentShift和segmentMask
初始化每个segment
输入参数initialCapacity是ConcurrentHashMap的初始化容量,loadfactor是每个segment的负载因子,在构造方法里需要通过这两个参数来初始化数组中的每个segment。
定位segment
使用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再散列
代码
目的
ConcurrentHashMap的操作
get操作
Segment的get操作实现非常简单和高效。先经过一次再散列,然后使用这个散列值通过散列运算定位到Segment
不加锁:get方法里将要使用的共享变量都定义成volatile类型,如用于统计当前Segement大小的count字段和用于存储值的HashEntry的value
put操作
为了线程安全,加锁
步骤
判断是否需要扩容
先判断HashEntry数组的容量是否超过阈值,是的话先扩容再插入
为了高效,ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容
定位添加元素的位置,然后将其放在HashEntry数组里
size操作
先尝试2次通过不锁住Segment的方式来统计各个Segment大小,如果统计的过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment的大小
使用modCount变量,在put、remove和clean方法里操作元素前都会将变量modCount进行加1,那么在统计size前后比较modCount是否发生变化,从而得知容器的大小是否发生变化
ConcurrentLinkedQueue
ConcurrentLinkedQueue的结构
ConcurrentLinkedQueue由head节点和tail节点组成,每个节点(Node)由节点元素(item)和指向下一个节点(next)的引用组成,节点与节点之间就是通过这个next关联起来,从而组成一张链表结构的队列。
入队列
第一是定位出尾节点;第二是使用CAS算法将入队节点设置成尾节点的next节点,如不成功则重试。
出队列
Java中的阻塞队列
什么是阻塞队列
一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。
Java里的阻塞队列
ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列
FIFO
公平/非公平
LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列
默认和最大长度为Integer.MAX_VALUE
FIFO
PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列
默认采取自然顺序升序排列
可定义类实现compareTo()方法指定排序规则
可指定构造参数Comparator来对元素进行排序
DelayQueue:一个支持延时获取元素的无界阻塞队列
使用PriorityQueue来实现
元素必须实现Delayed接口
可用于缓存系统的设计和定时任务调度
SynchronousQueue:一个不存储元素的阻塞队列
每一个put操作必须等待一个take操作
公平/非公平
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列
多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法
阻塞队列的实现原理
使用通知模式实现(Condition)
Fork/Join框架
什么是Fork/Join框架
Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架
Fork就是把一个大任务切分为若干子任务并行的执行
Join就是合并这些子任务的执行结果,最后得到这个大任务的结果
工作窃取算法
窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行
为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
Fork/Join框架的设计
步骤
分割任务
执行任务并合并任务
两个类完成上面的两个步骤
ForkJoinTask(任务)
方法
fork()
join()
重写compute()方法
实现
RecursiveAction:用于没有返回结果的任务
RecursiveTask:用于有返回结果的任务
ForkJoinPool(ForkJoinTask需要通过ForkJoinPool来执行)
任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务
使用Fork/Join框架
在compute()方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用fork方法时,又会进入compute方法,看看当前子任务是否需要继续分割成子任务,如果不需要继续分割,则执行当前子任务并返回结果。使用join方法会等待子任务执行完并得到其结果
Fork/Join框架的异常处理
isCompletedAbnormally()
检查任务是否已经抛出异常或已经被取消
getException()
获取异常
Fork/Join框架的实现原理
ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务
ForkJoinTask的fork方法实现原理
pushTask()将任务放在ForkJoinTask数组里
signalWork()方法唤醒或创建一个工作线程来执行任务
ForkJoinTask的join方法实现原理
调用了doJoin方法,根据任务的状态返回结果
如果任务状态是已完成,则直接返回任务结果(死循环等待获取)
如果任务状态是被取消,则直接抛出CancellationException
如果任务状态是抛出异常,则直接抛出对应的异常
Java中的原子操作类
原子更新基本类型类
类
AtomicBoolean:原子更新布尔类型
AtomicInteger:原子更新整型
AtomicLong:原子更新长整型
常用方法
int addAndGet(int delta):以原子方式将输入的数值与实例中的值(AtomicInteger里的
value)相加,并返回结果
value)相加,并返回结果
boolean compareAndSet(int expect,int update):如果输入的数值等于预期值,则以原子方
式将该值设置为输入的值
式将该值设置为输入的值
int getAndIncrement():以原子方式将当前值加1,注意,这里返回的是自增前的值
void lazySet(int newValue):最终会设置成newValue,使用lazySet设置值后,可能导致其他线程在之后的一小段时间内还是可以读到旧的值
int getAndSet(int newValue):以原子方式设置为newValue的值,并返回旧值
原理
死循环+CAS
如果要实现char,float等原子更新类
利用Unsafe类(CAS算法)
compareAndSwapObject
compareAndSwapInt
compareAndSwapLong
原子更新数组类
类
AtomicIntegerArray:原子更新整型数组里的元素
AtomicLongArray:原子更新长整型数组里的元素
AtomicReferenceArray:原子更新引用类型数组里的元素
常用方法
int addAndGet(int i,int delta):以原子方式将输入值与数组中索引i的元素相加
boolean compareAndSet(int i,int expect,int update):如果当前值等于预期值,则以原子
方式将数组位置i的元素设置成update值
方式将数组位置i的元素设置成update值
原子更新引用类型类
类
AtomicReference:原子更新引用类型
AtomicReferenceFieldUpdater:原子更新引用类型里的字段
AtomicMarkableReference:原子更新带有标记位的引用类型
原子更新字段类
类
AtomicIntegerFieldUpdater:原子更新整型的字段的更新器
AtomicLongFieldUpdater:原子更新长整型字段的更新器
AtomicStampedReference:原子更新带有版本号的引用类型
步骤
每次使用的时候必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性
更新类的字段(属性)必须使用public volatile修饰符
Java中的并发工具类
等待多线程完成的CountDownLatch
CountDownLatch允许一个或多个线程等待其他线程完成操作
CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N
调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当前线程,直到N变成零
同步屏障的CyclicBarrier
介绍
CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞
CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrier-Action),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景
应用场景
多线程计算数据,最后合并计算结果
与CountDownLatch的区别
CountDownLatch的计数器只能使用一次
CyclicBarrier的计数器可以使用reset()方法重置
控制并发线程数的Semaphore
用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源
应用场景:流量控制,一共三十个访问,但是同时最多只能十个访问进来
其他方法
intavailablePermits():返回此信号量中当前可用的许可证数
intgetQueueLength():返回正在等待获取许可证的线程数
booleanhasQueuedThreads():是否有线程正在等待获取许可证
void reducePermits(int reduction):减少reduction个许可证,是个protected方法
Collection getQueuedThreads():返回所有等待获取许可证的线程集合,是个protected方
法
法
线程间交换数据的Exchanger
Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方
如果两个线程有一个没有执行exchange()方法,则会一直等待,如果担心有特殊情况发生,避免一直等待,可以使用exchange(V x,longtimeout,TimeUnit unit)设置最大等待时长
Java中的线程池
线程池的实现原理
处理流程
线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程
线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程
线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务
execute方法执行的4种情况
如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)
如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue
如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)
如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法
execute方法的源码(JDK1.8)
工作线程:线程池创建线程时,会将线程封装成工作线程Worker,Worker在执行完任务后,还会循环获取工作队列里的任务来执行
线程池的使用
线程池的创建
创建代码
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
milliseconds,runnableTaskQueue, handler);
milliseconds,runnableTaskQueue, handler);
核心参数
corePoolSize(线程池的基本大小)
runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列
maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数
RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务
AbortPolicy:直接抛出异常
CallerRunsPolicy:只用调用者所在线程来运行任务
DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务
DiscardPolicy:不处理,丢弃掉
keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间
向线程池提交任务
execute()
execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功
submit()
submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完
关闭线程池
shutdown()
shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程
shutdownNow()
shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表
合理地配置线程池
CPU密集型任务应配置尽可能小的线程,如配置Ncpu+1个线程的线程池
IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*Ncpu
优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理
建议使用有界队列
线程池的监控
taskCount:线程池需要执行的任务数量
completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount
largestPoolSize:线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过
getPoolSize:线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销毁,所以这个大小只增不减
getActiveCount:获取活动的线程数
通过扩展线程池进行监控。可以通过继承线程池来自定义线程池,重写线程池的beforeExecute、afterExecute和terminated方法,也可以在任务执行前、执行后和线程池关闭前执行一些代码来进行监控。例如,监控任务的平均执行时间、最大执行时间和最小执行时间等。这几个方法在线程池里是空方法
线程池的好处
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行
第三:提高线程的可管理性
Executor框架
Executor框架简介
Executor框架的两级调度模型
在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程
在底层,操作系统内核将这些线程映射到硬件处理器上
Executor框架的结构与成员
任务。包括被执行任务需要实现的接口:Runnable接口或Callable接口
任务的执行。包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)
异步计算的结果。包括接口Future和实现Future接口的FutureTask类
ThreadPoolExecutor详解
FixedThreadPool详解
FixedThreadPool被称为可重用固定线程数的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue());
}
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue
}
FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads
execute()方法
如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务
在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue
线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行。
FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)
FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)
无界队列带来的影响
当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize
由于1,使用无界队列时maximumPoolSize将是一个无效参数
由于1和2,使用无界队列时keepAliveTime将是一个无效参数
由于使用无界队列,运行中的FixedThreadPool(未执行方法shutdown()或shutdownNow())不会拒绝任务(不会调用RejectedExecutionHandler.rejectedExecution方法)
SingleThreadExecutor详解
SingleThreadExecutor是使用单个worker线程的Executor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue()));
}
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue
}
SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1。其他参数与FixedThreadPool相同SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)
execute()方法
如果当前运行的线程数少于corePoolSize(即线程池中无运行的线程),则创建一个新线程来执行任务。
在线程池完成预热之后(当前线程池中有一个运行的线程),将任务加入LinkedBlockingQueue
线程执行完1中的任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行
CachedThreadPool详解
CachedThreadPool是一个会根据需要创建新线程的线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue());
}
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue
}
CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为Integer.MAX_VALUE,即maximumPool是无界的
keepAliveTime设置为60L,意味着
CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止
CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止
CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但CachedThreadPool的maximumPool是无界的。这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源
execute()方法
首先执行SynchronousQueue.offer(Runnable task)。如果当前maximumPool中有空闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成;否则执行下面的步2)
当初始maximumPool为空,或者maximumPool中当前没有空闲线程时,将没有线程执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤1)将失败。此时CachedThreadPool会创建一个新线程执行任务,execute()方法执行完成
在步骤2)中新创建的线程将任务执行完后,会执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这个poll操作会让空闲线程最多在SynchronousQueue中等待60秒钟。如果60秒钟内主线程提交了一个新任务(主线程执行步骤1)),那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。由于空闲60秒的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源
ScheduledThreadPoolExecutor详解
ScheduledThreadPoolExecutor的运行机制
当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunnableScheduledFutur接口的ScheduledFutureTask
线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行任务
ScheduledThreadPoolExecutor为了实现周期性的执行任务,对ThreadPoolExecutor做了如下
的修改
的修改
使用DelayQueue作为任务队列
获取任务的方式不同
执行周期任务后,增加了额外的处理
ScheduledThreadPoolExecutor的实现
ScheduledFutureTask
long型成员变量time,表示这个任务将要被执行的具体时间
long型成员变量sequenceNumber,表示这个任务被添加到ScheduledThreadPoolExecutor中的序号
long型成员变量period,表示任务执行的间隔周期
任务执行步骤
线程1从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue.take())。到期任务是指ScheduledFutureTask的time大于等于当前时间
线程1执行这个ScheduledFutureTask
线程1修改ScheduledFutureTask的time变量为下次将要被执行的时间
线程1把这个修改time之后的ScheduledFutureTask放回DelayQueue中(Delay-Queue.add())
DelayQueue.take()
获取Lock
获取周期任务
如果PriorityQueue为空,当前线程到Condition中等待;否则执行下面的2.2
如果PriorityQueue的头元素的time时间比当前时间大,到Condition中等待到time时间;否则执行下面的2.3
获取PriorityQueue的头元素(2.3.1);如果PriorityQueue不为空,则唤醒在Condition中等待的所有线程(2.3.2)
释放Lock
DelayQueue.add()
获取Lock
添加任务
向PriorityQueue添加任务
如果在上面2.1中添加的任务是PriorityQueue的头元素,唤醒在Condition中等待的所有线程
释放Lock
FutureTask详解
FutureTask简介
FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run())
三种状态
未启动。FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态。当创建一个FutureTask,且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态
已启动。FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态
已完成。FutureTask.run()方法执行完后正常结束,或被取消(FutureTask.cancel(…)),或执行FutureTask.run()方法时抛出异常而异常结束,FutureTask处于已完成状态
get()
当FutureTask处于未启动或已启动状态时,执行FutureTask.get()方法将导致调用线程阻塞
当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或抛出异常
cancel()
当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会被执行
当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断执行此任务线程的方式来试图停止任务
当FutureTask处于已启动状态时,执行FutureTask.cancel(false)方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成)
当FutureTask处于已完成状态时,执行FutureTask.cancel(…)方法将返回false
FutureTask的使用
直接调用FutureTask的run()方法
可以通过ExecutorService.submit()提交任务
每个FutureTask任务最多只能被执行一次
FutureTask的实现
FutureTask的实现基于AbstractQueuedSynchronizer(以下简称为AQS)
基于“复合优先于继承”的原则,FutureTask声明了一个内部私有的继承于AQS的子类,对FutureTask所有公有方法的调用都会委托给这个内部子类
AQS被作为“模板方法模式”的基础类提供给FutureTask的内部子类Sync,这个内部子类只需要实现状态检查和状态更新的方法即可,这些方法将控制FutureTask的获取和释放操作。具体来说,Sync实现了AQS的tryAcquireShared(int)方法和tryReleaseShared(int)方法,Sync通过这两个方法来检查和更新同步状态
0 条评论
下一页