Java并发思维导图(含面试问题整理)
2021-02-08 12:16:39 614 举报
AI智能生成
Java并发思维导图涵盖了Java多线程编程的各个方面,包括线程的创建、同步与互斥、线程间通信、线程池等核心概念。此外,还涉及了面试中常见的问题,如synchronized关键字的作用、ReentrantLock与synchronized的区别、volatile关键字的作用、生产者消费者问题等。通过学习这个思维导图,可以帮助开发者更好地理解Java并发编程的原理和技术,提高编写高效、稳定并发程序的能力。同时,也为面试者提供了一个全面、系统的复习资料,帮助他们在面试中脱颖而出。
作者其他创作
大纲/内容
Java线程
创建线程有哪几种方式?
继承 Thread 类
定义一个Thread类的子类继承Thread类,重写run方法
实现 Runnable 接口
Thread继承和Runnable接口的区别
任务与线程分离(作为参数传入Thread)
用 Runnable 让任务类脱离了 Thread 继承体系,更灵活
用 Runnable 更容易与线程池等高级 API 配合
FutureTask 配合 Thread
Callalbe参数传入FutureTask,支持返回结果,调用FutureTask.get();
FutureTask传入Thread,Thread启动线程。
FutureTask传入Thread,Thread启动线程。
调用:此方法会阻塞主进程的向下执行,同时等待FutureTask线程执行完成
不调用:不阻塞
说一下runnable和callable的区别
相同点
都是单方法接口@FunctionalInterface修饰,可以用lambda表达式
都可以编写多线程程序
都采用Thread.start()启动线程
不同点
有无返回值
Runnable 接口 run 方法无返回值
Callable 接口 call 方法有返回值(泛型,和Future、FutureTask配合使用)可获取异步执行的结果
是否可以捕获异常信息
Runnable 接口 run 方法只能抛出运行时异常,且无法捕获处理
Callable 接口 call 方法允许抛出异常,可以获取异常信息
使用 Executors 工具类创建线程池
线程上下文切换(Thread Context Switch)
时间片轮转,当前线程保存再加载记一次上下文切换
保存恢复现场,程序计数器参与工作
哪些动作会导致上下文切换?
线程的 cpu 时间片用完
垃圾回收(Stop the world)
有更高优先级的线程需要运行
线程自己调用了 sleep、yield、wait、join、park、synchronized、lock 等方法
频繁的上下文切换会影响性能
如:单个CPU执行垃圾回收任务时使用Serial GC ,不使用Parallel GC
选择合适的线程数
如何在 Windows 和 Linux 上查找哪个线程cpu利用率最高?
windows
任务管理器
tasklist
linux
top
Shift+p,cpu利用率最高PID
top -H -p pid
用jstack 排查死锁
线程常用方法
说说线程start()和线程run()方法的区别
start() 方法用于启动线程
start() 只能调用一次
若调用多次,会出现IllegalThreadStateException
run() 方法用于执行线程运行时代码
run() 可以重复调用
为什么我们调用 start() 方法时会执行 run() 方法?
new 一个 Thread,线程进入新建状态
调用 start() 方法,会启动一个线程并进入就绪状态
分配到时间片后开始运行
自动执行 run() 方法的内容,实现多线程工作
为什么我们不能直接调用 run() 方法?
直接执行 run() 方法,会把 run 方法当成一个 main 线程下的普通方法去执行,并不会在某个线程中执行它,所以这并不是多线程工作。
getState()
获取线程状态
说说线程的状态有哪些?
Java的Enum,线程 Thread t
NEW
调用 t.start(),NEW --> RUNNABLE
RUNNABLE
可运行
运行
阻塞(操作系统阻塞、IO阻塞)
WAITING
BLOCKED
TIMED_WAITING
TERMINATED
结束状态
当前线程所有代码运行完毕,进入 TERMINATED
RUNNABLE <--> WAITING
t 线程用 synchronized(obj) 获取了对象锁后
调用 obj.wait() 方法时,进入waitSet,t 线程从 RUNNABLE --> WAITING
调用 obj.notify() , obj.notifyAll() , t.interrupt() 时
竞争锁成功,t 线程从 WAITING --> RUNNABLE
竞争锁失败,t 线程从 WAITING --> BLOCKED(EntryList)
当前线程调用 t.join() 方法时,当前线程从 RUNNABLE --> WAITING
t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 WAITING --> RUNNABLE
当前线程调用 LockSupport.park() 方法会让当前线程从 RUNNABLE --> WAITING
调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,会让目标线程从 WAITING --> RUNNABLE
RUNNABLE <--> BLOCKED
t 线程用 synchronized(obj) 获取了对象锁时如果竞争失败,从 RUNNABLE --> BLOCKED(EntryList)
持 obj 锁线程的同步代码块执行完毕,会唤醒该对象上所有 BLOCKED 的线程重新竞争,
如果其中 t 线程竞争成功,从 BLOCKED --> RUNNABLE ,其它失败的线程仍然 BLOCKED
如果其中 t 线程竞争成功,从 BLOCKED --> RUNNABLE ,其它失败的线程仍然 BLOCKED
RUNNABLE <--> TIMED_WAITING
t 线程用 synchronized(obj) 获取了对象锁后
调用 obj.wait(long n) 方法时,t 线程从 RUNNABLE --> TIMED_WAITING
t 线程等待时间超过了 n 毫秒,或调用 obj.notify() , obj.notifyAll() , t.interrupt() 时
竞争锁成功,t 线程从 TIMED_WAITING --> RUNNABLE
竞争锁失败,t 线程从 TIMED_WAITING --> BLOCKED(EntryList)
当前线程调用 t.join(long n) 方法时,当前线程从 RUNNABLE --> TIMED_WAITING
当前线程等待时间超过了 n 毫秒,或t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从TIMED_WAITING --> RUNNABLE
当前线程调用 Thread.sleep(long n) ,当前线程从 RUNNABLE --> TIMED_WAITING
当前线程等待时间超过了 n 毫秒,当前线程从 TIMED_WAITING --> RUNNABLE
当前线程调用 LockSupport.parkNanos(long nanos) 或 LockSupport.parkUntil(long millis) 时,
当前线程从 RUNNABLE --> TIMED_WAITING
当前线程从 RUNNABLE --> TIMED_WAITING
调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,或是等待超时,
会让目标线程从TIMED_WAITING--> RUNNABLE
会让目标线程从TIMED_WAITING--> RUNNABLE
操作系统层面
新建(new)
线程刚被创建,还没有调用 start() 方法
可运行(runnable)
线程对象创建后,线程调用 start()方法,进入就绪状态,等待线程调度cpu执行
join()等待执行状态
运行(running)
可运行状态(runnable)的线程获得了cpu时间片(timeslice),执行程序代码
阻塞(block)
处于运行状态中的线程由于某种原因,暂时放弃对 CPU的使用权,停止执行,此时进入阻塞状态
直到其进入到就绪状态,才有机会再被 CPU 调用进入运行状态
Monitor对象的EntryList(BLOCKED),waitSet(WAITING)
死亡(dead)
sleep(long n)
调用 sleep 会让当前线程从 Running 进入 Timed Waiting 状态(阻塞)
TimeUnit.SECONDS.sleep(1)可读性更好
让当前执行的线程休眠n毫秒,休眠时让出 cpu的时间片给其它线程
限制对CPU的使用有哪几种方式?
在没有利用 cpu 来计算时,不要让 while(true) 空转浪费 cpu,
这时可以使用 yield 或 sleep 来让出 cpu 的使用权给其他程序
这时可以使用 yield 或 sleep 来让出 cpu 的使用权给其他程序
sleep 适用于无需锁同步的场景
wait 或 条件变量
都需要加锁
需要相应的唤醒操作
一般适用于要进行同步的场景
wait
条件变量
限制对共享资源的使用
semaphore 实现
Thread 类中的 yield 方法有什么作用?
让线程从 Running (执行状态)进入 Runnable 就绪状态
就绪状态,等待时间片分配(可能立即分配)
线程的 sleep()方法和 yield()方法有什么区别?
sleep():执行状态-->阻塞状态
任务调度器不分配时间片给阻塞状态
给低优先级线程运行机会
声明抛出 InterruptedException
yield():执行状态-->就绪状态
任务调度器分配时间片给就绪状态
只会给相同优先级或更高优先级的线程以运行机会
没有声明任何异常
为什么 Thread 类的 sleep()和 yield ()方法是静态的?
在当前正在执行的线程上调用
避免其他非运行线程调用这些方法
同步
同步是由于线程执行的先后、顺序不同、需要一个线程等待其它线程运行到某个点
需要等待结果
join 实现同步
等待线程运行结束,获取结果
需要外部共享变量,不符合面向对象封装的思想
必须等待线程结束,不能配合线程池使用
Future 实现同步
对变量进行了封装
可以方便配合线程池使用
get 方法让调用线程同步等待
自定义实现同步
模式篇:保护性暂停模式
Java 中 interrupted 和 isInterrupted 方法的区别?
interrupt()
阻塞状态打断:sleep、wait、join
监视线程中断状态,抛出interruptedException异常
打断状态置false
正常程序的打断
打断状态置true
Thread.currentThread().isInterrupted()来控制正常程序是否退出
两阶段终止模式
利用 isInterrupted
取代stop()方法
立即停止线程,线程资源未被释放,容易造成死锁
利用停止标记,停止标记用 volatile修饰,保证该变量在多个线程之间的可见性
interrupted()
静态方法
清除打断标记
查看当前中断信号是true还是false
如果一个线程被中断,第一次调用 interrupted 则返回 true,之后的打断标记为 false
Thread.interrupted()
isInterrupted()
非静态方法
不会清除 打断标记
Thread.currentThread().isInterrupted()
守护线程和用户线程有什么区别呢?
用户线程(main、其他thread):有一个线程未结束,JVM就不结束
守护线程:一旦所有用户线程都结束运行,守护线程会随 JVM 一起结束工作
setDaemon(true)必须在start()方法前执行,否则会抛出 IllegalThreadStateException 异常
在守护线程中产生的新线程也是守护线程
垃圾回收器线程就是一种守护线程
Tomcat 中的 Acceptor 和 Poller 线程都是守护线程,所以 Tomcat 接收到 shutdown 命令后,不会等
待它们处理完当前请求
待它们处理完当前请求
不是所有的任务都可以分配给守护线程来执行,比如读写操作或者计算逻辑
守护 (Daemon) 线程中不能依靠 finally 块的内容来确保执行关闭或清理资源的逻辑
一旦所有用户线程都结束运行,守护线程会随 JVM 一起结束工作,所以守护 (Daemon) 线程中的 finally 语句块可能无法被执行。
共享模型
Java如何实现线程安全?
synchronized锁住对象(this / Test.class)
ReentrantLock
CAS乐观锁
JUC(Java Util Concurrent)包下的Atomic,比如AtomicInteger
TLAB
ThreadLocal Allocate Buffer
Java里的锁,有哪几种?
Synchronized
ReentrantLock
读写锁 ReentrantReadWriteLock
共享锁:允许多个线程持有读锁
独占锁:同一时刻只允许一个线程进行写操作,且读写互斥
Semaphore信号量
CountdownLatch
CyclicBarrier
讲讲你怎么理解synchronized和volatile?
Synchronized
线程锁记录、对象头、轻量级锁、偏向锁、自旋优化、锁膨胀、重量级锁
Volatile
内存屏障,讲讲你怎么理解的?
写指令后加入写屏障,保证屏障前共享变量修改同步到主存;
读指令前加入读屏障,保证读取到的共享变量是最新值;
Volatile保证可见性、有序性(防止本线程内指令重排)
为什么volatile不能保证原子性?
写屏障只保证之后的读能读到最新值,不能保证读跑到前面去,不能解决指令交错。
都是什么场景下使用
管程Monitor
共享问题
一段代码块内如果存在对共享资源的多线程读写操作,称这段代码块为临界区
解决
悲观互斥
互斥是保证临界区的竞态条件发生,同一时刻只能有一个线程执行临界区代码
synchronized
增加临界区操作上的原子性
乐观重试
CAS
线程安全分析
变量的线程安全分析
成员变量和静态变量是否线程安全?
没有共享,则线程安全
被共享,根据状态是否能够改变,又分两种情况
只读操作,则线程安全
有读写操作,则这段代码为临界区,需要考虑线程安全
局部变量是否线程安全?
局部变量是线程安全的
局部变量引用的对象
没有逃离方法的作用范围,线程安全
该对象逃离方法的作用范围,需要考虑线程安全
线程安全类
线程安全的是指,多个线程调用它们同一个实例的某个方法时,是线程安全的。
每个方法是原子的
多个方法的组合不是原子的
不可变类
如果一个对象不能够修改其内部状态(属性),那么它就是线程安全的,因为不存在并发修改
内部的状态不可以改变,final,private修饰
说说为什么使用final?
属性用 final 修饰保证了该属性是只读的,不能修改
类用 final 修饰保证了该类中的方法不能被覆盖,防止子类破坏父类行为
private 或 final 提供【安全】,开闭原则中的【闭】
final原理
全局常量:static final 进行修饰
每个全局常量在编译阶段被分配。
在编译期将结果放入常量池
final int a = 20;
final 变量的赋值通过 putfield 指令来完成,在这条指令之后加入写屏障,保证在其它线程读到它的值时不会出现为 0 的情况
保证写屏障之前的指令不会重排到写屏障后面去。
对写屏障之前的操作同步到主存,保证变量可见性
public,子类方法覆盖,线程不安全
String
replace,substring如何保证线程安全?
构造新字符串对象时,会生成新的 char[] value,对内容进行复制
保护性拷贝
若直接赋值,String引用与外引用共用一个数组,无法保证不可变性。
Integer
日期格式化类
SimpleDateFormat可变类
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
sdf.parse("1951-04-21")
DateTimeFormatter不可变类
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd");
LocalDate date = dtf.parse("2018-10-01", LocalDate::from);
This class is immutable and thread-safe.
享元模式
保证不可变性需要频繁创建对象,重用对象使用享元模式
Boolean,Byte,Short,Integer,Long,Character 等包装类提供了 valueOf 方法
Byte, Short, Long 缓存的范围都是 -128~127
Character 缓存的范围是 0~127
Boolean 缓存了 TRUE 和 FALSE
BigDecimal BigInteger
本身的原子操作是线程安全的,组合起来获取值设置值是不安全的。
案例
Integer i1 = 40;
Integer i2 = 40;//在-128~127之间,对象重用
Integer i3 = 0;
Integer i4 = new Integer(40);
Integer i5 = new Integer(40);
Integer i6 = new Integer(0);
System.out.println("i1==i2\t" + (i1 == i2));//true (i1、i2等于400为false)
System.out.println("i1==i2+i3\t" + (i1 == i2 + i3));//true
System.out.println("i4==i5\t" + (i4 == i5));//false
System.out.println("i4==i5+i6\t" + (i4 == i5 + i6));//true
Integer i2 = 40;//在-128~127之间,对象重用
Integer i3 = 0;
Integer i4 = new Integer(40);
Integer i5 = new Integer(40);
Integer i6 = new Integer(0);
System.out.println("i1==i2\t" + (i1 == i2));//true (i1、i2等于400为false)
System.out.println("i1==i2+i3\t" + (i1 == i2 + i3));//true
System.out.println("i4==i5\t" + (i4 == i5));//false
System.out.println("i4==i5+i6\t" + (i4 == i5 + i6));//true
“==”比较的是两个对象是否为同一个内存地址。
Integer的默认范围是 -128~127
当基本类型的值未超出常量数组的范围,则返回常量数组中的对象,否则会创建新对象
线程池应用
其他
StringBuffer
Random
Vector
Hashtable
java.util.concurrent 包下的类
synchronized
synchronized只锁对象,不是对方法上锁
class Test{
public synchronized void test() {}}
public synchronized void test() {}}
等价于
class Test{
public void test() {
synchronized(this) {}}}
public void test() {
synchronized(this) {}}}
class Test{
public synchronized static void test() {}}
public synchronized static void test() {}}
等价于
class Test{
public static void test() {
synchronized(Test.class) {}}}
public static void test() {
synchronized(Test.class) {}}}
线程八锁分析
底层原理(字节码)
monitorenter
将 lock对象 MarkWord 置为 Monitor 指针
monitorexit
将 lock对象 MarkWord 重置, 唤醒 EntryList
加锁释放锁期间出现异常
由程序计数器跳转,负责释放锁来防止死锁
加Synchronized是重量级锁,加锁之后关联操作系统的monitor对象,性能低,耗用资源大。在JDK6之后对锁进行优化。
没有竞争资源,加锁浪费资源,轻量级锁
有竞争资源,轻量级锁升级为重量级锁
偏向某个线程使用资源,偏向锁。若有其他线程使用资源,升级为轻量级锁。
批量重填项:一个类的偏向锁撤销到达 20 阈值,资源交给另外一个类
轻量级锁
使用场景
加锁时间交错,不产生竞争
说说轻量级锁的流程
线程栈帧包含锁记录结构,内部存储:锁定对象的Mark Word、对象引用
让锁记录中对象引用指向锁对象,并尝试用 cas 替换锁定对象的 Mark Word,
将 Mark Word 的值存入锁记录
将 Mark Word 的值存入锁记录
如果 cas 替换成功,对象头中存储锁记录地址和状态00 ,表示由该线程给对象加锁
如果 cas 失败,有两种情况
如果是其它线程已经持有了该对象的轻量级锁,这时表明有竞争,进入锁膨胀过程(产生竞争,升级重量级锁)
如果是自己执行了 synchronized 锁重入,那么再添加一条 Lock Record 作为重入的计数
当退出 synchronized 代码块(解锁时)如果有取值为 null 的锁记录,表示有重入,这时重置锁记录,表示重入计数减一
当退出 synchronized 代码块(解锁时)锁记录的值不为 null,这时使用 cas 将 Mark Word 的值恢复给对象头
成功,则解锁成功
失败,说明轻量级锁进行了锁膨胀或已经升级为重量级锁,进入重量级锁解锁流程
锁膨胀
尝试加轻量级锁,CAS失败
一种情况:如果是其它线程已经持有了该对象的轻量级锁,这时表明有竞争,进入锁膨胀过程(产生竞争,升级重量级锁)
说说锁膨胀流程
当 Thread-1 进行轻量级加锁时,Thread-0 已经对该对象加了轻量级锁
这时 Thread-1 加轻量级锁失败,进入锁膨胀流程
为对象申请 Monitor 锁,让对象指向重量级锁地址
Thread-1进入 Monitor 的 EntryList 进行阻塞BLOCKED
Monitor的owner指向Thread-0
当 Thread-0 退出同步块解锁时,使用 cas 将 Mark Word 的值恢复给对象头,失败
进入重量级解锁流程,即按照 Monitor 地址找到 Monitor 对象,设置 Owner 为 null,唤醒 EntryList 中 BLOCKED 线程
自旋优化
重量级锁竞争时自旋优化,如果当前线程自旋成功(即持锁线程退出同步块,释放锁),当前线程就可以避免阻塞。
阻塞需要上下文切换,消耗性能
适合多核CPU
单核 CPU 自旋占用 CPU 时间
自旋重试
判断owner是否为null
自适应
对象刚刚自旋操作成功过,那么认为这次自旋成功的可能性会高,就多自旋几次
反之,就少自旋甚至不自旋
偏向锁
使用场景
偏向锁适用只有一个线程多次访问资源
轻量级锁在没有竞争时(线程只有自身),每次重入仍然需要执行 CAS 操作。
Java 6 中引入偏向锁来进一步优化
只有第一次使用 CAS 将线程 ID 设置到对象的 Mark Word ,
之后发现这个线程 ID 是自己的就表示没有竞争,不用重新 CAS。以后只要不发生竞争,这个对象就归该线程所有
之后发现这个线程 ID 是自己的就表示没有竞争,不用重新 CAS。以后只要不发生竞争,这个对象就归该线程所有
批量重偏向
对象虽然被多个线程访问,但没有竞争,这时偏向了线程 T1 的对象仍有机会重新偏向 T2,重偏向会重置对象的 Thread ID
当撤销偏向锁阈值超过 20 次后,jvm 会这样觉得,我是不是偏向错了呢,于是会在给这些对象加锁时重新偏向至加锁线程
批量撤销
当撤销偏向锁阈值超过 40 次后,jvm 会这样觉得,自己确实偏向错了,根本就不该偏向。
于是整个类的所有对象都会变为不可偏向的,新建的对象也是不可偏向的
于是整个类的所有对象都会变为不可偏向的,新建的对象也是不可偏向的
锁消除
局部变量不会逃离方法,对当前局部变量加锁无意义。JIT:java即时编译器,进行锁消除优化
管程Monitor
java对象头都包含哪些内容?
32位虚拟机
普通对象
Object Header (64 bits)
64位,8个字节
Integer类型:8+4(int)=12个字节
int类型占4个字节
Integer类型:8+4(int)=12个字节
int类型占4个字节
Mark Word (32 位)
哈希值hashcode
GC分代年龄age
锁状态标记位lock
偏向锁
偏向锁标记biased_lock
偏向线程IDThread
偏向时间戳epoch
轻量级锁
指向栈中锁记录的指针ptr_to_lock_record
重量级锁
指向线程Monitor的指针ptr_to_heavyweight_monitor
Klass Word (32 bits)
类信息:如Student类、Teacher类
数组对象
多了array length(32bits)数组长度记录
64位虚拟机
每个 Java 对象都可以关联一个 Monitor 对象
Monitor结构
刚开始 Monitor 中 Owner 为 null
当 Thread-2 执行 synchronized(obj) 就会将 Monitor 的所有者 Owner 置为 Thread-2,Monitor中只能有一个 Owner
在 Thread-2 上锁的过程中,如果 Thread-3,Thread-4,Thread-5 也来执行 synchronized(obj),就会进入EntryList BLOCKED
Thread-2 执行完同步代码块的内容,然后唤醒 EntryList 中等待的线程来竞争锁,竞争的时是非公平的
图中 WaitSet 中的 Thread-0,Thread-1 是之前获得过锁,但条件不满足进入 WAITING 状态的线程,配合wait-notify 操作
如果使用 synchronized 给对象上锁(重量级),
该对象头的Mark Word 中会设置指向 Monitor 对象的指针(30位)后2位存状态(10)
该对象头的Mark Word 中会设置指向 Monitor 对象的指针(30位)后2位存状态(10)
wait & notify
Synchronized
Owner 线程发现条件不满足,调用 wait 方法,即可进入 WaitSet 变为 WAITING 状态
BLOCKED 和 WAITING 的线程都处于阻塞状态(操作系统层面),不占用 CPU 时间片
BLOCKED 线程会在 Owner 线程释放锁时唤醒
WAITING 线程会在 Owner 线程调用 notify 或 notifyAll 时唤醒,但唤醒后并不意味者立刻获得锁,仍需进入EntryList 重新竞争(可运行状态)
说说sleep(long n) 和 wait(long n) 的区别
sleep 是 Thread 方法,而 wait 是 Object 的方法
wait 需要和 synchronized 一起使用
sleep 在睡眠的同时,不会释放对象锁的,但 wait 在等待的时候会释放对象锁
线程状态相同,TIMED_WAITING进入阻塞状态
waitSet有多个线程,notify(),存在虚假唤醒问题
解决
notifyAll与while条件+wait联用解决虚假唤醒问题
Park & Unpark
暂停当前线程
LockSupport.park();
恢复某个线程的运行
LockSupport.unpark(暂停线程对象)
与wait & notify的区别
wait,notify 和 notifyAll 必须配合 Object Monitor 一起使用,而 park,unpark 不必
park & unpark 是以线程为单位来【阻塞】和【唤醒】线程,
而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程,就不那么【精确】
而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程,就不那么【精确】
park & unpark 可以先 unpark,而 wait & notify 不能先 notify
同步模式:保护性暂停Guarded Suspension
要点
有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
JDK 中,join 的实现、Future 的实现,采用的就是保护性暂停模式
要等待另一方的结果,因此归类到同步模式
同步,只要产生结果,可以立即消耗
异步模式:生产者/消费者BoundedBufferProblem
异步,产生结果后不一定立即被消耗
要点
与保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
消费队列可以用来平衡生产和消费的线程资源
生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
解耦
JDK 中各种阻塞队列,采用的就是这种模式
代码
Code实现生产者和消费者,一个长度100的buffer,10个生产者线程,10个消费者线程
判定条件的while能不能换成if,为什么?
自旋锁
为什么用notifyAll(signalAll),可不可以换成notify(signal),二者有什么区别?
随机唤醒一个/全部唤醒进行资源竞争
仅通知一个线程/所有线程均收到通知
notifyAll与while ,wait联用解决虚假唤醒问题
活跃性
多把锁:将锁的粒度细分
好处,增强并发度
坏处,如果一个线程需要同时获得多把锁,易发生死锁
死锁
定位死锁有哪些方式?
使用 jps 定位进程 id
再用 jstack 定位死锁,如 jstack 33200(id)
jconsole工具
线程
检测死锁
哲学家就餐问题
活锁
活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束
活锁和死锁的区别:相互抢占资源(死锁),活锁资源在使用,但无法结束
解决:增加随机睡眠时间
饥饿
一个线程由于优先级太低,始终得不到 CPU 调度执行,也不能够结束
ReentrantLock
说说和synchronized的区别
相同点
都支持可重入
不同点
加锁方式
Sychronized是关键字方式,对象加Monitor
创建对象方式加锁,成对出现
基本语法
可以设置超时时间
Synchronized无法获取超时时间(主动打断)
可中断
Synchronized不可中断(被动打断)
支持可打断设置:lock.lockInterruptibly();
支持多个条件变量
Synchronized只有一个条件变量,不满足条件,进入waitset
先有synchronized再wait、notify、notifyall
ReentrantLock多个条件变量使用要点
await 前需要获得锁
await 执行后,会释放锁,进入 conditionObject 等待
await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁
竞争 lock 锁成功后,从 await 后继续执行
语法lock.newCondition()
可以设置为公平锁,防止饥饿
Syn不会记录阻塞顺序
公平锁一般没有必要,会降低并发度
Synchronized优化前性能比ReenTrantLock差,Synchronized优化后(1.6)引入偏向锁,轻量级锁(自旋锁)后,两者的性能差不多。
可重入锁是什么,非可重入锁又是什么?(可重入是如何实现的)
某个线程已经获得某个锁,可以再次获取锁而不会出现死锁。
Synchronized、ReentrantLock 都是支持可重入的
不可重入锁,那么第二次获得锁时,自己会被锁挡住,嵌套调用会产生死锁。
实现:判断是否上锁的同时,需要计数记录上锁次数
当某个线程获取ReentrantLock失败时,是否会从内核态切换回用户态?
不会
为什么线程切换会导致用户态与内核态的切换?
因为线程的调度是在内核态运行的,而线程中的代码是在用户态运行。
ReentrantLock如何存储阻塞的线程的?什么是自旋锁?
自旋锁不会引起调用者睡眠(在内核态)。如果自旋锁已经被别的进程保持,调用者就轮询(不断的消耗CPU的时间)是否该自旋锁的保持者已经释放了锁。
(synchronized的优化、ReenTrantLock的CAS。试图在用户态就把加锁问题解决,避免进入内核态的线程阻塞。
避免线程进入内核的阻塞状态是分析和理解锁设计的关键钥匙)
避免线程进入内核的阻塞状态是分析和理解锁设计的关键钥匙)
竞争锁失败后先进行短暂的自旋,再挂起(上下文切换)
阻塞进入AQS队列,符合先进先出规则;
(AQS,不断轮询前一个结点状态是否发生改变;state实现可重入)
(非公平锁相对公平锁上下文切换的次数更少,提高并发性;
非公平锁:队首线程与新线程竞争锁,在队首线程唤醒过程中当前新线程竞争锁成功释放后,唤醒线程再来竞争(双赢))
阻塞进入AQS队列,符合先进先出规则;
(AQS,不断轮询前一个结点状态是否发生改变;state实现可重入)
(非公平锁相对公平锁上下文切换的次数更少,提高并发性;
非公平锁:队首线程与新线程竞争锁,在队首线程唤醒过程中当前新线程竞争锁成功释放后,唤醒线程再来竞争(双赢))
Java 内存模型
JMM 即 Java Memory Model,它定义了主存、工作内存抽象概念,底层对应着 CPU 寄存器、缓存、硬件内存、CPU 指令优化等。
程序员直接操作比较复杂
volatile
synchronized对比volatile
Volatile:只保证可见性、有序性
效率高
Synchronized:原子性、可见性、有序性
共享变量被sync完全保护,才保证有序性
若在sync之外还有共享变量,可能会出现有序性问题
缺点:重量级锁,性能低
可见性
不加volatile修饰出现的问题
JIT编译器将线程内变量值缓存至工作内存的高速缓存中,减少对主存中资源的访问,提高效率
一个线程对主存的修改对另外一个线程不可见
作用
修饰共享数据,保证线程间的可见性
可以用来修饰成员变量和静态成员变量
不能修饰局部变量(线程私有的)
可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值
线程操作 volatile 变量都是直接操作主存
有序性
注意CPU指令重排序优化
CPU 可以在一个时钟周期内,同时运行五条指令的不同阶段
提高了指令地吞吐率
内存屏障(Memory Barrier)
可见性
写屏障(sfence)保证在该屏障之前的,对共享变量的改动,都同步到主存当中
读屏障(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中最新数据
有序性
写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后
读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前
说说volatile 的原理
底层实现内存屏障
对 volatile 变量的写指令后会加入写屏障
对 volatile 变量的读指令前会加入读屏障
不能保证临界区的原子性
两个线程一个 i++ 一个 i-- ,只能保证看到最新值,不能解决指令交错
写屏障仅仅是保证之后的读能够读到最新的结果,但不能保证读跑到它前面去
而有序性的保证也只是保证了本线程内相关代码不被重排序
两个线程调度顺序还是由CPU决定
单例模式(见设计模式篇)
happens-before
规定了对共享变量的写操作对其它线程的读操作可见,它是可见性与有序性的一套规则总结
相关案例
案例一
static int x;
static Object m = new Object();
new Thread(()->{
synchronized(m) {
x = 10;//线程解锁 m 之前对变量的写
}
},"t1").start();
new Thread(()->{
synchronized(m) {
System.out.println(x);//接下来对 m 加锁的其它线程对该变量的读可见
}
},"t2").start();
static Object m = new Object();
new Thread(()->{
synchronized(m) {
x = 10;//线程解锁 m 之前对变量的写
}
},"t1").start();
new Thread(()->{
synchronized(m) {
System.out.println(x);//接下来对 m 加锁的其它线程对该变量的读可见
}
},"t2").start();
案例二
volatile static int x;
new Thread(()->{
x = 10;//线程对 volatile 变量的写
},"t1").start();
new Thread(()->{
System.out.println(x);//对接下来其它线程对该变量的读可见
},"t2").start();
new Thread(()->{
x = 10;//线程对 volatile 变量的写
},"t1").start();
new Thread(()->{
System.out.println(x);//对接下来其它线程对该变量的读可见
},"t2").start();
案例三
static int x;
x = 10;//线程 start 前对变量的写
new Thread(()->{
System.out.println(x);//对该线程开始后对该变量的读可见
},"t2").start();
x = 10;//线程 start 前对变量的写
new Thread(()->{
System.out.println(x);//对该线程开始后对该变量的读可见
},"t2").start();
案例四
static int x;
Thread t1 = new Thread(()->{
x = 10;//线程结束前对变量的写
},"t1");
t1.start();
t1.join();//比如其它线程调用 t1.isAlive() 或 t1.join()等待它结束
System.out.println(x);//对其它线程得知它结束后的读可见
Thread t1 = new Thread(()->{
x = 10;//线程结束前对变量的写
},"t1");
t1.start();
t1.join();//比如其它线程调用 t1.isAlive() 或 t1.join()等待它结束
System.out.println(x);//对其它线程得知它结束后的读可见
案例五
static int x;
public static void main(String[] args) {
Thread t2 = new Thread(()->{
while(true) {
if(Thread.currentThread().isInterrupted()) {
System.out.println(x);
break;
}
}
},"t2");
t2.start();
new Thread(()->{
sleep(1);
x = 10;//线程 t1 打断 t2(interrupt)前对变量的写
t2.interrupt();//通过t2.interrupted 或 t2.isInterrupted
},"t1").start();
while(!t2.isInterrupted()) {
Thread.yield();
}
System.out.println(x);//对于其他线程得知 t2 被打断后对变量的读可见
}
public static void main(String[] args) {
Thread t2 = new Thread(()->{
while(true) {
if(Thread.currentThread().isInterrupted()) {
System.out.println(x);
break;
}
}
},"t2");
t2.start();
new Thread(()->{
sleep(1);
x = 10;//线程 t1 打断 t2(interrupt)前对变量的写
t2.interrupt();//通过t2.interrupted 或 t2.isInterrupted
},"t1").start();
while(!t2.isInterrupted()) {
Thread.yield();
}
System.out.println(x);//对于其他线程得知 t2 被打断后对变量的读可见
}
案例六
volatile static int x;
static int y;
new Thread(()->{
y = 10;
x = 20;//对变量默认值(0,false,null)的写,对其它线程对该变量的读可见
},"t1").start();
new Thread(()->{
// x=20 对 t2 可见, 同时 y=10 也对 t2 可见
System.out.println(x);
},"t2").start();
static int y;
new Thread(()->{
y = 10;
x = 20;//对变量默认值(0,false,null)的写,对其它线程对该变量的读可见
},"t1").start();
new Thread(()->{
// x=20 对 t2 可见, 同时 y=10 也对 t2 可见
System.out.println(x);
},"t2").start();
无锁
代码案例
悲观锁代码(不允许修改共享变量)
public synchronized void withdraw(Integer amount) {
balance -= amount;
}
balance -= amount;
}
乐观(无)锁代码(允许修改共享变量,若发生修改进行再次尝试)
public void withdraw(Integer amount) {
while (true) {// 需要不断尝试,直到成功为止
int prev = balance.get();//获取最新值
int next = prev - amount;//修改值
if (balance.compareAndSet(prev, next)) {
break;
}
}
while (true) {// 需要不断尝试,直到成功为止
int prev = balance.get();//获取最新值
int next = prev - amount;//修改值
if (balance.compareAndSet(prev, next)) {
break;
}
}
compareAndSet(CAS 或Compare And Swap)比较并设置,原子操作不可分割(CPU层面)
CAS 的底层是 lock cmpxchg 指令(X86 架构),在单核 CPU 和多核 CPU 下都能够保证【比较-交换】的原子性。
在多核状态下,某个核执行到带 lock 的指令时,CPU 会让总线锁住,当这个核把此指令执行完毕,再开启总线
这个过程中不会被线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子的。
获取到的值与当前共享变量的最新值对比,如果相等说明其他线程未修改。
如果不相等说明共享变量已被修改。返回false来保证线程安全,while继续尝试。
balance.addAndGet(-1 * amount);替代
说说为什么无锁效率更高?
无锁:可运行状态,其他线程CAS失败依然是可运行状态。
线程数大于CPU核数时,若无法获得时间片,需要上下文切换进入阻塞状态
如果竞争激烈,重试频繁发生,效率也会受影响
有锁:其他未获得锁的线程需要上下文切换,直接进入阻塞状态
线程池实现使用synchronized
CAS适合短时间的CPU空转,长时间的CPU运行不如进入阻塞队列的性能
CAS的特点
适用于线程数少、多核 CPU 的场景下
CAS 必须借助 volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果
volatile保证变量可见性,但不能保证操作原子性
CAS能保证操作上的原子性,不能保证到主存中读取资源
无锁并发
while循环不断尝试的方式
无阻塞并发
需要在可运行状态
JUC(Java Util Concurrent)包下的Atomic
原子整数
AtomicInteger
AtomicInteger i = new AtomicInteger(0);
AtomicInteger的原理
依赖Unsafe类,value变量使用volatile修饰令其他线程可见,自增加减乘除操作使用CAS保证线程安全。
自增1操作
获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++
i++底层指令为多条,有读写指令,存在线程安全问题
System.out.println(i.getAndIncrement());
自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i
System.out.println(i.incrementAndGet());
自减1操作
自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i
System.out.println(i.decrementAndGet());
获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--
System.out.println(i.getAndDecrement());
加减其他值
加值并获取(i = 5, 结果 i = 0, 返回 0)
System.out.println(i.addAndGet(-5));
updateAndGet()/getAndUpdate()使用及源码分析
支持加减乘除操作
其他类型
AtomicBoolean
AtomicLong
原子引用
AtomicReference
AtomicReference<BigDecimal> ref;
ref = new AtomicReference<>(BigDecimal balance);
ref.get();
ABA问题
主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又 改回 A 的情况
只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号
AtomicStampedReference
stamped,邮戳
static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);
0表示增加的版本号
获取值 A
String prev = ref.getReference();
获取版本号
int stamp = ref.getStamp();
ref.compareAndSet(ref.getReference(), "B", ref.getStamp(), ref.getStamp() + 1));
获取版本号版本号加一
AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程
如: A -> B -> A -> C ,通过AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。
AtomicMarkableReference
但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过
AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);
GarbageBag prev = ref.getReference();
ref.compareAndSet(bag, bag, true, false)
原子数组
原子引用类型无法实现数组内元素的修改。原子数组保护数组里的元素。
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
提供数组、可以是线程不安全数组或线程安全数组
Supplier<T> arraySupplier
supplier 提供者 无中生有 ()->结果
()-> new AtomicIntegerArray(10)
获取数组长度的方法
function 函数 一个参数一个结果 (参数)->结果 , BiFunction (参数1,参数2)->结果
Function<T, Integer> lengthFun
(array) -> array.length()
自增方法,回传 array, index
consumer 消费者 一个参数没结果 (参数)->void, BiConsumer (参数1,参数2)->void
Consumer<T> printConsumer
array -> System.out.println(array)
字段更新器
保护对象里的字段属性,成员变量
AtomicReferenceFieldUpdater // 域 字段
AtomicIntegerFieldUpdater
AtomicLongFieldUpdater
利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常
原子累加器
LongAdder提高累加操作的性能
for (int i = 0; i < 5; i++) {
demo(() -> new LongAdder(), adder -> adder.increment());
}
demo(() -> new LongAdder(), adder -> adder.increment());
}
for (int i = 0; i < 5; i++) {
demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());
}
demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());
}
性能提升的原因
在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加Cell[1]... 最后将结果汇总。
这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。
Unsafe
Unsafe 对象提供了非常底层的,操作内存、线程的方法
Unsafe 对象不能直接调用,只能通过反射获得
程序员调用会产生线程不安全,所以命名unsafe来提醒防止调用
工具
线程池
类比阻塞队列:平衡生产者消费者速度差异
ThreadPoolExecutor
线程池状态
ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量
从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING(带符号位)
线程池状态+线程数量存储在原子变量 ctl 中,目的:将线程池状态与线程个数合二为一,可以用一次 cas 原子操作进行赋值
构造方法
决定线程池的行为
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize 核心线程数目 (最多保留的线程数)
maximumPoolSize 最大线程数目
keepAliveTime 生存时间 - 针对救急线程
unit 时间单位 - 针对救急线程
workQueue 阻塞队列
核心线程使用完,新任务加入到阻塞队列
无界队列不存在救急线程
threadFactory 线程工厂 - 可以为线程创建时起个好名字
handler 拒绝策略
对象在用到时才创建,懒惰式;core+救急线程=max最大线程数
工作方式
线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排队,直到有空闲的线程
如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。
如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。
AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
CallerRunsPolicy 让调用者运行任务
DiscardPolicy 放弃本次任务
DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
Dubbo (RPC框架)的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方便定位问题
Netty 的实现,是创建一个新线程来执行任务(网络通信框架,达不到限制线程池总数目的)
ActiveMQ (消息队列)的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由keepAliveTime 和 unit 来控制。
newFixedThreadPool
创建固定大小线程池(工厂方法)
new ThreadPoolExecutor(
nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
阻塞队列是无界的,可以放任意数量的任务
适用于任务量已知,相对耗时的任务
newCachedThreadPool
new ThreadPoolExecutor(
0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>()
);
0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>()
);
核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s
全部都是救急线程(60s 后可以回收)
救急线程可以无限创建
队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)
(没有可用线程,任务无法放入)
线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。
适合任务数比较密集,但每个任务执行时间较短的情况
newSingleThreadExecutor
new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。
任务执行完毕,这唯一的线程也不会被释放。
说说单个线程和线程池只包含一个线程的区别
自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池正常工作
newSingleThread创建一个线程与newFixedThreadPool(1)
方式创建一个线程的区别
方式创建一个线程的区别
Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改
FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,
因此不能调用 ThreadPoolExecutor 中特有的方法
因此不能调用 ThreadPoolExecutor 中特有的方法
Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改
对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改
提交任务、关闭线程池
任务调度线程池
希望任务被延迟执行
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
pool.schedule
希望任务被反复执行
pool.scheduleAtFixedRate(() -> {
log.debug("running...");
sleep(2);
}, 1, 1, TimeUnit.SECONDS);
log.debug("running...");
sleep(2);
}, 1, 1, TimeUnit.SECONDS);
一开始,延时 1s,接下来,由于任务执行时间 > 间隔时间,间隔被『撑』到了 2s
pool.scheduleWithFixedDelay(()-> {
log.debug("running...");
sleep(2);
}, 1, 1, TimeUnit.SECONDS);
log.debug("running...");
sleep(2);
}, 1, 1, TimeUnit.SECONDS);
一开始,延时 1s,scheduleWithFixedDelay 的间隔是 上一个任务结束 <-> 延时 <-> 下一个任务开始 所以间隔都是 3s
正确处理执行任务异常
主动捉异常
try/catch
使用 Future
加返回值,为callable,使用Future捕获;不加返回值,runnable
Tomcat 线程池
LimitLatch 用来限流,可以控制最大连接个数,类似 J.U.C 中的 Semaphore
Acceptor 只负责【接收新的 socket 连接】
Poller 只负责监听 socket channel 是否有【可读的 I/O 事件】
一旦可读,封装一个任务对象(socketProcessor),提交给 Executor 线程池处理
Executor 线程池中的工作线程最终负责【处理请求】
合理分工,不同线程负责不同的工作
Tomcat 线程池扩展了 ThreadPoolExecutor,行为稍有不同
如果总线程数达到 maximumPoolSize
这时不会立刻抛 RejectedExecutionException 异常
拒绝策略抛出异常
而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常
Connector 配置
Tomcat连接器(对外沟通,用到线程池)、容器(实现servlet组件)
Executor 线程配置
有Executor以Executor为准
Fork/Join
Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型运算
所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。
Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率
Fork/Join 默认会创建与 cpu 核心数大小相同的线程池
充分利用CPU资源
使用
第一步:要创建任务对象
提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值)
需要有终止条件
任务拆分,线程池内多个线程来处理,如何拆分是递归思想
第二步:Fork/Join执行任务对象
让一个线程去执行此任务,fork(),来源于ForkJoinPool线程池
合并(join)结果
为什么阿里禁止使用Executors创建线程池?(查询)
Java线程池实现原理及其在美团业务中的实践……
自己设计线程池,要从哪些方面考虑
线程池生命周期管理
running、shutdown阻塞队列任务执行完、stop中断并抛弃、线程数为0即将进入终结 tidying、terminate
线程池参数
核心线程数(可无限等待)、救急线程数(限时)、最大线程数、阻塞队列大小
任务调度
直接申请线程执行任务、缓冲到队列等待线程执行、拒接该任务
工作线程创建
核心线程:传入任务,启动线程立即执行;
救急线程:去阻塞队列获取任务
工作线程回收
线程池维持引用、线程回收消除引用
拒绝策略
记录日志抛出异常、创建一个新任务来执行、带超时等待放入队列
阻塞队列
数组、链表、有界无界、同步获取、延时获取、两端获取
应用场景
快速响应用户请求
设置更高的核心线程数、最大线程数
快速处理批量任务
关注如何使用有限的资源,尽可能在单位时间内处理更多的任务,吞吐量优先问题;
设置队列去缓冲并发任务,调整合适的corePoolSize去设置处理任务的线程数。
设置队列去缓冲并发任务,调整合适的corePoolSize去设置处理任务的线程数。
瓶颈:线程池参数配置
简化线程池配置:corePoolSize、maximumPoolSize,workQueue
提高响应速度:使用同步队列,任务不应该被缓存,而是立即执行
提升吞吐量:使用有界队列缓冲大批量任务,声明队列容量,防止任务无限制堆积
参数可动态修改
封装线程池,允许线程池监听同步外部的消息,根据消息进行修改配置增加线程池监控,告警
J.U.C
AQS 原理(AbstractQueuedSynchronizer)
阻塞式锁(synchronized,ReentrantLock)和相关的同步器工具的框架
特点
用 state 属性(整数)来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
getState - 获取 state 状态
setState - 设置 state 状态
compareAndSetState - cas 机制设置 state 状态(CAS防止多个线程来修改state状态)
独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源(提供访问上限)
提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
子类主要实现这样一些方法(默认抛出 UnsupportedOperationException,继承重用父类中的方法)
tryAcquire
// 如果获取锁失败
if (!tryAcquire(arg)) {
// 入队, 可以选择阻塞当前线程 park unpark
}
if (!tryAcquire(arg)) {
// 入队, 可以选择阻塞当前线程 park unpark
}
tryRelease
// 如果释放锁成功
if (tryRelease(arg)) {
// 让阻塞线程恢复运行
}
if (tryRelease(arg)) {
// 让阻塞线程恢复运行
}
tryAcquireShared
tryReleaseShared
isHeldExclusively
是否持有独占锁
实现不可重入锁
不可重入锁:自己加的锁自己再次获取当前锁获取不到。lock.lock()调用两次,第二次被阻塞。锁的大部分功能由同步器类完成AbstractQueueSynchronizer
tryAcquire
尝试获取锁,state初始值为0,未加锁的独占锁。CAS方式保证state原子性。
tryRelease
尝试释放锁,直接setState,此时没有其他线程竞争,所以不用CAS操作。volatile的setState放在置空语句后面,volatile之前语句可以加入写屏障,保证了资源的可见性
AQS 要实现的功能目标
阻塞版本获取锁 acquire 和非阻塞的版本尝试获取锁 tryAcquire
获取锁超时机制
通过打断取消机制
独占机制及共享机制
条件不满足时的等待机制
设计
获取锁的逻辑
while(state 状态不允许获取) {
if(队列中还没有此线程) {
入队并阻塞}
}
当前线程出队
if(队列中还没有此线程) {
入队并阻塞}
}
当前线程出队
释放锁的逻辑
if(state 状态允许了) {
恢复阻塞的线程(s) }
恢复阻塞的线程(s) }
要点
原子维护 state 状态
state 使用 volatile 配合 cas 保证其修改时的原子性
state 使用了 32bit int 来维护同步状态,因为当时使用 long 在很多平台下测试的结果并不理想
阻塞及恢复线程
早期的控制线程暂停和恢复的 api 有 suspend 和 resume,但它们是不可用的,因为如果先调用的 resume 那么 suspend 将感知不到
解决方法是使用 park & unpark 来实现线程的暂停和恢复,先 unpark 再 park 也支持
park & unpark 是针对线程的,而不是针对同步器的,因此控制粒度更为精细
park 线程还可以通过 interrupt 打断
维护队列
使用了 FIFO 先入先出队列,并不支持优先级队列
队列中有 head 和 tail 两个指针节点,都用 volatile 修饰配合 cas 使用,每个节点有 state 维护节点状态入队伪代码,只需要考虑 tail 赋值的原子性
设计时借鉴了 CLH 队列,它是一种单向无锁队列
无锁,使用自旋
快速,无阻塞
ReentrantLock 原理
非公平锁实现原理
加锁解锁流程
先从构造器开始看,默认为非公平锁实现
NonfairSync 继承自 AQS
没有竞争时
CAS修改state 状态0-->1,设置当前线程为独占线程
第一个竞争出现时
Thread-1 执行CAS 尝试将 state 由 0 改为 1,结果失败
进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败
接下来进入 addWaiter 逻辑,构造 Node 队列
首次创建,创建两个node节点,双向队列
Node 的创建是懒惰的,其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程
Node 的 waitStatus 状态,0 为默认正常状态
当前线程进入 acquireQueued 逻辑
acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然
这时 state 仍为 1,失败
这时 state 仍为 1,失败
进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的waitStatus 改为 -1(有责任唤醒后继节点,进入阻塞队列),这次返回 false
shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时state 仍为 1,失败
当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回true
进入 parkAndCheckInterrupt, Thread-1 park,进入阻塞状态
Thread-0 释放锁,进入 tryRelease 流程,如果成功
设置 exclusiveOwnerThread 为 null
state = 0
当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程
找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,即为 Thread-1
回到 Thread-1 的 acquireQueued 流程
如果加锁成功(没有竞争),会设置
exclusiveOwnerThread 为 Thread-1,state = 1
head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
原本的 head 因为从链表断开,而可被垃圾回收
如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 竞争
若被 Thread-4 抢占
Thread-4 被设置为 exclusiveOwnerThread,state = 1
Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
可重入原理
如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
支持锁重入, 只有 state 减为 0, 才释放成功
加锁状态自增
解锁状态自减
可打断原理
不可打断模式
在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了
可打断模式
如果没有获得到锁
进入可打断的获取锁流程
在 park 过程中如果被 interrupt
这时候抛出异常, 而不会再次进入循环等待
公平锁实现原理
与非公平锁主要区别在于 tryAcquire 方法的实现
先检查 AQS 队列中是否有前驱节点, 没有才去竞争
条件变量实现原理
每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObject
await 流程
开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程
创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部
没有占位节点(区别非公平锁),-2条件变量为等待含义
接下来进入 AQS 的 fullyRelease 流程,释放同步器上的锁
unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功
park 阻塞 Thread-0
signal 流程
假设 Thread-1 要来唤醒 Thread-0
进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node
执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的waitStatus 改为 -1
读写锁 ReentrantReadWriteLock
当读操作远远高于写操作时,这时候使用 读写锁 让 读-读 可以并发,提高性能。适合读多写少。
提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法
读锁-读锁 可以并发
读锁-写锁 相互阻塞
写锁-写锁 也是相互阻塞的
读锁不支持条件变量
重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
必须先释放读锁,才能使用写锁
重入时降级支持:即持有写锁的情况下去获取读锁
缓存应用
缓存更新策略
更新时,是先清缓存还是先更新数据库
先清除缓存,再更新数据库;
中间插入多条查询指令,查询会将结果放入缓存,更新数据库为新的值,之后的查询会是旧数据;造成结果不一致
先更新数据库,再清除缓存
更新数据库还未清除缓存时,进行查询会是缓存中旧数据,造成结果不一致
相对先清除缓存好些,解决:可以加锁保证操作的原子性(性能略低);使用读写锁
读写锁实现缓存和数据库的一致性
使用读写锁实现一个简单的按需加载缓存
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
加写锁, 防止其它线程对缓存读取和更改
lock.writeLock().lock();
采取先更新数据库,再清除缓存的方式
lock.writeLock().unlock();
加读锁, 防止其它线程对缓存更改
lock.readLock().lock();
有缓存,直接返回缓存值
lock.readLock().unlock();
没有考虑缓存容量
没有考虑缓存过期
一张表一把锁的方式,可以提高并发性
使用select语句查询时,第一次select语句后面同样的查询使用缓存,如果不存在缓存或参数不一致再去查数据库
读写锁原理
读锁是share共享式,可并发执行;写锁是独占式,所有读锁会被唤醒,进入执行状态,可并发,遇到一个独占锁再阻塞
读写锁用的是同一个 Sycn 同步器,因此等待队列、state 等也是同一个
t1 w.lock,t2 r.lock
t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁使用的是 state 的高 16 位
t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。如果有写锁占据,那么 tryAcquireShared 返回 -1 表示失败
tryAcquireShared 返回值表示
-1 表示失败
0 表示成功,但后继节点不会继续唤醒
正数表示成功,而且数值是还有几个后继节点需要唤醒,读写锁返回 1
这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点,不同之处在于节点被设置为Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此时 t2仍处于活跃状态
t2 会看看自己的节点是不是第二位,如果是,还会再次调用 tryAcquireShared(1) 来尝试获取锁
如果没有成功,在 doAcquireShared 内 for (;;) 循环一次,把前驱节点的 waitStatus 改为 -1,再 for (;;) 循环一次尝试 tryAcquireShared(1) 如果还不成功,那么在 parkAndCheckInterrupt() 处 park
t3 r.lock,t4 w.lock
假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁
t1 w.unlock
这时会走到写锁的 sync.release(1) 流程,调用 sync.tryRelease(1) 成功
接下来执行唤醒流程 sync.unparkSuccessor,即让第二个node恢复运行,这时 t2 在 doAcquireShared 内parkAndCheckInterrupt() 处恢复运行。这回再来一次 for (;;) 执行 tryAcquireShared 成功则让读锁计数加一
这时 t2 已经恢复运行,接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点
在 setHeadAndPropagate 方法内还会检查下一个节点是否是 shared,如果是则调用doReleaseShared() 将 head 的状态从 -1 改为 0 并唤醒第二个,这时 t3 在 doAcquireShared 内parkAndCheckInterrupt() 处恢复运行
这回再来一次 for (;;) 执行 tryAcquireShared 成功则让读锁计数加一
这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点
下一个节点不是 shared 了,因此不会继续唤醒 t4 所在节点
t2 r.unlock,t3 r.unlock
t2 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但由于计数还不为零
t3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,这回计数为零了,进入doReleaseShared() 将头节点从 -1 改为 0 并唤醒老二
之后 t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,再次 for (;;) 这次自己是老二,并且没有其他竞争,tryAcquire(1) 成功,修改头结点,流程结束
StampedLock
该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用
加解读锁
long stamp = lock.readLock();
lock.unlockRead(stamp);
加解写锁
long stamp = lock.writeLock();
lock.unlockWrite(stamp);
乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。
无锁,验戳的方式提高性能
验戳不通过(数据有变动),再加上读锁
乐观读锁进行优化,验证戳通过,直接返回结果。验证不成立进行锁的升级
StampedLock 不支持条件变量
StampedLock 不支持可重入
如果一个服务只能支持5个并发,现在有6个用户,你选择什么并发工具?
Semaphore信号量
信号量,用来限制能同时访问共享资源的线程上限。
ReentrantLock独占式的
Semaphore对共享线程数量限制
类比:停车场,公示牌,汽车做线程,Semaphore管理车库
基本使用
acquire方法、release方法(finally代码块释放资源),竞争失败进入 AQS队列park阻塞
创建 semaphore 对象
Semaphore semaphore = new Semaphore(3);
10个线程同时运行
获取许可
semaphore.acquire();
获得许可,许可数减一
finally代码块释放许可,许可加一
释放许可
semaphore.release();
Semaphore 原理
加锁解锁流程
Semaphore 类比停车场,permits 类比停车位数量,当线程获得了 permits 便获得了停车位,停车场显示空余车位减一
刚开始,permits(state)为 3,这时 5 个线程来获取资源
假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列park 阻塞
这时 Thread-4 释放了 permits,state改为1
接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,
unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态
unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态
CountdownLatch 倒计时门栓
用来进行线程同步协作,等待所有线程完成倒计时。
其中构造参数用来初始化等待计数值
CountDownLatch latch = new CountDownLatch(3);
await() 用来等待计数归零
latch.await();
countDown() 用来让计数减一
latch.countDown();
getCount()可以获取当前计数
latch.getCount();
CountdownLatch与join方法的区别
高级API和底层调用;
使用线程池中的线程,线程可能一直在运行,join无法等到线程的结束
CountdownLatch可以配合线程池使用
一次性使用
应用之同步等待多线程准备完毕
案例:王者荣耀十个玩家加载完毕才能进入游戏
应用之同步等待多个远程调用结束
CyclicBarrier 循环栅栏
CyclicBarrier与CountdownLatch的区别
CountdownLatch倒计时锁,创建之后无法修改值,无法满足循环任务需求,可以使用CyclicBarrier
CyclicBarrier可重用性,可以重新设置值
循环栅栏,用来进行线程协作,等待线程满足某个计数。比喻为『人满发车』
循环使用
构造时设置『计数个数』
CyclicBarrier cb = new CyclicBarrier(2);
个数为2时才会继续执行
注意线程个数要与CyclicBarrier个数一致
每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待
cb.await();
当个数不足时,等待
当等待的线程数满足『计数个数』时,继续执行
线程安全集合类概述
遗留的线程安全集合
Hashtable
Vector
使用 Collections 装饰的线程安全集合(装饰器模式)
Collections.synchronizedCollection
Collections.synchronizedList
Collections.synchronizedMap
Collections.synchronizedSet
……
java.util.concurrent.*
包含三类关键词
Blocking
大部分实现基于锁,并提供用来阻塞的方法
阻塞队列,不满足条件时进行阻塞,ReentrantLock
CopyOnWrite
CopyOnWrite 之类容器修改开销相对较重
拷贝方式避免多线程读写的并发安全,适用于读多写少
Concurrent
使用CAS,多把锁提高并发度和吞吐量
缺点:弱一致性
遍历时弱一致性
例如,当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍历,这时内容是旧的
遍历线程,修改线程,造成遍历结果不一致,fail-safe
遍历时如果发生了修改,对于非安全容器来讲,使用 fail-fast 机制也就是让遍历立刻失败,抛出ConcurrentModificationException,不再继续遍历
求大小弱一致性
size 操作未必是 100% 准确
读取弱一致性
ConcurrentHashMap
练习:单词计数
ConcurrentHashMap 原理
说说ConcurrentHashMap解决了什么问题
JDK7HashMap出现的多线程死链问题(环)
JDK8改进JDK7由头插法改为尾插法,多线程场景下存在数据丢失问题
提高HashMap的并发安全性
解决
见:读操作get为什么是线程安全的
JDK7中的ConcurrentHashMap
segment分段锁
哈希桶数组包含多个segment,一个segment包含多个HashEntry(桶)
segment上锁不影响其他segment操作
segment继承ReentrantLock
value值volatile修饰
访问可见性
底层用数组+链表实现
get操作
根据 key 计算出 hash 值定位到具体的 Segment ,再根据 hash 值获取定位 HashEntry 对象,并对 HashEntry 对象进行链表遍历,找到对应元素。
volatile 可以保证内存可见性,所以不会读取到过期数据
put操作
先定位到相应的 Segment
获取对象锁成功
定位分段内的桶下标
遍历链表内容
key 已存在看是否覆盖value
key不存在头插法插入
尝试获取锁失败
尝试自旋获取锁
循环式的判断对象锁是否能够被成功获取,直到获取到锁才会退出循环,防止执行 put 操作的线程频繁阻塞
重试的次数达到一定次数改为阻塞锁
ConcurrentHashMap 的并发度是什么?
程序不产生锁竞争的最大线程数
JDK7里是segment的数量
过大,可以在一个segment内访问,变成两个segment,命中效率降低
过小,容易产生锁竞争
JDK8里并发度等于数组的大小
JDK8中的ConcurrentHashMap
采用更细粒度的锁,锁住桶数组中桶节点
使用CAS+Synchronized方式,保证线程并发安全
JDK8对Sychronized做大量优化后相对ReentrantLock可以减少内存开销 。
因为只对桶下标进行上锁。ReentrantLock对每个节点都需要通过继承 AQS 来获得同步支持
底层使用数组+链表+红黑树实现
提升查询效率
数据添加:由JDK7的头插法改为尾插法
尾插法的优势
初始化:由饿汉式改为懒汉式
饿汉式创建非常消耗资源
get操作
get,无锁操作(仅需要保证可见性,除Treebin的读写锁),扩容过程中 get 操作拿到的是 ForwardingNode 它会让 get 操作在新table 进行搜索
get 流程
根据 key 计算出 hash 值,判断哈希表table为空
返回null
如果读取的bin头节点为null
返回null
如果是链表结构,就到链表中查询
如果是红黑树结构,就从红黑树里面查询
读操作get为什么是线程安全的
读操作一般不加锁(TreeBin的读写锁除外),读写操作可并行;因为C13Map的写操作都要获取bin头部的syncronized互斥锁,能保证最多只有一个线程在做更新,单线程写、多线程读的并发安全性的问题。
如果读取的bin是一个链表,
头节点是个普通Node
头节点是个普通Node
如果正在发生链表向红黑树的treeify工作,因为treeify本身并不破坏旧的链表bin的结构,
只是在全部treeify完成后将头节点一次性替换为新创建的TreeBin,可以放心读取。
只是在全部treeify完成后将头节点一次性替换为新创建的TreeBin,可以放心读取。
如果正在发生resize且当前bin正在被transfer,因为transfer本身并不破坏旧的链表bin的结构,
只是在全部transfer完成后将头节点一次性替换为ForwardingNode,可以放心读取。
只是在全部transfer完成后将头节点一次性替换为ForwardingNode,可以放心读取。
扩容期间hash桶查询数据会发生什么?
扩容前,扩容中可以访问原数组
正在迁移的hash桶
迁移形成的链是复制的,而非移动,复制不影响原数组的遍历,不会阻塞get操作
扩容完成的
头结点的hash 为负数表示整个数组在扩容
头结点标记为ForwardNode
转发到新数组进行查询
如果其它线程正在操作链表,在当前线程遍历链表的任意一个时间点,
都有可能同时在发生add/replace/remove操作。
ConcurrentHashMap 弱一致性
都有可能同时在发生add/replace/remove操作。
ConcurrentHashMap 弱一致性
如果是add操作,因为链表的节点新增从JDK8以后都采用了尾插法,会多遍历或者少遍历一个tailNode。
如果是remove操作,存在遍历到某个Node时,正好有其它线程将其remove,导致其孤立于整个链表之外;
但因为其next引用未变,整个链表并没有断开,还是可以照常遍历链表直到tailNode。
但因为其next引用未变,整个链表并没有断开,还是可以照常遍历链表直到tailNode。
如果是replace操作,链表的结构未变,只是某个Node的value发生了变化,没有安全问题。
不能保证happen before
结论:链表线性数据结构,单线程写且插入操作尾插法,并发读取是安全的;
不会存在误读、链表断开导致的漏读、读到环状链表等问题。
不会存在误读、链表断开导致的漏读、读到环状链表等问题。
如果读取的bin是一个红黑树,
说明头节点是TreeBin节点。
说明头节点是TreeBin节点。
如果正在发生红黑树向链表的untreeify操作,因为untreeify本身并不破坏旧的红黑树结构,
只是在全部untreeify完成后将头节点一次性替换为新创建的普通Node,可以放心读取。
只是在全部untreeify完成后将头节点一次性替换为新创建的普通Node,可以放心读取。
如果正在发生resize且当前bin正在被transfer,因为transfer本身并不破坏旧的红黑树结构,
只是在全部transfer完成后将头节点一次性替换为ForwardingNode,可以放心读取。
只是在全部transfer完成后将头节点一次性替换为ForwardingNode,可以放心读取。
如果其他线程在操作红黑树,在当前线程遍历红黑树的任意一个时间点,都可能有单个的其它线程发生add/replace/remove/红黑树的翻转等操作,参考红黑树的读写锁实现。
put流程
首先会判断 key、value 是否为空,如果为空就抛异常!
null区别
ConCurrentHashMap:if (key == null || value == null) throw new NullPointerException();
如果key或者value其中一个null,就报空指针异常
如果key或者value其中一个null,就报空指针异常
HashMap:(key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16)
HashMap允许存入key为null
HashMap允许存入key为null
为什么ConcurrentHashMap不能存null
在并发的情况下带来二义性
HashMap中
map.get(key)方法得到值为null
map.contains(key)方法来判断不存在还是值为null
ConcurrentHashMap中
两次调用值可能被改变
计算key的hash值
spread(key.hashCode())保证为正数
判断桶数组为空,创建桶数组
当前线程正在创建另一个线程等待
创建方式区别
JDK8仅计算size容量,懒惰式创建
JDK7直接创建数组,占用内存
判断当前数组下标是否第一次插入,为空,使用casTabAt创建Node头结点
要创建链表头节点
cas操作保证线程安全,若同时有两个线程进行同一个位置的数据添加,只有一个会成功,失败的线程进入下一次for循环。
取代分段锁思想
更细粒度的锁,cas只会对hash表的一个链表头结点上锁,不影响其他链表头的操作(一个位置一把锁)
扩容期间hash桶插入数据会发生什么?
已扩容
hash值为-1,说明为ForwardingNode,有线程正在扩容,当前线程帮忙扩容
正在扩容的头结点
synchronized锁住桶下标,doubleCheck检测是否变化
是链表,用链表方式插入
找到相同的 key
如果允许更新,进行value值的更新
没有找到
链表尾部进行添加操作(JDK8的尾插法)
链表长度大于8且table长度大于64,进行树化操作
是红黑树,用红黑树方式插入
其他线程无法获得锁,被阻塞,不能操作正在迁移的hash桶
未扩容
可直接插入
最后判断是否进行扩容操作
什么时候触发扩容?(transfer)
addCount:在调用 addCount 方法增加集合元素计数后发现当前集合元素个数到达扩容阈值时就会触发扩容
helpTransfer:扩容状态下其他线程对集合进行插入、修改、删除、合并、compute 等操作时遇到 ForwardingNode 节点会触发扩容
tryPresize:putAll 批量插入或者插入节点后发现存在链表长度达到 8 个或以上,但数组长度为 64 以下时会触发扩容
ForwardingNode
hash值MOVED=-1 ,ForwardingNode存在表示集合正在扩容状态,nextTable 指向扩容后的数组。
扩容时如果table某个bin(头结点)迁移完毕, 用 ForwardingNode 作为旧 table bin 的头结点(占位功能)
get遍历到旧table,如果头结点标识为ForwardingNode,则到新表中get遍历,不会阻塞查询操作(转发功能)
sizeCtl 属性在各个阶段的作用
新建而未初始化时
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) +1));
this.sizeCtl = cap;
this.sizeCtl = cap;
sizeCtl值为2的幂次
记录集合在实际创建时应该使用的大小
初始化过程中
U.compareAndSwapInt(this, SIZECTL, sc, -1)
将 sizeCtl 值设置为 -1 表示集合正在初始化
其他线程发现该值为 -1 时会让出CPU资源以便初始化操作尽快完成 。
初始化完成后
sc = n - (n >>> 2);
sizeCtl = sc;
sizeCtl = sc;
记录下一次扩容的阈值
正在扩容时
U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)
第一条扩容线程设置的某个特定基数
U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)
后续线程加入扩容大军时每次加 1
U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)
线程扩容完毕退出扩容操作时每次减 1
sizeCtl 用于记录当前扩容的并发线程数情况
此时 sizeCtl 的值为:((rs << RESIZE_STAMP_SHIFT) + 2) + (正在扩容的线程数) ,并且该状态下 sizeCtl < 0 。
BlockingQueue
LinkedBlockingQueue 原理
基本的入队出队
初始化链表 last = head = new Node<E>(null); Dummy 节点用来占位,item 为 null
last:尾指针
head:头指针
Dummy
哑元节点、哨兵节点
当一个节点入队 last = last.next = node;
入队方法:node节点添加到last节点的next,同时将当前节点赋值给last
再来一个节点入队 last = last.next = node;
出队
h = head
first = h.next
h.next = h
出队的节点执行自己,保证安全的被垃圾回收
head = first
Dummy内的item为空,真正要出队的是当前first节点的item,将当前节点置null,变成Dummy节点
E x = first.item;
first.item = null;
return x;
first.item = null;
return x;
加锁分析
【高明之处】用了两把锁和 dummy 节点
dummy节点防止队列剩一个节点两把锁锁住同一个对象
用一把锁,同一时刻,最多只允许有一个线程(生产者或消费者,二选一)执行
用两把锁,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行
消费者与消费者线程仍然串行
生产者与生产者线程仍然串行
队列,出队入队两把锁提升效率
线程安全分析
当节点总数大于 2 时(包括 dummy 节点),putLock 保证的是 last 节点的线程安全,
takeLock 保证的是head 节点的线程安全。两把锁保证了入队和出队没有竞争
takeLock 保证的是head 节点的线程安全。两把锁保证了入队和出队没有竞争
当节点总数等于 2 时(即一个 dummy 节点,一个正常节点)这时候,仍然是两把锁锁两个对象,不会竞争
当节点总数等于 1 时(就一个 dummy 节点)这时 take 线程会被 notEmpty 条件阻塞,有竞争,会阻塞
性能比较
主要列举 LinkedBlockingQueue 与 ArrayBlockingQueue 的性能比较
Linked 支持有界,Array 强制有界
Linked 实现是链表,Array 实现是数组
Linked 是懒惰的,而 Array 需要提前初始化 Node 数组
Linked 每次入队会生成新 Node,而 Array 的 Node 是提前创建好的
Linked 两把锁,Array 一把锁
ConcurrentLinkedQueue
ConcurrentLinkedQueue 的设计与 LinkedBlockingQueue 非常像
两把【锁】,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行
dummy 节点的引入让两把【锁】将来锁住的是不同对象,避免竞争
只是这【锁】使用了 cas 来实现
ConcurrentLinkedQueue 应用还是非常广泛的
Tomcat 的 Connector 结构时,Acceptor 作为生产者向 Poller 消费者传递事件信息时,
正是采用了ConcurrentLinkedQueue 将 SocketChannel 给 Poller 使用
正是采用了ConcurrentLinkedQueue 将 SocketChannel 给 Poller 使用
Acceptor :处理请求接收连接,生产SocketChannel对象
poller:处理读写事件的,监控SocketChannel对象的读写生产者消费者的关系
ConcurrentLinkedQueue(CAS性能更高)将SocketChannel包装为事件对象
说说线程安全的list
Vector
Collections装饰的 sychronizedList
CopyOnWriteArrayList
CopyOnWriteArraySet 是它的马甲 底层实现采用了 写入时拷贝 的思想
增删改操作会将底层数组拷贝一份,更改操作在新数组上执行,这时不影响其它线程的并发读,实现读写分离,读写并发。
读写锁只是读读并发
适合『读多写少』的应用场景
新增add
synchronized (lock)
保证与其他写互斥
这里的源码版本是 Java 11,在 Java 1.8 中使用的是可重入锁而不是 synchronized
Object[] es = getArray()
获取旧的数组
es = Arrays.copyOf(es, len + 1);
拷贝新的数组(这里是比较耗时的操作,但不影响其它读线程)
拷贝,利用空间换取线程安全
当前读的可能是旧的数组
其它读操作并未加锁
get 弱一致性
Concurrent集合类、CopyOnWrite集合类都有get弱一致性问题
不要觉得弱一致性就不好
数据库的 MVCC (多版本并发控制)都是弱一致性的表现
并发高和一致性是矛盾的,需要权衡
迭代器弱一致性
0 条评论
下一页