java并发编程
2021-04-04 17:28:47 1 举报
AI智能生成
java并发编程 juc 设计模式 jdk cas
作者其他创作
大纲/内容
书籍推荐
《Java 并发编程实战》
《Java 并发编程:设计原则与模式》
《Java 并发编程的艺术》
RxJava
《图解 Java 多线程设计模式》
《TCP/IP 网络编程》
bug的源头
cpu缓存导致的可见性问题
线程切换带来的原子性问题
编译优化带来的有序性问题
可见性和有序性问题的解决
java内存模型
Java 内存模型规范了 JVM 如何提供按需禁用缓存和编译优化的方法
方法包括 volatile、synchronized 和 final 三个关键字
六项 Happens-Before 规则
volatile
final
六项 Happens-Before 规则(可见性)
程序的顺序性规则
程序前面对某个变量的修改一定是对后续操作可见的
volatile 变量规则
volatile 变量的写操作相对于后续对这个 volatile 变量的读操作可见
传递性
如果 A Happens-Before B,且 B Happens-Before C,那么 A Happens-Before C
管程中锁的规则
管程是一种通用的同步原语,在 Java 中指的就是 synchronized,synchronized 是 Java 里对管程的实现
对一个锁的解锁 Happens-Before 于后续对这个锁的加锁
线程 start() 规则
主线程 A 启动子线程 B 后,子线程 B 能够看到主线程在启动子线程 B 前的操作
线程 join() 规则
主线程 A 等待子线程 B 完成(主线程 A 通过调用子线程 B 的 join() 方法实现),
当子线程 B 完成后(主线程 A 中 join() 方法返回),主线程能够看到子线程的操作
当子线程 B 完成后(主线程 A 中 join() 方法返回),主线程能够看到子线程的操作
互斥锁
原子性的本质
操作的中间状态对外不可见
解决原子性问题
是要保证中间状态对外不可见
synchronized
静态方法:类锁
实例方法:实例锁
代码块:指定对象锁
锁和受保护资源的关系
如果一个资源对应多个锁,没有形成互斥关系,可能会导致并发问题
受保护资源和锁之间合理的关联关系应该是 N:1 的关系
细粒度锁
用不同的锁对受保护资源进行精细化管理,能够提升性能
保护有关联关系的多个资源
使用同一把锁
锁能覆盖所有受保护资源
死锁
一组互相竞争资源的线程因互相等待,导致“永久”阻塞的现象
四个条件
互斥,共享资源 X 和 Y 只能被一个线程占用
占有且等待,线程 T1 已经取得共享资源 X,在等待共享资源 Y 的时候,不释放共享资源 X
不可抢占,其他线程不能强行抢占线程 T1 占有的资源
循环等待,线程 T1 等待线程 T2 占有的资源,线程 T2 等待线程 T1 占有的资源,就是循环等待
打破
破坏占用且等待条件
一次性申请全部资源
破坏不可抢占条件
破坏循环等待条件
按序申请资源
等待-通知机制
可以解决一直while循环导致的cpu消耗
synchronized+wait()+notifyAll()/notify()
wait和sleep的比较
共同点
都可以暂停线程的执行
不同点
wait、notify、notifyAll只能在同步代码块或同步方法中使用,而sleep不受限制
wait方法释放锁,而sleep没有
sleep来自Thread类,wait来自Object类
sleep必须捕捉异常,wait、notify、notifyAll不需要
安全性问题
正确性的含义就是程序按照我们期望的执行
场景
数据竞争(Data Race):存在共享数据并且该数据会发生变化,通俗地讲就是有多个线程会同时读写同一数据
竞态条件(Race Condition):程序的执行结果依赖线程执行的顺序
if (状态变量 满足 执行条件) {
执行操作
}
执行操作
}
当某个线程发现状态变量满足执行条件后,开始执行操作;
可是就在这个线程执行操作的时候,其他线程同时修改了状态变量,导致状态变量不满足执行条件了
可是就在这个线程执行操作的时候,其他线程同时修改了状态变量,导致状态变量不满足执行条件了
方案
互斥
锁
活跃性问题
死锁
活锁
有时线程虽然没有发生阻塞,但仍然会存在执行不下去的情况,这就是所谓的“活锁”
比如,甲乙同时相互谦让,结果又相撞
方案
谦让时等待一个随机时间
饥饿
所谓“饥饿”指的是线程因无法访问所需资源而无法执行下去的情况
方案
保证资源充足
公平分配资源(场景更多),公平锁
避免持有锁的线程长时间执行
性能问题
串行比
阿姆达尔(Amdahl)定律
减少锁带来的串行比
方案
使用无锁的算法和数据结构
线程本地存储 (Thread Local Storage, TLS)
写入时复制 (Copy-on-write)
乐观锁
Java 并发包里面的原子类
Disruptor 无锁的内存队列
减少锁持有时间
使用细粒度的锁
如1.8版本前的ConcurrentHashMap,分段锁
读写锁
指标
吞吐量:指的是单位时间内能处理的请求数量
延迟:指的是从发出请求到收到响应的时间
并发量
指的是能同时处理的请求数量,一般来说随着并发量的增加、延迟也会增加。
所以延迟这个指标,一般都会是基于并发量来说的
所以延迟这个指标,一般都会是基于并发量来说的
管程(Monitor)
管程,指的是管理共享变量以及对共享变量的操作过程,让他们支持并发
管程和信号量是等价的,所谓等价指的是用管程能够实现信号量,也能用信号量实现管程
管程模型
Hasen 模型
Hoare 模型
MESA 模型
MESA 模型
互斥
将共享变量及其对共享变量的操作统一封装起来
操作保证互斥性,只允许一个线程进入管程
同步
示意图
Java 内置的管程方案(synchronized)
synchronized 关键字修饰的代码块,
在编译期会自动生成相关加锁和解锁的代码,但是仅支持一个条件变量
在编译期会自动生成相关加锁和解锁的代码,但是仅支持一个条件变量
Java SDK 并发包实现的管程
支持多个条件变量,不过并发包里的锁,需要开发人员自己进行加锁和解锁操作
线程
生命周期
通用生命周期:五态模型
初始状态
指的是线程已经被创建,但是还不允许分配 CPU 执行
这里所谓的被创建,仅仅是在编程语言层面被创建,而在操作系统层面,真正的线程还没有创建
可执行状态
指的是线程可以分配 CPU 执行
在这种状态下,真正的操作系统线程已经被成功创建了,所以可以分配 CPU 执行
运行状态
被分配到 CPU 的线程的状态
休眠状态
运行状态的线程如果调用一个阻塞的 API(例如以阻塞方式读文件)
或者等待某个事件(例如条件变量),
那么线程的状态就会转换到休眠状态,同时释放 CPU 使用权
或者等待某个事件(例如条件变量),
那么线程的状态就会转换到休眠状态,同时释放 CPU 使用权
休眠状态的线程永远没有机会获得 CPU 使用权。
当等待的事件出现了,线程就会从休眠状态转换到可运行状态。
当等待的事件出现了,线程就会从休眠状态转换到可运行状态。
终止状态
线程执行完或者出现异常就会进入终止状态
进入终止状态也就意味着线程的生命周期结束
java中线程的生命周期
六态模型
NEW(初始化状态)
RUNNABLE(可运行 / 运行状态)
BLOCKED(阻塞状态)
WAITING(无时限等待)
TIMED_WAITING(有时限等待)
TERMINATED(终止状态)
在操作系统层面,
Java 线程中的 BLOCKED、WAITING、TIMED_WAITING 是一种状态,
即休眠状态
Java 线程中的 BLOCKED、WAITING、TIMED_WAITING 是一种状态,
即休眠状态
RUNNABLE 与 BLOCKED 的状态转换
只有一种场景会触发这种转换,就是线程等待 synchronized 的隐式锁
平时所谓的 Java 在调用阻塞式 API 时,线程会阻塞,
指的是操作系统线程的状态,并不是 Java 线程的状态(依然为RUNNABLE)
指的是操作系统线程的状态,并不是 Java 线程的状态(依然为RUNNABLE)
RUNNABLE 与 WAITING 的状态转换
获得 synchronized 隐式锁的线程,
调用无参数的 Object.wait() 方法
调用无参数的 Object.wait() 方法
调用无参数的 Thread.join() 方法
等待:LockSupport.park()
唤醒:LockSupport.unpark(Thread thread)
唤醒:LockSupport.unpark(Thread thread)
RUNNABLE 与 TIMED_WAITING 的状态转换
调用带超时参数的 Thread.sleep(long millis) 方法
获得 synchronized 隐式锁的线程,调用带超时参数的 Object.wait(long timeout) 方法
调用带超时参数的 Thread.join(long millis) 方法
调用带超时参数的 LockSupport.parkNanos(Object blocker, long deadline) 方法
调用带超时参数的 LockSupport.parkUntil(long deadline)
从 NEW 到 RUNNABLE 状态
Java 刚创建出来的 Thread 对象就是 NEW 状态
继承 Thread 对象,重写 run() 方法
实现 Runnable 接口,重写 run() 方法,并将该实现类作为创建 Thread 对象的参数
从 NEW 状态转换到 RUNNABLE 状态很简单,只要调用线程对象的 start() 方法
从 RUNNABLE 到 TERMINATED 状态
run()方法执行完
run()方法抛出异常
interrupt()方法
stop() 和 interrupt() 方法的主要区别
stop() 方法会真的杀死线程,不给线程喘息的机会,
如果线程持有 ReentrantLock 锁,
被 stop() 的线程并不会自动调用 ReentrantLock 的 unlock() 去释放锁,
那其他线程就再也没机会获得 ReentrantLock 锁
如果线程持有 ReentrantLock 锁,
被 stop() 的线程并不会自动调用 ReentrantLock 的 unlock() 去释放锁,
那其他线程就再也没机会获得 ReentrantLock 锁
interrupt() 方法仅仅是通知线程,
线程有机会执行一些后续操作,同时也可以无视这个通知
线程有机会执行一些后续操作,同时也可以无视这个通知
当线程处于 WAITING、TIMED_WAITING 状态,
或者阻塞在某个 I/O 操作上,
被中断的线程通过异常的方式获得通知
或者阻塞在某个 I/O 操作上,
被中断的线程通过异常的方式获得通知
通过isInterrupted()方法进行主动探知
分析:jstack 命令或者Java VisualVM这个可视化工具
线程数量选择
目的:提升程序性能,主要是降低延迟,提高吞吐量
一个方向是优化算法
另一个方向是将硬件的性能发挥到极致
在并发编程领域,提升性能本质上就是提升硬件的利用率,
再具体点来说,就是提升 I/O 的利用率和 CPU 的利用率
再具体点来说,就是提升 I/O 的利用率和 CPU 的利用率
我们需要解决 CPU 和 I/O 设备综合利用率的问题
CPU 密集型的计算场景
线程的数量一般会设置为“CPU 核数 +1”
I/O 密集型计算场景
最佳线程数 =CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)]
线程安全的局部变量
每个方法在调用栈里都有自己的独立空间,称为栈帧,
每个栈帧里都有对应方法需要的参数和返回地址
每个栈帧里都有对应方法需要的参数和返回地址
栈帧和方法是同生共死的
局部变量就是放到了调用栈里
每个线程都有自己独立的调用栈
线程封闭:仅在单线程内访问数据
例如从数据库连接池里获取的连接 Connection
递归与栈溢出
原因分析
每调用一个方法就会在栈上创建一个栈帧
递归调用的特点是每递归一次,就要创建一个新的栈帧,而且还要保留之前的环境(栈帧),直到遇到结束条件
栈的大小不是无限的,所以递归调用次数过多的话就会导致栈溢出
所有的递归算法都可以用非递归算法实现?
如何用面向对象思想写好并发程序
封装共享变量
将属性和实现细节封装在对象内部,外界对象只能通过目标对象提供的公共方法来间接访问这些内部属性
将共享变量作为对象属性封装在内部,对所有公共方法制定并发访问策略
对于这些不会发生变化的共享变量,建议你用 final 关键字来修饰
识别共享变量间的约束条件
一定要识别出所有共享变量之间的约束条件,
如果约束条件识别不足,很可能导致制定的并发访问策略南辕北辙
如果约束条件识别不足,很可能导致制定的并发访问策略南辕北辙
一定要特别注意竞态条件
制定并发访问策略
三件事
避免共享:避免共享的技术主要是利于线程本地存储以及为每个任务分配独立的线程
不变模式:这个在 Java 领域应用的很少,但在其他领域却有着广泛的应用,
例如 Actor 模式、CSP 模式以及函数式编程的基础都是不变模式
例如 Actor 模式、CSP 模式以及函数式编程的基础都是不变模式
管程及其他同步工具:Java 领域万能的解决方案是管程,
但是对于很多特定场景,使用 Java 并发包提供的读写锁、并发容器等同步工具会更好
但是对于很多特定场景,使用 Java 并发包提供的读写锁、并发容器等同步工具会更好
三条原则
优先使用成熟的工具类
迫不得已时才使用低级的同步原语
避免过早优化:安全第一,并发程序首先要保证安全,出现性能瓶颈后再优化
理论基础模块总结
用锁的最佳实践
一个合理的受保护资源与锁之间的关联关系应该是 N:1
锁,应是私有的、不可变的、不可重用的
// 普通对象锁
private final Object lock = new Object();
// 静态对象锁
private static final Object lock = new Object();
private final Object lock = new Object();
// 静态对象锁
private static final Object lock = new Object();
锁的性能要看场景
竞态条件需要格外关注
方法调用是先计算参数
logger.debug("The var1:" +
var1 + ", var2:" + var2);
var1 + ", var2:" + var2);
如果日志级别设置为 INFO,虽然这行代码不会写日志,
但是会计算"The var1:" + var1 + ", var2:" + var2的值,
因为方法调用前会先计算参数
但是会计算"The var1:" + var1 + ", var2:" + var2的值,
因为方法调用前会先计算参数
使用{}占位符是写日志的一个良好习惯
InterruptedException 异常处理需小心
在触发 InterruptedException 异常的同时,JVM 会同时把线程的中断标志位清除
理论值 or 经验值
经验值为“最佳线程 =2 * CPU 的核数 + 1”
最佳线程数最终还是靠压测来确定的,
实际工作中大家面临的系统,“I/O 耗时 / CPU 耗时”往往都大于 1,
所以基本上都是在这个初始值的基础上增加
实际工作中大家面临的系统,“I/O 耗时 / CPU 耗时”往往都大于 1,
所以基本上都是在这个初始值的基础上增加
一般来讲,随着线程数的增加,吞吐量会增加,延迟也会缓慢增加;
但是当线程数增加到一定程度,吞吐量就会开始下降,延迟会迅速增加。
这个时候基本上就是线程能够设置的最大值了
但是当线程数增加到一定程度,吞吐量就会开始下降,延迟会迅速增加。
这个时候基本上就是线程能够设置的最大值了
不同的 I/O 模型对最佳线程数的影响非常大
例如大名鼎鼎的 Nginx 用的是非阻塞 I/O,
采用的是多进程单线程结构,
Nginx 本来是一个 I/O 密集型系统,
但是最佳进程数设置的却是 CPU 的核数,
完全参考的是 CPU 密集型的算法
例如大名鼎鼎的 Nginx 用的是非阻塞 I/O,
采用的是多进程单线程结构,
Nginx 本来是一个 I/O 密集型系统,
但是最佳进程数设置的却是 CPU 的核数,
完全参考的是 CPU 密集型的算法
Lock和Condition
隐藏在并发包中的管程
Java SDK 并发包通过 Lock 和 Condition 两个接口来实现管程,
其中 Lock 用于解决互斥问题,Condition 用于解决同步问题
其中 Lock 用于解决互斥问题,Condition 用于解决同步问题
破坏死锁中的不可抢占条件的方案
能够响应中断
支持超时
非阻塞地获取锁
Lock接口
// 支持中断的 API
void lockInterruptibly()
throws InterruptedException;
// 支持超时的 API
boolean tryLock(long time, TimeUnit unit)
throws InterruptedException;
// 支持非阻塞获取锁的 API
boolean tryLock();
void lockInterruptibly()
throws InterruptedException;
// 支持超时的 API
boolean tryLock(long time, TimeUnit unit)
throws InterruptedException;
// 支持非阻塞获取锁的 API
boolean tryLock();
Lock如何保证可见性
利用了 volatile 相关的 Happens-Before 规则
Java SDK 里面的 ReentrantLock,内部持有一个 volatile 的成员变量 state,
加锁或解锁的时候,会读写 state 的值
加锁或解锁的时候,会读写 state 的值
顺序性规则:对于线程 T1,value+=1 Happens-Before 释放锁的操作 unlock();
volatile 变量规则:由于 state = 1 会先读取 state,所以线程 T1 的 unlock() 操作 Happens-Before 线程 T2 的 lock() 操作;
传递性规则:线程 T1 的 value+=1 Happens-Before 线程 T2 的 lock() 操作
volatile 变量规则:由于 state = 1 会先读取 state,所以线程 T1 的 unlock() 操作 Happens-Before 线程 T2 的 lock() 操作;
传递性规则:线程 T1 的 value+=1 Happens-Before 线程 T2 的 lock() 操作
可重入锁-ReentrantLock
指的是线程可以重复获取同一把锁
可重入函数
指的是多个线程可以同时调用该函数,每个线程都能得到正确结果
同时在一个线程内支持线程切换,无论被切换多少次,结果都是正确的
线程安全
公平锁与非公平锁
// 无参构造函数:默认非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
// 根据公平策略参数创建锁
public ReentrantLock(boolean fair){
sync = fair ? new FairSync()
: new NonfairSync();
}
public ReentrantLock() {
sync = new NonfairSync();
}
// 根据公平策略参数创建锁
public ReentrantLock(boolean fair){
sync = fair ? new FairSync()
: new NonfairSync();
}
用锁的最佳实践
永远只在更新对象的成员变量时加锁
永远只在访问可变的成员变量时加锁
永远不在调用其他对象的方法时加锁
因为调用其他对象的方法,实在是太不安全了,
也许“其他”方法里面有线程 sleep() 的调用,
也可能会有奇慢无比的 I/O 操作,这些都会严重影响性能。
更可怕的是,“其他”类的方法可能也会加锁,然后双重加锁就可能导致死锁
也许“其他”方法里面有线程 sleep() 的调用,
也可能会有奇慢无比的 I/O 操作,这些都会严重影响性能。
更可怕的是,“其他”类的方法可能也会加锁,然后双重加锁就可能导致死锁
Dubbo如何用管程实现异步转同步
Condition 实现了管程模型里面的条件变量
利用两个条件变量快速实现阻塞队列
final Lock lock =
new ReentrantLock();
// 条件变量:队列不满
final Condition notFull =
lock.newCondition();
// 条件变量:队列不空
final Condition notEmpty =
lock.newCondition();
new ReentrantLock();
// 条件变量:队列不满
final Condition notFull =
lock.newCondition();
// 条件变量:队列不空
final Condition notEmpty =
lock.newCondition();
Lock 和 Condition 实现的管程,
线程等待和通知需要调用 await()、signal()、signalAll()
类似于synchronized中的wait()、notify()、notifyAll()
线程等待和通知需要调用 await()、signal()、signalAll()
类似于synchronized中的wait()、notify()、notifyAll()
同步与异步
通俗点来讲就是调用方是否需要等待结果,
如果需要等待结果,就是同步;
如果不需要等待结果,就是异步
如果需要等待结果,就是同步;
如果不需要等待结果,就是异步
异步的实现方法
调用方创建一个子线程,在子线程中执行方法调用,这种调用我们称为异步调用
方法实现的时候,创建一个新的线程执行主要逻辑,主线程直接 return,这种方法我们一般称为异步方法
Dubbo 源码分析
异步的场景
TCP 协议本身就是异步的
常用到的 RPC 调用,在 TCP 协议层面,
发送完 RPC 请求后,线程是不会等待 RPC 的响应结果的
发送完 RPC 请求后,线程是不会等待 RPC 的响应结果的
DefaultFuture.get()
while,编程范式
Semaphore:如何快速实现一个限流器
信号量模型
一个计数器,一个等待队列,三个方法
init():设置计数器的初始值
down():计数器的值减 1;如果此时计数器的值小于 0,则当前线程将被阻塞,否则当前线程可以继续执行
up():计数器的值加 1;如果此时计数器的值小于或者等于 0,则唤醒等待队列中的一个线程,并将其从等待队列中移除
down()、up() 这两个操作历史上最早称为 P 操作和 V 操作,所以信号量模型也被称为 PV 原语
Semaphore
acquire()
release()
限流器的实现
Semaphore 可以允许多个线程访问一个临界区
对象池
N个对象集合
初始化信号量的计数器为N
限制不允许多于N个线程同时进入临界区
ReadWriteLock:如何快速实现一个完备的缓存
读多写少场景
读写锁的三条基本原则
允许多个线程同时读共享变量
只允许一个线程写共享变量
如果一个写线程正在执行写操作,此时禁止读线程读共享变量
获取写锁的前提是读锁和写锁均未被占用
快速实现一个缓存
ReentrantReadWriteLock
final Map<K, V> m =
new HashMap<>();
final ReadWriteLock rwl =
new ReentrantReadWriteLock();
// 读锁
final Lock r = rwl.readLock();
// 写锁
final Lock w = rwl.writeLock();
new HashMap<>();
final ReadWriteLock rwl =
new ReentrantReadWriteLock();
// 读锁
final Lock r = rwl.readLock();
// 写锁
final Lock w = rwl.writeLock();
实现缓存的按需加载
当获取不到缓存时,从数据库获取
// 读锁查缓存
// 判断缓存为null
// 写锁
// 然后再次检查缓存是否为null
// 查数据库并设置
// 判断缓存为null
// 写锁
// 然后再次检查缓存是否为null
// 查数据库并设置
锁的升级和降级
升级
先是获取读锁,然后再升级为写锁
ReadWriteLock 并不支持这种升级
降级
支持
只有写锁支持条件变量,读锁是不支持条件变量的
缓存的数据同步问题
超时机制
StampedLock:有没有比读写锁更快的锁
StampedLock 支持的三种锁模式
java1.8引入
java1.8引入
写锁
悲观读锁
乐观读
写锁、悲观读锁的语义和 ReadWriteLock 的写锁、读锁的语义非常类似
StampedLock 里的写锁和悲观读锁加锁成功之后,都会返回一个 stamp;
然后解锁的时候,需要传入这个 stamp
然后解锁的时候,需要传入这个 stamp
乐观读
“乐观读”这个词,而不是“乐观读锁”,是要提醒你,乐观读这个操作是无锁的
StampedLock 提供的乐观读,是允许一个线程获取写锁的,也就是说不是所有的写操作都被阻塞
tryOptimisticRead(),获取乐观读,返回stamp
validate(stamp),检验stamp是否被改动(期间是否存在写操作)
如果存在写操作,升级为悲观读锁
和数据库的乐观锁有异曲同工之妙
StampedLock 使用注意事项
StampedLock 不支持重入
StampedLock 的悲观读锁、写锁都不支持条件变量
如果线程阻塞在 StampedLock 的 readLock() 或者 writeLock() 上时,
此时调用该阻塞线程的 interrupt() 方法,会导致 CPU 飙升
此时调用该阻塞线程的 interrupt() 方法,会导致 CPU 飙升
使用 StampedLock 一定不要调用中断操作,
如果需要支持中断功能,
一定使用可中断的悲观读锁 readLockInterruptibly() 和写锁 writeLockInterruptibly()
如果需要支持中断功能,
一定使用可中断的悲观读锁 readLockInterruptibly() 和写锁 writeLockInterruptibly()
CountDownLatch和CyclicBarrier:如何让多线程步调一致
例子:对账流程
getPOrders()/getDOrders(),查询订单和派送单
check(),对账
优化一:getPOrders()/getDOrders()并行
Thread
new Thread()
join()
优化二:使用线程池
join()方法失效
CountDownLatch
CountDownLatch latch =
new CountDownLatch(2);
new CountDownLatch(2);
latch.countDown();
// 等待两个查询操作结束
latch.await();
latch.await();
优化三:将两次查询和对账并行
对账的时候,可以同时查询下一批数据
查询相当于数据的生产者,对账是消费者
创建两个队列分别存储订单和派送单
T1(查询订单)和T2(查询psd)要保持步调一致,
获得数据后,告知T3(对账)
获得数据后,告知T3(对账)
难点
一个是线程 T1 和 T2 要做到步调一致
另一个是要能够通知到线程 T3
依然可以使用计数器来实现,但有更优雅可靠的实现
用 CyclicBarrier 实现线程同步
final CyclicBarrier barrier =
new CyclicBarrier(2, ()->{
executor.execute(()->check());
});
new CyclicBarrier(2, ()->{
executor.execute(()->check());
});
// 等待
barrier.await();
barrier.await();
总结
CountDownLatch 主要用来解决一个线程等待多个线程的场景
CyclicBarrier 是一组线程之间互相等待
CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过
CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。
除此之外,CyclicBarrier 还可以设置回调函数,可以说是功能丰富
除此之外,CyclicBarrier 还可以设置回调函数,可以说是功能丰富
并发容器:都有哪些“坑”
同步容器:synchronized
Colletions类中的线程安全包装方法
Vector
Stack
HashTable
并发容器
List
CopyOnWriteArrayList
写的时候会将共享变量新复制一份出来
CopyOnWriteArrayList 仅适用于写操作非常少的场景
而且能够容忍读写的短暂不一致
迭代器是只读的,不支持增删改
写操作互斥
Map
ConcurrentHashMap
key、value不能为null
ConcurrentSkipListMap
SkipList 本身就是一种数据结构,中文一般都翻译为“跳表”
有序
key、value不能为null
跳表插入、删除、查询操作平均的时间复杂度是 O(log n),
理论上和并发线程数没有关系,并发度高时,性能更优
理论上和并发线程数没有关系,并发度高时,性能更优
Set
CopyOnWriteArraySet
ConcurrentSkipListSet
Queue
单端阻塞
ArrayBlockingQueue
持有数组队列
LinkedBlockingQueue
持有链表队列
SynchronousQueue
不持有队列,此时生产者线程的入队操作必须等待消费者线程的出队操作
LinkedTransferQueue
融合 LinkedBlockingQueue 和 SynchronousQueue 的功能,
性能比 LinkedBlockingQueue 更好
性能比 LinkedBlockingQueue 更好
PriorityBlockingQueue
支持按照优先级出队
DelayQueue
支持延时出队
双端阻塞
LinkedBlockingDeque
单端非阻塞
ConcurrentLinkedQueue
双端非阻塞
ConcurrentLinkedDeque
注意事项
使用队列时,需要格外注意队列是否支持有界(所谓有界指的是内部的队列是否有容量限制)
一般都不建议使用无界的队列,因为数据量大了之后很容易导致 OOM
上述中只有ArrayBlockingQueue 和 LinkedBlockingQueue 是支持有界的,
所以在使用其他无界队列时,一定要充分考虑是否存在导致 OOM 的隐患
所以在使用其他无界队列时,一定要充分考虑是否存在导致 OOM 的隐患
组合操作需要注意竞态条件问题
容易被忽视的“坑”是用迭代器遍历容器
Java 容器的快速失败机制(Fail-Fast)
原子类:无锁工具类的典范
相对互斥锁方案,最大的好处就是性能
加锁、解锁操作本身就消耗性能
同时拿不到锁的线程还会进入阻塞状态,
进而触发线程切换,线程切换对性能的消耗也很大
进而触发线程切换,线程切换对性能的消耗也很大
无锁方案的实现原理:CAS+自旋
CPU 为了解决并发问题,提供了 CAS 指令
(CAS,全称是 Compare And Swap,即“比较并交换”)
(CAS,全称是 Compare And Swap,即“比较并交换”)
CAS 指令包含 3 个参数:共享变量的内存地址 A、用于比较的值 B 和共享变量的新值 C;
并且只有当内存中地址 A 处的值等于 B 时,才能将内存中地址 A 处的值更新为新值 C
并且只有当内存中地址 A 处的值等于 B 时,才能将内存中地址 A 处的值更新为新值 C
作为一条 CPU 指令,CAS 指令本身是能够保证原子性的
只有当目前 count 的值和期望值 expect 相等时,才会将 count 更新为 newValue
所谓自旋,其实就是循环尝试
while(count != cas(count,newValue))
ABA问题
cas(count,newValue) 返回的值等于count,
但不能够认为 count 的值没有被其他线程更新过
但不能够认为 count 的值没有被其他线程更新过
大多数情况下我们并不关心 ABA 问题,例如数值的原子递增
原子化的更新对象很可能就需要关心 ABA 问题,因为两个 A 虽然相等,但是第二个 A 的属性可能已经发生变化
使用 CAS 方案的时候,一定要考虑ABA问题
Java 如何实现原子化的 count += 1
AtomicLong.getAndIncrement()
转调unsafe.getAndAddLong(
Object o, long offset, long delta)
Object o, long offset, long delta)
final long getAndIncrement() {
return unsafe.getAndAddLong(
this, valueOffset, 1L);
}
return unsafe.getAndAddLong(
this, valueOffset, 1L);
}
public final long getAndAddLong(
Object o, long offset, long delta){
long v;
do {
// 读取内存中的值
v = getLongVolatile(o, offset);
} while (!compareAndSwapLong(
o, offset, v, v + delta));
return v;
}
Object o, long offset, long delta){
long v;
do {
// 读取内存中的值
v = getLongVolatile(o, offset);
} while (!compareAndSwapLong(
o, offset, v, v + delta));
return v;
}
do {
// 获取当前值
oldV = xxxx;
// 根据当前值计算新值
newV = ...oldV...
}while(!compareAndSet(oldV,newV);
// 获取当前值
oldV = xxxx;
// 根据当前值计算新值
newV = ...oldV...
}while(!compareAndSet(oldV,newV);
原子类概览
基本数据类型
AtomicBoolean
AtomicInteger
AtomicLong
引用类型
AtomicReference
需要重点关注 ABA 问题
AtomicStampedReference
可以解决 ABA 问题
AtomicMarkableReference
可以解决 ABA 问题
数组
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
原子化地更新数组里面的每一个元素
对象属性更新器
AtomicIntegerFieldUpdater
AtomicLongFieldUpdater
AtomicReferenceFieldUpdaer
原子化地更新对象的属性
利用反射机制实现
利用反射机制实现
对象属性必须是 volatile 类型的,只有这样才能保证可见性
如果对象属性不是 volatile 类型的,newUpdater() 方法会抛出 IllegalArgumentException 这个运行时异常
如果对象属性不是 volatile 类型的,newUpdater() 方法会抛出 IllegalArgumentException 这个运行时异常
累加器
DoubleAccumulator
AoubleAdder
LongAccumulator
LongAdder
仅仅用来执行累加操作,相比原子化的基本数据类型,速度更快,
但是不支持 compareAndSet() 方法
但是不支持 compareAndSet() 方法
总结
首先性能好,其次是基本不会出现死锁问题
(但可能出现饥饿和活锁问题,因为自旋会反复重试)
(但可能出现饥饿和活锁问题,因为自旋会反复重试)
原子类的方法都是针对一个共享变量
原子类虽好,但使用要慎之又慎
Executor与线程池:如何创建正确的线程池
线程池是一种生产者 - 消费者模式
为什么不采用一般的池化资源的设计方法
Thread 对象的所有方法,都不存在类似 execute(Runnable target) 这样的公共方法
如何使用 Java 中的线程池
ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
ThreadPoolExecutor
corePoolSize:表示线程池保有的最小线程数
maximumPoolSize:表示线程池创建的最大线程数
keepAliveTime & unit:如果一个线程空闲了keepAliveTime & unit这么久,
而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收
而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收
workQueue:工作队列
threadFactory:通过这个参数你可以自定义如何创建线程,
例如你可以给线程指定一个有意义的名字
例如你可以给线程指定一个有意义的名字
handler:通过这个参数你可以自定义任务的拒绝策略,
如果线程池中所有的线程都在忙碌,
并且工作队列也满了(前提是工作队列是有界队列),
那么此时提交任务,线程池就会拒绝接收
如果线程池中所有的线程都在忙碌,
并且工作队列也满了(前提是工作队列是有界队列),
那么此时提交任务,线程池就会拒绝接收
CallerRunsPolicy:提交任务的线程自己去执行该任务
AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException
DiscardPolicy:直接丢弃任务,没有任何异常抛出
DiscardOldestPolicy:丢弃最老的任务,
其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列
其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列
Java 在 1.6 版本还增加了 allowCoreThreadTimeOut(boolean value) 方法,
它可以让所有线程都支持超时,这意味着如果项目很闲,就会将项目组的成员都撤走
它可以让所有线程都支持超时,这意味着如果项目很闲,就会将项目组的成员都撤走
使用线程池要注意些什么
不建议使用 Executors
最重要的原因是:Executors 提供的很多方法默认使用的都是无界的 LinkedBlockingQueue
强烈建议使用有界队列
默认拒绝策略要慎重使用
线程池默认的拒绝策略会 throw RejectedExecutionException 这是个运行时异常,
对于运行时异常编译器并不强制 catch 它,所以开发人员很容易忽略
对于运行时异常编译器并不强制 catch 它,所以开发人员很容易忽略
注意异常处理的问题
通过 ThreadPoolExecutor 对象的 execute() 方法提交任务时,
如果任务在执行的过程中出现运行时异常,会导致执行任务的线程终止;
不过,最致命的是任务虽然异常了,但是你却获取不到任何通知
如果任务在执行的过程中出现运行时异常,会导致执行任务的线程终止;
不过,最致命的是任务虽然异常了,但是你却获取不到任何通知
虽然线程池提供了很多用于异常处理的方法,
但是最稳妥和简单的方案还是捕获所有异常并按需处理
但是最稳妥和简单的方案还是捕获所有异常并按需处理
Future:如何用多线程实现最优的“烧水泡茶”程序
如何获取任务执行结果
ThreadPoolExecutor 提供的 3 个 submit() 方法
// 提交 Runnable 任务
Future<?>
submit(Runnable task);
// 提交 Callable 任务
<T> Future<T>
submit(Callable<T> task);
// 提交 Runnable 任务及结果引用
<T> Future<T>
submit(Runnable task, T result);
Future<?>
submit(Runnable task);
// 提交 Callable 任务
<T> Future<T>
submit(Callable<T> task);
// 提交 Runnable 任务及结果引用
<T> Future<T>
submit(Runnable task, T result);
Future接口
// 取消任务
boolean cancel(
boolean mayInterruptIfRunning);
// 判断任务是否已取消
boolean isCancelled();
// 判断任务是否已结束
boolean isDone();
// 获得任务执行结果
get();
// 获得任务执行结果,支持超时
get(long timeout, TimeUnit unit);
boolean cancel(
boolean mayInterruptIfRunning);
// 判断任务是否已取消
boolean isCancelled();
// 判断任务是否已结束
boolean isDone();
// 获得任务执行结果
get();
// 获得任务执行结果,支持超时
get(long timeout, TimeUnit unit);
FutureTask
FutureTask 实现了 Runnable 和 Future 接口
FutureTask(Callable<V> callable);
FutureTask(Runnable runnable, V result);
FutureTask(Runnable runnable, V result);
由于实现了 Runnable 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以直接被 Thread 执行;
又因为实现了 Future 接口,所以也能用来获得任务的执行结果
又因为实现了 Future 接口,所以也能用来获得任务的执行结果
使用方式
将 FutureTask 对象提交给 ThreadPoolExecutor 去执行
FutureTask 对象直接被 Thread 执行
获取结果,通过FutureTask对象可以获取
实现最优的“烧水泡茶”程序
编写并发程序,首先要做的就是分工,
所谓分工指的是如何高效地拆解任务并分配给线程
所谓分工指的是如何高效地拆解任务并分配给线程
用两个线程 T1 和 T2 来完成烧水泡茶程序,
T1 负责洗水壶、烧开水、泡茶这三道工序,
T2 负责洗茶壶、洗茶杯、拿茶叶三道工序,
其中 T1 在执行泡茶这道工序时需要等待 T2 完成拿茶叶的工序
T1 负责洗水壶、烧开水、泡茶这三道工序,
T2 负责洗茶壶、洗茶杯、拿茶叶三道工序,
其中 T1 在执行泡茶这道工序时需要等待 T2 完成拿茶叶的工序
总结
任务之间有依赖关系,
比如当前任务依赖前一个任务的执行结果,
这种问题基本上都可以用 Future 来解决
比如当前任务依赖前一个任务的执行结果,
这种问题基本上都可以用 Future 来解决
在分析这种问题的过程中,
建议你用有向图描述一下任务之间的依赖关系,
同时将线程的分工也做好
建议你用有向图描述一下任务之间的依赖关系,
同时将线程的分工也做好
CompletableFuture:异步编程没那么难
CompletableFuture 的核心优势
无需手工维护线程
语义更清晰
代码更简练并且专注于业务逻辑
创建 CompletableFuture 对象
// 使用默认线程池
static CompletableFuture<Void> runAsync(Runnable runnable);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
// 可以指定线程池
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) ;
static CompletableFuture<Void> runAsync(Runnable runnable);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
// 可以指定线程池
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) ;
CompletionStage 接口
CompletionStage 接口可以清晰地描述任务之间的这种时序关系
描述串行关系
CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);
描述 AND 汇聚关系
CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);
描述 OR 汇聚关系
CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);
异常处理
虽然上面我们提到的 fn、consumer、action 它们的核心方法都不允许抛出可检查异常,
但是却无法限制它们抛出运行时异常
但是却无法限制它们抛出运行时异常
在异步编程时,很有必要对运行时异常进行处理
catch异常的方法
CompletionStage exceptionally(fn);
finally的方法
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);
总结
ReactiveX的发展(Java 语言的实现版本是 RxJava),
解决“回调地狱”的问题
解决“回调地狱”的问题
注意捕获异常
CompletionService:如何批量执行异步任务
利用 CompletionService 实现询价系统
解决先获取到的报价先保存到数据库的问题
创建 CompletionService
ExecutorCompletionService(Executor executor);
ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)
ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)
CompletionService 接口说明
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
// 如果阻塞队列是空的,那么调用 take() 方法的线程会被阻塞
Future<V> take() throws InterruptedException;
// 如果阻塞队列是空的,那么调用 poll() 方法会返回 null 值
Future<V> poll();
// 支持以超时的方式获取并移除阻塞队列头部的一个元素,
//如果等待了 timeout unit 时间,阻塞队列还是空的,那么该方法会返回 null 值
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
Future<V> submit(Runnable task, V result);
// 如果阻塞队列是空的,那么调用 take() 方法的线程会被阻塞
Future<V> take() throws InterruptedException;
// 如果阻塞队列是空的,那么调用 poll() 方法会返回 null 值
Future<V> poll();
// 支持以超时的方式获取并移除阻塞队列头部的一个元素,
//如果等待了 timeout unit 时间,阻塞队列还是空的,那么该方法会返回 null 值
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
实现 Dubbo 中的 Forking Cluster
Forking 的集群模式,支持并行地调用多个查询服务,
只要有一个成功返回结果,整个服务就可以返回了
只要有一个成功返回结果,整个服务就可以返回了
// 创建线程池
// 创建 CompletionService
// 用于保存 Future 对象
List<Future<Integer>> futures =new ArrayList<>(3);
// 提交异步任务,并保存 future 到 futures
// 获取最快返回的任务执行结果
Integer r = 0;
try {
// 只要有一个成功返回,则 break
for (int i = 0; i < 3; ++i) {
r = cs.take().get();
// 简单地通过判空来检查是否成功返回
if (r != null) {
break;
}
}
} finally {
// 取消所有任务
for(Future<Integer> f : futures)
f.cancel(true);
}
// 返回结果
return r;
// 创建 CompletionService
// 用于保存 Future 对象
List<Future<Integer>> futures =new ArrayList<>(3);
// 提交异步任务,并保存 future 到 futures
// 获取最快返回的任务执行结果
Integer r = 0;
try {
// 只要有一个成功返回,则 break
for (int i = 0; i < 3; ++i) {
r = cs.take().get();
// 简单地通过判空来检查是否成功返回
if (r != null) {
break;
}
}
} finally {
// 取消所有任务
for(Future<Integer> f : futures)
f.cancel(true);
}
// 返回结果
return r;
总结
当需要批量提交异步任务的时候建议使用 CompletionService
CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,
能够让批量异步任务的管理更简单
能够让批量异步任务的管理更简单
CompletionService 能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列
Fork/Join:单机版的MapReduce
对于简单的并行任务,你可以通过“线程池 +Future”的方案来解决;
如果任务之间有聚合关系,无论是 AND 聚合还是 OR 聚合,都可以通过 CompletableFuture 来解决;
而批量的并行任务,则可以通过 CompletionService 来解决
如果任务之间有聚合关系,无论是 AND 聚合还是 OR 聚合,都可以通过 CompletableFuture 来解决;
而批量的并行任务,则可以通过 CompletionService 来解决
Fork/Join 的使用
Fork/Join 是一个并行计算的框架,主要就是用来支持分治任务模型的
Fork 对应的是分治任务模型里的任务分解,
Join 对应的是结果合并
Join 对应的是结果合并
Fork/Join 计算框架主要包含两部分,
一部分是分治任务的线程池 ForkJoinPool,
另一部分是分治任务 ForkJoinTask
一部分是分治任务的线程池 ForkJoinPool,
另一部分是分治任务 ForkJoinTask
ForkJoinTask
RecursiveAction
RecursiveTask
注意事项
compute方法需要合并结果时,
推荐使用task2.fork() ->task1.compute() ;task2.join()
也可以(慎用,注意调用顺序)task1.fork();task2.fork() -> task2.join();task1.join();
推荐使用task2.fork() ->task1.compute() ;task2.join()
也可以(慎用,注意调用顺序)task1.fork();task2.fork() -> task2.join();task1.join();
ForkJoinPool 工作原理
生产者 - 消费者模式
多个任务队列
当我们通过 ForkJoinPool 的 invoke() 或者 submit() 方法提交任务时,
ForkJoinPool 根据一定的路由规则把任务提交到一个任务队列中,
如果任务在执行过程中会创建出子任务,
那么子任务会提交到工作线程对应的任务队列中
ForkJoinPool 根据一定的路由规则把任务提交到一个任务队列中,
如果任务在执行过程中会创建出子任务,
那么子任务会提交到工作线程对应的任务队列中
支持一种叫做“任务窃取”的机制
如果工作线程空闲了,
那它可以“窃取”其他工作任务队列里的任务
那它可以“窃取”其他工作任务队列里的任务
任务队列采用的是双端队列,
工作线程正常获取任务和“窃取任务”分别是从任务队列不同的端消费,
这样能避免很多不必要的数据竞争
工作线程正常获取任务和“窃取任务”分别是从任务队列不同的端消费,
这样能避免很多不必要的数据竞争
模拟 MapReduce 统计单词数量
分治、递归,
注意控制基准条件,
避免调用次数过多,
导致栈溢出
注意控制基准条件,
避免调用次数过多,
导致栈溢出
总结
Fork/Join 并行计算框架主要解决的是分治任务
compute方法(递归),注意栈溢出的问题
Java 1.8 提供的 Stream API 里面并行流也是以 ForkJoinPool 为基础的。
默认情况下所有的并行流计算都共享一个 ForkJoinPool
默认情况下所有的并行流计算都共享一个 ForkJoinPool
建议用不同的 ForkJoinPool 执行不同类型的计算任务
并发工具类模块热点问题
while(true) ,注意退出条件
活锁,随机等待一小段时间
相对signal(),signalAll() 总让人省心
Semaphore允许多个线程同时访问临界区,需要锁中锁
锁的申请和释放要成对出现,
最佳实践,就是使用try{}finally{}
最佳实践,就是使用try{}finally{}
回调总要关心执行线程是谁,
执行回调的动作往往是同步的,
思考回调的方法是否要异步化
执行回调的动作往往是同步的,
思考回调的方法是否要异步化
执行回调函数的线程是哪一个?这个在多线程场景下非常重要。
因为不同线程 ThreadLocal 里的数据是不同的,
有些框架比如 Spring 就用 ThreadLocal 来管理事务,
如果不清楚回调函数用的是哪个线程,
很可能会导致错误的事务管理,并最终导致数据不一致
因为不同线程 ThreadLocal 里的数据是不同的,
有些框架比如 Spring 就用 ThreadLocal 来管理事务,
如果不清楚回调函数用的是哪个线程,
很可能会导致错误的事务管理,并最终导致数据不一致
共享线程池:有福同享就要有难同当,
合理隔离线程池
合理隔离线程池
线上问题定位的利器:线程栈 dump,
给线程合理命名
给线程合理命名
Immutability模式:如何利用不变性解决并发问题
解决并发问题,其实最简单的办法就是让共享变量只有读操作,而没有写操作
不变性(Immutability)模式
所谓不变性,简单来讲,就是对象一旦被创建之后,状态就不再发生变化
快速实现具备不可变性的类
类final
属性final
属性类型是不变性类
只允许只读方法
或者写方法返回新对象
或者写方法返回新对象
享元模式(Flyweight Pattern)
利用享元模式可以减少创建对象的数量,从而减少内存占用
Java 语言里面 Long、Integer、Short、Byte 等这些基本数据类型的包装类都用到了享元模式
只缓存范围:-128,127
只缓存范围:-128,127
享元模式本质上其实就是一个对象池
创建之前,首先去对象池里看看是不是存在;
如果已经存在,就利用对象池里的对象;
如果不存在,就会新创建一个对象,并且把这个新创建出来的对象放进对象池里
如果已经存在,就利用对象池里的对象;
如果不存在,就会新创建一个对象,并且把这个新创建出来的对象放进对象池里
使用 Immutability 模式的注意事项
对象的所有属性都是 final 的,并不能保证不可变性
不可变对象也需要正确发布
在使用 Immutability 模式的时候一定要确认保持不变性的边界在哪里,
是否要求属性对象也具备不可变性
是否要求属性对象也具备不可变性
还有一种更简单的不变性对象,那就是无状态
Copy-on-Write模式
写时复制
读多写少的场景
使用 Copy-on-Write 更多地体现的是一种延时策略,
只有在真正需要复制的时候才复制,而不是提前复制好
只有在真正需要复制的时候才复制,而不是提前复制好
Copy-on-Write 还支持按需复制,所以 Copy-on-Write 在操作系统领域是能够提升性能的
相比较而言,Java 提供的 Copy-on-Write 容器,由于在修改的同时会复制整个容器,
所以在提升读操作性能的同时,是以内存复制为代价的
所以在提升读操作性能的同时,是以内存复制为代价的
Copy-on-Write思想使用场景
最大应用领域:函数式编程领域
操作系统领域(父子进程fork)
java容器
Docker容器镜像设计
git设计思想
线程本地存储模式:没有共享,就没有伤害
ThreadLocal 的使用方法
static final AtomicLong nextId=new AtomicLong(0);
// 定义 ThreadLocal 变量
static final ThreadLocal<Long> tl=ThreadLocal.withInitial(
()->nextId.getAndIncrement());
// 此方法会为每个线程分配一个唯一的 Id
static long get(){
return tl.get();
}
// 定义 ThreadLocal 变量
static final ThreadLocal<Long> tl=ThreadLocal.withInitial(
()->nextId.getAndIncrement());
// 此方法会为每个线程分配一个唯一的 Id
static long get(){
return tl.get();
}
如何安全的使用一个线程不安全的类
可以通过ThreadLocal,让对象线程私有
ThreadLocal 的工作原理
Thread 这个类内部有一个私有属性 threadLocals,
其类型就是 ThreadLocalMap,
ThreadLocalMap 的 Key 是 ThreadLocal
其类型就是 ThreadLocalMap,
ThreadLocalMap 的 Key 是 ThreadLocal
ThreadLocal是代理类
ThreadLocal 与内存泄露
在线程池中使用 ThreadLocal可能导致内存泄露
线程池中线程的存活时间太长,往往都是和程序同生共死的,
这就意味着 Thread 持有的 ThreadLocalMap 一直都不会被回收
这就意味着 Thread 持有的 ThreadLocalMap 一直都不会被回收
再加上 ThreadLocalMap 中的 Entry 对 ThreadLocal 是弱引用(WeakReference),
所以只要 ThreadLocal 结束了自己的生命周期是可以被回收掉的
所以只要 ThreadLocal 结束了自己的生命周期是可以被回收掉的
但是 Entry 中的 Value 却是被 Entry 强引用的,
所以即便 Value 的生命周期结束了,
Value 也是无法被回收的,从而导致内存泄露
所以即便 Value 的生命周期结束了,
Value 也是无法被回收的,从而导致内存泄露
try{}finally{}方案,手动释放资源
InheritableThreadLocal 与继承性
线程池中慎用,警惕内存溢出风险
Guarded Suspension模式:等待唤醒机制的规范实现
所谓 Guarded Suspension,直译过来就是“保护性地暂停”
Guarded Suspension 模式也常被称作 Guarded Wait 模式、
Spin Lock 模式(因为使用了 while 循环去等待)
Spin Lock 模式(因为使用了 while 循环去等待)
简单Guarded Suspension 模式
管程的一个经典用法
ReentrantLock
get() 方法通过条件变量的 await() 方法实现等待,
onChanged() 方法通过条件变量的 signalAll() 方法实现唤醒功能
onChanged() 方法通过条件变量的 signalAll() 方法实现唤醒功能
扩展 Guarded Suspension 模式
GuardedObject 内部维护了一个 Map,
其 Key 是 MQ 消息 id,而 Value 是 GuardedObject 对象实例
其 Key 是 MQ 消息 id,而 Value 是 GuardedObject 对象实例
同时增加了静态方法 create() 和 fireEvent();
create() 方法用来创建一个 GuardedObject 对象实例,并根据 key 值将其加入到 Map 中,
而 fireEvent() 方法则是模拟的大堂经理根据包间找就餐人的逻辑
create() 方法用来创建一个 GuardedObject 对象实例,并根据 key 值将其加入到 Map 中,
而 fireEvent() 方法则是模拟的大堂经理根据包间找就餐人的逻辑
Balking模式:再谈线程安全的单例模式
当状态满足某个条件时,执行某个业务逻辑,其本质其实不过就是一个 if 而已,
放到多线程场景里,就是一种“多线程版本的 if”
放到多线程场景里,就是一种“多线程版本的 if”
这种“多线程版本的 if”的应用场景还是很多的,
所以也有人把它总结成了一种设计模式,叫做Balking 模式。
所以也有人把它总结成了一种设计模式,叫做Balking 模式。
Balking 模式的经典实现
synchronied
用 volatile 实现 Balking 模式
使用 volatile 的前提是对原子性没有要求
双重检查(Double Check)
Thread-Per-Message模式:最简单实用的分工方法
为每个任务分配一个独立的线程
难以应对高并发场景
频繁创建、销毁 Java 线程的成本有点高
而且无限制地创建线程还可能导致应用 OOM
OpenJDK 有个 Loom 项目
轻量级线程被叫做Fiber
Worker Thread模式:如何避免重复创建线程
Worker Thread 对应到现实世界里,其实指的就是车间里的工人
正确地创建线程池
用创建有界的队列来接收任务
在创建线程池时,清晰地指明拒绝策略
在实际工作中给线程赋予一个业务相关的名字
避免线程死锁
为不同的任务创建不同的线程池
提交到相同线程池中的任务一定是相互独立的,否则就一定要慎重
ThreadLocal 内存泄露问题
对于提交到线程池的任务,还要做好异常处理
两阶段终止模式:如何优雅地终止线程
普通线程
两阶段
发送终止指令
响应终止指令
interrupt() 方法和线程终止的标志位
线程池
shutdown()
线程池执行 shutdown() 后,就会拒绝接收新的任务,
但是会等待线程池中正在执行的任务和已经进入阻塞队列的任务都执行完之后才最终关闭线程池
但是会等待线程池中正在执行的任务和已经进入阻塞队列的任务都执行完之后才最终关闭线程池
shutdownNow()
线程池执行 shutdownNow() 后,会拒绝接收新的任务,
同时还会中断线程池中正在执行的任务,
已经进入阻塞队列的任务也被剥夺了执行的机会,
不过这些被剥夺执行机会的任务会作为 shutdownNow() 方法的返回值返回
同时还会中断线程池中正在执行的任务,
已经进入阻塞队列的任务也被剥夺了执行的机会,
不过这些被剥夺执行机会的任务会作为 shutdownNow() 方法的返回值返回
生产者-消费者模式:用流水线思想提高效率
生产者 - 消费者模式的优点
解耦
支持异步
并且能够平衡生产者和消费者的速度差异
支持批量执行以提升性能
批量执行任务
支持分阶段提交以提升性能
设计模式模块热点问题答疑
避免共享的设计模式
使用 Immutability 模式需要注意对象属性的不可变性
使用 Copy-on-Write 模式需要注意性能问题
使用线程本地存储模式需要注意异步执行问题
多线程版本 IF 的设计模式
Guarded Suspension 模式的经典实现是使用管程
Balking 模式最容易忽视的就是竞态条件问题
三种最简单的分工模式
Thread-Per-Message 模式在实现的时候需要注意是否存在线程的频繁创建、销毁以及是否可能导致 OOM
Worker Thread 模式的实现,需要注意潜在的线程死锁问题
生产者 - 消费者模式
优雅地终止线程:毒丸对象
两阶段终止模式
案例分析(一):高性能限流器Guava RateLimiter
经典限流算法:令牌桶算法
核心是要想通过限流器,必须拿到令牌。
也就是说,只要我们能够限制发放令牌的速率,
那么就能控制流速
也就是说,只要我们能够限制发放令牌的速率,
那么就能控制流速
1、令牌以固定的速率添加到令牌桶中,假设限流的速率是 r/ 秒,则令牌每 1/r 秒会添加一个;
2、假设令牌桶的容量是 b ,如果令牌桶已满,则新的令牌会被丢弃;
3、请求能够通过限流器的前提是令牌桶中有令牌。
2、假设令牌桶的容量是 b ,如果令牌桶已满,则新的令牌会被丢弃;
3、请求能够通过限流器的前提是令牌桶中有令牌。
b 其实是 burst 的简写,意义是限流器允许的最大突发流量
普通实现
生产者 - 消费者模式
一个生产者线程定时向阻塞队列中添加令牌,
而试图通过限流器的线程则作为消费者线程,
只有从阻塞队列中获取到令牌,才允许通过限流器
而试图通过限流器的线程则作为消费者线程,
只有从阻塞队列中获取到令牌,才允许通过限流器
高并发场景,而且系统压力已经临近极限了,此时这个实现就有问题
问题就出在定时器上,在高并发场景下,
当系统压力已经临近极限的时候,定时器的精度误差会非常大,
同时定时器本身会创建调度线程,也会对系统的性能产生影响
当系统压力已经临近极限的时候,定时器的精度误差会非常大,
同时定时器本身会创建调度线程,也会对系统的性能产生影响
Guava 如何实现令牌桶算法
只需要记录一个下一令牌产生的时间,
并动态更新它,就能够轻松完成限流功能
并动态更新它,就能够轻松完成限流功能
burst(最大突发流量/令牌桶的容量)为1的情况
burst(最大突发流量/令牌桶的容量)为n的情况
总结
经典的限流算法有两个,
一个是令牌桶算法(Token Bucket),
另一个是漏桶算法(Leaky Bucket)
一个是令牌桶算法(Token Bucket),
另一个是漏桶算法(Leaky Bucket)
案例分析(二):高性能网络应用框架Netty
网络编程性能的瓶颈
使用 BIO 模型,一般都会为每个 socket 分配一个独立的线程
Java 里还提供了非阻塞式(NIO)API,利用非阻塞式 API 就能够实现一个线程处理多个连接
Reactor 模式
Reactor模式类结构图
Reactor 模式的核心自然是Reactor 这个类,
其中 register_handler() 和 remove_handler() 这两个方法可以注册和删除一个事件处理器;
handle_events() 方式是核心,也是 Reactor 模式的发动机,这个方法的核心逻辑如下:
首先通过同步事件多路选择器提供的 select() 方法监听网络事件,
当有网络事件就绪后,就遍历事件处理器来处理该网络事件
其中 register_handler() 和 remove_handler() 这两个方法可以注册和删除一个事件处理器;
handle_events() 方式是核心,也是 Reactor 模式的发动机,这个方法的核心逻辑如下:
首先通过同步事件多路选择器提供的 select() 方法监听网络事件,
当有网络事件就绪后,就遍历事件处理器来处理该网络事件
Netty 中的线程模型
Netty 中最核心的概念是事件循环(EventLoop),
其实也就是 Reactor 模式中的 Reactor,
负责监听网络事件并调用事件处理器进行处理
其实也就是 Reactor 模式中的 Reactor,
负责监听网络事件并调用事件处理器进行处理
还有一个核心概念是EventLoopGroup
处理 TCP 连接请求和读写请求是通过两个不同的 socket 完成的
在 Netty 中,bossGroup 就用来处理连接请求的,
而 workerGroup 是用来处理读写请求的
而 workerGroup 是用来处理读写请求的
每个网络连接都关联到了一个线程上,这样做的好处是:
对于一个网络连接,读写操作都是单线程执行的,从而避免了并发程序的各种问题
对于一个网络连接,读写操作都是单线程执行的,从而避免了并发程序的各种问题
案例分析(三):高性能队列Disruptor
Disruptor 是一款高性能的有界内存队列
内存分配更加合理,使用 RingBuffer 数据结构,数组元素在初始化时一次性全部创建,提升缓存命中率;对象循环利用,避免频繁 GC。
能够避免伪共享,提升缓存利用率。
采用无锁算法,避免频繁加锁、解锁的性能消耗。
支持批量消费,消费者可以无锁方式消费多个消息。
能够避免伪共享,提升缓存利用率。
采用无锁算法,避免频繁加锁、解锁的性能消耗。
支持批量消费,消费者可以无锁方式消费多个消息。
Disruptor 如何使用
在 Disruptor 中,生产者生产的对象(也就是消费者消费的对象)称为 Event,
使用 Disruptor 必须自定义 Event,例如示例代码的自定义 Event 是 LongEvent;
使用 Disruptor 必须自定义 Event,例如示例代码的自定义 Event 是 LongEvent;
构建 Disruptor 对象除了要指定队列大小外,还需要传入一个 EventFactory,
示例代码中传入的是LongEvent::new;
示例代码中传入的是LongEvent::new;
消费 Disruptor 中的 Event 需要通过 handleEventsWith() 方法注册一个事件处理器,
发布 Event 则需要通过 publishEvent() 方法
发布 Event 则需要通过 publishEvent() 方法
RingBuffer 如何提升性能
Java SDK 中 ArrayBlockingQueue 使用数组作为底层的数据存储,
而 Disruptor 是使用RingBuffer作为数据存储
而 Disruptor 是使用RingBuffer作为数据存储
RingBuffer 本质上也是数组
程序的局部性原理指的是在一段时间内程序的执行会限定在一个局部范围内
时间局部性指的是程序中的某条指令一旦被执行,不久之后这条指令很可能再次被执行;
如果某条数据被访问,不久之后这条数据很可能再次被访问
如果某条数据被访问,不久之后这条数据很可能再次被访问
空间局部性是指某块内存一旦被访问,不久之后这块内存附近的内存也很可能被访问
CPU 从内存中加载数据 X 时,会将数据 X 缓存在高速缓存 Cache 中,
实际上 CPU 缓存 X 的同时,还缓存了 X 周围的数据,
因为根据程序具备局部性原理,X 周围的数据也很有可能被访问
实际上 CPU 缓存 X 的同时,还缓存了 X 周围的数据,
因为根据程序具备局部性原理,X 周围的数据也很有可能被访问
Disruptor 内部的 RingBuffer 也是用数组实现的,
但是这个数组中的所有元素在初始化时是一次性全部创建的,
所以这些元素的内存地址大概率是连续的
但是这个数组中的所有元素在初始化时是一次性全部创建的,
所以这些元素的内存地址大概率是连续的
消费者线程在消费的时候,是遵循空间局部性原理的,
消费完第 1 个元素,很快就会消费第 2 个元素;
当消费第 1 个元素 E1 的时候,CPU 会把内存中 E1 后面的数据也加载进 Cache,
如果 E1 和 E2 在内存中的地址是连续的,那么 E2 也就会被加载进 Cache 中,
然后当消费第 2 个元素的时候,由于 E2 已经在 Cache 中了,所以就不需要从内存中加载了,
这样就能大大提升性能
消费完第 1 个元素,很快就会消费第 2 个元素;
当消费第 1 个元素 E1 的时候,CPU 会把内存中 E1 后面的数据也加载进 Cache,
如果 E1 和 E2 在内存中的地址是连续的,那么 E2 也就会被加载进 Cache 中,
然后当消费第 2 个元素的时候,由于 E2 已经在 Cache 中了,所以就不需要从内存中加载了,
这样就能大大提升性能
除此之外,在 Disruptor 中,生产者线程通过 publishEvent() 发布 Event 的时候,
并不是创建一个新的 Event,而是通过 event.set() 方法修改 Event,
也就是说 RingBuffer 创建的 Event 是可以循环利用的,
这样还能避免频繁创建、删除 Event 导致的频繁 GC 问题
并不是创建一个新的 Event,而是通过 event.set() 方法修改 Event,
也就是说 RingBuffer 创建的 Event 是可以循环利用的,
这样还能避免频繁创建、删除 Event 导致的频繁 GC 问题
如何避免“伪共享”
伪共享和 CPU 内部的 Cache 有关,Cache 内部是按照缓存行(Cache Line)管理的,缓存行的大小通常是 64 个字节;
CPU 从内存中加载数据 X,会同时加载 X 后面(64-size(X))个字节的数据
CPU 从内存中加载数据 X,会同时加载 X 后面(64-size(X))个字节的数据
伪共享指的是由于共享缓存行导致缓存无效的场景
方案很简单,每个变量独占一个缓存行、不共享缓存行就可以了,具体技术是缓存行填充
Disruptor 中的无锁算法
Disruptor 中的 RingBuffer 维护了入队索引,但是并没有维护出队索引,
这是因为在 Disruptor 中多个消费者可以同时消费,每个消费者都会有一个出队索引,所以 RingBuffer 的出队索引是所有消费者里面最小的那一个
这是因为在 Disruptor 中多个消费者可以同时消费,每个消费者都会有一个出队索引,所以 RingBuffer 的出队索引是所有消费者里面最小的那一个
入队操作的逻辑:如果没有足够的空余位置,就出让 CPU 使用权,然后重新计算;反之则用 CAS 设置入队索引
案例分析(四):高性能数据库连接池HiKariCP
微观上 HiKariCP 程序编译出的字节码执行效率更高,站在字节码的角度去优化 Java 代码
宏观上主要是和两个数据结构有关,一个是 FastList,另一个是 ConcurrentBag
FastList
remove(Object element) 方法的查找顺序变成了逆序查找
get(int index) 方法没有对 index 参数进行越界检查,
HiKariCP 能保证不会越界,所以不用每次都进行越界检查
HiKariCP 能保证不会越界,所以不用每次都进行越界检查
ConcurrentBag
用于存储所有的数据库连接的共享队列 sharedList
线程本地存储 threadList
等待数据库连接的线程数 waiters
分配数据库连接的工具 handoffQueue
Actor模型:面向对象原生的并发模型
Actor 模型本质上是一种计算模型,基本的计算单元称为 Actor
在 Actor 模型中,所有的计算都是在 Actor 中执行的
Akka
消息机制
Actor 中的消息机制完全是异步的
发送消息和接收消息的 Actor 可以不在一个进程中,也可以不在同一台机器上
Actor 的规范化定义
处理能力,处理接收到的消息
存储能力,Actor 可以存储自己的内部状态,
并且内部状态在不同 Actor 之间是绝对隔离的
并且内部状态在不同 Actor 之间是绝对隔离的
通信能力,Actor 可以和其他 Actor 之间通信
软件事务内存:借鉴数据库的并发经验
软件事务内存(Software Transactional Memory,简称 STM)
第三方类库:Multiverse
协程:更轻量级的线程
Golang 中的协程
CSP模型:Golang的主力队员
0 条评论
下一页