Fork/Join并行框架

发布于:2021-10-20 22:48:46

JDK1.5加入了java.util.concurrent包实现粗粒度的并发(宏观上的同时执行)框架,由于多核时代的到来,JDK1.7加入了


java.util.concurrent.forkjoin包,对该框架进行扩充,通过利用fork/join模式可以轻松地利用多核资源来协作完成一个复杂的计算任务。fork/join模式是处理并行编程的一种经典方法。


fork/join框架是ExecutorService接口的一种具体实现,目的是为了帮助你更好地利用多处理器带来的好处。它是为那些能够被
递归地拆解成子任务的工作类型量身设计的。










Fork/Join框架的主要类






RecursiveAction供不需要返回值的任务继续。


RecursiveTask通过泛型参数设置计算的返回值类型。


ForkJoinPool提供了一系列的submit方法,计算任务。ForkJoinPool默认的线程数通过Runtime.availableProcessors()获得,因为在计算密集型的任务中,获得多于处理性核心数的线程并不能获得更多性能提升。


public ForkJoinTask submit(ForkJoinTask task) {
??? doSubmit(task);
??? return task;
}


sumit方法返回了task本身,ForkJoinTask实现了Future接口,所以可以通过它等待获得结果。






利用RecursiveTask(带返回值)实现累加




import?java.util.concurrent.ForkJoinPool;


import?java.util.concurrent.Future;


import?java.util.concurrent.RecursiveTask;


publicclass?Calculator?extends?RecursiveTask {


???privatestaticfinalintTHRESHOLD?= 100;//每个计算任务计算100个,超过则分割任务


????privateintstart;


????privateintend;


????public?Calculator(int?start,?int?end) {


????????this.start?= start;


????????this.end?= end;


????}


????@Override


????protected?Integer compute() {


????????int?sum = 0;


????????if((end-start)


????????????for(int?i =?start; i


????????????????sum += i;


????????????}


????????}else{


????????????int?middle = (start?+?end) /2;


????????????Calculator left =?new?Calculator(start, middle+1);


????????????Calculator right =?new?Calculator(middle + 1,?end);


????????????left.fork();


????????????right.fork();


????????????sum = left.join() + right.join();


????????}


????????return?sum;


????}


????publicstaticvoid?main(String[]args)?throws?Exception{


????????ForkJoinPool forkJoinPool =?new?ForkJoinPool();


????????Future result = forkJoinPool.submit(new?Calculator(0, 10000));


????????System.out.println(result.get());


????}


}







利用RecursiveAction(不带返回值)实现排序:




import?java.util.Arrays;


import?java.util.Random;


import?java.util.concurrent.ForkJoinPool;


import?java.util.concurrent.RecursiveAction;


import?java.util.concurrent.TimeUnit;


publicclassSortTaskextends?RecursiveAction {


????finallong[]?array;


????finalintstart;


????finalintend;


????privateintTHRESHOLD?= 100;


????public?SortTask(long[] array) {


????????this.array?= array;


????????this.start?= 0;


????????this.end?= array.length?- 1;


????}


????public?SortTask(long[] array,?int?start,?int?end) {


????????this.array?= array;


????????this.start?= start;


????????this.end?= end;


????}


????@Override


????protectedvoid?compute() {


????????if?(end?-?start?

????????????sequentiallySort(array,?start,?end);


????????else?{


????????????int?pivot = partition(array,?start,?end);


????????????new?SortTask(array,?start, pivot - 1).fork();


????????????new?SortTask(array, pivot + 1,?end).fork();


????????}


????}


????privateint?partition(long[] array,?int?start,?int?end) {


????????long?x = array[end];


????????int?i = start - 1;


????????for?(int?j = start; j < end; j++) {


????????????if?(array[j] <= x) {


????????????????i++;


????????????????swap(array, i, j);


????????????}


????????}


????????swap(array, i + 1, end);


????????return?i + 1;


????}


????privatevoid?swap(long[] array,?int?i,?int?j) {


????????if?(i != j) {


????????????long?temp = array[i];


????????????array[i] = array[j];


????????????array[j] = temp;


????????}


????}


????privatevoid?sequentiallySort(long[] array,?int?lo,?int?hi) {


????????Arrays.sort(array, lo, hi + 1);


????}


????publicstaticvoid?main(String[]args)?throws?InterruptedException{


???????int?size = 10000;


????????ForkJoinPool forkJoinPool =?new?ForkJoinPool();?


????????Random rnd =?new?Random();?


????????long[] array =?newlong[size];?


????????for?(int?i = 0; i < size; i++) {?


????????????array[i] = rnd.nextInt();?


????????}?


????????forkJoinPool.submit(new?SortTask(array));


????????forkJoinPool.shutdown();?


????????forkJoinPool.awaitTermination(1000, TimeUnit.SECONDS);


????????for?(int?i = 1; i < size; i++) {


????????????if(array[i - 1] > array[i])


??????????????System.out.println("排序不正确");


????????}


????}


}








Fork/Join框架的异常处理




ForkJoinTask在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法获取异常。使用如下代码:



if(task.isCompletedAbnormally())
{
System.out.println(task.getException());
}

这里的task也即上面的Caculator、SortTask实例。



getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。





























































相关推荐

最新更新

猜你喜欢