CS 702 - Operating Systems - Spring 2008
Producer/Consumer in Java


Loyola College > Department of Computer Science > Dr. James Glenn > CS 702 > Examples and Lecture Notes > Producer/Consumer in Java

ProducerConsumer_Java5.java

import java.util.*;
import java.util.concurrent.locks.*;

/**
 * Demonstration of the producer/consumer problem in Java.  In this
 * example one producer adds items to a fixed-length queue.  Multiple
 * consumers remove items from the queue.  When the producer is finished
 * adding items the program hangs with all consumers waiting for items
 * to be added (try to fix this!).
 *
 * @author Jim Glenn
 * @version 0.1 2/10/2004
 */

public class ProducerConsumer_Java5
{
    /**
     * The fixed-length queue for this demonstration.
     */

    private Buffer b;

    /**
     * Encapsulates the data and operations on the fixed-length queue.
     */

    public class Buffer
    {
	/**
	 * Storage for items on this queue.
	 */

	private LinkedList buf;

	/**
	 * The number of items currently on this queue.
	 */

	private int items;

	/**
	 * The maximum number of items that can be on this queue.
	 */

	private int size;

	/**
	 * The number of consumers waiting.
	 */

	private int consumersWaiting;

	/**
	 * The number of producers waiting.
	 */

	private int producersWaiting;

	/**
	 * The lock for this buffer's methods.
	 */

	private Lock lock;

	/**
	 * The condition for the producer(s) to wait on.
	 */

	private Condition notFull;

	/**
	 * The condition for the consumers to wait on.
	 */

	private Condition notEmpty;

	/**
	 * Whether or not there is still a producer working with this buffer.
	 */

	private boolean closed;

	/**
	 * Creates a queue that can hold up to the given number of items.
	 *
	 * @param n the capacity of the new queue
	 */

	public Buffer(int n)
	{
	    buf = new LinkedList();
	    items = 0;
	    size = n;
	    producersWaiting = 0;
	    consumersWaiting = 0;
	    closed = false;

	    lock = new ReentrantLock();
	    notFull = lock.newCondition();
	    notEmpty = lock.newCondition();
	}

	/**
	 * Removes an item from this queue.  If no item is available,
	 * the calling thread sleeps until one is available.
	 *
	 * @return the item removed
	 */

	public Object remove()
	{
	    lock.lock();

	    try
		{
		    Object result;
		    
		    while (items == 0 && !closed)
			{
			    consumersWaiting++;
			    notEmpty.awaitUninterruptibly();
			}
		    
		    if (items == 0 && closed)
			{
			    return null;
			}
		    else
			{
			    result = buf.getFirst();
			    buf.remove(0);
			    items--;
			    
			    if (producersWaiting > 0)
				{
				    notFull.signal();
				    producersWaiting--;
				}
			    
			    return result;
			}
		}
	    finally
		{
		    lock.unlock();
		}
	}

	/**
	 * Adds an item to this queue.  If there is no more room on the
	 * queue, the calling thread sleeps.
	 *
	 * @param item the item to add
	 */

	public void add(Object item)
	{
	    lock.lock();

	    try
		{
		    while (items == size)
			{
			    producersWaiting++;
			    notFull.awaitUninterruptibly();
			}
		    
		    buf.add(item);
		    items++;
		    
		    if (consumersWaiting > 0)
			{
			    notEmpty.signal();
			    consumersWaiting--;
			}
		}
	    finally
		{
		    lock.unlock();
		}
	}

	/**
	 * Closes this buffer.  This will cause any waiting consumers
	 * or any consumers that arrive after the buffer is emptied
	 * to receive <CODE>null</CODE>.
	 */

	public void close()
	{
	    lock.lock();

	    try
		{
		    closed = true;
		    
		    if (consumersWaiting > 0)
			{
			    notEmpty.signalAll();
			    consumersWaiting = 0;
			}
		}
	    finally
		{
		    lock.unlock();
		}
	}
    }


    /**
     * Threads that add items to the queue.
     */

    public class Producer extends Thread
    {
	/**
	 * The number of items for this thread to add.
	 */

	private int itemsToAdd;

	/**
	 * Creates a producer that will add the given number of items
	 * to the queue.  The items will be numbered 0,...,<CODE>n</CODE>-1
	 */

	public Producer(int n)
	{
	    itemsToAdd = n;
	}

	/**
	 * Adds items to the queue.
	 */

	public void run()
	{
	    for (int i = 0; i < itemsToAdd; i++)
		{
		    try
			{
			    sleep((int)(Math.random() * 10));
			}
		    catch (InterruptedException e)
			{
			}

		    b.add(new Integer(i));
		    System.out.println("Producer added " + i);
		}
	    b.close();
	    System.out.println("Producer terminated");
	}
    }

    /**
     * Threads that remove items from the queue.
     */

    public class Consumer extends Thread
    {
	/**
	 * An identifier for this consumer.
	 */

	private int id;

	/**
	 * Creates a consumer thread with the given id.
	 *
	 * @param i the id of the new consumer
	 */

	public Consumer(int i)
	{
	    id = i;
	}

	/**
	 * Causes this consumer to start removing things from the queue.
	 */

	public void run()
	{

	    Integer item = (Integer)(b.remove());
	    while (item != null)
		{
		    System.out.println("Consumer " + id + " removed " + item);

		    try
			{
			    sleep((int)(Math.random() * 10) * 100);
			}
		    catch (InterruptedException e)
			{
			}

		    item = (Integer)(b.remove());
		}
	    System.out.println("Consumer " + id + " terminated");
	}
    }

    /**
     * Creates a new example of the producer/consumer problem.
     * The arguments specify the number of items the producer will add,
     * the size of the queue that will hold the items, and the number
     * of consumers that access the queue.
     *
     * @param items the number of items for the producer to add
     * @param bufferSize the size of the fixed-length queue
     * @param consumers the number of consumers to create
     */

    public ProducerConsumer_Java5(int items, int bufferSize, int consumers)
    {
	b = new Buffer(bufferSize);
	
	for (int id = 0; id < consumers; id++)
	    {
		Thread t = new Consumer(id);
		t.start();
	    }

	(new Producer(items)).start();
    }

    /**
     * Creates an instance of the producer/consumer problem given
     * parameters specified on the command line.
     *
     * @param args an array containing the decimal representation of the
     * number of items to add, the size of the queue, and the number
     * of consumers in its first three elements
     */

    public static void main(String[] args)
    {
	if (args.length < 3)
	    {
		System.err.println("USAGE: java ProducerConsumer items buffer-size consumers");
		System.exit(1);
	    }

	int items = Integer.parseInt(args[0]);
	int bufferSize = Integer.parseInt(args[1]);
	int consumers = Integer.parseInt(args[2]);

	new ProducerConsumer_Java5(items, bufferSize, consumers);
    }
}
This code can also be downloaded from the file ProducerConsumer_Java5.java.