大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。
Jetbrains全系列IDE使用 1年只要46元 售后保障 童叟无欺
ForkJoin之RecursiveAction和RecursiveTask的简单使用
Java提供的多线程可以提高CPU的利用率,现在的CPU都是多核多线程,如果不好好利用,只运行单线程程序,会使得CPU的其他核心空闲,浪费宝贵的计算机资源。
编写好的程序不容易,编写好的多线程程序更难。
JDK1.7开始,Java提供了Fork/Join框架,可以方便的将一个大任务拆分成多个小任务,利用多个线程去并行执行,提高程序的性能,更好的利用CPU资源。
基本思想
分而治之
将一个规模为N的问题分解为K个规模较小的子问题(K <= N),这些子问题相互独立且与原问题性质相同,求出子问题的解,就可以求出原问题的解。
分而治之有两个前提条件:
- 子问题相互独立
- 子问题与原问题性质相同
举个栗子
有一亩稻田,种着10排水稻,丰收季节要对水稻进行收割。
大任务:收割10排水稻。
小任务:收割1排水稻。
将大任务进行拆分,大小任务之间性质相同,且小任务之间相互独立。
单线程程序
一位工人,串行收割10排水稻。
多线程程序
10位工人,并行每人收割1排水稻。
工作窃取
如上述,10位工人不可能每个人干活的速度都一样,收割有快有慢,提前结束的要去帮忙收割慢的,榨干工人资源。
将一个大任务拆分给10个线程去执行,10个线程不可能同时执行完毕,肯定有快有慢。
执行快的线程必须等待慢的线程,因为最后必须汇总所有的结果才是最终结果。
执行快的线程等待:其实还是浪费了CPU的资源。
于是就有了“工作窃取”的概念。
每个线程都为分配给他的任务保存一个双向的链式队列,每完成一个任务,就会队列头上取出下一个任务开始执行。
有些线程可能早早地完成了分配给他的任务,即他的队列已经空了,但其他的线程还是很忙。这个时候队列已经空了的线程并不会闲置下来,而是随机选择一个其他的线程从队列的尾巴上“偷走”一个任务。这个过程会一直继续下去,直到所有的任务都执行完毕,所有的队列都清空。
简单示例
ForkJoinTask下有两个常用的类:RecursiveAction和RecursiveTask。
RecursiveAction不支持返回值,RecursiveTask支持返回值。
有点类似于:Runnable和Callable。
RecursiveAction
递归目录,找出zip文件。
FileTask
public class FileTask extends RecursiveAction {
private File file;
public FileTask(File file) {
this.file = file;
}
@Override
protected void compute() {
if (file.isDirectory()) {
List<RecursiveAction> list = new ArrayList<>();
for (File item : file.listFiles()) {
list.add(new FileTask(item));
}
for (RecursiveAction action : invokeAll(list)) {
action.join();
}
}
if (file.getName().endsWith("zip")){
System.out.println(file.getAbsolutePath());
}
}
}
Client
public class Client {
public static void main(String[] args) {
// 构建ForkJoin线程池
ForkJoinPool pool = new ForkJoinPool();
//构建一个FileTask,交给pool执行并获取处理结果。
File file = new File("/Users/panchanghe/temp");
FileTask task = new FileTask(file);
pool.invoke(task);
}
}
输出:
/Users/panchanghe/temp/37010119000084.zip
/Users/panchanghe/temp/a.zip
/Users/panchanghe/temp/问题图片.zip
/Users/panchanghe/temp/ext-6.6.0/packages/exporter/src/exporter/file/zip
/Users/panchanghe/temp/ext-6.6.0/build/packages/exporter/src/exporter/file/zip
RecursiveTask
计算一个30000长度的int数组之和,每个task计算10000个。
ArrayTask
public class ArrayTask extends RecursiveTask<Integer> {
//单个任务处理的阈值
private final static int THRESHOLD = 10000;
private int start;
private int end;
private int[] arr;
public ArrayTask(int start, int end, int[] arr) {
this.start = start;
this.end = end;
this.arr = arr;
}
@Override
protected Integer compute() {
if (end - start >= THRESHOLD) {
System.out.println("任务拆分...");
//对任务进行切分
ArrayTask task1 = new ArrayTask(start, (end + start) / 2, arr);
ArrayTask task2 = new ArrayTask((end + start) / 2+1, end, arr);
invokeAll(task1, task2);
return task1.join() + task2.join();
}
//直接处理 返回结果
int sum = 0;
for (int i = start; i <= end; i++) {
sum += arr[i];
}
System.out.println(Thread.currentThread().getName()+"线程执行情况:"+start+"~"+end+":"+sum);
return sum;
}
}
Client
public class Client {
public static void main(String[] args){
// 构建ForkJoin线程池
ForkJoinPool pool = new ForkJoinPool();
//构建一个 1000000 长度的数组,元素为100内随机数
Random random = new Random();
int[] arr = IntStream.range(0, 30000).map(i -> random.nextInt(100)).toArray();
//构建一个ArrayTask,交给pool执行并获取处理结果。
ArrayTask task = new ArrayTask(0, arr.length-1, arr);
Integer result = pool.invoke(task);
System.out.println("最终结果:"+result);
}
}
输出:
任务拆分...
任务拆分...
任务拆分...
ForkJoinPool-1-worker-1线程执行情况:0~7499:375503
ForkJoinPool-1-worker-3线程执行情况:7500~14999:374008
ForkJoinPool-1-worker-2线程执行情况:15000~22499:373373
ForkJoinPool-1-worker-4线程执行情况:22500~29999:374858
最终结果:1497742
大任务被拆分了三次,ForkJoinPool用了4个线程去处理,每个线程的单独执行结果和最终汇总的结果已经打印在控制台。
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/193444.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...