ZOFTINO.COM android and web dev tutorials

Java Concurrency Fork Join Executor Examples

Fork join executor is similar to other executors provided in Java concurrency frame work in that it runs tasks on worker threads in a thread pool. It differs with other executors by providing a feature which utilizes multiple processors to improve the performance of applications by breaking work into smaller tasks. Fork join executor employs work-steal algorithm in which a thread which is free steals sub task from a busy thread which is executing a task.

ForkJoinPool

ForkJoinPool is an implementation of ExecutorService with work-stealing algorithm which makes threads in a pool to find work and execute tasks and subtasks. ForkJoinPool improves performance in situations where task spawns subtasks or multiple smaller tasks are submitted to it.

In addition to running Runnable and Callable tasks, ForkJoinPool executor runs ForkJoinTasks. ForkJoinTasks is a special task that allows work to be split into sub tasks.

If you instantiate ForkJoinPool using no argument constructor, it sets parallelism to number of processors and uses default thread factory. ForkJoinPool provides other constructors which allow you to specify parallelism, thread factory and exception handler.

ForkJoinTask

To utilize the features of ForkJoinPool, the task which is submitted to ForkJoinPool needs to be subclass of ForkJoinTask. ForkJoinTask allows you to define a task which can be split into subtasks to be run on different threads. ForkJoinTask is a thread like entity.

To create a task, you need to extend sub classes of ForkJoinTask such as RecursiveAction and RecursiveTask and implement compute() method in which you add logic to fork and join tasks. The difference between RecursiveAction and RecursiveTask is that RecursiveTask returns results and RecursiveAction doesn’t.

To submit a ForkJoinTask to ForkJoinPool and wait for it to complete, you need to call either invoke() method on ForkJoinPool object or submit()/execute() on ForkJoinPool object and join() method on ForkJoinTask object.

ForkJoinTask can fork and join subtasks using fork() and join() methods or invoke() or invokeAll() methods.

Steps to Use ForkJoinPool and ForkJoinTask

  • First create ForkJoinTask by extending RecursiveAction and RecursiveTask and implementing compute() method.
  • In compute method, perform work and create subtasks by calling fork, join, invoke or invokeAll methods of ForkJoinTask.
  • Instantiate ForkJoinPool and instantiate ForkJoinTask.
  • Submit ForkJoinTask task to ForkJoinPool by calling invoke() method.

ForkJoinPool RecursiveAction Example

Following example shows how to use ForkJoinPool and invoke RecursiveAction task. The task is supplied with list of integers. It calculates addition of consecutive numbers starting from 1 till the number for each number. In the RecursiveAction’s compute method, one sub task is created for last number of the source array and second sub task processes the rest of the source array.

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveAction;

public class ConsecutiveSumTask extends RecursiveAction{
	private static final long serialVersionUID = 1L;
	
	private List<Integer> numbers;
	
	public ConsecutiveSumTask(List<Integer> numbers) {
		this.numbers = numbers;
	}

	@Override
	protected void compute() {

		//create sub tasks if supplied array has more than one element
		if(numbers != null && numbers.size() > 1) {
			List<Integer> oneNum = new ArrayList<Integer>();
			oneNum.add(numbers.remove(numbers.size()-1));
			
			ConsecutiveSumTask subTaskOne = new ConsecutiveSumTask(oneNum);
			ConsecutiveSumTask subTaskTwo = new ConsecutiveSumTask(numbers);
			
			//fork and join two sub tasks
			invokeAll(subTaskOne, subTaskTwo);			

		}else {
			//calculate sum of consecutive numbers
			consecutiveNumbersSum(numbers.get(0));
		}
	}

	void consecutiveNumbersSum(int num) {
		int sum = num*(1+num)/2;
		System.out.println("Thread "+Thread.currentThread().getId()
								+" Number "+num
								+" Consecutive numbers sum "+sum);
	}
}

Submitting the task to fork join pool.

		ForkJoinPool fjp = new ForkJoinPool();
		
		List<Integer> numLst = new ArrayList<Integer>();
		numLst.add(3);
		numLst.add(5);
		numLst.add(9);
		numLst.add(4);
		numLst.add(20);
		
		ConsecutiveSumTask cst = new ConsecutiveSumTask(numLst);
		
		fjp.invoke(cst);

Output

Thread 10 Number 20 Consecutive numbers sum 210
Thread 11 Number 4 Consecutive numbers sum 10
Thread 10 Number 5 Consecutive numbers sum 15
Thread 11 Number 9 Consecutive numbers sum 45
Thread 10 Number 3 Consecutive numbers sum 6

You can submit the task to fork join pool using execute() or submit() methods, but you need to call join() on the task in order for the main thread to wait for the task to complete.

		fjp.execute(cst);
		cst.join();

In the task also, instead of calling invokeAll(), you can call fork() and join() on each sub task.

			subTaskOne.fork();
			subTaskTwo.fork();
			subTaskTwo.join();
			subTaskOne.join();

ForkJoinPool RecursiveTaskExample

Following example shows how to create RecursiveTask and submit it to ForkJoinPool . RecursiveTask returns result meaning compute method of RecursiveTask returns result. In the example, the task uses the given list of stores to fetch coupons for each store and combine and returns results.

import java.util.List;
import java.util.concurrent.RecursiveTask;

public class CouponTask extends RecursiveTask<String>{
	private static final long serialVersionUID = 1L;

	private List<String> stores;

	public CouponTask(List<String> stores) {
		this.stores = stores;
	}

	@Override
	protected String compute() {
		
		if(stores == null || stores.size() < 1) {
			return "";
		}else if(stores.size() == 1) {			
			return getCoupon(stores.get(0));
		}

		List<String> oneStores = stores.subList(0, stores.size()/2);
		List<String> twoStores = stores.subList(stores.size()/2, 
			stores.size());	
		
		CouponTask cTaskOne = new CouponTask(oneStores);
		cTaskOne.fork();
		
		CouponTask cTaskTwo = new CouponTask(twoStores);

		return cTaskTwo.compute()+"," + cTaskOne.join();

	}

	String getCoupon(String store) {
		int dicount = (int )(Math.random() * 90);		
		return "get upto "+dicount+ "% off at "+store;
	}
}

Recursive task is submitted to fork join pool using invoke method. Invoke method returns final result.

	public static void main(String[] args) {
		
		ForkJoinPool forkJoinPool = new ForkJoinPool();
		
		List<String> stores = new ArrayList<String>();
		stores.add("sears");
		stores.add("gap");
		stores.add("nordstorm");
		stores.add("jcpenny");
		
		CouponTask ctask = new CouponTask(stores);
		
		String coupons = forkJoinPool.invoke(ctask);
		System.out.println(coupons);
		
	}