ZOFTINO.COM android and web dev tutorials

Java LinkedTransferQueue Example

LinkedTransferQueue is an unbound blocking queue. It implements TransferQueue interface which allows producer to wait for consumer to receive elements. For this it provides transfer and tryTransfer operations.

LinkedTransferQueue supports all other queue and blocking queue and collection operations.

LinkedTransferQueue Transfer

Producer can add element to LinkedTransferQueue and wait till the element is received by the consumer by using transfer() or tryTransfer() methods. If you run the following program, it blocks forever as transfer() waits for a consumer of the LinkedTransferQueue.

	LinkedTransferQueue<String> tQueue = new LinkedTransferQueue<String>();		
	
	try {
		System.out.println("transferring an element");
		tQueue.transfer("brand");
		System.out.println("element transfer is complete");
	} catch (InterruptedException e1) {
		e1.printStackTrace();
	}

Output

transferring an element

Following example shows that transfer operation exits after consumer tries to take the element.

	LinkedTransferQueue<String> tQueue = new LinkedTransferQueue<String>();		
	
	new Thread(new Runnable() {
		@Override
		public void run() {		
			try {
				System.out.println("transferring an element");
				tQueue.transfer("brand");
				System.out.println("element transfer is complete");
			} catch (InterruptedException e1) {
				e1.printStackTrace();
			}
			
		}			
	}).start();	
	
	try {
		System.out.println("transfer queue received element: "+tQueue.take());
	} catch (Exception e) {
		e.printStackTrace();
	}

Output

transferring an element
element transfer is complete
transfer queue received element: brand

Operation tryTransfer adds the given element to the transfer queue if there is a consumer waiting for elements, otherwise it exits without adding the element to the queue.

	LinkedTransferQueue<String> tQueue = new LinkedTransferQueue<String>();		

	tQueue.tryTransfer("brand");	
	System.out.println("size of transfer queue: "+tQueue.size());

Output

size of transfer queue: 0

You can make tryTransfer operation wait for certain amount of time before it exits when there is no consumer by passing wait time to the operation.

	try {
		tQueue.tryTransfer("brand", 2000, TimeUnit.MILLISECONDS);
	} catch (InterruptedException e2) {
		e2.printStackTrace();
	}

LinkedTransferQueue as Normal Queue

In addition to transferring elements where producer waits for consumer, LinkedTransferQueue can be used as normal queue or blocking queue.

	LinkedTransferQueue<String> tQueue = new LinkedTransferQueue<String>();		
	tQueue.put("apple");
	
	System.out.println("element from queue: "+tQueue.remove());

LinkedTransferQueue Example

Following real example shows how to use LinkedTransferQueue. The example uses order processing module of an ecommerce application in which order producers transfer orders via LinkedTransferQueue to order consumers and order consumers processes the orders.

The example uses thread pool executor service to simulate multiple producers and consumers which run in different threads and add to or remove from the same queue.

Order Object

public class Order {
	
	private String orderId;
	private double orderAmount;
	
	public String getOrderId() {
		return orderId;
	}
	public void setOrderId(String orderId) {
		this.orderId = orderId;
	}
	public double getOrderAmount() {
		return orderAmount;
	}
	public void setOrderAmount(double orderAmount) {
		this.orderAmount = orderAmount;
	}	
}

Order Producer

Producer starts three threads and each thread executes transfer operation by creating order objects and transferring them to queue till termination signal is received.

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;

public class OrderProducer {

	private LinkedTransferQueue<Order> transferQueue;
	private ExecutorService producer;
	
	private int orderIdGen = 1000;
	private Object lock = new Object();
	private volatile boolean terminate;
	
	public OrderProducer(LinkedTransferQueue<Order> transferQueue) {
		this.transferQueue = transferQueue;
		producer = Executors.newFixedThreadPool(3);
	}
	public void produceOrderes() {
		//producer on thread one
		producer.execute(addOrder);
		//producer on thread two
		producer.execute(addOrder);
		//producer on thread thread
		producer.execute(addOrder);
	}

	Runnable addOrder = new Runnable() {
		@Override
		public void run() {
			long tId = Thread.currentThread().getId();
			
			//post orders until it needs to be shutdown
			while(!terminate) {
				try {
					Order order = getOrder();
					System.out.println("thread: "+tId
							+" adding order for precessing: "
							+order.getOrderId());
					//add order object to queue and wait for consumer
					transferQueue.transfer(order);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		
			System.out.println("producer thread: "+tId+" stopped");
		}		
	};
	public void clearWaitingConsumers() {
		//to prevent blocking consumers
		//before existing post dummy order if there are waiting consumers
		while(transferQueue.hasWaitingConsumer()) {
			transferQueue.put(new Order());
		}	
	}
	public void shutDownExecutor() {
		producer.shutdown();
	}
	public void setTerminate(boolean terminate) {
		this.terminate = terminate;
	}	
	
	public Order getOrder() {
		int orderId;
		synchronized(lock) {
			orderId = orderIdGen++;
		}
		
		Order order = new Order();
		order.setOrderId(""+orderId);
		order.setOrderAmount(getOrderAmt());
		
		return order;
	}
	public int getOrderAmt() {
		Random random = new Random();
        int rval = random.nextInt((10 - 1) + 1) + 1;
        return rval*123;
	}
}

Order Consumer

Consumer starts three threads and executes take operation on the queue till termination signal is received.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;

public class OrderConsumer {

	private LinkedTransferQueue<Order> transferQueue;
	private ExecutorService consumer;
	private volatile boolean terminate;

	public OrderConsumer(LinkedTransferQueue<Order> transferQueue){
		this.transferQueue = transferQueue;
		consumer = Executors.newFixedThreadPool(3);	
	}

	public void consumeOrderes() {
		//consumer on thread one
		consumer.execute(takeOrder);
		//consumer on thread two
		consumer.execute(takeOrder);
		//consumer on thread thread
		consumer.execute(takeOrder);
	}

	Runnable takeOrder = new Runnable() {
		@Override
		public void run() {
			long tId = Thread.currentThread().getId();
			//consume orders until it needs to be shutdown
			while(!terminate) {
				try {
					Order order = transferQueue.take();
					System.out.println("thread: "+tId
							+" received and processing order: "
							+order.getOrderId());
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			
			System.out.println("consumer thread: "+tId+" stopped");
		}		
	};
	
	public void shutDownExecutor() {
		consumer.shutdown();
	}
	
	public void setTerminate(boolean terminate) {
		this.terminate = terminate;
	}	
}

OrderManger

This class instantiates LinkedTransferQueue, element producer and consumer objects, starts order processing and stops it after running it for some time.

public class OrderManger {

	private LinkedTransferQueue<Order> transferQueue;	
	
	public static void main(String[] args) {
		OrderManger om = new OrderManger();
		
		om.startOrderManger();		
	}
	
	public OrderManger() {		
		transferQueue = new LinkedTransferQueue<Order>();
	}
	
	public void startOrderManger() {
		//start order producer
		OrderProducer op = new OrderProducer(transferQueue);
		op.produceOrderes();
		
		//start order consumer
		OrderConsumer oc = new OrderConsumer(transferQueue);
		oc.consumeOrderes();
		
		System.out.println("Order processing started");
		
		try {
			Thread.sleep(40);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}	
		
		//ask producer threads to stop
		op.setTerminate(true);
		
		//wait before asking consumer threads to stop
		//to prevent blocking producers issue
		try {
			Thread.sleep(40);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}	
	
		//ask consumer threads to stop
		oc.setTerminate(true);
		
		System.out.println("Order processing stopping after 80 milliseconds");
		
		//wait for the remaining work to complete
		try {
			Thread.sleep(200);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		//shutdown producer thread executor
		op.shutDownExecutor();
		//clear any blocking consumer with dummy orders
		op.clearWaitingConsumers();
		//shutdown consumer thread executor
		oc.shutDownExecutor();
	}
}

Output

thread: 12 adding order for precessing: 1001
thread: 11 adding order for precessing: 1002
thread: 10 adding order for precessing: 1000
........
thread: 13 received and processing order: 1762
thread: 14 received and processing order: 1759
thread: 15 received and processing order: 1761
thread: 11 adding order for precessing: 1763
producer thread: 11 stopped
producer thread: 12 stopped
thread: 13 received and processing order: 1763
Order processing stopping after 80 milliseconds
thread: 15 received and processing order: null
consumer thread: 15 stopped