CS 702 - Operating Systems - Spring 2008
Producer/Consumer in Java
Loyola College >
Department of Computer Science >
Dr. James Glenn >
CS 702 >
Examples and Lecture Notes >
Producer/Consumer in Java
ProducerConsumer_Java5.java
import java.util.*;
import java.util.concurrent.locks.*;
/**
* Demonstration of the producer/consumer problem in Java. In this
* example one producer adds items to a fixed-length queue. Multiple
* consumers remove items from the queue. When the producer is finished
* adding items the program hangs with all consumers waiting for items
* to be added (try to fix this!).
*
* @author Jim Glenn
* @version 0.1 2/10/2004
*/
public class ProducerConsumer_Java5
{
/**
* The fixed-length queue for this demonstration.
*/
private Buffer b;
/**
* Encapsulates the data and operations on the fixed-length queue.
*/
public class Buffer
{
/**
* Storage for items on this queue.
*/
private LinkedList buf;
/**
* The number of items currently on this queue.
*/
private int items;
/**
* The maximum number of items that can be on this queue.
*/
private int size;
/**
* The number of consumers waiting.
*/
private int consumersWaiting;
/**
* The number of producers waiting.
*/
private int producersWaiting;
/**
* The lock for this buffer's methods.
*/
private Lock lock;
/**
* The condition for the producer(s) to wait on.
*/
private Condition notFull;
/**
* The condition for the consumers to wait on.
*/
private Condition notEmpty;
/**
* Whether or not there is still a producer working with this buffer.
*/
private boolean closed;
/**
* Creates a queue that can hold up to the given number of items.
*
* @param n the capacity of the new queue
*/
public Buffer(int n)
{
buf = new LinkedList();
items = 0;
size = n;
producersWaiting = 0;
consumersWaiting = 0;
closed = false;
lock = new ReentrantLock();
notFull = lock.newCondition();
notEmpty = lock.newCondition();
}
/**
* Removes an item from this queue. If no item is available,
* the calling thread sleeps until one is available.
*
* @return the item removed
*/
public Object remove()
{
lock.lock();
try
{
Object result;
while (items == 0 && !closed)
{
consumersWaiting++;
notEmpty.awaitUninterruptibly();
}
if (items == 0 && closed)
{
return null;
}
else
{
result = buf.getFirst();
buf.remove(0);
items--;
if (producersWaiting > 0)
{
notFull.signal();
producersWaiting--;
}
return result;
}
}
finally
{
lock.unlock();
}
}
/**
* Adds an item to this queue. If there is no more room on the
* queue, the calling thread sleeps.
*
* @param item the item to add
*/
public void add(Object item)
{
lock.lock();
try
{
while (items == size)
{
producersWaiting++;
notFull.awaitUninterruptibly();
}
buf.add(item);
items++;
if (consumersWaiting > 0)
{
notEmpty.signal();
consumersWaiting--;
}
}
finally
{
lock.unlock();
}
}
/**
* Closes this buffer. This will cause any waiting consumers
* or any consumers that arrive after the buffer is emptied
* to receive <CODE>null</CODE>.
*/
public void close()
{
lock.lock();
try
{
closed = true;
if (consumersWaiting > 0)
{
notEmpty.signalAll();
consumersWaiting = 0;
}
}
finally
{
lock.unlock();
}
}
}
/**
* Threads that add items to the queue.
*/
public class Producer extends Thread
{
/**
* The number of items for this thread to add.
*/
private int itemsToAdd;
/**
* Creates a producer that will add the given number of items
* to the queue. The items will be numbered 0,...,<CODE>n</CODE>-1
*/
public Producer(int n)
{
itemsToAdd = n;
}
/**
* Adds items to the queue.
*/
public void run()
{
for (int i = 0; i < itemsToAdd; i++)
{
try
{
sleep((int)(Math.random() * 10));
}
catch (InterruptedException e)
{
}
b.add(new Integer(i));
System.out.println("Producer added " + i);
}
b.close();
System.out.println("Producer terminated");
}
}
/**
* Threads that remove items from the queue.
*/
public class Consumer extends Thread
{
/**
* An identifier for this consumer.
*/
private int id;
/**
* Creates a consumer thread with the given id.
*
* @param i the id of the new consumer
*/
public Consumer(int i)
{
id = i;
}
/**
* Causes this consumer to start removing things from the queue.
*/
public void run()
{
Integer item = (Integer)(b.remove());
while (item != null)
{
System.out.println("Consumer " + id + " removed " + item);
try
{
sleep((int)(Math.random() * 10) * 100);
}
catch (InterruptedException e)
{
}
item = (Integer)(b.remove());
}
System.out.println("Consumer " + id + " terminated");
}
}
/**
* Creates a new example of the producer/consumer problem.
* The arguments specify the number of items the producer will add,
* the size of the queue that will hold the items, and the number
* of consumers that access the queue.
*
* @param items the number of items for the producer to add
* @param bufferSize the size of the fixed-length queue
* @param consumers the number of consumers to create
*/
public ProducerConsumer_Java5(int items, int bufferSize, int consumers)
{
b = new Buffer(bufferSize);
for (int id = 0; id < consumers; id++)
{
Thread t = new Consumer(id);
t.start();
}
(new Producer(items)).start();
}
/**
* Creates an instance of the producer/consumer problem given
* parameters specified on the command line.
*
* @param args an array containing the decimal representation of the
* number of items to add, the size of the queue, and the number
* of consumers in its first three elements
*/
public static void main(String[] args)
{
if (args.length < 3)
{
System.err.println("USAGE: java ProducerConsumer items buffer-size consumers");
System.exit(1);
}
int items = Integer.parseInt(args[0]);
int bufferSize = Integer.parseInt(args[1]);
int consumers = Integer.parseInt(args[2]);
new ProducerConsumer_Java5(items, bufferSize, consumers);
}
}
This code can also be downloaded from the file
ProducerConsumer_Java5.java.