并发编程实战
2019-08-13 10:23:42 0 举报
AI智能生成
并发编程实战
作者其他创作
大纲/内容
并发设计模式
Immutability模式
类和属性都是final的
所有方法均是只读的
需要修改时,必须创建另一个不可变对象
享元模式
Copy on Write
CopyOnWriteArrayList
CoppyOnWriteArraySet
线程本地存储
ThreadLocal
Guarded Suspension
Balking
Thread Per Message
Worker Thread
两阶段终止
生产者-消费者
并发工具
Lock/Condition
为什么不沿用synchronized而重复造轮子
性能问题可以通过优化解决
synchronized无法解决死锁的不可抢占问题
如何解决
能够响应中断
支持超时
非阻塞地获取锁
可重入锁
公平锁与非公平锁
用锁的最佳实践
Doug Lea <Java并发编程:设计原则与模式>
永远只在更新对象的成员变量时加锁
永远只在访问可变的成员变量时加锁
永远不再调用其他对象的方法时加锁
其他方法里面有线程sleep,也可能有较慢的I/O操作会影响性能
其他类的方法可能也会加锁,可能导致死锁
Condition实现了管程里的条件变量
Java内置的管程里只有一个条件变量
Lock&Condition实现的管程支持多个条件变量
如何利用两个条件变量实现阻塞队列
队列不空
队列不满
同步与异步
调用方是否需要等待结果,如果需要就是同步,不需要就是异步
Dubbo如何实现异步转同步
Semaphore
信号量模型
一个计数器
一个等待队列
三个方法
init
up
down
简单限流器
class ObjPool<T, R> {
final List<T> pool;
// 用信号量实现限流器
final Semaphore sem;
// 构造函数
ObjPool(int size, T t){
pool = new Vector<T>(){};
for(int i=0; i<size; i++){
pool.add(t);
}
sem = new Semaphore(size);
}
// 利用对象池的对象,调用 func
R exec(Function<T,R> func) {
T t = null;
sem.acquire();
try {
t = pool.remove(0);
return func.apply(t);
} finally {
pool.add(t);
sem.release();
}
}
}
// 创建对象池
ObjPool<Long, String> pool =
new ObjPool<Long, String>(10, 2);
// 通过对象池获取 t,之后执行
pool.exec(t -> {
System.out.println(t);
return t.toString();
});
ReadWriteLock
读写锁
允许多个线程同时读共享变量
只允许一个线程写共享变量
如果一个写线程正在执行写操作,禁止读线程读共享变量
不支持锁升级,支持锁降级
// 读缓存
r.lock(); ①
try {
v = m.get(key); ②
if (v == null) {
w.lock();
try {
// 再次验证并更新缓存
// 省略详细代码
} finally{
w.unlock();
}
}
} finally{
r.unlock(); ③
}
锁升级不支持
支持锁降级
class CachedData {
Object data;
volatile boolean cacheValid;
final ReadWriteLock rwl =
new ReentrantReadWriteLock();
// 读锁
final Lock r = rwl.readLock();
// 写锁
final Lock w = rwl.writeLock();
void processCachedData() {
// 获取读锁
r.lock();
if (!cacheValid) {
// 释放读锁,因为不允许读锁的升级
r.unlock();
// 获取写锁
w.lock();
try {
// 再次检查状态
if (!cacheValid) {
data = ...
cacheValid = true;
}
// 释放写锁前,降级为读锁
// 降级是可以的
r.lock(); ①
} finally {
// 释放写锁
w.unlock();
}
}
// 此处仍然持有读锁
try {use(data);}
finally {r.unlock();}
}
}
支持公平与非公平模式
支持lock,trylock,lockInterruptibly
读锁不支持condition,写锁支持
读写锁实现简单缓存
class Cache<K,V> {
final Map<K, V> m =
new HashMap<>();
final ReadWriteLock rwl =
new ReentrantReadWriteLock();
final Lock r = rwl.readLock();
final Lock w = rwl.writeLock();
V get(K key) {
V v = null;
// 读缓存
r.lock(); ①
try {
v = m.get(key); ②
} finally{
r.unlock(); ③
}
// 缓存中存在,返回
if(v != null) { ④
return v;
}
// 缓存中不存在,查询数据库
w.lock(); ⑤
try {
// 再次验证
// 其他线程可能已经查询过数据库
v = m.get(key); ⑥
if(v == null){ ⑦
// 查询数据库
v= 省略代码无数
m.put(key, v);
}
} finally{
w.unlock();
}
return v;
}
}
可重入
StampedLock
支持两种模式
写锁
final StampedLock sl =
new StampedLock();
// 获取 / 释放悲观读锁示意代码
long stamp = sl.readLock();
try {
// 省略业务相关代码
} finally {
sl.unlockRead(stamp);
}
// 获取 / 释放写锁示意代码
long stamp = sl.writeLock();
try {
// 省略业务相关代码
} finally {
sl.unlockWrite(stamp);
}
悲观读锁
乐观读
无锁状态
乐观读升级悲观读锁
class Point {
private int x, y;
final StampedLock sl =
new StampedLock();
// 计算到原点的距离
int distanceFromOrigin() {
// 乐观读
long stamp =
sl.tryOptimisticRead();
// 读入局部变量,
// 读的过程数据可能被修改
int curX = x, curY = y;
// 判断执行读操作期间,
// 是否存在写操作,如果存在,
// 则 sl.validate 返回 false
if (!sl.validate(stamp)){
// 升级为悲观读锁
stamp = sl.readLock();
try {
curX = x;
curY = y;
} finally {
// 释放悲观读锁
sl.unlockRead(stamp);
}
}
return Math.sqrt(
curX * curX + curY * curY);
}
}
不可重入
不支持条件变量
中断问题
final StampedLock lock
= new StampedLock();
Thread T1 = new Thread(()->{
// 获取写锁
lock.writeLock();
// 永远阻塞在此处,不释放写锁
LockSupport.park();
});
T1.start();
// 保证 T1 获取写锁
Thread.sleep(100);
Thread T2 = new Thread(()->
// 阻塞在悲观读锁
lock.readLock()
);
T2.start();
// 保证 T2 阻塞在读锁
Thread.sleep(100);
// 中断线程 T2
// 会导致线程 T2 所在 CPU 飙升
T2.interrupt();
T2.join();
正确示范
读
final StampedLock sl =
new StampedLock();
// 乐观读
long stamp =
sl.tryOptimisticRead();
// 读入方法局部变量
......
// 校验 stamp
if (!sl.validate(stamp)){
// 升级为悲观读锁
stamp = sl.readLock();
try {
// 读入方法局部变量
.....
} finally {
// 释放悲观读锁
sl.unlockRead(stamp);
}
}
// 使用方法局部变量执行业务操作
......
写
long stamp = sl.writeLock();
try {
// 写共享变量
......
} finally {
sl.unlockWrite(stamp);
}
支持锁的升级与降级
tryConvertToReadLock
tryConvertToWriteLock
CountDownLatch与CyclicBarrier
CountDown适用于一个线程等待多个线程的场景,如果自己创建线程,用join可以实现相同效果,但是如果用线程池则不适用join
对账优化示例
原逻辑
while(存在未对账订单){
// 查询未对账订单
pos = getPOrders();
// 查询派送单
dos = getDOrders();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
并行优化
while(存在未对账订单){
// 查询未对账订单
Thread T1 = new Thread(()->{
pos = getPOrders();
});
T1.start();
// 查询派送单
Thread T2 = new Thread(()->{
dos = getDOrders();
});
T2.start();
// 等待 T1、T2 结束
T1.join();
T2.join();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
池化资源
// 创建 2 个线程的线程池
Executor executor =
Executors.newFixedThreadPool(2);
while(存在未对账订单){
// 查询未对账订单
executor.execute(()-> {
pos = getPOrders();
});
// 查询派送单
executor.execute(()-> {
dos = getDOrders();
});
/* ??如何实现等待??*/
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
CountDownLatch
// 创建 2 个线程的线程池
Executor executor =
Executors.newFixedThreadPool(2);
while(存在未对账订单){
// 计数器初始化为 2
CountDownLatch latch =
new CountDownLatch(2);
// 查询未对账订单
executor.execute(()-> {
pos = getPOrders();
latch.countDown();
});
// 查询派送单
executor.execute(()-> {
dos = getDOrders();
latch.countDown();
});
// 等待两个查询操作结束
latch.await();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
进一步优化
// 订单队列
Vector<P> pos;
// 派送单队列
Vector<D> dos;
// 执行回调的线程池
Executor executor =
Executors.newFixedThreadPool(1);
final CyclicBarrier barrier =
new CyclicBarrier(2, ()->{
executor.execute(()->check());
});
void check(){
P p = pos.remove(0);
D d = dos.remove(0);
// 执行对账操作
diff = check(p, d);
// 差异写入差异库
save(diff);
}
void checkAll(){
// 循环查询订单库
Thread T1 = new Thread(()->{
while(存在未对账订单){
// 查询订单库
pos.add(getPOrders());
// 等待
barrier.await();
}
});
T1.start();
// 循环查询运单库
Thread T2 = new Thread(()->{
while(存在未对账订单){
// 查询运单库
dos.add(getDOrders());
// 等待
barrier.await();
}
});
T2.start();
}
CountDown的初始值减为0后不会重置
CyclicBarrier使用与多个线程间相互等待的场景,既A线程需等待B,C线程都执行完成后执行,同时B,C需要相互等待对方执行完成
CyclicBarrier初始值减为0后会重置,可以重复使用
并发容器
同步容器
将不安全容器包装后用synchronized同步
需要注意竞态条件问题
注意迭代器
如collections.synchronizedList
其他非包装但使用synchronized
Stack
HashTable
Vector
并发容器
java1.5以后
List
CopyOnWriteArrayList
CopyOnWriteArrayList 仅适用于写操作非常少的场景,而且能够容忍读写的短暂不一致
CopyOnWriteArrayList 迭代器是只读的,不支持增删改
Map
ConcurrentHashMap
key不允许为null
ConcurrentSkipListMap
key有序,key不允许为null
Set
ConcurrentSkipListSet
CopyOnWriteArraySet
Queue
BlockingDeque
LinkedBlockingDeque
BlockingQueue
ArrayBlockingQueue
LinkedBlockingQueue
SynchronousQueue
LinkedTransferQueue
PriorityBlockingQueue
DelayQueue
ConcurrentLinkedQueue
ConcurrentLinkedDeque
分类
单端与双端
Queue
Deque
阻塞与非阻塞
BlockingQueue
ConcurrentLinkedQueue/ConcurrentLinkedDeque
是否有界
有界
ArrayBlockingQueue/LinkedBlockingQueue
无界
....
原子类
基本数组类型
AtomicBoolean
AtomicInteger
AtomicLong
数组
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
累加器
DoubleAccumulator
DoubleAdder
LongAccumulator
LongAdder
引用类型
AtomicReference
AtomicStampedReference
AtomicMarkableRefrence
对象属性更新器
AtomicIntegerFiledUpdater
AtomicFieldUpdater
AtomicReferenceFieldUpdater
线程池
Excutors
多使用无界队列,不推荐使用
ThreadPoolExecutor
corePoolSize核心线程数
maximumPoolSize最大线程数
keepAliveTime & unit线程空闲时间及单位,当线程数大于核心线程数且有线程空闲超过该时间会销毁线程,保留核心线程
workQueue任务队列
threadFactory定制线程创建过程,比如可以设置线程名
handler拒绝策略
CallerRunsPolicy若工作队列已满,则提交任务的线程自己执行该任务
AbortPolicy丢弃任务,并抛出 RejectedException,默认策略
DiscardPolicy直接丢弃任务,不抛出异常
DiscardOldestPolicy丢弃最老的任务
注意事项
使用有界队列
拒绝策略
Future
CompletionService
Fork/Join
理论基础
并发编程Bug的源头
可见性
缓存导致
原子性
线程切换
一个或者多个操作在 CPU 执行的过程中不被中断的特性称为原子性
CPU 能保证的原子操作是 CPU 指令级别的,而不是高级语的操作符
保证中间状态对外不见
有序性
编译优化
单例双重检查仍然有可能存在实例未初始化完成就被使用的情况
内存模型
Java 内存模型规范了 JVM 如何提供按需禁用缓存和编译优化
volatile
synchronized
final
六项 Happens-Before 规则
程序的顺序性规则
前面一个操作的结果对后续操作是可见的
volatile 变量规则
对一个 volatile 变量的写操作, Happens-Before 于后续对这个 volatile 变量的读操作。
传递性
A happens before B,B happens before C, then A happens before C.
管程中锁的规则
对一个锁的解锁 Happens-Before 于后续对这个锁的加锁
线程 start() 规则
主线程 A 启动子线程 B 后,子线程 B 能够看到主线程在启动子线程B之前的操作
线程 join() 规则
如果在线程 A 中,调用线程 B 的 join() 并成功返回,那么线程 B 中的任意操作 Happens-Befor于该 join() 操作的返回
互斥锁
解决原子性问题
用一把锁保护多个资源
死锁
死锁的四个条件
互斥,共享资源 X 和 Y 只能被一个线程占用
占有且等待,线程 T1 已经取得共享资源 X,在等待共享资源Y 的时候,不释放共享资源 X
不可抢占,其他线程不能强行抢占线程 T1 占有的资源
循环等待,线程 T1 等待线程 T2 占有的资源,线程 T2等待线程 T1 占有的资源,就是循环等待
安全性、活跃性及性能问题
安全性问题
存在共享数据并且该数据会发生变化,通俗地讲就是有多个线程会同时读写同一数据
数据竞争(Data Race)
竞态条件(Race Condition)
竞态条件,指的是程序的执行结果依赖线程执行的顺序
活跃性问题
死锁
活锁
相互谦让
两个人相向而行,相互谦让
随机等待可解决
饥饿
保证资源充足
公平地分配资源
公平锁
避免持有锁的线程长时间执行
性能问题
阿姆达尔定律
使用无锁的算法和数据结构
减少锁持有的时间
指标
吞吐量
延迟
并发量
管程Monitor
管程和信号量是等价的,所谓等价指的是用管程能够实现信号量,也能用信号量实现管程
所谓管程,指的是管理共享变量以及对共享变量的操作过程,让他们支持并发
MESA 模型
Java采用的模型
Hasen 模型
Hoare 模型
如何并发两大核心问题
互斥
将共享变量及其对共享变量的操作统一封装起来
同步
每个条件变量都对应有一个等待队列
线程生命周期
NEW初始化状态
RUNNABLE可运行/运行状态
BLOCKED阻塞状态
WAITING无时限等待
TIMED_WAITING有时限等待
TERMINATED终止状态
创建多少线程
cpu密集型
核数+1
IO密集型
CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)]
为什么局部变量是线程安全的
线程封闭
用面向对象思想写好并发程序
封装共享变量
识别共享变量间的约束条件
制定并发访问策略
避免共享
不变模式
管程及其他同步工具
原则
有限使用成熟的工具类
迫不得已时才使用低级的同步原语
避免过早优化
问题
分工
Executor与线程池
Fork/Join
Future
Guarded Suspension
Balking
Thread-Per-Message
生产者-消费者模式
Worker-Thread模式
两阶段终止模式
协作
信号量
管程
Lock&Condition
synchronized
CountDownLatch
CyclicBarrier
Phaser
Exchanger
互斥
无锁
不变模式
线程本地存储
CAS
Copy-on-Write
原子类
互斥锁
synchronized
Lock
读写锁
0 条评论
下一页