现在的位置: 首页 > 综合 > 正文

Java7 FORK/JOIN框架

2019年05月25日 ⁄ 综合 ⁄ 共 3635字 ⁄ 字号 评论关闭

fork/join框架就是在运行时,对要执行的人物进行判断,如果满足拆分的条件就进行拆分,最后在对拆分的任务运行的结果进行汇总

一个简单的例子

我们首先看一个简单的Fork/Join的任务定义。

Java代码  收藏代码
  1. public class Calculator extends RecursiveTask<Integer> {  
  2.   
  3.     private static final int THRESHOLD = 100;  
  4.     private int start;  
  5.     private int end;  
  6.   
  7.     public Calculator(int start, int end) {  
  8.         this.start = start;  
  9.         this.end = end;  
  10.     }  
  11.   
  12.     @Override  
  13.     protected Integer compute() {  
  14.         int sum = 0;  
  15.         if((start - end) < THRESHOLD){  
  16.             for(int i = start; i< end;i++){  
  17.                 sum += i;  
  18.             }  
  19.         }else{  
  20.             int middle = (start + end) /2;  
  21.             Calculator left = new Calculator(start, middle);  
  22.             Calculator right = new Calculator(middle + 1, end);  
  23.             left.fork();  
  24.             right.fork();  
  25.   
  26.             sum = left.join() + right.join();  
  27.         }  
  28.         return sum;  
  29.     }  
  30.   
  31. }  

 

    这段代码中,定义了一个累加的任务,在compute方法中,判断当前的计算范围是否小于一个值,如果是则计算,如果没有,就把任务拆分为连个子任务,并合并连个子任务的中间结果。程序递归的完成了任务拆分和计算。

    任务定义之后就是执行任务,Fork/Join提供一个和Executor框架 的扩展线程池来执行任务。

Java代码  收藏代码
  1. @Test  
  2. public void run() throws Exception{  
  3.     ForkJoinPool forkJoinPool = new ForkJoinPool();  
  4.     Future<Integer> result = forkJoinPool.submit(new Calculator(010000));  
  5.   
  6.     assertEquals(new Integer(49995000), result.get());  
  7. }  

 

Fork/Join框架的主要类


RecursiveAction供不需要返回值的任务继续。

RecursiveTask通过泛型参数设置计算的返回值类型。

ForkJoinPool提供了一系列的submit方法,计算任务。ForkJoinPool默认的线程数通过Runtime.availableProcessors()获得,因为在计算密集型的任务中,获得多于处理性核心数的线程并不能获得更多性能提升。

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
    doSubmit(task);
    return task;
}

sumit方法返回了task本身,ForkJoinTask实现了Future接口,所以可以通过它等待获得结果。

另一例子

这个例子并行排序数组,不需要返回结果,所以继承了RecursiveAction。

Java代码  收藏代码
  1. public class SortTask extends RecursiveAction {  
  2.     final long[] array;  
  3.     final int start;  
  4.     final int end;  
  5.     private int THRESHOLD = 100//For demo only  
  6.   
  7.     public SortTask(long[] array) {  
  8.         this.array = array;  
  9.         this.start = 0;  
  10.         this.end = array.length - 1;  
  11.     }  
  12.   
  13.     public SortTask(long[] array, int start, int end) {  
  14.         this.array = array;  
  15.         this.start = start;  
  16.         this.end = end;  
  17.     }  
  18.   
  19.     protected void compute() {  
  20.         if (end - start < THRESHOLD)  
  21.             sequentiallySort(array, start, end);  
  22.         else {  
  23.             int pivot = partition(array, start, end);  
  24.             new SortTask(array, start, pivot - 1).fork();  
  25.             new SortTask(array, pivot + 1, end).fork();  
  26.         }  
  27.     }  
  28.   
  29.     private int partition(long[] array, int start, int end) {  
  30.         long x = array[end];  
  31.         int i = start - 1;  
  32.         for (int j = start; j < end; j++) {  
  33.             if (array[j] <= x) {  
  34.                 i++;  
  35.                 swap(array, i, j);  
  36.             }  
  37.         }  
  38.         swap(array, i + 1, end);  
  39.         return i + 1;  
  40.     }  
  41.   
  42.     private void swap(long[] array, int i, int j) {  
  43.         if (i != j) {  
  44.             long temp = array[i];  
  45.             array[i] = array[j];  
  46.             array[j] = temp;  
  47.         }  
  48.     }  
  49.   
  50.     private void sequentiallySort(long[] array, int lo, int hi) {  
  51.         Arrays.sort(array, lo, hi + 1);  
  52.     }  
  53. }  

 

Java代码  收藏代码
  1. @Test  
  2. public void run() throws InterruptedException {  
  3.     ForkJoinPool forkJoinPool = new ForkJoinPool();  
  4.     Random rnd = new Random();  
  5.     long[] array = new long[SIZE];  
  6.     for (int i = 0; i < SIZE; i++) {  
  7.         array[i] = rnd.nextInt();  
  8.     }  
  9.     forkJoinPool.submit(new SortTask(array));  
  10.   
  11.     forkJoinPool.shutdown();  
  12.     forkJoinPool.awaitTermination(1000, TimeUnit.SECONDS);  
  13.   
  14.     for (int i = 1; i < SIZE; i++) {  
  15.         assertTrue(array[i - 1] < array[i]);  
  16.     }  
  17. }  

 

抱歉!评论已关闭.