fork/join框架就是在运行时,对要执行的人物进行判断,如果满足拆分的条件就进行拆分,最后在对拆分的任务运行的结果进行汇总
一个简单的例子
我们首先看一个简单的Fork/Join的任务定义。
- public class Calculator extends RecursiveTask<Integer> {
- private static final int THRESHOLD = 100;
- private int start;
- private int end;
- public Calculator(int start, int end) {
- this.start = start;
- this.end = end;
- }
- @Override
- protected Integer compute() {
- int sum = 0;
- if((start - end) < THRESHOLD){
- for(int i = start; i< end;i++){
- sum += i;
- }
- }else{
- int middle = (start + end) /2;
- Calculator left = new Calculator(start, middle);
- Calculator right = new Calculator(middle + 1, end);
- left.fork();
- right.fork();
- sum = left.join() + right.join();
- }
- return sum;
- }
- }
这段代码中,定义了一个累加的任务,在compute方法中,判断当前的计算范围是否小于一个值,如果是则计算,如果没有,就把任务拆分为连个子任务,并合并连个子任务的中间结果。程序递归的完成了任务拆分和计算。
任务定义之后就是执行任务,Fork/Join提供一个和Executor框架 的扩展线程池来执行任务。
- @Test
- public void run() throws Exception{
- ForkJoinPool forkJoinPool = new ForkJoinPool();
- Future<Integer> result = forkJoinPool.submit(new Calculator(0, 10000));
- assertEquals(new Integer(49995000), result.get());
- }
Fork/Join框架的主要类
RecursiveAction供不需要返回值的任务继续。
RecursiveTask通过泛型参数设置计算的返回值类型。
ForkJoinPool提供了一系列的submit方法,计算任务。ForkJoinPool默认的线程数通过Runtime.availableProcessors()获得,因为在计算密集型的任务中,获得多于处理性核心数的线程并不能获得更多性能提升。
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
doSubmit(task);
return task;
}
sumit方法返回了task本身,ForkJoinTask实现了Future接口,所以可以通过它等待获得结果。
另一例子
这个例子并行排序数组,不需要返回结果,所以继承了RecursiveAction。
- public class SortTask extends RecursiveAction {
- final long[] array;
- final int start;
- final int end;
- private int THRESHOLD = 100; //For demo only
- public SortTask(long[] array) {
- this.array = array;
- this.start = 0;
- this.end = array.length - 1;
- }
- public SortTask(long[] array, int start, int end) {
- this.array = array;
- this.start = start;
- this.end = end;
- }
- protected void compute() {
- if (end - start < THRESHOLD)
- sequentiallySort(array, start, end);
- else {
- int pivot = partition(array, start, end);
- new SortTask(array, start, pivot - 1).fork();
- new SortTask(array, pivot + 1, end).fork();
- }
- }
- private int partition(long[] array, int start, int end) {
- long x = array[end];
- int i = start - 1;
- for (int j = start; j < end; j++) {
- if (array[j] <= x) {
- i++;
- swap(array, i, j);
- }
- }
- swap(array, i + 1, end);
- return i + 1;
- }
- private void swap(long[] array, int i, int j) {
- if (i != j) {
- long temp = array[i];
- array[i] = array[j];
- array[j] = temp;
- }
- }
- private void sequentiallySort(long[] array, int lo, int hi) {
- Arrays.sort(array, lo, hi + 1);
- }
- }
- @Test
- public void run() throws InterruptedException {
- ForkJoinPool forkJoinPool = new ForkJoinPool();
- Random rnd = new Random();
- long[] array = new long[SIZE];
- for (int i = 0; i < SIZE; i++) {
- array[i] = rnd.nextInt();
- }
- forkJoinPool.submit(new SortTask(array));
- forkJoinPool.shutdown();
- forkJoinPool.awaitTermination(1000, TimeUnit.SECONDS);
- for (int i = 1; i < SIZE; i++) {
- assertTrue(array[i - 1] < array[i]);
- }
- }