LinkedTransferQueue supports all other queue and blocking queue and collection operations.
Producer can add element to LinkedTransferQueue and wait till the element is received by the consumer by using transfer() or tryTransfer() methods. If you run the following program, it blocks forever as transfer() waits for a consumer of the LinkedTransferQueue.
LinkedTransferQueue<String> tQueue = new LinkedTransferQueue<String>();
try {
System.out.println("transferring an element");
tQueue.transfer("brand");
System.out.println("element transfer is complete");
} catch (InterruptedException e1) {
e1.printStackTrace();
}
Output
transferring an element
Following example shows that transfer operation exits after consumer tries to take the element.
LinkedTransferQueue<String> tQueue = new LinkedTransferQueue<String>();
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("transferring an element");
tQueue.transfer("brand");
System.out.println("element transfer is complete");
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}).start();
try {
System.out.println("transfer queue received element: "+tQueue.take());
} catch (Exception e) {
e.printStackTrace();
}
Output
transferring an element
element transfer is complete
transfer queue received element: brand
Operation tryTransfer adds the given element to the transfer queue if there is a consumer waiting for elements, otherwise it exits without adding the element to the queue.
LinkedTransferQueue<String> tQueue = new LinkedTransferQueue<String>();
tQueue.tryTransfer("brand");
System.out.println("size of transfer queue: "+tQueue.size());
Output
size of transfer queue: 0
You can make tryTransfer operation wait for certain amount of time before it exits when there is no consumer by passing wait time to the operation.
try {
tQueue.tryTransfer("brand", 2000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e2) {
e2.printStackTrace();
}
In addition to transferring elements where producer waits for consumer, LinkedTransferQueue can be used as normal queue or blocking queue.
LinkedTransferQueue<String> tQueue = new LinkedTransferQueue<String>();
tQueue.put("apple");
System.out.println("element from queue: "+tQueue.remove());
Following real example shows how to use LinkedTransferQueue. The example uses order processing module of an ecommerce application in which order producers transfer orders via LinkedTransferQueue to order consumers and order consumers processes the orders.
The example uses thread pool executor service to simulate multiple producers and consumers which run in different threads and add to or remove from the same queue.
public class Order {
private String orderId;
private double orderAmount;
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public double getOrderAmount() {
return orderAmount;
}
public void setOrderAmount(double orderAmount) {
this.orderAmount = orderAmount;
}
}
Producer starts three threads and each thread executes transfer operation by creating order objects and transferring them to queue till termination signal is received.
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
public class OrderProducer {
private LinkedTransferQueue<Order> transferQueue;
private ExecutorService producer;
private int orderIdGen = 1000;
private Object lock = new Object();
private volatile boolean terminate;
public OrderProducer(LinkedTransferQueue<Order> transferQueue) {
this.transferQueue = transferQueue;
producer = Executors.newFixedThreadPool(3);
}
public void produceOrderes() {
//producer on thread one
producer.execute(addOrder);
//producer on thread two
producer.execute(addOrder);
//producer on thread thread
producer.execute(addOrder);
}
Runnable addOrder = new Runnable() {
@Override
public void run() {
long tId = Thread.currentThread().getId();
//post orders until it needs to be shutdown
while(!terminate) {
try {
Order order = getOrder();
System.out.println("thread: "+tId
+" adding order for precessing: "
+order.getOrderId());
//add order object to queue and wait for consumer
transferQueue.transfer(order);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("producer thread: "+tId+" stopped");
}
};
public void clearWaitingConsumers() {
//to prevent blocking consumers
//before existing post dummy order if there are waiting consumers
while(transferQueue.hasWaitingConsumer()) {
transferQueue.put(new Order());
}
}
public void shutDownExecutor() {
producer.shutdown();
}
public void setTerminate(boolean terminate) {
this.terminate = terminate;
}
public Order getOrder() {
int orderId;
synchronized(lock) {
orderId = orderIdGen++;
}
Order order = new Order();
order.setOrderId(""+orderId);
order.setOrderAmount(getOrderAmt());
return order;
}
public int getOrderAmt() {
Random random = new Random();
int rval = random.nextInt((10 - 1) + 1) + 1;
return rval*123;
}
}
Consumer starts three threads and executes take operation on the queue till termination signal is received.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
public class OrderConsumer {
private LinkedTransferQueue<Order> transferQueue;
private ExecutorService consumer;
private volatile boolean terminate;
public OrderConsumer(LinkedTransferQueue<Order> transferQueue){
this.transferQueue = transferQueue;
consumer = Executors.newFixedThreadPool(3);
}
public void consumeOrderes() {
//consumer on thread one
consumer.execute(takeOrder);
//consumer on thread two
consumer.execute(takeOrder);
//consumer on thread thread
consumer.execute(takeOrder);
}
Runnable takeOrder = new Runnable() {
@Override
public void run() {
long tId = Thread.currentThread().getId();
//consume orders until it needs to be shutdown
while(!terminate) {
try {
Order order = transferQueue.take();
System.out.println("thread: "+tId
+" received and processing order: "
+order.getOrderId());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("consumer thread: "+tId+" stopped");
}
};
public void shutDownExecutor() {
consumer.shutdown();
}
public void setTerminate(boolean terminate) {
this.terminate = terminate;
}
}
This class instantiates LinkedTransferQueue, element producer and consumer objects, starts order processing and stops it after running it for some time.
public class OrderManger {
private LinkedTransferQueue<Order> transferQueue;
public static void main(String[] args) {
OrderManger om = new OrderManger();
om.startOrderManger();
}
public OrderManger() {
transferQueue = new LinkedTransferQueue<Order>();
}
public void startOrderManger() {
//start order producer
OrderProducer op = new OrderProducer(transferQueue);
op.produceOrderes();
//start order consumer
OrderConsumer oc = new OrderConsumer(transferQueue);
oc.consumeOrderes();
System.out.println("Order processing started");
try {
Thread.sleep(40);
} catch (InterruptedException e) {
e.printStackTrace();
}
//ask producer threads to stop
op.setTerminate(true);
//wait before asking consumer threads to stop
//to prevent blocking producers issue
try {
Thread.sleep(40);
} catch (InterruptedException e) {
e.printStackTrace();
}
//ask consumer threads to stop
oc.setTerminate(true);
System.out.println("Order processing stopping after 80 milliseconds");
//wait for the remaining work to complete
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
//shutdown producer thread executor
op.shutDownExecutor();
//clear any blocking consumer with dummy orders
op.clearWaitingConsumers();
//shutdown consumer thread executor
oc.shutDownExecutor();
}
}
thread: 12 adding order for precessing: 1001
thread: 11 adding order for precessing: 1002
thread: 10 adding order for precessing: 1000
........
thread: 13 received and processing order: 1762
thread: 14 received and processing order: 1759
thread: 15 received and processing order: 1761
thread: 11 adding order for precessing: 1763
producer thread: 11 stopped
producer thread: 12 stopped
thread: 13 received and processing order: 1763
Order processing stopping after 80 milliseconds
thread: 15 received and processing order: null
consumer thread: 15 stopped