并发编程
2021-09-12 19:55:18 2 举报
AI智能生成
并发编程
作者其他创作
大纲/内容
并发编程
0. 简介
说明
Java是一个支持多线程的开发语言,多线程可以在包含多个CPU核心的机器上同时处理多个不同的任务,优化资源的使用率,提升程序的性能,再一些对性能要求比较高的场合,多线程是Java调优的重要方面
涉及方面
并发编程三要素
原子性
指一个或多个操作要么全部执行成功要么全部执行失败
有序性
程序执行的顺序按照代码的先后顺序执行(处理器可能会对指令进行重排序)
可见性
当多个线程访问同一个变量时,如果其中一个线程对其做了修改,其他线程能立即获取到最新的指
线程五大状态
创建
当用new操作符创建一个线程时
就绪
调用start(),处于就绪状态的线程并不一定马上就会执行run(),还需要等待CPU的调度
运行
CPU开始调度线程,并开始执行run()
阻塞
线程的执行过程中由于一些原因进入阻塞状态,如调用sleep(),尝试取得到一个锁
死亡
run()执行完毕或执行过程中遇到一个异常
悲观锁/乐观锁
悲观锁
每次操作都会加锁,会造成线程阻塞
乐观锁
每次操作不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止,不会造成线程阻塞
线程间协作
wait/notify/notifyAll
synchronized
一种同步锁,修饰对象有
一个代码块
被修饰的代码块称为同步语句块,作用范围是{}括起来的代码,作用对象是调用这个代码块的对象
一个方法
被修饰的方法称为同步方法,其作用范围是整个方法,作用的对象是调用这个方法的对象
一个静态方法
起作用范围是整个静态方法,作用的对象是这个类的所有对象
一个类
起作用范围是synchronized后面括号括起来的部分,作用主的对象是这个类的对象
CAS
Compare And Swap,即比较替换,是实现并发应用到的一种技术
操作包含三个操作数:内存位置(V)、预期原值(A)和新值(B)
如果内存位置的值与预期原值相匹配,则处理器会自动将该位置值更新为新值,否则,处理器不做任何操作
存在问题:ABA问题;循环时间长开销大;只能保证一个共享变量的原子操作
线程池
如果使用线程时就去创建一个线程,虽然简单,但是存在很大问题
如果并发的线程数量很多,且每个线程都是执行一个时间很短的任务就结束,这样频繁创建线程就会大大降低系统的效率
线程池通过复用可以大大减少线程频繁创建与销毁带来的性能上的损耗
1. 多线程/并发设计原理
多线程
Thread/Runnable
创建执行线程的方法
扩展Thread类
实现Runnable接口
线程的特征和状态
所有Java程序,无论是否并发,都有一个名为主线程的Thread对象
执行该程序时,Java虚拟机将创建一个新Thread并在该线程中执行main(),这是非并发应用程序中唯一的线程,也是并发应用程序中的第一个线程
Java中的线程共享应用程序中的所有资源,包括内存和打开的文件,快速而简单地共享信息,但必须使用同步避免数据竞争
Java中的所有线程都有一个优先级,这个整数值介于Thread.MIN_PRIORITY(1)和Thread_MAX_PRIORITY(10)之间,默认为Thread_NORM_PRIORITY(5)
线程的执行顺序并没有保证,通常较高优先级的线程建在较低优先级的线程之前执行
Java,可以创建两种线程
守护线程和非守护线程,区别在于它们如何影响程序的结束
Java程序结束执行过程
程序执行Runtime类的exit(),而用户有权执行该方法
应用程序的所有非守护线程均已结束执行,无论是否有正在运行的守护线程
守护线程通常用在作为垃圾收集器或缓存管理器的应用程序中,执行辅助任务
再线程start之前调用isDaemon()检查线程是否为守护线程
可以使用setDaemon()将某个线程确立为守护线程
Thread.States
定义线程的状态
NEW
Thread对象已创建,但还没有开始执行
RUNNABLE
Thread对象正在Java虚拟机中运行
BLOCKED
Thread对象正在等待锁定
WAITING
Thread对象正在等待另一个线程的动作
TIME_WAITING
Thread对象正在等待另一个线程的操作,但是有时间限制
TERMINATED
Thread对象已经完成了执行
getState()
获取Thread对象的状态,可以直接更改线程的状态
在给定时间内,线程只能处于一个状态,这些状态时JVM使用的状态,不能映射到操作系统的线程状态
Runnable
Runnable接口只定义了一个方法:run(),这是每个线程的主方法,当执行start()启动线程时,它将调用run()
Thread常用方法
获取和设置Thread对象信息
getId()
该方法返回Thread对象标识符,该标识符是在线程创建时分配的正整数,在线程的整个生命周期中是唯一且无法改变的
getName()/setName()
获取或设置Thread对象的名称,该名称是一个String对象,可以在Thread类的构造函数中建立
getPriority()/setPriority()
可以使用这两种方法来获取或设置Thread对象的优先级
isDaemon()/setDaemon()
允许获取或设置Thread对象的守护条件
getState()
该方法返回Thread对象的状态
interrupt()
中断目标线程,给目标线程发送一个中断信号,线程被打上中断标记
interrupted()
判断目标线程是否被中断,但是将清除线程的中断标记
isinterrupted()
判断目标线程是否被中断,不会清除中断标记
sleep()
该方法将线程的执行暂停ms时间
join()
暂停线程的执行,直到调用该方法的线程执行结束为止
可以使用该方法等待另一个Thread对象结束
setUncaughtExceptionHandler()
当线程执行出现未校验异常时,该方法用于建立未校验异常的控制器
currentThread()
Thread类的静态方法,返回实际执行改代码的Thread对象
Callable
一个与Runnable接口非常相似的接口
主要特征
接口
有简单类型参数,与call()的返回类型相对应
声明了call()
执行器运行任务时,该方法会被执行器执行,它必须返回声明中指定类型的对象
call()可以抛出任何一种校验异常
可以实现自己的执行器并重载afterExecute()来处理这些异常
synchronized
锁的对象
synchronized关键字给某个对象加锁
实例方法的锁加在对象实例上,静态方法的锁加载对象类上
锁的本质
如果一份资源需要多个线程同时访问,需要给该资源加锁,加锁之后,可以保证同一时间只能有一个线程访问该资源
资源可以是一个变量、一个对象或一个文件
锁是一个对象
这个对象内部得有一个标志位(state变量),记录自己有没有被某个线程占用
最简单的情况是这个state有1/0两个取值,分别表示有没有线程占用这个锁
如果这个对象被某个线程占用,记录这个线程的thread_ID
这个对象维护一个thread_id_list,记录其他所有阻塞的、等待获取拿这个锁的线程
在当前线程释放锁之后从这个thread_id_list里面取一个线程唤醒
资源和锁合二为一,使得在Java里面,synchronized关键字可以加在任何对象的成员上,这意味着,这个对象既是共享资源,同时也具备锁的功能
实现原理
在对象头里,有一块数据叫Mark_Word,在64位机器上,它是8字节的
Mark_Word有2个重要字段:锁标志位和占用该锁的thread_ID,因为不同版本的JVM实现,对象头的数据结构会有各种差异
wait/notify
生产者-消费者模型
一个常见的多线程编程模型
一个内存队列,多个生产者线程往内存内列中放数据,多个消费者线程从内存队列中取数据
1、内存队列本身要加锁,才能实现线程安全
2、阻塞,当内存队列满了,生产者放不进去,会被阻塞,当内存队列为空,消费者无事可做,会被阻塞
3、双向通知,消费者被阻塞后,生产者放入新数据,要notify消费者,反之,生产者被阻塞后,消费者消费了数据,要notify生产者
如何阻塞
1、线程自己阻塞自己,即生产者、消费者线程各自调用wait()/notify()
2、用一个阻塞队列,当取不到或放不进数据时,入队/出队函数本身就是阻塞的
如何双向通知
1、wait()和notify()机制
2、Condition机制
为什么需要和synchronized一起使用
Java中,wait()/notify()是Object的成员函数
两个线程之间要通信,对于同一个对象来说,一个线程调用该对象的wait(),另一个对象调用该对象的notify(),该对象本身就需要同步
所以,调用wait()、notify()之前,需要先通过synchronized同步给对象,即给该对象加锁
synchronized可以加载任何对象的实例方法上,任何对象都可能成为锁,因此,wait()、notify()只能放在Object里面
wait()需要释放锁
当线程A进入synchronized(obj1)中之后,即对obj1上锁,此时,调用wait()进入阻塞状态;那么,线程B就永远无法进入synchronized(obj1)同步块中,永远无法调用notify(),发生死锁
在wait()内部,会先释放锁obj1,然后进入阻塞状态,之后,它被另外一个线程用notify()唤醒,重新获取锁
其次,wait()调用完成后,执行后面的业务逻辑代码,然后退出同步块,再次释放锁
如此可避免死锁
问题
生产者在通知消费者时,也通知了其他生产者,消费者在通知生产者时,也通知了其他消费者
原因在于wait()/notify()所作用的对象和synchronized所作用的对象是同一个,只能由一个对象,无法区分队列空和队列满两个条件
这正是Condition要解决的问题
InterruptedException
抛出场景
只有那些声明了会抛出InterruptedException的函数才会抛出异常
即sleep()、wait()、join()
阻塞类型
轻量级阻塞
能够被中断的阻塞,对应的线程状态是WAITING或TIMED_WAITING
重量级阻塞
synchronized这种不能被中断的阻塞,对应的状态是BLOCKED
Thread.interrupted()
精确含义是“唤醒轻量级阻塞”,而不是字面上的“中断一个线程”
它相当于给线程发送了一个唤醒信号,所以如果线程此时恰好处于WAITING/TIMED_WAITING状态,就会抛出一个InterruptedException,且线程被唤醒
如果此时线程没有被阻塞,则线程什么都不会做,但在后续,线程可以判断自己是否收到过其他线程发来的中断信号,然后做一些对应的处理
线程优雅关闭
stop/destroy
线程是一段运行中的代码,一个运行中的方法。运行到一半的线程不能强制杀死
如果强制杀死线程,则线程中所使用的资源,如文件描述符、网络连接等无法正常关闭
因此,一个线程一旦运行起来,不要强制关闭,合理的做法是让其运行完毕,干净地释放掉所有资源,然后退出
如果是一个不断循环运行的线程,就需要用到线程间的通信机制,让主线程通知其退出
守护线程
当在一个JVM进程内开多个线程时,这些线程会被分成两类:守护线程/非守护线程,默认都是非守护线程
Java中规定:当所有非守护线程退出后,整个JVM进程就会退出,即守护线程不影响整个JVM进程的退出
垃圾回收线程就是守护线程,它们在后台工作,当开发者的所有前台线程都退出后,整个JVM进程就退出了
设置关闭的标志位
开发中一般通过设置标志位的方式,停止循环运行的线程
并发核心概念
并发与并行
定义
在单个处理器上采用单核执行多个任务即为并发,这种情况下,操作系统的任务调用程序会很快从一个任务切换到另一个任务,因此看起来所有任务都是同时运行的
同一时间内在不同计算机、处理器或处理器核心上同时运行多个任务,即所谓的“并行”
另外的定义
在系统上同时运行多个任务(不同的任务)即并发
同时在某个数据集的不同部分上运行同一任务的不同实例即并行
额外的定义
并行即系统同时运行了多个任务
并发即一种解释程序员将任务和它们对共享资源的访问同步的不同技术和机制的方法
同步
在并发中,可以将同步定义位一种协调两个或更多任务以获得预期结果的机制
方式
控制同步
如当一个任务的开始依赖于另一个任务的结束时,第二个任务不能在第一个任务完成之前开始
数据访问同步
当两个或更多任务访问共享变量时,在任意时间内,只有一个任务可以访问该变量
临界段
一段代码,由于它可以访问共享资源,因此在任何给定时间内,只能被一个任务执行
互斥是用来保证这一要求的机制,且可以采用不同的方式来实现
粒度
同步可以帮助在完成并发任务时避免一些错误,但它也为算法引入了一些开销
必须仔细地计算任务的数量,这些任务可以独立执行,而无需并行算法中的互通信,这就涉及并发算法的粒度
如果算法有粗粒度,同步方面的开销就比较低,反之同步方面的开销就会很高,且该算法的吞吐量可能不会很好
并发系统中的同步机制
信号量 semaphore
一种用于控制对一个或多个单位资源进行访问的机制
它有一个用于存放可用资源数量的变量,且可以采用两种原子操作来管理该变量
互斥是一种特殊类型的信号量,它只能取两个值(资源空闲/资源忙),只有将互斥设置为忙的那个进程才可以释放它
互斥可以通过保护临界段来避免出现竞争条件
监视器
一种在共享资源上实现互斥的机制,它有一个互斥、一个条件变量、两种操作(等待条件/通报条件)
一旦通报了该条件,在等待它的任务中只有一个会继续执行
线程安全
如果共享数据的所有用户都受到同步机制的保护,则代码(方法、对象)就是线程安全的
数据的非阻塞的CAS原语是不可变的,这样就可以在并发应用程序中使用改代码而不会出任何问题
不可变对象
一种特殊的对象,在其初始化后,不能修改其可视状态(其属性值),如果想修改一个不可变对象,则必须创建一个新的对象
主要优点在于它是线程安全的,可以在并发应用程序使用它而不会出现任何问题
一个例子就是Java中的String类,当给一个String对象赋新值时,会创建一个新的String对象
原子操作和原子变量
与其他任务相比,原子操作是一种发生在瞬间的操作,在并发应用程序中,可以通过一个临界段来实现原子操作,以便对整个操作采用同步机制
原子变量是一种通过原子操作来设置和获取其值的变量,可以使用某种同步机制来实现一个原子变量,或使用CAS以无锁方式来实现一个原子变量,该方式并不需要任何同步机制
共享内存与消息传递
任务可以通过两种不同方式来相互通信
共享内存
通常用于在同一台计算机上运行多任务的情况,任务在读取和写入值时使用相同的内存区域,为了避免出现问题,对该共享内存的访问必须在一个由同步机制保护的临界段内完成
消息传递
通常用于在不同计算机上运行多任务的情形,当一个任务需要与另一个任务通信时,它会发送一个遵循预定义协议的消息
如果发送方保持阻塞并等待响应,则该通信是同步的,如果发送方在发送消息后继续执行自己的流程,则该通信是异步的
并发的问题
数据竞争
如果有两个或多个任务在临界段之外对一个共享变量进行写入操作,即没有使用任何同步机制,那么应用程序可能存在数据竞争
在这些情况下,应用程序的最终结果可能取决于任务的执行顺序
死锁
当两个(多个)任务正在等待必须由另一线程释放的某个共享资源,而该线程有正在等待必须由前述任务之一释放的另一个共享资源时,并发应用程序就出现了死锁
当系统中同时出现以下条件,就会导致死锁,将其称为Coffman条件
互斥
死锁中涉及的资源必须是不可共享的,一次只有一个任务可以使用该资源
占有并等待条件
一个任务在占有某一互斥资源时又请求另一互斥资源,当它在等待时,不会释放任何资源
不可剥夺
资源只能被那些持有它们的任务释放
循环等待
任务1正等待任务2占有的资源,任务2正在等待任务3占有的资源,依次类推,最终任务n又正在等待任务1所占有的资源,这样就出现了循环等待
有一些机制可以用来避免死锁
忽略
最常用的机制,可以假设自己的系统绝不会出现死锁,而如果发生死锁,结果就是停止应用程序且重新执行
检测
系统中有一项专门分析系统状态的任务,可以检测是否发生了死锁,如果检测到了死锁,可以采取一些措施来修复
如,结束某个任务或强制释放某一资源
预防
如果想防止系统出现死锁,就必须预防Coffman条件中的一条或多条出现
规避
如果可以在某一任务执行之前得到该任务使用资源的相关信息,则死锁是可以规避的
一个任务要开始执行时,可以对系统中空闲的资源和任务所需的资源进行分析,这样就可以判断任务是否能够开始执行
活锁
如果系统中两个任务,总是因为对方的行为而改变自己的状态,那么就出现了活锁,最终结果是它们陷入了状态变更的循环而无法继续向下执行
资源不足
当某个任务在系统中无法获取其继续执行所需的资源时,就会出现资源不足
当有多个任务在等待某一资源且该资源被释放时,系统需要选择下一个可以使用该资源的任务
如果系统中没有设计良好的算法,则系统中有些线程很可能要为获取该资源等待很长时间
解决这一问题就要确保公平原则,所有等待某一个资源的任务必须在某一给定时间之内占有该资源
可选方案之一就是实现一个算法,在选择下一个将占有某一资源的任务时,对任务已等待该资源的时间因素加以考虑
然而实现锁的公平需要增加额外的开销,这可能会降低程序的吞吐量
优先权反转
当一个低优先权的任务持有了一个高优先级任务所需的资源时,就会发生优先权反转
这样,低优先权的任务就会在高优先权任务之前执行
JMM内存模型
JMM与happen-before
内存可见性问题
因为存在CPU缓存一致性协议,多个CPU核心之间缓存不会出现不同步的问题,不会有“内存可见性”问题
缓存一致性协议对性能有很大损耗,为了解决这个问题,又进行了各种优化,如Store_buffer、Load_buffer等
新加的buffer和CPU一级缓存是异步的,向内存中写入一个变量,这个变量会保存在Store_buffer中,稍后才异步写入L1,同时同步写入主内存
多CPU,每个CPU多核,每个核上可能还有多个硬件线程,对于操作系统来说,就相当于一个个逻辑CPU,每个逻辑CPU都有自己的缓存,这些缓存和主内存之间不是完全同步的
重排序
Store_buffer的延迟写入是重排序的一种,称为内存重排序,除此之外,还有编译器和CPU的指令重排序
类型
编译器重排序
对于没有先后依赖关系的语句,编译器可以重新调整语句的执行顺序
CPU指令重排序
在指令级别,让没有依赖关系的多条指令并行
CPU内存重排序
CPU有自己的缓存,指令的执行顺序和写入主内存的顺序不完全一致
CPU内存重排序是造成“内存可见性”问题的主因
内存屏障
为了禁止编译器重排序和CPU重排序,在编译器和CPU层面都有对应的指令,也就是内存屏障(MemoryBarrier),这也正是JMM和happen-before规则的底层实现原理
编译器的内存屏障,只是为了告诉编译器不要对指令进行重排序,当编译完成之后,这种内存屏障就消失了,CPU并不会感知到编译器中内存屏障的存在
CPU的内存屏障是CPU提供的指令,可以由开发者显式调用
内存屏障是很底层的概念,对于开发者来说,一般用volatile就足够了
理论层面上,可以把基本的CPU内存屏障分为
LoadLoad
禁止读和读的重排序
StoreStore
禁止写和写的重排序
LoadStore
禁止读和写的重排序
StoreLoad
禁止写和读的重排序
as-if-serial
单线程程序的重排序规则
无论什么语言,站在编译器和CPU角度来说,不管怎么重排序,单线程程序的执行结果不能改变
即只要操作之间没有数据依赖性,编译器和CPU都可以任意重排序,因为执行结果不会改变,代码看起来就像是完全串行地一行行从头执行到尾,这也就是ai-if-serial语义
对于单线程程序来说,编译器和CPU可能做了重排序,但开发者感知不到,也不存在内存可见性问题
多线程程序的重排序规则
对于多线程,线程之间的数据依赖性太复杂,编译器和CPU没有办法完全理解这种依赖性,并据此做出最合理的优化
编译器和CPU只能保证每个线程的as-if-serial语义
线程之间的数据依赖和相互影响,需要编译器和CPU的上层来确定
上层要告知编译器和CPU在多线程场景下什么时候可以重排序,什么时候不能重排序
happen-before
说明
使用happen-before描述两个操作之间的内存可见性
Java内存模型是一套规范,在多线程中
一方面,要让编译器和CPU可以灵活地重排序
另一方面,要对开发者做一些承诺,明确告知开发者不需要感知什么样的重排序,需要感知什么样的重排序
然后根据需要决定这种重排序对程序是否有影响,有则需要开发者显式的通过线程同步机制来禁止重排序
如果A_happen-before_B,意味着A的执行结果必须对B可见,即保证跨线程的内存可见性
这不代表A一定在B之前执行,因为对于多线程而言,两个操作的执行顺序是不确定的
基于这种描述方法,JMM对开发者做出了一系列承诺
1、单线程中的每个操作,happen-before对应该线程中任意后续操作
2、对volatile变量的写入,happen-before对应后续对这个变量的读取
3、对synchronized的解锁,happen-before对应后续对这个锁的加锁
传递性
如果A_happen-before_B,B_happen-before_C,则A_happen-before_C
volatile关键字
64位写入的原子性(Half_Write)
因为JVM的规范并没有要求64位的long或double的写入是原子的,在32位机器上,一个64位变量的写入可能被拆分成两个32位的写操作来执行
这样,读取的线程就可能读到“一半”的值,解决方法,long前面加上volatile
实现原理
由于不同的CPU架构的缓存体系不一样,重排序策略不一样,所提供的内存屏障指令也有差异
一种参考做法
1、在volatile写操作前面插入一个StoreStore屏障,保证volatile写操作不会和之前的写操作重排序
2、在volatile写操作后面插入一个StoreLoad屏障,保证volatile写操作不会和之后的读操作重排序
3、在volatile读操作后面插入一个LoadLoad+LoadStore屏障,保证volatile读操作不会和之后的读操作、写操作重排序
JSR-133
从JSR-133内存模型开始(JDK5),仅仅只允许把一个64位long/double的变量的写操作拆分为两个32位的写入来执行,任意的读操作都必须具有原子性(即必须要在单个读事务中执行)
final关键字
相应的happen-before语义
1、对final域的写(构造方法内部),happen-before于后续对final域所在对象的读
2、对final域所在对象的读,happen-before于后续对final域的读
2. JUC
并发容器
BlockingQueue
说明
在所有的并发容器中,BlockingQueue是最常见的一种
它是一个带阻塞功能的队列,当入队列时,若队列已满,则阻塞调用者,当出队列时,若队列为空,则阻塞调用者
在Concurrent包中,BlockingQueue是一个接口,有许多不同的实现类
ArrayBlockingQueue
一个用数组实现的环形队列,在构造方法中,会要求传入数组的容量
LinkedBlockingQueue
一种基于单向链表的阻塞队列,因为队头和队尾是2个指针分开操作的,所以用了2把锁+2个条件,同时有一个AtomicInteger的原子变量记录count数
构造方法中,可以指定队列的总容量,如果不指定,默认为Integer.MAX_VALUE
区别于ArrayBlockingQueue
1、为了提高并发度,用了2把锁,分别控制队头、队尾的操作
意味着在put()和put()之间、take()和take()之间是互斥的,put()和take()之间并不互斥
但对于count变量,双方都需要操作,所以必须是原子变量
2、因为各自拿了一把锁,所以当需要调用对方的condition的signal时,还必须再加上对方的锁,就是signalNotEmpty()和signalNotFull()
3、不仅put会通知take,take也会通知put
当put发现非满时,也会通知其他put线程
当take发现非空时,也会通知其他take线程
PriorityBlockingQueue
队列通常是先进先出的,而该队列是按照元素优先级从小到大出队列的
因此,队列元素之间需要可以比较大小,并实现Comparable接口
构造方法中,如果不指定初始大小,内部会设定一个默认值11,当元素个数超过该值,会自动扩容
阻塞的实现方面,和ArrayBlockingQueue机制类似
主要区别是用数组实现了一个二叉堆,从而实现按优先级从小到大出队列
另一个区别是没有notFull条件,当元素个数超过数组长度,执行扩容操作
DelayQueue
即延迟队列,一个按延迟时间从小到大出列的PriorityQueue
放入该队列的元素,必须实现Delayed接口
1、如果getDelay的返回值小于等于0,则说明该元素到期,需要从队列中拿出来执行
2、该接口首先继承了Comparable接口,所以要实现该接口,必须实现Comparable接口
SynchronousQueue
一种特殊的BlockingQueue,它本身没有容量
先调put(),线程会阻塞;直到另一个线程调用了take(),两个线程才同时解锁,反之亦然
BlockingDeque
定义了一个阻塞的双端队列接口,继承了BlockingQueue接口,同时增加了对应的双端队列操作接口
该接口仅有一个实现:LinkedBlockingDeque,底层使用了一个双向链表
CopyOnWrite
说明
指在“写”时,不是直接“写”源数据,而是把数据拷贝一份进行修改,再通过悲观锁或乐观锁的方式写回
这是为了在“读”的时候不加锁
CopyOnWriteArrayList
核心数据结构也是一个数组
CopyOnWriteArraySet
用Array实现的一个Set,保证所有元素都不重复,其内部是封装的一个CopyOnWriteArrayList
ConcurrentLinkedQueue/Deque
实现原理
和AQS内部的阻塞队列类似,同样是基于CAS,通过head/tail指针记录队列头部和尾部,实现入队和出队
首先,它是一个单向链表
其次,在AQS的阻塞队列中,每次入队后,tail一定后移一个位置,每次出队,head一定后移一个位置,以保证head指向队列头部,tail指向链表尾部
但在ConcurrentLinkedQueue中,head/tail的更新可能落后于节点的入队和出队,因为它不是直接对head/tail执行进行CAS操作,而是对Node中的item进行操作
初始化
初始时,head和tail都指向一个null节点
入队列
即使tail没有移动,只要对p的next指针成功进行CAS操作,就算成功入队
只有当p!=tail时,才会后移tail指针,即每连续追加2个节点,才后移一次tail指针,即使CAS失败也没关系,可以由下一个线程来移动tail指针
出队列
出队的判断并非观察tail指针的位置,而是依赖于head指针后续的节点是否为NULL这一条件
只要对节点的item执行CAS操作,置为NULL成功,则出队成功,即使head指针没有成功移动,也可以由下一个线程继续完成
队列为空
因为head/tail并不是精确地指向队列头部和尾部,所以不能简单地通过比较head/tail指针来判断队列是否为空
而是需要从head指针开始遍历,找第一个不为NULL的节点,如果找到,则队列不为空,如果找不到,则队列为空
ConcurrentHashMap
原理
HashMap通常实现方式是“数组+链表”,这个方式被称为“拉链法”,ConcurrentHashMap在这个基本原理上进行了各种优化
首先是所有数据都放在一个大的HashMap中,其次是引入了红黑树
如果头节点是Node类型,则尾随它的就是一个普通链表,如果头节点是一个TreeNode,则后面就是一个红黑树
链表和红黑树之间可以相互转换,初始时是链表,当链表元素超过某个阈值,把链表转换为红黑树,反之,当红黑树元素少于某个阈值,再转换为链表
设计原理
使用红黑树,当一个槽里有很多元素时,其查询和更新速度会比链表快很多,Hash冲突能得到很好的解决
加锁的粒度,并非整个ConcurrentHashMap,而是对每个头节点分别加锁,即并发度就是Node数组的长度(初始为16)
并发扩容
构造方法
cap(Node数组长度)保持为2的整数次方
tableSizeFor()根据传入的初始容量,计算出一个合适的数组长度
sizeCtl,用于控制再初始化或并发扩容时的线程数,默认初始值为cap
初始化
构造方法中没有对数组进行初始化,当多个线程都往里面放入元素时,在进行初始化
多个线程的竞争通过对sizeCtl进行CAS操作实现,如果某个线程成功把sizeCtl设置为-1,它就拥有了初始化的权利,初始化完成,再将sizeCtl设置回去,其他线程一直执行while循环,自旋等待,直到数组不为null
初始化成功后,sizeCtl不再等于数组长度,而是n-(n>>>2)=0.75n,表示下一次扩容的阈值
put()实现
for循环分支
1、整个数组的初始化
2、所在槽为空,说明该元素是该槽的第一个元素,直接新建一个头节点,然后返回
3、该槽正在进行扩容,帮助其扩容
4、把元素放入槽内,槽内可能是链表/红黑树
包裹再synchronized(f)内,f对应的数组下标位置的头节点,意味着每个数组元素有一把锁,并发度等于数组长度
扩容
MIN_TREEIFY_CAPCITY=64
意味着当数组长度没有超过64是,数组每个节点都是链表,只会扩容,不会转换为红黑树
当数组长度大于等于64时,才考虑把链表转换为红黑树
transfer()
基本原理
首先建一个新的HashMap,其长度是旧数组的2倍,然后把旧元素逐个迁移过来
方法有两个参数:1是tab扩容前的HashMap,2是nextTab扩容后的HashMap
该方法会被多个线程调用,所以每个线程只是扩容旧的HashMap部分,这就涉及如何划分任务的问题
并行扩容任务划分
旧数组长度是N,每个线程扩容一段,一段的长度用变量stride表示,transferIndex表示整个数组扩容的进度
stride在单核模式下直接等于n,因为没有办法多个线程并行扩容,多核模式下为(n>>>3)/NCPU,且保证最小值为16,需要线程个数约为n/stride
transferIndex是一个成员变量,初始值为n,从大到小扩容,每次减stride个位置,最终减至n<=0,表示整个扩容完成
transferIndex会被多个线程并发修改,因此需要通过CAS进行操作
扩容期间
扩容未完成前,有的数组下标对应的槽已经迁移到新数组,此时所有线程还是会访问旧HashMap
为此,当Node[0]迁移成功,会新建一个ForwardingNode,即转发节点,内部记录新的ConcurrentHashMap的引用,线程访问到ForwardingNode之后,会去查询新的HashMap
迁移
因为数组长度为2的整数次方,每次扩容又是2倍,则意味着处于第i个位置的元素,扩容后一定处于第i或i+n个位置
tryPresize()
输入是整个Hash表的元素个数,在函数里面,根据需要对整个Hash表进行扩容
它的核心是调用transfer函数
第一次扩容时,sizeCtl会被设置成一个很大的负数U.compareAndSwapInt(this,SIZECTL,sc,(rs<
之后每一个线程扩容,sizeCtl就+1,待扩容完成后,sizeCtl-1
ConcurrentSkipListMap/Set
说明
ConcurrentHashMap是一种key无序的HashMap,而ConcurrentSkipListMap则是Key有序的,实现了NavigableMap接口
ConcurrentSkipListMap
无锁链表
无锁队列、栈都是只在队头、队尾进行CAS操作,通常不会有问题,如果在链表的中间进行插入或删除,按照通常的CAS做法,就会出现问题
跳查表
解决了无锁链表的插入和删除问题,也就解决了跳查表的一个关键问题,因为跳查表就是多层链表叠起来的
put
在通过findPredecessor()找到待插入的元素在[b,n]之间后,并不能马上插入,因为其他线程也在操作这个链表,b、n都有可能被删除,所有在插入之前执行了一系列的检查逻辑,这也正是无锁链表的复杂之处
ConcurrentSkipListSet
只是对ConcurrentSkipListMap的简单封装
同步工具类
Semaphore
说明
即信号量,提供了资源数量的并发访问控制
原理
实现原理和锁基本相同,资源总数即state的初始值,在acquire里对state变量进行CAS减操作,减到0之后,线程阻塞,在release里对state变量进行CAS加操作
方法
acquire()
工作线程每获取一份资源,就在该对象上记下来
release()
工作线程每归还一份资源,就在该对象上记下来
此时资源可以被其他线程使用
CountDownLatch
使用场景
假设一个主线程需要等待n个Worker线程执行完才能退出,可以使用CountDownLatch来实现
await()
调用的是AQS的模板方法,只要state!=0,调用await()的线程便会被放入AQS的阻塞队列,进入阻塞状态
countDown()
调用AQS的模板方法releaseShared(),只有state=0,才会返回true,然后一次性唤醒队列中所有阻塞的线程
总结
由于是基于AQS阻塞队列来实现,所以可以让多个线程都阻塞在state=0条件上,通过countDown()一直减state,减到0后一次性唤醒多个线程
CyclicBarrier
使用场景
用于协调多个线程同步执行操作的场合
实现原理
基于ReentrantLock+Condition实现
说明
CyclicBarrier是可以被重用的,每一轮被称为一个Generation,即一次同步点
CyclicBarrier会响应中断,当线程没有到齐,如果有线程收到中断信号,所有阻塞的线程也会被唤醒,然后count被重置为初始值,重新开始
Exchanger
使用场景
用于线程之间交换数据
实现原理
核心机制和Lock一样,也是CAS+park/unpark
Exchanger内部有两个内部类:Paritcipant、Node
每个线程调用exchange()交换数据时,会先创建一个Node对象
该对象就是对该线程的包装,包含了3个重要字段
该线程要交互的数据
对方线程交换来的数据
该线程自身
一个Node只能支持2个线程之间交换数据,要实现多个线程并行地交换数据,需要多个Node,因此Exchanger里定义了Node数组
exchange(V x)
Phaser
说明
从JDK7开始,新增了一个同步工具类Phaser,其功能比CyclicBarrier和CountDownLatch更加强大
Phaser没有基于AQS来实现,但具备AQS的核心特性:state变量、CAS操作、阻塞队列
新特性
1、动态调整线程个数
CyclicBarrier所要同步的线程个数是在构造方法中指定的,之后不能更改,而Phaser可以在运行期间动态地调整要同步的线程个数
register()
注册一个
bulkRegister()
注册多个
arriveAndDeregister()
解除注册
2、层次Phaser
多个Phaser可以组成树状结构,可以通过在构造方法中传入父Phaser来实现
Phaser内部结构中,每个Phaser记录了自己的父节点,但并没有记录自己的子节点列表
每个Phaser知道自己的父节点,但不知道自己有多少个子节点,对父节点的操作,是通过子节点来实现的
对于树状Phaser上的每个节点来说,可以当作一个独立的Phaser来看待,其运作机制和一个单独的Phaser一样
父Phaser并不用感知子Phaser的存在,当子Phaser中注册的参与者数量大于0,会把自己向父节点注册,当子Phaser中注册的参与者数量等于0,会自动向父节点解除注册
父Phaser把子Phaser当作一个正常参与的线程即可
state变量
分为4部分
1位:是否完成同步,0/1,默认为0
31位:轮数
16位:总线程数
16位:未到达线程数
Phaser提供了一系列成员方法来从state中获取state的不同部分
阻塞和唤醒
基于state变量,对其执行CAS操作,并进行相应的阻塞和唤醒
主线程会调用awaitAdvance()进行阻塞,工作线程调用arrive()对state进行CAS的累减操作,当未到达线程数减到0,唤醒阻塞的主线程
阻塞使用的是一个称为Treiber_Stack的数据结构,而不是AQS双向链表
Treiber_Stack是一个无锁的栈,它是一个单向链表,出栈入栈都在链表头部,所以只需要一个head指针,而不需要tail指针
为了减少并发冲突,定义了两个Treiber_Stack,当phase为奇数轮时,阻塞线程放在oddQ中,当phase为偶数轮时,阻塞线程放在evenQ中
arrive()
arrive()和arriveAndDeregister()内部都是调用doArrive(boolean),区别在于前者只是把“未到达线程数”减1,后者则把“未到达线程数”和“下一轮总线程数”都减1
awaitAdvance()
Atomic类
AtomicInteger/AtomicLong
说明
对于一个整数的加减操作,要保证线程安全,需要加锁,即加synchronized
但有了Concurrent的Atomic相关类之后,synchronized可以用相关Atomic类代替,其性能更好
AtomicInteger
getAndIncrement()和getAndDecrement()都调用了U.getAndAddInt(),该方法基于CAS实现
getAndAddInt()具有volatile语义,即对所有线程都是同时可见的
悲观锁/乐观锁
对于悲观锁,认为数据发生并发冲突的概率很大,读操作之前就上锁,synchronized、ReentrantLock都是悲观锁的典型
对于乐观锁,认为数据发生并发冲突的概率较小,读之前不上锁,等到写操作时,再判断数据在此期间是否被其他线程修改了
如果修改了,就把数据重新读出来,重复该过程
如果没修改,就写回去
判断数据是否被修改,同时写回新值,这两个操作要合成一个原子操作,即CAS(CompareAndSet)
AtomiInteger的实现就是典型的乐观锁
Unsafe的CAS
Unsafe类是整个Concurrent包的基础,里面所有方法都是native的
compareAndSetInt(Object, long, int, int)
第二个参数是long型,经常被称为xxxOffset,表示某个成员变量在对应的类中的内存偏移量(该变量在内存中的位置),表示该成员变量本身
所有调用CAS的地方,都会先通过objectFieldOffset1()把成员变量转换成一个Offset
自旋/阻塞
当一个线程拿不到锁时,有两种基本的等待策略
1、放弃CPU,进入阻塞状态,等待后续被唤醒,再重新被操作系统调度
2、不放弃CPU,空转,不断重试,即所谓的“自旋”
对于单核CPU,只能用策略1,而对于多核或多CPU,可以使用策略2,因为没有线程切换的开销
两种策略并不互斥,可以结合使用,如果获取不到锁,先自旋,如果还拿不到,再阻塞,synchronized关键字就是这样的实现策略
AtomicInteger的实现就用的是自旋策略,如果拿不到锁,就会一直重试
AtomicBoolean/AtomicReference
为什么需要AtomicBoolean
因为往往需要实现先判断后赋值的操作,即两个操作合在一起的原子性,即CAS提供的功能
支持boolean/double
Unsafe类中,只提供了三种类型的CAS操作:int、long、Object(引用类型)
在JDK实现中,这三种CAS操作都是由底层实现的,其他类型的CAS操作都要转换为这三种之一进行操作
AtomicBoolean
用int来代替,入参时,将boolean转换为int,返回时,将int转换为boolean
double类型
依赖double类型提供的一对double类型核long类型互转的方法
longBitsToDouble()
doubleToRawLongBits()
AtomicStampedReference/AtomicMarkableReference
ABA问题
CAS都是基于“值”来做比较的,如果另一个线程把变量从A改为B,再从B改为A,则尽管修改了两次,但在当前线程做CAS操作时,却会因为值没变而认为数据没有被其他线程修改过
要解决ABA问题,不仅要比较值,还要比较“版本号”,这正是AtomicStampedReference的事情
说明
compareAndSet
这里的CAS有4个参数,后两个参数就是版本号的旧值和新值
因为要同时比较数据的“值”和“版本号”,而Integer或Long类型的CAS没有办法同时比较两个变量,所以只能把值和版本号封装成一个对象(Pair内部类),然后通过对象引用的CAS来实现
AtomicMarkableReference
原理与AtomicStampedReference类似,只是Pair里的版本号是boolean类型,而不是整型的累积变量
因为是boolean类型,只能有true、false两个版本号,所以并不能完全避免ABA问题,只是降低了ABA发生的概率
AtomicXXXFieldUpdater
说明
如果一个类是自己编写的,则可以在编写时把成员变量定义为Atomic类
但如果是一个已有的类,在不更改源码情况下,要实现对其成员变量的原子操作,就需要AtomicXXXFieldUpdater
原理
AtomicIntegerFieldUpdater为例,它是一个抽象类,构造方法是protected,不能直接构造其对象,必须通过它提供的一个静态方法来创建
newUpdater()
传入的是要修改的类(非对象)和对应的成员变量的名字,内部通过反射拿到这个类的成员变量,然后包装成一个AtomicIntegerFieldUpdater对象
所以这个对象表示的是类的某个成员,而不是对象的成员变量
若要修改某个对象的成员变量的值,再传入相应的对象
getAndIncrement(T obj)
限制条件
如果想使用AtomicXXXFieldUpdater修改成员变量,成员变量必须是volatile的int类型(不能是Integer包装类)
AtomicXXXArray
说明
Concurrent包提供了AtomicXXXArray(Integer、Long、Reference)三个数组元素的原子操作
并不是对整个数组的操作,而是针对数组中某个元素的原子操作
使用方式
相比对应的AtomicXXX,各类操作方法中多了一个传入参数:数组的下标(i)
实现原理
其底层的CAS方法直接调用VarHandle中native的getAndAdd()
Striped64/LongAdder
说明
从JDK8开始,针对Long型的原子操作,又提供了LongAdder、LongAccumulator
针对Double型,提供了DoubleAdder、DoubleAccumulator
LongAdder原理
AtomicLong内部是一个volatile_long型变量,由多个线程对这个变量进行CAS操作,然而在高并发下仍不够快,需要再提高性能
把一个变量拆成多份,变为多个变量,类似于ConcurrentHashMap的分段锁
把一个Long型拆成一个base变量外加多个Cell,每个Cell包装一个Long型变量
当多个线程并发累加时,如果并发度低,就直接加到base变量上
如果并发度高,冲突大,平摊到Cell上,最后取值时,再把base和Cell求sum运算
由于无论是long,还是double都是64位的,但因为没有double的CAS操作,所以通过把double转化成long来实现
最终一致性
在最终求和时,并没有对Cells数组加锁,即线程对其执行求和和修改可能是并行的,即最终一致性,而不是强一致性
因此,LongAdder适合高并发的统计场景,而不适合要对某个Long型变量进行严格同步的场景
伪共享和缓存行填充
Cell类定义中,用了一个独特的注解@sum.misc.Contended
这是JDK8之后有的,涉及了一个很重要的优化原理:伪共享和缓存行填充
每个CPU都有自己的缓存,缓存与主内存进行数据交换的基本单位叫CacheLine(缓存行)
在64位x86架构中,缓存行是64字节,即8个Long型大小,即意味着当缓存失效,要刷新到主内存时,最少要刷新64字节
伪共享
假设主内存中有变量X、Y、Z(均为Long型),被CPU1、CPU2分别读入自己的缓存,放在同一行的CacheLine中
当CPU1修改了X,它要失效整行CacheLine,即往总线上发消息,通知CPU2对应的CacheLine失效
由于CacheLine是数据交换的基本单位,失效X会导致失效整行CacheLine,即失效Y、Z缓存
解决
需要用到所谓的“缓存行填充”,分别在X、Y、Z后面填充7个无用的Long型,填充整个缓存行,使X、Y、Z处于不同的缓存行
声明@jdk.internal.vm.annotation.Contended即可实现缓存行填充,之所以这里要用缓存行填充,使为了不让Cell数组中相邻的元素落到同一个缓存行中
核心实现
LongAdder最核心的累加方法add(long),自增、自减操作都是通过调用该方法实现
当一个线程调用add()时,首先会尝试使用casBase把x加到base变量上,如果不成功,则使用c.cas()尝试把x加到Cell数组的某个元素上,如果还不成功,则调用longAccumulate()
Cells数组的大小始终是2的整数次方,在运行中不断扩容,每次扩容增长为2倍
对于一个线程来说,它并不在意到底是把x累加到base上,还是累加到Cells数组上,只要累加成功即可,因此,使用随机数来实现Cell的长度取模
LongAccumulator
类似LongAdder,但功能更加强大
LongAdder只能进行累加操作,且初始值默认为0
LongAccumulator可以自定义一个二元操作符,且可以传入一个初始值
LongBinaryOperator
二元操作符
DoubleAdder/DoubleAccumulator
其实也是long型实现,因为没有double类型的CAS方法
Lock/Condition
互斥锁
锁的可重入性
可重入锁指当一个线程调用oject.lock()获取到锁,进入临界区后,再次调用object.lock(),仍然可以获取到锁
通常锁都要设计成可重入的,否则就会发生死锁,synchronized就是可重入锁
类继承层次
Lock
一个接口,常用方法是lock/unlock()
lock()不能被中断,对应的lockInterruptibly()可以被中断
ReentrantLock
本身没有代码逻辑,实现都在内部类Sync中
Sync
AbstractQueuedSynchronizer
NonfairSync
FairSync
锁的公平性/非公平性
说明
Sync是一个抽象类,有两个子类FairSync/UnfairSync,分别对应公平锁和非公平锁
ReentrantLock构造方法中,可以传入一个布尔类型变量fair指定锁是否公平,默认非公平
一个新的线程来了,看到有很多线程在排队,自己排到队伍末尾,这叫公平锁,线程来了之后直接去抢锁,这叫做不公平
默认设置是非公平锁,是为了提高效率,减少线程切换
基本实现原理
Sync的父类AbstractQueuedSynchronizer被称作队列同步器(AQS),它的父类是AbstractOwnableSynchronizer
为了实现一把具有阻塞或唤醒作用的锁,需要几个核心要素
1、需要一个state变量,标记锁的状态
至少有两个值:0,1
对state的操作,使用CAS保证线程安全
2、需要记录当前是哪个线程持有锁
3、需要底层支持对一个线程进行阻塞或唤醒操作
4、需要有一个队列维护所有阻塞的线程
这个队列必须是线程安全的无锁队列,也需要使用CAS
AQS的state取值不仅可以是0/1,还可以大于1,就是为了支持锁的可重入性
Unsafe类提供了阻塞或唤醒线程的一对操作原语,即park()/unpark(),LockSupport工具类对这对原语做了简单封装
在当前线程中调用park(),该线程会被阻塞,在另外一个线程中调用unpark(Thread),传入一个阻塞的线程,就可以唤醒阻塞在park()处的线程
unpark(Thread)实现了一个线程对另一个线程的精准唤醒,而notify只是唤醒某一个线程,但无法指定具体唤醒哪个线程
AQS中利用双向链表和CAS实现了一个阻塞队列
它是整个AQS的核心中的核心
head指向链表头部,tail指向链表尾部
入队就是把新的Node加到tail后面,然后对tail进行CAS操作
出队就是对head进行CAS操作,把head向后移一个位置
初始时,head=tail=null,然后在往队列中加入阻塞线程时,会新建一个空的Node,让head和tail都指向这个空Node,之后在后面加入被阻塞的线程对象
公平/非公平锁实现差异
非公平
如果state为0,直接将当前线程设置为排他线程,同时设置state的值
如果state不为0,但是排他线程就是当前线程,则直接设置state的值
否则返回false,获取失败
公平
如果state为0,且队列中没有等待的线程,则设置当前线程为排他线程,并设置state的值
如果state不为0,但是排他线程就是当前线程,则直接设置state的值
否则返回false,获取失败
阻塞队列/唤醒机制
addWaiter()
为当前线程生成一个Node,然后把Node放入链表尾部
此时只是把Thread对象放入一个队列而已,线程本身并未阻塞
之后工作依靠acquireQueued()完成
acquireQueued()
线程一旦进入方法就会被无限期阻塞,即使有其他线程调用inerrupt()也不能将其唤醒,除非有其他线程释放了锁,且该线程拿到了锁,才会从该方法返回
方法返回的一刻,即拿到锁/被唤醒时,会删除队列的第一个元素,head指针前移1个节点
该方法不会中断响应,但它会记录被阻塞期间有没有其他线程向它发送过中断信号,如果有,返回true,否则返回false
该方法返回true时,会调用selfInterrupt(),自己给自己发送中断信号,即自己把自己的中断标志位设为true
这样做是因为自己在阻塞期间,收到其他线程中断信号没有及时响应,现在要进行补偿
这样,如果该线程在lock代码块内部有调用sleep()之类阻塞方法,就可以抛出异常,响应该中断信号
parkAndCheckInterrupt()
阻塞发生在该方法中,线程调用park(),自己把自己阻塞起来,直到被其他线程唤醒,该方法返回
park()返回情况
1、其他线程调用了unpark(Thread)
2、其他线程调用了t.interrupt()
注意lock()不能响应中断,但LockSupport.park()能响应中断
因为LockSupport.park()可能被中断唤醒,accquireQueued()才写了一个for死循环,唤醒之后,如果发现自己排在队列头部,就去拿锁,如果拿不到,就再次自己阻塞自己,不断重复此过程,直到拿到锁
被唤醒后,通过Thread.interrupted()来判断是否被中断唤醒
unlock()
unlock()不区分是否公平,内部直接调用sync.release()
release()
当前线程要释放锁,先调用tryRelease(),如果返回true,则取出head,让head获取锁
tryRelease()
先计算当前线程释放锁后的state值
如果当前线程不是排他线程,则抛异常,因为只有获取锁的线程才可以释放锁
此时设置state,没有使用CAS,因为是单线程操作
unparkSuccessor()
唤醒队列中的后继者
lockInterruptibly()
可以被中断,直接调用sync.acquireInterruptibly()
acquireInterruptibly()
AQS的模板方法,内部的tryAcquire()分别被FairSync/UnfairSync实现
doAcquireInterruptibly()
当parkAndCheckInterrupt)返回true,说明有其他线程发送中断信号,直接抛出InterruptedException,跳出for循环,整个方法返回
tryLock()
实现基于调用非公平锁的tryAcquire(),对state进行CAS操作,如果成功就拿到锁,如果不成功则直接返回false,也不阻塞
读写锁
说明
和互斥锁相比,ReentrantReadWriteLock就是读线程和读线程之间不互斥
读读不互斥,读写互斥,写写互斥
类继承层次
Lock
ReadWriteLock
一个接口,内部由两个Lock接口组成
readLock()
writeLock()
ReentrantReadWriteLock
实现了ReadWriteLock接口
当使用ReadWriteLock时,并不是直接使用,而是获得其内部的读锁和写锁,然后分别调用lock/unlock
实现原理
表面上看,ReadLock和WriteLock是两把锁,但实际上它只是一把锁的两个视图而已
readerLock和writerLock实际上公用同一个sync对象,它同互斥锁一样,分为非公平/公平两种策略,并继承自AQS
读写锁也用state变量来表示锁状态,在内部类Sync中,对state变量进行了重新定义,即把state变量拆成两半
低16位,用来记录写锁
高16位,用来“读”锁
因为无法用一次CAS同时操作两个int变量,所以用一个int变量的高低16位分别表示读锁和写锁的状态
state=0,说明既没有线程持有读写,也没有线程持有写锁
当state!=0,要么有线程持有读锁,要么有线程持有写锁,两者不能同时成立,因为读写互斥
AQS的两对模板方法
方法
acquire()/release()
互斥锁和读写锁的写锁基本这两个方法实现
acquireShared()/releaseShared()
读写锁的读锁基于这两个方法实现
实现
读锁的公平实现
Sync.tryAccquireShared() + FairSync中的两个重写的子方法
读锁的非公平实现
Sync.tryAccquireShared() + NonfairSync中的两个重写的子方法
写锁的公平实现
Sync.tryAccquire() + FairSync中的两个重写的子方法
写锁的非公平实现
Sync.tryAccquire() + NonfairSync中的两个重写的子方法
说明
公平
不论是读锁/写锁,只要队列中有其他线程在排队,就不能直接去抢锁,要排在队列尾部
非公平
写锁
写线程能抢锁,前提是state=0,只有在没有其他线程持有读锁/写锁的情况下,它才有机会去抢锁
或者state!=0,而持有写锁的线程是自己,再次重入
写线程是非公平的,即writerShouldBlock()一直返回false
读锁
假设当前线程被读线程持有,然后其他读线程非公平地去抢,可能导致写线程永远拿不到锁,所以对于读线程的非公平,需要做一些“约束”
当发现队列的第一个元素是写线程时,读线程也要阻塞,不能直接去抢
WriteLock实现
写锁是排他锁,实现策略类似于互斥锁
tryLock()
当state不是0,如果写线程获取锁的个数为0,或写线程不是当前线程,则返回抢锁失败
只要不是以上情况,则通过CAS设置state值,如果成功,就将排他线程设置为当前线程,返回true
tryLock()和lock()不区分公平/非公平
unlock()
不区分公平/非公平
ReadLock实现
读锁是共享锁,实现策略和排他锁有很大差异
tryLock()
如果写线程占用锁或当前线程不是排他线程,则抢锁失败
如果获取锁的值达到极限,则抛异常
使用CAS设置读线程锁的state值,如果成功
如果第一个读线程就是当前线程,表示读线程重入读锁
如果不是当前线程,则从ThreadLocal中获取当前线程的读锁个数,并设置当前线程持有的读锁个数
unlock()
因为读锁是共享锁,多个线程会同时持有读锁,所以对读锁的释放不能直接减1,而是需要通过一个for循环+CAS操作不断重试
Condition
与Lock的关系
Condition本身也是一个接口,其功能和wait()/notify()类似
Condition必须和Lock一起使用,因此Lock接口中,有一个与Condition相关的接口
实现原理
由于Condition必须和Lock一起使用,所以Condition的实现也是Lock的一部分
读写锁中的ReadLock不支持Condition,读写锁的WriteLock和互斥锁都支持Condition
每一个Condition对象上,都阻塞了多个线程,所以在ConditionObject内部也有一个双向链表组成的队列
await()
关键点说明
1、线程调用await()时,肯定已经拿到了锁
所以addConditionWaiter()内部,对这个双向链表的操作不需要执行CAS操作,线程天生是安全的
2、在线程执行wait操作前,必须先释放锁
即fullyRelease(Node),否则会发生死锁
3、线程从wait中被唤醒后,必须用acquireQueued(node,saveState)重新拿锁
4、checkInterruptWhileWaiting(node)在park()之后,是为了检测在park期间是否受到过中断信号
当线程从park中醒来,有两种可能
一是其他线程调用了unpark
另一是收到中断信号
await()是可以响应中断的,所以当发现自己是被中断唤醒,会直接退出while循环,然后返回
5、isOnSyncQueue(node)用于判断该Node是否在AQS的同步队列中
初始时,Node只在Condition队列中,而不在AQS队列里
但执行notify()操作时,会放进AQS的同步队列
awaitUninterruptibly()
不会响应中断,其方法定义中不会有中断异常抛出
当线程唤醒后,如果被中断过,仅记录不处理,继续进行while循环
notify()
同await()一样,调用notify()时,必须先拿到锁(否则会抛出异常),因为前面执行await()时,把锁释放了
然后从队列中取出firstWaiter,唤醒它,在通过调用unpark()唤醒前,先用enq(node)把这个Node放入AQS的锁对应的阻塞队列中
StampedLock
引入原因
StampedLock是JDK8中新增的,从ReentrantLock到StampedLock,并发度依次提高
StampedLock引入了“乐观读”策略,读时不加读锁,读出来发现数据被修改了,再升级为“悲观读”,相当于降低了读的地位,避免写线程被饿死
StampedLock读读不互斥,读写不互斥,写写互斥
使用场景
“乐观读”实现原理
state
StampedLock是一个读写锁,因为也会像读写锁一样,把state拆分,分别表示读锁和写锁的状态
同时它需要一个数据的version,但一次CAS没办法操作两个变量,所以这个state本身还表示了数据的version
最低的8位表示读和写的状态,第8位表示写锁状态,低7位表示读锁状态,因为写锁只有一个bit位,所以写锁不可重入
初始值不为0
悲观读/写
StampedLock也要进行悲观的读锁和写锁操作,但它不是基于AQS实现,而是内部重新实现了一个阻塞队列
阻塞队列实现锁的调用策略和AQS不一样
AQS中,一个线程CAS_state失败后,会立即加入阻塞队列,并进入阻塞状态
StampedLock中,CAS_state失败后,会不断自旋,直到足够多次数后,如果还拿不到锁,才进入阻塞状态
为此根据CPU的核数,定义了自旋次数的常量值
acquireWrite()
两个大的for循环,内部实现了非常复杂的自旋策略
第一个循环中,目的就是把Node加入队列的尾部,一边加入,一边通过CAS操作尝试获得锁
如果获得,则整个方法返回
如果不能获得,会一直自旋,直到加入队列尾部
第二个循环中,即该Node已经在队列尾部
这时如果发现自己也在队列头部,说明队列除了空的Head节点,就是当前线程
此时再进行一轮自旋,直到达到MAX_HEAD_SPINS,然后进入阻塞
另一个不同于AQS阻塞队列的是,在每个WNode里有一个cowait指针,用于串联起所有读线程,这样一个读线程被唤醒,其他读线程会随之一同唤醒,因为读读不互斥
3. 线程池与Future
实现原理
调用方不断向线程池中提交任务,线程池中有一组线程,不断地从队列中取任务,这是一个典型的生产者-消费者模型
实现需要考虑
1、队列设置多长,如果是无界的,调用方不断往队列放任务,可能导致内存耗尽,如果是有界的,队列满了之后,调用方如何处理
2、线程池中的线程个数是固定的,还是动态变化的
3、每次提交新任务,是放入队列,还是开新线程
4、当没有任务时,线程是睡眠了一小段时间,还是进入阻塞,如果进入阻塞,如何唤醒
问题4做法
1、不使用阻塞队列,只使用一般的线程安全队列,也无阻塞/唤醒机制
当队列为空,线程池中的线程只能睡眠一会儿,然后醒来去看队列中有无新任务,如此不断轮询
2、不使用阻塞队列,但在队列外部、线程池内部实现阻塞/唤醒机制
3、使用阻塞队列
很明显,该做法最完善
类继承体系
Executor
先线程池提交的每个任务,都必须实现Runnable接口,通过该接口的execute(Runnable)向线程池提交任务
ExecutorService
定义了线程池关闭接口shutdown(),还定义了可以有返回值的任务,即Callable
ThreadPoolExecutor
ScheduledThreadPoolExecutor
不仅可以执行某个任务,还可以周期性地执行任务
ThreadPoolExecutor
核心数据结构
成员
AtomicInteger ctl
线程数量和线程池状态变量
BlockingQueue workQueue
存放任务的阻塞队列
ReentrantLock mainLock
对线程池内部各种变量进行互斥访问控制
HashSet workers
线程集合
Worker
每一个线程是一个Worker对象,Worker是ThreadPoolExecutor的内部类
Worker继承于AQS,即Worker本身就是一把锁,这把锁用于线程池的关闭、线程执行任务的过程中
核心配置参数
ThreadPoolExecutor在构造方法中提供了几个配置参数,来配置不同策略的线程池
参数
corePoolSize
线程池中始终维护的线程个数
maxPoolSize
在corePoolSize已满,队列也满的情况下,扩充线程至该值
keepAliveTime/TimeUnit
maxPoolSize中的空闲线程销毁所需要的时间
总线程数收缩回corePoolSize
blockingQueue
线程池所用的队列类型
threadFactory
线程创建工厂,可以自定义,默认为Executors.defaultThreadFactory()
RejectedExecutionHandler
corePoolSize已满,队列已满,maxPoolSize已满,最后的拒绝策略
提交任务处理流程
1、判断当前线程数是否大于等于corePoolSize,如果小于,则新建线程执行,如果大于,则进入Step2
2、判断队列是否已满,如未满,则放入,如已满,进入Step3
3、判断当前线程数是否大于等于maxPoolSize,如果小于,则新建线程执行,如果大于,进入Step4
4、根据拒绝策略,拒绝任务
优雅关闭
说明
线程池的关闭,较之线程的关闭更为复杂
关闭线程池时,有的线程还在执行某个任务,有的调用者正在向线程池提交任务,且队列中可能还有未执行的任务
因此,关闭过程不可能是瞬时的,而是需要一个平滑的过渡,这就涉及线程池的完整生命周期管理
生命周期
JDK7中,把线程数量和线程池状态两个变量打包存储在一个字段,即ctl变量,而在JDK6中,它们是分开的
最高3位存储线程池状态
其余29位存储线程个数
线程池状态
RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED
关闭方法
shutdown()/shutdownNow(),它们会让线程池切换到不同状态
shutdown()不会清空任务队列,会等所有任务执行完成,shutdownNow()清空任务队列
shutdown()只会中断空闲线程,shutdownNow()会中断所有线程
在队列为空,线程池也为空时,进入TIDYING状态,最后执行一个钩子方法terminated(),进入TERMINATED状态,线程池才真正关闭
状态迁移
有一个关键特征,从小到大迁移,只会从小的状态往大的状态迁移
关闭的正确步骤
在调用shutdown()/shutdownNow()之后,线程池不会立即关闭,接下来需要调用awaitTermination()来等待线程池关闭
awaitTermination()内部不断循环判断线程池是否到达了最终状态,如果是返回,如果不是,则通过termination条件阻塞一段时间,之后继续
tryTerminator()
shutdown()/shutdownNow()都调用了tryTerminator()
它不会强行终止线程池,只是做了一下检测
当workerCount为0,workerQueue为空时,先把状态切换为TIDYING,然后调用terminated()
当terminated()执行完成,把状态改为TERMINATED,接着调用termination.signalAll(),通知前面阻塞在awaitTermination的所有调用者线程
任务提交过程
execute()
addWorker()
用于启动新线程
任务执行过程
任务提交过程中,可能会开启一个新的Worker,并把任务本身作为firstTask赋给该Worker
但是对于一个Worker来说,不是只执行一个任务,而是源源不断地从队列中取任务执行,这是一个不断循环的过程
调用shutdown()时,可能出现
所有线程都处于空闲状态
这意味着任务队列一定是空的,此时所有线程都会阻塞在getTask()
然后所有线程都会收到interruptIdleWorkers()发来的信号,getTask()返回null,所有Worker退出while循环,之后执行processWorkerExit
所有线程都处于忙碌状态
此时队列可能是空或非空,interruptIdleWorkers()内部的tryLock()调用失败,什么都不做,所有线程继续执行自己当前的任务
之后所有线程会执行完队列中的任务,直到队列为空,getTask()返回null,所有Worker退出while循环,之后执行processWorkerExit
部分线程忙碌,部分线程空闲
有部分线程空闲,说明队列一定为空,这些线程肯定阻塞在getTask()上
空闲线程和场景1一样处理,不空闲线程和场景2一样处理
调用shutdownNow()
与上面类似,只是多了一个环节,即清空任务队列
如果一个线程正在执行某个业务代码,即使向它发送中断信号,也没有用,只能等它把代码执行完成
因此,中断空闲线程和中断所有线程区别并不是很大,除非线程当前正好阻塞在某个地方
当一个Worker最终退出时,会调用processWorkerExit()执行清理工作
拒绝策略
execute()最后调用了reject(command)执行拒绝策略,实际是调用RejectedExecutionHandler.rejectedExecution()
RejectedExecutionHandler
是一个接口,定义了四种实现,分别对应四种不同的拒绝策略
CallerRunsPolicy
调用者直接在自己的线程里执行,线程池不处理
AbortPolicy
默认策略,线程池抛异常
DiscardPolicy
线程池直接丢掉任务
DiscardOldestPolicy
删除队列中最早的任务,将当前任务入队列
Executors工具类
concurrent包提供了Executors工具类,利用它可以创建不同类型的线程池
对比
单线程的线程池
Executor.newSingleThreadPoolExecutor()
固定数目线程的线程池
Executor.newFixedThreadPool()
每接收一个请求,就创建一个线程来执行
Executor.newCachedThreadPool()
单线程具有周期调度功能的线程池
Executor.newSingleThreadScheduledExecutor()
多线程,有调度功能的线程池
Executor.newScheduledThreadPool()
最佳实践
不同的线程池,都是由关键配置参数配置而成的
AlibabaJava开发手册中,明确禁止使用Executors创建线程池,并要求开发者直接使用ThreadPoolExecutor/ScheduledThreadPoolExecutor进行创建
这样做是为了强制开发者明确线程池的运行策略,使其对线程池的每个配置参数心中有数,以规避因使用不当而造成的资源耗尽的风险
ScheduledThreadPoolExecutor
说明
实现了按时间调度来执行任务
类型
延迟执行任务
schedule(Callable, long, TimeUnit)
schedule(Runnable, long, TimeUnit)
周期执行任务
scheduleAtFixedRate(Runnable, long, long, TimeUnit)
按固定频率执行,与任务本身执行时间无关
任务执行时间必须小于间隔时间
scheduleWithFixedDelay(Runnable, long, long, TimeUnit)
按固定间隔执行,与任务本身执行时间有关
原理
延迟执行任务依靠的是DelayQueue,它在内部实现了一个特定的DelayQueue
其原理和DelayQueue一样,但针对任务的取消进行了优化
周期性执行任务是执行完一个任务之后,再把该任务扔回到任务队列中,这样可以对一个任务反复执行
延迟执行 schedule()
在内部通过decorateTask()把Runnable包装成一个ScheduleFutureTask对象,而DelayedWorkQueue中存放的就是该类型对象,该对象实现了Delayed接口
任务的执行过程复用了ThreadPoolExecutor,延迟的控制在DelayedWorkerQueue内部完成
周期性执行
和schedule()基本一样,也是包装一个ScheduledFutureTask对象,只是在延迟时间参数外多了一个周期参数,然后放入DelayedWorkerQueue
两个方法区别在于setNextRunTime
atFixedRate
period>0,下一次开始执行时间等于上次开始执行时间+period
withFixedDelay
period<0,下一次开始执行时间等于triggerTime(p),为now+(-period),now即上次执行结束时间
CompletableFuture
说明
JDK8开始,Concurrent包提供了一个强大的异步编程工具CompletableFuture
JDK8前,异步编程可以通过线程池和Future来实现,但功能不够强大
CompletableFuture实现了Future接口,所以具有Future特性
调用get()会阻塞当前线程,直到结果返回
另一个线程调用complete()完成该Future,所有阻塞在get()的线程都会获得返回结果
runAsync/supplyAsync
runAsync()
没有返回值的任务,提交的是Runnable,返回CompletableFuture
supplyAsync()
有返回值的任务,提交的是Supplier,返回的是CompletableFuture(String)
thenRun/thenAccept/thenApply
CompletableFuture可以在结果上再加一个callback,依次往复
三个方法都是在任务执行完成后,接着执行回调,只是回调形式不同
thenRun()
跟的是一个无参数、无返回值的方法,即Runnable
最终返回CompletableFuture
thenAccept()
跟一个有参数、无返回值的方法,称为Consumer
返回值也是CompletableFuture
thenApply()
跟一个有参数、有返回值的方法,称为Function
返回值是CompletableFuture
参数接收的是之前supplyAsync()这个任务的返回值
thenCompose/thenCombine
thenCompose()
thenApply接收的是一个Function,如果这个Function返回值是另一个CompletableFuture,就会出现嵌套的CompletableFuture
如果希望返回值非嵌套,可以使用thenCompose
它传入的参数是Function,其返回值必须是CompletionStage子类,也即CompletableFuture类型
thenCombine()
第一个参数是一个CompletableFuture,第二个参数是BiFunction(2个输入参数,一个返回值)
功能是在两个CompletableFuture完成之后,把两个返回值传进去,再额外做一些事
任意组合
allOf和anyOf可以组合任意多个CompletableFuture
它们都是静态方法,参数是变长的CompletableFuture集合,它们的区别是前者是“与”,后者是“或”
allOf返回值是CompletableFuture,因为每个传入的CompletableFuture的返回值可能不同,所以组合的结果是无法确定的
anyOf的含义是只要任意一个CompletableFuture结束,就可以做接下来的事情,由于每个CompletableFuture的返回值类型可能不同,意味着无法判断类型,所以其返回值是CompletableFuture
任务原型
提交给CompletableFuture执行的任务有四种
Runnable
无返回值,无参数
Consumer
无返回值,有参数
Supplier
有返回值,无参数
Function
有返回值,有参数
因为初始时没有CompletableFuture对象,也没有参数可传,所以提交的只能是Runnable/Supplier,只能是静态方法
通过静态方法生成CompletableFuture之后,便可以链式提交其他任务(Runnable、Consumer、Function),且都是成员方法
CompletableStage
该接口定义的是各种链式方法、组合方法
其所有方法的返回值都是CompletableStage类型,正因为如此,才能实现链式调用
thenApply()接收的是一个有参数/返回值的Function,而这个Function的
输入参数必须是?_super_T类型(T或T的父类型),T代表调用thenApply的对象的类型
返回值必须是?_Extends_U类型(U或U的子类型),U即thenApply返回值的CompletableStage类型
内部原理
构造 ForkJoinPool
CompletableFuture中任务的执行依靠ForkJoinPool
任务类型的适配
ForkJoinPool接受的任务是ForkJoinTask类型,所以需要一个适配机制,把任务类型转换成ForkJoinTask,然后提交给ForkJoinPool
为了完成转换,CompletableFuture内部定义了一系列的内部类
supplyAsync()内部,会把一个Supplier转换为一个AsyncSupply,然后提交
runAsync()内部,会把一个Runnable转换为一个AsyncRun,然后提交
thenRun/thenAccept/thenApply内部,分别把Runnable/Consumer/Function转换为UniRun/UniAccept/UniApply对象,然后提交
链式执行过程
supplyAsync(Supplier)
其内部构造了一个AsyncSupply对象,该对象有三个关键点
1、继承自ForkJoinTask,所以能够提交给ForkJoinPool执行
2、封装了Supplier f,即它所执行任务的具体内容
3、该任务的返回值,即CompletableFuture,也被封装在里面
ForkJoinPool执行AsyncSupply(ForkJoinTask类型),该任务输入是Supply,输出是CompletableFuture
thenApply()
这里提交的任务不可能立即执行,在此构建了一个UniApply对象,即一个ForkJoinTask类型的任务,这个任务放入了上面的任务的栈中
每一个CompletableFuture对象内部都有一个栈,存储后续依赖它的任务,这个栈也是Treiber_Stack
UniApply类似于上面的AsyncSupply,构造方法传入了4个参数
1、执行它的ForkJoinPool
2、输出一个CompletableFuture对象,即thenApply()的返回值,用来链式执行下一个任务
3、其依赖的前置任务
4、输入,即一个Function对象
UniApply被放入上面的CompletableFuture的栈中,上面的任务执行完成之后,就会从栈中弹出并执行
thenRun()
类似thenApply(),不过构建的是UniRun对象
thenApply/thenApplyAsync
它们调用的是同一个方法,只不过传入参数不同
如果前置任务没有完成,它们都会将当前任务的下一个任务入栈,然后再出栈执行
只有再当前任务已完成情况下,thenApply才会立即执行,不会入栈再出栈,不会交给ForkJoinPool;thenApplyAsync还是将下一个任务封装为ForkJoinTask,入栈再出栈执行
任务的网状执行
如果任务只是链式执行,便不需要在每个CompletableFuture内部设置一个栈了,用一个指针使所有任务组成链表即可
实际上,任务不只是链式执行,而是网状执行
所有任务组成一个有向无环图
1、在每个任务的返回值里,存储了依赖它的下个执行任务,即每个任务的CompletableFuture对象的栈里,存储了该节点的出边对应的任务集合
2、对于AND逻辑,节点依赖多个上级任务,则它会被触发多次,会判断是否所有上级任务都完成,如果不是就不会执行
3、对于OR逻辑,节点依赖多个上级任务,则它会被触发多次,但只会执行一次,只要有一个上级任务完成,就会执行
4、正因为有AND和OR两种关系,因此对应了BiApply和OrApply两个对象,它们构造方法几乎一样,只是内部执行时,一个是AND逻辑,一个是OR逻辑
5、BiApply和OrApply都是二元操作符,即只能传入两个被依赖的任务,任何一个多元操作,都会被转换为多个二元操作的叠加
allOf内部的计算图
4. ForkJoinPool
用法
ForkJoinPool是JDK7提供的一种“分治算法”的多线程并行计算框架
Fork为分叉,Join为合并,相互配合,形成分治算法
也可以把ForkJoinPool看作是单机版的Map/Reduce,多个线程并行计算
相比于ThreadPoolExecutor,它可以更好地实现计算的负载均衡,提高资源利用率
利用ForkJoinPool,可以把大任务拆分为很多小任务,然后这些小任务被所有线程执行,从而实现任务计算的负载均衡
核心数据结构
与ThreadPoolExecutor不同,除一个全局的任务队列之外,每个线程还有一个自己的局部队列
成员
volatile long ctl
状态变量
WorkQueue[] workQueues
工作线程队列
ForkJoinWorkerThreadFactory factory
工作线程工厂
int indexSeed
下一个worker的下标
工作窃取队列
说明
全局队列WorkQueue并非使用BlockingQueue,而是基于一个普通的数组得以实现
该队列又名工作窃取队列,为ForkJoinPool的工作窃取算法提供服务
工作窃取算法
指一个Worker线程在执行完毕自己队列中的任务之后,可以窃取其他线程队列中的任务来执行,从而实现负载均衡,以防有的线程很空,有的线程很忙
操作
1、Worker线程自己,在队列头部通过对top指针执行加、减操作,实现入队或出队,这是单线程的
2、其他Worker线程,在队列尾部,通过对base进行累加,实现出队操作,也就是窃取,这是多线程的,需要通过CAS操作
关键点
1、整个队列是环形的,即一个数组实现的RingBuffer
Base会一直累加,不会减少,Top会累加、减少
最后Base、Top的值都会大于整个数组的长度,计算数组下标是,会取top&(queue.length-1),base&(queue.length-1)
当top-base=queue.length-1,队列为满,此时需要扩容
当top=base,队列为空,Worker线程即将进入阻塞状态
2、当队列满了之后会扩容,所以被称为是动态的
扩容
在base一端,是多线程访问的,但它们只会使base变大,即使队列中的元素变少
所以队列未满,肯定发生在top端,对top累加时,这端却是单线程的
队列的扩容恰好利用了单线程的特性,即在扩容中,不可能有线程对top进行修改
扩容之后,数组长度变成之前的二倍,但top、base的值不变,通过top、base对新数组长度取模,仍然可以定位到元素在新数组中的位置
状态控制
状态变量ctl
一个long型变量,64个比特位被分成5个部分
组成
1、AC
最高的16位,表示Active线程数-parallelism,parallelism是构造方法传入的参数
2、TC
次高的16位,表示Total线程数
3、ST
1位,如果为1,表示整个ForkJoinPool正在关闭
4、EC
15位,表示阻塞栈的栈顶线程的wait_count
5、ID
16位,表示阻塞栈的栈顶线程对应的ID
阻塞栈 Treiber Stack
要实现多个线程的阻塞、唤醒,除了park/unpark之外,还需要一个无锁链表实现的阻塞队列,把所有阻塞的线程串在一起
在ForkJoinPool中,没有使用阻塞队列,而是使用了阻塞栈,把所有空闲的Worker线程放在一个栈内,这个栈同样通过链表实现,名为Treiber_Stack
首先,WorkQueue有一个Id变量,记录了自己在WorkQueue数组中的下标,id相当于每个WorkQueue或ForkJoinWorkerThread对象的地址
其次,ForkJoinWorkerThread还有一个stackPred变量,记录了前一个阻塞线程的id,这个变量相当于链表的next指针,把所有阻塞线程串联起来,组成一个Treiber_Stack
最后,ctl的最低16位,记录了栈顶线程的id,中间15位,记录了栈顶线程被阻塞的次数,即wait_count
ctl初始值
初始时,ForkJoinPool中的线程个数为0,所以AC=0-parallelism,TC=0-parallelism
这意味着只有高32位的AC、TC填充了值,低32位都是0填充
ForkJoinWorkerThread状态和个数
ForkJoinPool只传入了一个parallelism参数,且这个参数不是实际的线程数
实际运行过程中,线程数决定于
1、空闲状态
放在Treiber_Stack中
2、活跃状态
正在执行某个ForkJoinTask,未阻塞
3、阻塞状态
正在执行某个ForkJoinTask,但阻塞了,于是调用join(),等待另一个任务的结果返回
ctl判断
高32位:u=(int)(ctl>>>32)
分为AC和TC,AC>0,说明有活跃线程;AC<=0,说明没有空闲线程,且还未超出parallelism
TC>0,说明总线程数>parallelism
低32位:c=(int)ctl
c>0,说明Treiber_Stack不为空,有空闲线程
c=0,说明没有空闲线程
Worker线程的阻塞/唤醒
说明
ForkJoinPool没有使用BlockingQueue,所以不利用其阻塞/唤醒机制,而是利用了park/unpark原语,并自行实现了Treiber_Stack
阻塞-入栈
当一个线程窃取不到任何任务,即处于空闲状态时,就会阻塞入栈
唤醒-出栈
在新任务到来之后,空闲线程被唤醒,其核心逻辑在signalWork()中
任务的提交过程
externalSubmit()
将一个可能是外部任务的子任务入队列
可以通过调用该方法的线程类型来区分任务是否内部任务
如果线程类型是ForkJoinWorkerThread,说明是线程池内部的某个线程在调用该方法,则把该方法放入该线程局部队列
否则,是外部线程在调用该方法,则将该任务加入全局队列
push(task)
内部提交任务,会放入线程的工作窃取队列中
由于工作窃取队列的特性,操作是单线程的,所以不需要执行CAS操作
externalPush(task)
外部多个线程会调用该方法,所以要加锁,入队列和扩容的逻辑和线程内部的队列基本相同
最后调用signalWork(),通知一个空闲线程来取
任务的执行过程
全局队列有任务,局部队列也有任务,每个Worker线程都会不间断地扫描这些队列,窃取任务来执行
ForkJoinTask(fork/join)
说明
对于分治算法来说,分解出来的一个个任务不是独立的,而是相互依赖,一个任务的完成需要依赖另一个前置任务的完成
这种依赖关系是通过ForkJoinPool的join()来实现的
fork()
说明
代码逻辑很简单,就是把自己放入当前线程所在的局部队列中
如果是外部线程调用fork(),则直接将任务添加到共享队列中
join()
层层嵌套阻塞原理
join会导致线程的层层嵌套阻塞,所有的任务其实组成了一个有向无环图DAG
站在ForkJoinTask角度来看,每个ForkJoinTask,都可能有多个线程在等待它完成,有1个线程在执行它,所以每个ForkJoinTask就是一个同步对象,线程调用join()时,阻塞在这个同步对象上,执行完成之后,再通过这个同步对象通知所有等待的线程
利用sychronized和wait()/notify()机制,实现了线程的等待-唤醒机制,调用join()的线程,内部其实是调用ForkJoinTask对象的wait(),执行该任务的Worker线程,在任务执行完毕之后,顺便调用notifyAll()
ForkJoinTask状态解析
要实现fork()/join()这种线程间同步,对应的ForkJoinTask一定是有各种状态的,这个状态变量是实现fork/join的基础
volatile int status
初始时,status=0,status>=0,未完成,status<0,已完成
所以通过判断status,就可以直到任务是否完成,进而决定调用join()的线程是否需要被阻塞
详细实现
getRawResult()
ForkJoinTask的一个模板方法,分别被RecursiveAction和RecursiveTask实现
doJoin()
阻塞主要发生在该方法中,其中调用t.join()的线程会阻塞,然后等待任务t执行完成,再唤醒该阻塞线程,方法返回
方法返回时,返回值就是任务的完成状态,即status
externalAwaitDone()
外部线程的阻塞过程
wt.pool.awaitJoin(w, this, 0L)
内部Worker线程的阻塞
for循环是死循环,且只有一个返回点(task.status<0),即任务完成
否则会不断自旋,若自旋还不行,会调用task.internalWait(ms)阻塞
唤醒
任务的执行发生在doExec()内,任务执行完成后,调用一个setDone()通知所有等待的线程
优雅关闭
说明
和ThreadPoolExecutor一样,ForkJoinPool的关闭也不可能是“瞬时”的,而是需要一个平滑的过渡过程
工作线程退出
对于一个Workker线程来说,它会在一个for循环中不断轮询队列中的任务,如果有任务,则执行,处在活跃状态,如果没有任务,则进入空闲等待状态
runWorker()
如果(int)ctl<0,即低32位的最高位为1,说明线程池已经进入关闭状态
但线程池进入关闭状态,不代表所有线程都会立即关闭
shutdown()/shutdownNow()
两者代码基本相同,都是调用tryTerminate(),但参数传入不同
tryTerminate()意为视图关闭ForkJoinPool,并不保证一定可以关闭成功
shutdown()只拒绝新提交的任务,shutdownNow()会取消现有的全局队列和局部队列中的任务,同时唤醒所有空闲线程,让这些线程自动退出
5. 多线程设计模式
Single Threaded Execution
说明
指“以一个线程执行”,该模式用于设置限制,以确保同一时间只能让一个线程执行处理
有时也称为“临界区”或“临界域”,Single_Threaded_Execution名称侧重于执行处理的线程,临界区或临界域侧重于执行范围
总结
SharedResource 共享资源
该模式中出现了一个发挥SharedResource作用的类,它是可以被多个线程访问的类,包含很多方法
这些方法主要分为两类
safeMethod
多个线程同时调用也不会发生问题的方法
unsafeMethod
多个线程同时访问会发生问题,因此必须加以保护的方法
本模式会保护unsafeMethod,使其同时只能由一个线程访问,java则是通过将unsafeMethod声明为synchronized来进行保护
我们将只允许单个线程执行的程序范围称为临界区
适用场景
多线程时
在单线程程序中使用synchronized并不会破坏程序的安全性,但是调用synchronized方法要比调用一般方法花费时间,稍微降低程序性能
多个线程访问时
当SharedResource角色的实例有可能被多个线程同时访问时,就需要使用本模式
即便是多线程程序,如果所有线程都是完全独立操作的,也无需使用本模式,这种状态称为线程互不干涉
某些处理多个线程的框架中,有些线程的独立性是由框架控制的,此时框架使用者就无需考虑是否使用本模式
状态有可能变化时
如果在创建实例后,实例状态再不发生变化,就无需使用本模式
需要确保安全性时
只有在需要确保安全性时,才需要使用本模式
Java的集合类大多是非线程安全的,这时为了在不需要考虑安全性时提高程序运行速度
用户在使用类时,需要考虑自己要用的类是否是线程安全的
死锁
使用Signle_Threaded_Execution时,存放发生死锁的危险
满足以下条件时,会发生死锁
存在多个SharedResource角色
线程在持有某个SharedResource角色锁的同时,还想获取其他SharedResource角色的锁
获取SharedResource角色的锁的顺序不固定
SharedResource角色是对称的
临界区大小和性能
一般情况下,Single_Threaded_Execution会降低程序性能
原因
1、获取锁花费时间
进入synchronized方法时,线程需要获取对象的锁,该处理会花费时间
如果SharedResource角色数量减少了,则要获取的锁的数量也会相应地减少,从而就能够抑制性能的下降
2、线程冲突引起的等待
当线程执行临界区内的处理时,其他想要进入临界区的线程会阻塞,这种状况称为线程冲突
发生冲突时,程序的整体性能会随着线程等待时间的增加而下降
Immutable
说明
即不变的,该模式中存在着确保实例状态不发生改变的类
在访问这些实例时不需要执行耗时的互斥处理,如果能用好该模式,就可以提高程序性能
如String就是一个不可变类,immutable
Immutable角色
Immutable角色是一个类,该角色中的字段值不可修改,也不存在修改字段内容的方法
无需对该角色应用Single_Threaded_Execution模式,无需使用synchronized关键字
适用场景
1、创建实例后,状态不再发生改变
必须是实例创建后,状态不再发生变化的
实例的状态由字段的决定,即使字段是final的且不存在setter,也有可能不是不可变的,因为字段引用的实例有可能发生变化
2、实例是共享的,且被频繁访问时
该模式优点是不需要使用synchronized关键字进行保护,意味着在不失去安全性和生存性的前提下提高性能
当实例被多个线程共享,且有可能被频繁访问时,该模式优点明显
注意
StringBuffer类表示字符串的可变类,String表示字符串的不可变类,String实例表示的字符串不可修改,执行操作的方法都不是synchronized修饰的,引用速度更快
如果需要频繁修改字符串内容,则使用StringBuffer,如果不需要修改字符串内容,只是引用,则使用String
JDK中的不可变模式
java.lang.String
java.math.BigInteger
java.math.Decimal
java.util.regex.Pattern
java.lang.Boolean
java.lang.Byte
java.lang.Character
java.lang.Double
java.lang.Float
java.lang.Float
java.lang.Integer
java.lang.Long
java.lang.Short
java.lang.Void
Guarded Suspension
说明
Guarded表示被守护、被保卫、被保护,Suspension表示暂停
本模式表示如果执行现在的处理会造成问题,就让执行处理的线程进行等待
本模式通过让线程等待来保证实例的安全性,也称为Guaded_wait、spin_lock等
GuardedObject角色
GuardedObject角色是一个持有被保护(GuardedMethod)方法的类,当线程执行GuardedMethod方法时,若守护条件成立,则立即执行,反之则等待
守护条件随之GuardedObject角色的状态不同而变化
除了GuardedMethod之外,GuardedObject角色也可以持有其他改变实例状态(stateChangeingMethod)的方法
Java中,GuardedMethod通过while语句和wait()来实现,stateChangingMethod通过notify/notifyAll方法实现
Balking
说明
所谓Balk,就是停止并返回的意思
该模式和Guarded_Suspension一样,也存在守护条件
在该模式中,如果守护条件不成立,则立即中断处理
GuardedObject角色
GuardedObject角色是一个拥有被保护的方法(GuardedMethod)的类,当线程执行GuardedMethod时,若保护条件成立,则执行实际的处理,若不成立,则不执行实际的处理,直接返回
保护条件的成立与否随着GuardedObject角色状态的改变而变动
除了GuardedMethod之外,GuardedObject角色还有可能由其他改变状态的方法(stateChangingMethod)
执行时机
不需要执行时
不需要等待守护条件成立时
Balking模式的特点就是不等待,若条件成立就执行,若不成立就不执行,立即进入下一个操作
守护条件仅在第一次成立时
当“守护条件仅在第一次成立”时,可以使用Balking模式
比如各种类的初始化操作,检查一次是否初始化了,如果初始化了就不用执行,如果没有初始化则进行初始化
Balk结果的表示
1、忽略balk
最简单的方式就是不通知调用端“发生了balk”
2、通过返回值表示balk
通过boolean值表示balk,若返回true,表示未发生balk,需要执行并执行了处理,若false,则表示发生了balk,处理已执行,不再需要执行
有时也会使用null来表示“发生了balk”
3、通过异常表示balk
有时也通过异常表示“发生了balk”,即当balk时,程序并不从方法return,而是抛异常
Balking和GuardedSuspension之间
介于“直接balk并返回”和“等待到守护条件成立为止”这两种极端之间的还有一种“在守护条件成立之前等待一段时间”,如果到时条件还未成立,则直接balk
这种操作称为计时守护或超时(timeout)
java.util.concurrent中的超时
1、通过异常通知超时
当发生超时抛出异常时,不适合使用返回值表示超时,需要使用java.util.concurrent.TimeoutExcption异常
如
java.util.concurrent.Future.get()
java.util.concurrent.Exchanger.exchange()
java.util.concurrent.CyclicBarrier.await()
java.util.concurrent.CountDownLatch.await()
2、通过返回值通知超时
当执行多次try时,则不使用异常,而使用返回值表示超时
如
java.util.concurrent.BlockingQueue接口,当offer()返回值为false,或poll()返回值为null,表示发生了超时
java.util.concurrent.Semaphore,当tryAcquire()返回值为false,表示发生了超时
java.util.concurrent.locks.Lock接口,当tryLock()返回值为false时,表示发生了超时
Producer-Consumer
说明
生产者安全地将数据交给消费者
当生产者和消费者以不同的线程运行时,两者之间的处理速度差异会有问题
生产者消费者模式用于消除线程间处理速度的差异带来的问题
在该模式种,生产者和消费者都有多个,当生产者和消费者只有一个时,称之为管道模式
角色
1、Data
Data角色由Producer角色生成,供Consumer角色使用
2、Producer
Producer角色生成Data角色,并将其传递给Channel角色
3、Consumer
Consumer角色从Channel角色获取Data角色并使用
4、Channel角色
Channel角色管理从Producer角色获取的Data角色,还负责响应Consumer角色的请求,传递Data角色,为了安全,Channel角色会对Producer角色和Consumer角色进行互斥处理
当Producer角色将Data角色传递给Channel角色时,如果Channel角色状态不能接收Data角色,则Producer角色将一致等待,直到Channel可以接收Data角色为止
当Consumer角色从Channel角色获取Data角色时,如果Channel角色状态没有可以传递的Data角色,则Consumer角色将一直等待,直到Channel角色状态转变为可以传递Data角色为止
当存在多个Producer角色Consumer角色时,Channel角色需要对它们做互斥处理
守护安全性的Channel角色
在生产者消费者模型中,承担安全守护责任的是Channel角色
Channel角色执行线程间的互斥处理,确保Producer角色正确地将Data角色传递给Consumer角色
不要直接传递
Consumer角色想要获取Data角色,通常是因为想使用这些Data角色来执行某些处理
如果Producer角色直接调用Consumer的方法,执行处理的就不是Consumer的线程,而是Producer角色的线程了
这样一来,异步处理变同步处理,会发生不同Data间的延迟,降低程序的性能
传递Data角色的顺序
1、队列
先生产后消费
2、栈
先生产后消费
3、优先队列
“优先”的先消费
Channel意义
线程的协调要考虑“放在中间的东西”,线程的互斥要考虑“应该保护的东西”
为了让线程协调运行,必须执行互斥处理,以防止共享的内容被破坏,线程的互斥处理时为了线程协调运行而执行的
JUC包和该模式
JUC中提供了BlockingQueue接口及其实现类,相当于Producer-Consumer模式中的Channel角色
如
BlockingQueue接口
阻塞队列
ArrayBlockingQueue
基于数组的BlockingQueue
LinkedBlockingQueue
基于链表的BlockingQueue
PriorityBlockingQueue
带有优先级的BlockingQueue
DelayQueue
一定时间后才可以take的BlockingQueue
SynchronousQueue
直接传递的BlockingQueue
ConcurrentLinkedQueue
元素个数没有最大限制的线程安全队列
Read-Write Lock
说明
当线程读取实例的状态时,实例的状态不会发生变化,实例的状态仅在线程执行写入操作时才会发生变化
从实例状态变化来看,读取和写入有本质的区别
本模式中,读取操作和写入操作分开考虑,在执行读取操作之前,线程必须获取用于读取的锁,在执行写入操作之前,线程必须获取用于写入的锁
可以多个线程同时读取,读取时不可写入
当线程正在写入时,其他线程不可以读取或写入
一般来说,执行互斥会降低程序性能,如果把写入的互斥和读取的互斥分开考虑,则可以提高性能
角色
Reader
该角色对共享资源角色执行读取操作
Writer
该角色对共享资源角色执行写操作
ShardResource
共享资源角色,表示Reader和Writer共享的资源
提供了不修改内部状态的操作(读取)和修改内部状态的操作(写)
ReadWriteLock
读写锁提供了共享资源角色实现读操作和写操作时需要的锁
要点
1、利用读取操作的线程之间不会冲突的特性来提高程序性能
该模式利用了读操作的线程之间不会冲突的特性
由于读取操作不会修改共享资源的状态,所以彼此之间无需加锁
因此,多个Reader角色同时执行读取操作,从而提高程序性能
2、适合读取操作负载较大的情况
如果单纯使用SingleThreadedExecution模式,则read也只能运行一个线程
如果read负载很重,可以使用Read-Write Lock模式
3、适合少写多读
Read-Write_Lock模式优点是Reader之间不会冲突
如果写入很频繁,Writer会频繁停止Reader的处理,也就无法体现出该模式的优势了
锁的含义
synchronized可以用于获取实例的锁,Java中同一个对象锁不能由两个以上的线程同时获取
用于读取的锁和用于写入的锁与使用synchronized获取的锁是不一样的,开发者可以通过修改代码来改变锁的运行
JUC包的Read-Wirte Lock模式
JUC包提供了已实现Read-Write_Lock模式的ReadWriteLock接口和ReetrantReadWriteLock类
ReadWriteLock接口的功能在于该接口用于读取的锁和写入的锁是通过其他对象来实现的
ReetrantReadWriteLock特征
公平性
当创建该类实例时,可以选取锁的获取顺序是否要设置为fair,如果创建的实例是公平的,则等待时间久的线程将可以优先获取锁
可重入性
该类的锁是可重入的,Reader角色的线程可以获取用于写入的锁,Write角色的线程可以获取用于读取的锁
锁降级
该类可以按照以下顺序将用于写入的锁降级为用于读取的锁
获取用于写入的锁
获取用于读取的锁
释放用于写入的锁
用于读取的锁不能升级为用于写入的锁
快捷方法
该类提供了获取等待中的线程个数的方法getQueueLength()
以及检查是否获取了用于写入锁的方法isWriteLocked()
Thread-Per-Message
说明
该模式可以理解为“每个消息一个线程”,消息这里可以理解为命令或请求,每个命令或请求分配一个线程,由这个线程来处理
该模式中,消息的委托方和执行方是不同的线程
角色
Client
Client向Host发起请求,而不用关心Host如何实现该请求处理
Host
Host收到Client请求后,创建并启用一个线程,新建的线程使用Helper来处理请求
Helper
Helper为Host提供请求处理的功能,Host创建的新线程调用Helper
要点
1、提高响应性,缩短延迟时间
该模式能够提高与Client对应的Host的响应性,降低延迟时间
尤其是当handle操作非常耗时或handle操作需要等待输入/输出时,效果很明显
为了缩短线程启动花费的时间,可以使用Worker_Thread模式
2、适用于操作顺序没有要求
在该模式中,handle()并不一定按照request()的调用顺序来执行
3、适用于不需要返回值
在该模式中,request()并不会等待handle()的执行结束,request得不到handle的结果
当需要获取操作结果时,可以使用Future模式
4、应用于服务器
JUC中的Thread-Per-Message模式
java.lang.Thread
最基本的创建、启动线程的类
java.lang.Runnable
线程锁执行的任务接口
java.util.concurrent.ThreadFactory
将线程创建抽象化的接口
java.util.concurrent.Executors
用于创建实例的工具类
java.util.concurrent.Executor
将线程执行抽象化的接口
java.util.concurrent.ExecutorService
将被复用的线程抽象化的接口
java.util.concurrent.ScheduledExecutorService
将被调用线程的执行抽象化的接口
Worker Thread
说明
在该模式中,工人线程会逐个取回工作并进行处理,当所有工作全部完成后,工人线程会等待新的工作到来
该模式也被称为Background_Thread模式,或Thread_Pool模式
角色
Client 委托者
Client创建Request并将其传递给Channel
Channel
Channel接收来自Client的Request,并将其传递给Worker
Worker
Woker从Channel中获取Request,并执行其逻辑
当一项工作结束后,继续从Channel获取另外的Request
Request
Request表示工作,其中保存了工作的逻辑
优点
1、提高吞吐量
如果将工作交给其他线程,当前线程就可以处理下一项工作,称为Thread-Per-Message模式
由于启动新线程消耗时间,可以通过Worker_Thread模式轮流和反复地使用线程来提高吞吐量
2、容量控制
Worker的数量可以通过传递参数指定
Worker越多,可以并发处理的逻辑越多,同时增加Worker会增加消耗的资源,必须根据程序时机运行环境调整Worker的数量
3、调用与执行分离
该模式和Thread-Per-Message模式一样,方法的调用和执行是分开的
这样可以
提高响应速度
控制执行顺序,因为执行不受调用顺序的制约
可以取消和反复执行
进行分布式部署,通过网络将Request发送到其他Worker计算节点进行处理
4、Runnable接口的意义
该接口有时用于Worker_Thread模式的Request角色,即可以创建Runnable接口的实现类对象表示业务逻辑,然后传递给Channel
Runnable对象可以作为方法参数,可以放到队列中,可以跨网络传输,也可以保存到文件中,如此则Runnable不论传输到哪个计算节点,都可以执行
5、多态的Request角色
Channel接收到的只是Request实例,但WrokerThread并不知道Request类的详细信息
即使传递的是Request的子类给Channel,WorkerThread也可以正常执行
通过Request的多态,可以增加任务的种类,而无需修改Channel和Worker
JUC包的Worker_Thread
ThreadPoolExecutor
该类是管理Worker线程的类,可以轻松实现Worker_Thread模式
通过JUC创建线程池
java.util.concurrent.Executors
Future
说明
假设一个方法需要长时间执行才能获取结果,则一般不会让调用的程序等待,而是先返回它一张“提货卡”,获取提货卡并不消耗很多时间,该“提货卡”就是Future角色
获取Future角色的线程稍后使用Future角色来获取运行结果
角色
Client
Client向Host发出请求,并立即接收到请求的处理结果——VirtualData,也即Future
Client不必直到返回值是RealData还是Future,稍后通过VirtualData来操作
Host
Host创建新的线程,由新线程创建RealData,同时Host将Future返回给Client
VirtualData
是让Future与RealData具有一致性的角色
RealData
表示真实数据,创建该对象需要花费很多时间
Future
即RealData的“提货单”,由Host传递给Client
对Client而言,Future就是VirtualData,当Client操作Future是线程会wait,直到RealData创建完成
Future将Client的操作委托给RealData
要点
1、使用Thread-Per-Message模式,可以提高程序响应性,但不能获取结果,Future模式也可以提高程序响应性,还可以获取处理结果
2、利用Future模式异步处理特性,可以提高程序吞吐量,虽然并没有减少业务处理的时长,但是如果考虑到I/O,当程序进行磁盘操作时,CPU只是处理等待状态,CPU有空闲时间处理其他的任务
3、准备返回值和使用返回值的分离
4、如果想等待处理完成后获取返回值,还可以考虑采用回调处理方式
当处理完成后,由Host启动的线程调用Client的方法,进行结果的处理,此时Client中的方法需要线程安全地传递返回值
JUC包的Future模式
java.util.concurrent.Callable
该接口将“返回值的某种处理调用”抽象化了
它声明了call(),类似于Runnable.run(),但是call()有返回值
java.util.concurrent.Future
声明了get()来获取结果,但是没有声明设置值的方法
声明设置值的方法需要在Future接口的实现类中声明
除了get(),Future接口还声明了用于中断运行的cancel()
java.util.concurrent.FutureTask
实现了Future接口的标准类,声明了用于获取值的get(),用于中断运行的cancel(),用于设置值的set(),以及用于设置异常的setException()
由于实现了Runnable接口,还声明了run()
0 条评论
下一页
图形选择
思维导图
主题
补充说明
AI生成
提示
关闭后当前内容将不会保存,是否继续?
取消
确定