ForkJoinPool is an implementation of ExecutorService with work-stealing algorithm which makes threads in a pool to find work and execute tasks and subtasks. ForkJoinPool improves performance in situations where task spawns subtasks or multiple smaller tasks are submitted to it.
In addition to running Runnable and Callable tasks, ForkJoinPool executor runs ForkJoinTasks. ForkJoinTasks is a special task that allows work to be split into sub tasks.
If you instantiate ForkJoinPool using no argument constructor, it sets parallelism to number of processors and uses default thread factory. ForkJoinPool provides other constructors which allow you to specify parallelism, thread factory and exception handler.
To utilize the features of ForkJoinPool, the task which is submitted to ForkJoinPool needs to be subclass of ForkJoinTask. ForkJoinTask allows you to define a task which can be split into subtasks to be run on different threads. ForkJoinTask is a thread like entity.
To create a task, you need to extend sub classes of ForkJoinTask such as RecursiveAction and RecursiveTask and implement compute() method in which you add logic to fork and join tasks. The difference between RecursiveAction and RecursiveTask is that RecursiveTask returns results and RecursiveAction doesn’t.
To submit a ForkJoinTask to ForkJoinPool and wait for it to complete, you need to call either invoke() method on ForkJoinPool object or submit()/execute() on ForkJoinPool object and join() method on ForkJoinTask object.
ForkJoinTask can fork and join subtasks using fork() and join() methods or invoke() or invokeAll() methods.
Following example shows how to use ForkJoinPool and invoke RecursiveAction task. The task is supplied with list of integers. It calculates addition of consecutive numbers starting from 1 till the number for each number. In the RecursiveAction’s compute method, one sub task is created for last number of the source array and second sub task processes the rest of the source array.
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveAction;
public class ConsecutiveSumTask extends RecursiveAction{
private static final long serialVersionUID = 1L;
private List<Integer> numbers;
public ConsecutiveSumTask(List<Integer> numbers) {
this.numbers = numbers;
}
@Override
protected void compute() {
//create sub tasks if supplied array has more than one element
if(numbers != null && numbers.size() > 1) {
List<Integer> oneNum = new ArrayList<Integer>();
oneNum.add(numbers.remove(numbers.size()-1));
ConsecutiveSumTask subTaskOne = new ConsecutiveSumTask(oneNum);
ConsecutiveSumTask subTaskTwo = new ConsecutiveSumTask(numbers);
//fork and join two sub tasks
invokeAll(subTaskOne, subTaskTwo);
}else {
//calculate sum of consecutive numbers
consecutiveNumbersSum(numbers.get(0));
}
}
void consecutiveNumbersSum(int num) {
int sum = num*(1+num)/2;
System.out.println("Thread "+Thread.currentThread().getId()
+" Number "+num
+" Consecutive numbers sum "+sum);
}
}
Submitting the task to fork join pool.
ForkJoinPool fjp = new ForkJoinPool();
List<Integer> numLst = new ArrayList<Integer>();
numLst.add(3);
numLst.add(5);
numLst.add(9);
numLst.add(4);
numLst.add(20);
ConsecutiveSumTask cst = new ConsecutiveSumTask(numLst);
fjp.invoke(cst);
Output
Thread 10 Number 20 Consecutive numbers sum 210
Thread 11 Number 4 Consecutive numbers sum 10
Thread 10 Number 5 Consecutive numbers sum 15
Thread 11 Number 9 Consecutive numbers sum 45
Thread 10 Number 3 Consecutive numbers sum 6
You can submit the task to fork join pool using execute() or submit() methods, but you need to call join() on the task in order for the main thread to wait for the task to complete.
fjp.execute(cst);
cst.join();
In the task also, instead of calling invokeAll(), you can call fork() and join() on each sub task.
subTaskOne.fork();
subTaskTwo.fork();
subTaskTwo.join();
subTaskOne.join();
Following example shows how to create RecursiveTask and submit it to ForkJoinPool . RecursiveTask returns result meaning compute method of RecursiveTask returns result. In the example, the task uses the given list of stores to fetch coupons for each store and combine and returns results.
import java.util.List;
import java.util.concurrent.RecursiveTask;
public class CouponTask extends RecursiveTask<String>{
private static final long serialVersionUID = 1L;
private List<String> stores;
public CouponTask(List<String> stores) {
this.stores = stores;
}
@Override
protected String compute() {
if(stores == null || stores.size() < 1) {
return "";
}else if(stores.size() == 1) {
return getCoupon(stores.get(0));
}
List<String> oneStores = stores.subList(0, stores.size()/2);
List<String> twoStores = stores.subList(stores.size()/2,
stores.size());
CouponTask cTaskOne = new CouponTask(oneStores);
cTaskOne.fork();
CouponTask cTaskTwo = new CouponTask(twoStores);
return cTaskTwo.compute()+"," + cTaskOne.join();
}
String getCoupon(String store) {
int dicount = (int )(Math.random() * 90);
return "get upto "+dicount+ "% off at "+store;
}
}
Recursive task is submitted to fork join pool using invoke method. Invoke method returns final result.
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
List<String> stores = new ArrayList<String>();
stores.add("sears");
stores.add("gap");
stores.add("nordstorm");
stores.add("jcpenny");
CouponTask ctask = new CouponTask(stores);
String coupons = forkJoinPool.invoke(ctask);
System.out.println(coupons);
}