并发专题阶段性总结3
2023-03-21 10:29:36 0 举报
AI智能生成
并发,Fork/Join框架,Future,Callable
作者其他创作
大纲/内容
Future
对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果
主要功能
boolean cancel (boolean mayInterruptIfRunning) 取消任务的执行
参数指定是否立即中断任务执行,或者等等任务结束
boolean isCancelled () 任务是否已经取消,任务正常完成前将其取消,则返回 true
boolean isDone () 任务是否已经完成。需要注意的是如果任务正常终止、异常或取消,都将返回true
V get () throws InterruptedException, ExecutionException 等待任务执行结束,然后获得V类型的结果
FutureTask
Future实际采用FutureTask实现,利用 FutureTask 创建 Future
是消费者和生产者的桥梁
消费者
通过 FutureTask 存储任务的处理结果,更新任务的状态
生产者
可以阻塞式获取任务的处理结果,非阻塞式获取任务处理状
注意事项
当 for 循环批量获取 Future 的结果时容易 block
get 方法调用时应使用 timeout 限制
Future 的生命周期不能后退
一旦完成了任务,它就永久停在了“已完成”的状态
局限性
并发执行多任务---只提供了get()方法来获取结果,并且是阻塞的
无法对多个任务进行链式调用
无法组合多个任务
没有异常处理
Callable
线程的实现方式
本质上Java中实现线程只有一种方式,都是通过new Thread()创建线程
使用 Thread类或继承Thread类
实现 Runnable 接口配合Thread
使用有返回值的 Callable
使用 lambda
比较
直接继承Thread或者实现Runnable接口
没有返回值
不能获取执行完的结果
不能抛出 checked Exception
Callable接口
call方法可以有返回值
可以声明抛出异常
配合Future 类,通过 Future 可以了解任务执行情况
Callable 的功能要比 Runnable
Disruptor
初衷是为了解决高并发下列队锁的问题
核心设计原理
环形数组结构
为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好
元素位置定位
数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完
无锁设计
每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据
数据结构
框架使用RingBuffer来作为队列的数据结构
,RingBuffer就是一个可自定义大小的环形数组
数组外还有一个序列号(sequence),用以指向下一个可用的元素,供生产者与消费者使用
概念与作用
RingBuffer——Disruptor底层数据结构实现
Sequencer——序号管理器
Sequence——序号
SequenceBarrier——序号栅栏
EventProcessor——事件处理器
EventHandler——业务处理器
Producer——生产者接口
等待策略
BlockingWaitStrategy
Disruptor的默认策略
使用锁和condition来控制线程的唤醒
最低效的策略
但其对CPU的消耗最小
SleepingWaitStrategy
性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最
通过使用LockSupport.parkNanos(1)来实现循环等待
YieldingWaitStrategy
可以使用在低延迟系统的策略之一
BusySpinWaitStrategy
性能最好,适合用于低延迟的系统
在要求极高性能且事件处理线程数小于CPU逻辑核心数的场景中,推荐使用此策略
PhasedBackoffWaitStrategy
自旋 + yield + 自定义策略
CPU资源紧缺,吞吐量和延迟并不重要的场景
Fork/Join框架
是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架
Fork/Jion特性
ForkJoinPool 不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好
ForkJoinPool 主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数
ForkJoinPool 最适合的是计算密集型的任务
工作窃取算法
指某个线程从其他队列里窃取任务来执行
fork/join的使用
首先创建一个 ForkJoin 任务
fork()
join()
继承子类
RecursiveAction
用于没有返回结果的任务
比如写数据到磁盘,然后就退出了。 一个RecursiveAction可以把自己的工作分割成更小的几块, 这样它们可以由独立的线程或者CPU执行。 我们可以通过继承来实现一个RecursiveAction
RecursiveTask
用于有返回结果的任务
可以将自己的工作分割为若干更小任务,并将这些子任务的执行合并到一个集体结果。 可以有几个水平的分割和合并
CountedCompleter
在任务完成执行后会触发执行一个自定义的钩子函数
框架原理
ForkJoinPool 的内部状态都是通过一个64位的 long 型 变量ctl来存储
AC: 正在运行工作线程数减去目标并行度,高16位
TC: 总工作线程数减去目标并行度,中高16位
SS: 栈顶等待线程的版本计数和状态,中低16位
ID: 栈顶 WorkQueue 在池中的索引(poolIndex),低16位
异常处理
ForkJoinTask 提供了 isCompletedAbnormally() 方法来检查任务是否已经抛出异常
通过 ForkJoinTask 的 getException 方法获取异常
构造函数
ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix)
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix)
parallelism:并行度( the parallelism level),默认情况下跟我们机器的cpu个数保持一致
factory:创建新线程的工厂
ForkJoinWorkerThreadFactory
ForkJoinWorkerThreadFactory
handler:线程异常情况下的处理器(Thread.UncaughtExceptionHandler handler)
处理器在线程执行任务时由于某些无法预料到的错误而导致任务线程中断时进行一些处理
asyncMode工作线程内的任务队列是采用何种方式进行调度
先进先出FIFO(true)
后进先出LIFO(false默认)
fork()
把任务推入当前工作线程的工作队列里
join()
join() 可以使得线程免于被阻塞的原因——不像同名的 Thread.join()
检查调用 join() 的线程是否是 ForkJoinThread 线程
查看任务的完成状态,如果已经完成,直接返回结果
如果任务尚未完成,但处于自己的工作队列内,则完成它
如果任务已经被其他的工作线程偷走,则窃取这个小偷的工作队列内的任务(以 FIFO 方式)
任务性质类型
CPU密集型(CPU-bound)
在多重程序系统中,大部份时间用来做计算、逻辑判断等CPU动作的程序称之CPU bound
CPU占用率相当高。这可能是因为任务本身不太需要访问I/O设备,也可能是因为程序是多线程实现因此屏蔽掉了等待I/O的时间
特点是需要大量耗费 CPU 资源的任务。对于这样的任务最佳的线程数为 CPU 核心数的 1~2 倍
场景
加密、解密、压缩、计算等一系列
一个计算圆周率至小数点一千位以下的程序
线程数 = CPU核数+1 (现代CPU支持超线程)
IO密集型(I/O bound)
I/O bound的程序一般在达到性能极限时,CPU占用率仍然较低。这可能是因为任务本身需要大量I/O操作,而pipeline做得不是很好,没有充分利用处理器能力
特点是并不会特别消耗 CPU 资源,但是 IO 操作很耗时
线程数 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目
CompletableFuture
CompletionService
原理
通过阻塞队列+FutureTask
内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future
通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果
应用场景总结
当需要批量提交异步任务的时候建议你使用CompletionService
CompletionService能够让异步任务的执行结果有序化
线程池隔离
使用详解
CompletableFuture是Future接口的扩展和增强
CompletableFuture实现了对任务的编排能力
应用场景
描述依赖关系
thenApply() 把前面异步任务的结果,交给后面的Function
thenCompose()用来连接两个有依赖关系的任务,结果由第二个任务返回
描述and聚合关系
thenCombine:任务合并,有返回值
thenAccepetBoth:两个任务执行完成后,将结果交给thenAccepetBoth消耗,无返回值
runAfterBoth:两个任务都执行完成后,执行下一步操作(Runnable)
描述or聚合关系
applyToEither:两个任务谁执行的快,就使用那一个结果,有返回值
acceptEither: 两个任务谁执行的快,就消耗那一个结果,无返回值
runAfterEither: 任意一个任务执行完成,进行下一步操作(Runnable)
并行执行
CompletableFuture类自己也提供了anyOf()和allOf()用于支持多个CompletableFuture并行执行
创建异步操作
runAsync(Runnable runnable)
以Runnable函数式接口类型为参数,没有返回结果
没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码
CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数
runAsync(Runnable runnable, Executor executor)
如果指定线程池,则使用指定的线程池运行
supplyAsync(Supplier<U> supplier)
supplyAsync 方法Supplier函数式接口类型为参数,返回结果类型为U
Supplier 接口的 get() 方法是有返回值的(会阻塞)
supplyAsync(Supplier<U> supplier, Executor executor)
0 条评论
下一页