CS 702 - Operating Systems - Fall 2007
Producer/Consumer in C


Loyola College > Department of Computer Science > Dr. James Glenn > CS 702 > Examples and Lecture Notes > Producer/Consumer in C

buffer.h

#ifndef __BUFFER_H__
#define __BUFFER_H__

/**
 * A bounded buffer.
 *
 * @param Item the type of item thse buffers hold; must have an assignment
 * operator, a copy constructor, and a zero argument constructor
 * @param size the maximum number of items in these buffers; must be positive
 *
 * @author Jim Glenn
 * @version 0.1 2/17/2004
 */

template < class Item >
class Buffer
{
 public:
  /**
   * Creates an empty buffer that can hold up to the given number of items.
   *
   * @param max the maximum number of items this buffer can hold
   */

  Buffer(int max);

  /**
   * Destroys this buffer.
   */

  ~Buffer();

  /**
   * Adds the given item to this buffer.  If this buffer is full, the
   * calling thread is blocked until it is not full.  This method will
   * not return until the item has been successfully added.
   *
   * @param toAdd the item to add
   * @param id the id of the calling thread (for debugging)
   */

  void add(const Item& toAdd, int id);

  /**
   * Removes one item from this buffer.  If this buffer is empty, the
   * calling thread is blocked until one is available or the buffer is closed.
   *
   * @param removed the item removed from this buffer
   * @param id the id of the calling thread (for debugging)
   * @return true if an item was removed; false if the buffer was closed
   */

  bool remove(Item& removed, int id);

  void close();

 private:
  /**
   * Copying is disallowed.
   */

  Buffer(const Buffer&);

  /**
   * Assignment is disallowed.
   */

  void operator = (const Buffer &);

  /**
   * Adds the given item to the array used to implement this buffer.
   * There must be room in the buffer for the new item.
   * This is the part of add that does not have to do with mutual exclusion
   * and synchronization; it should be called from the public <CODE>add</CODE>
   * method only after mutual exclusion has been assured and only when
   * there is room in the buffer.
   *
   * @param toAdd the item to add
   */

  void addToArray(const Item& toAdd);

  /**
   * Removes one item from the array used to implement this buffer.
   * There must be at least one item in the array.
   * This is the part of remove that does not have to do with mutual exclusion
   * and synchronization; it should be called from the public
   * <CODE>remove</CODE> method only after mutual exclusion has been assured
   * and only when the buffer is nonempty.
   *
   * @return the item removed
   */

  Item removeFromArray();

  /**
   * The array that holds the items in this buffer.  Works like a
   * FIFO queue with wraparound.
   */

  Item *items;

  /**
   * The size of the <CODE>items</CODE> array.
   */

  int size;

  /**
   * The index of the front item in the queue.
   */

  int front;

  /**
   * The index of the last item in the queue.
   */

  int back;

  /**
   * The number of items in the queue.
   */

  int count;

  static const pthread_cond_t DEFAULT_COND;
  static const pthread_mutex_t DEFAULT_MUTEX;

  pthread_cond_t notFull;
  pthread_cond_t notEmpty;

  pthread_mutex_t mutex;

  bool closed;
  int consumersWaiting;
  int producersWaiting;
  int spacesReserved;
  int itemsReserved;
};

#include "buffer.cpp"

#endif

buffer.cpp

#ifndef __BUFFER_CPP__
#define __BUFFER_CPP__

#include <cassert>
#include <iostream>

#include "buffer.h"

template <class Item >
const pthread_cond_t Buffer< Item >::DEFAULT_COND = PTHREAD_COND_INITIALIZER;

template <class Item >
const pthread_mutex_t Buffer< Item >::DEFAULT_MUTEX = PTHREAD_MUTEX_INITIALIZER;

template < class Item >
Buffer< Item >::Buffer(int max) :
  notFull(DEFAULT_COND),
  notEmpty(DEFAULT_COND),
  mutex(DEFAULT_MUTEX)
{
  size = max;
  count = 0;
  front = 0;
  back = size - 1;

  closed = false;
  consumersWaiting = 0;
  producersWaiting = 0;
  spacesReserved = 0;
  itemsReserved = 0;

  // create the array

  items = new Item[max];
}

template < class Item >
Buffer< Item >::~Buffer()
{
  delete[] items;
}

template < class Item >
void Buffer< Item >::add(const Item& toAdd, int id)
{
  pthread_mutex_lock(&mutex);

  if (count + spacesReserved == size)
    {
      producersWaiting++;
      pthread_cond_wait(¬Full, &mutex);
      spacesReserved--;
    }

  addToArray(toAdd);

  if (consumersWaiting > 0)
    {
      consumersWaiting--;
      itemsReserved++;
      pthread_cond_signal(¬Empty);
    }

  pthread_mutex_unlock(&mutex);
}

template < class Item >
bool Buffer< Item >::remove(Item& removed, int id)
{
  pthread_mutex_lock(&mutex);

  bool gotItem = false;

  if (count - itemsReserved == 0 && !closed)
    {
      consumersWaiting++;
      pthread_cond_wait(¬Empty, &mutex);

      if (!closed || (closed && itemsReserved > 0))
	{
	  itemsReserved--;
	  gotItem = true;
	}
    }
  else if (count > 0)
    {
      gotItem = true;
    }

  if (gotItem)
    removed = removeFromArray();
  
  if (producersWaiting > 0)
    {
      producersWaiting--;
      spacesReserved++;
      pthread_cond_signal(¬Full);
    }

  pthread_mutex_unlock(&mutex);
  
  return gotItem;
}

template < class Item >
void Buffer< Item >::close()
{
  pthread_mutex_lock(&mutex);

  closed = true;

  pthread_cond_broadcast(¬Empty);

  pthread_mutex_unlock(&mutex);
}

template < class Item >
Buffer< Item >::Buffer(const Buffer&)
{
  assert(false);
}

template < class Item >
void Buffer< Item >::operator = (const Buffer &)
{
  assert(false);
}

template < class Item >
void Buffer< Item >::addToArray(const Item& toAdd)
{
  assert(count < size);

  // update index of back, accounting for wraparound

  back = (back + 1) % size;

  // update number of items in queue

  count++;

  // put item in array

  items[back] = toAdd;
}

template < class Item >
Item Buffer< Item >::removeFromArray()
{
  assert(count > 0);

  // save index of the item we'll need to return

  int oldFront = front;

  // update front, accounting for wraparound

  front = (front + 1) % size;

  // update number of items

  count--;

  // return item that used to be at the front

  return items[oldFront];
}

#endif

producer_consumer.cpp

#include <iostream>
#include <sstream>
#include <map>
#include <vector>
#include <cstdlib>

#include <unistd.h>
#include <time.h>

#include "buffer.h"

/**
 * The entry point for producer threads.
 *
 * @param args a pointer to a producer_args_t structure
 * @return <CODE>NULL</CODE>
 */

void *producer(void *args);

/**
 * The entry point for consumer threads.
 *
 * @param args a pointer to a consumer_args_t structure
 */

void *consumer(void *args);

/**
 * Arguments for producer threads.
 */

struct producer_args_t
{
  /**
   * An identifier.
   */

  int id;

  /**
   * A pointer to the buffer to write to.
   */

  Buffer< std::pair< int, int > > *buf;

  /**
   * The number of items to add.
   */

  int count;
};

/**
 * Arguments for consumer threads.
 */

struct consumer_args_t
{
  /**
   * An identifier.
   */

  int id;

  /**
   * A pointer to the buffer to read from.
   */

  Buffer< std::pair< int, int > > *buf;
};

/**
 * Simulates the producer/consumer problem.  Creates a number of
 * producers and consumers (determined by command line arguments)
 * and a bounded buffer for them to share.  The size also determines
 * the number of items the procuders will add to the buffer -- each
 * producer adds twice as many items as there are spaces in the buffer.
 *
 * @param argc the number of command line arguments; should be 4
 * @param argv an array of strings with the number of producers,
 * consumers, and size of the bounded buffer in elements 1 through 3
 */

int main(int argc, char **argv)
{
  // parse and validate command line arguments

  if (argc != 4)
    {
      std::cerr << "USAGE: " << argv[0] << " producers consumers buffer-size"
		<< std::endl;
      return 1;
    }

  std::stringstream args;
  for (int a = 1; a < argc; a++)
    args << argv[a] << ' ';

  int producers, consumers, size;
  args >> producers >> consumers >> size;

  if (producers <= 0 || consumers <= 0 || size <= 0)
    {
      std::cerr << argv[0] << " all argument must be positive"
		<< std::endl;
      return 1;
    }

  // create the buffer

  Buffer< std::pair< int, int > > b(size);

  // randomize

  srandom(time(NULL));

  // start the consumers

  for (int c = 0; c < consumers; c++)
    {
      pthread_t id;
      consumer_args_t *cArgs = new consumer_args_t();
      cArgs->id = c;
      cArgs->buf = &b;

      pthread_create(&id, NULL, consumer, cArgs);
    }
      
  // start the producers

  pthread_t producerID[producers];

  for (int p = 0; p < producers; p++)
    {
      pthread_t id;
      producer_args_t *pArgs = new producer_args_t();
      pArgs->id = p;
      pArgs->buf = &b;
      pArgs->count = size * 2;

      pthread_create(&id, NULL, producer, pArgs);
      producerID[p] = id;
    }

  // wait for producers to finish

  for (int p = 0; p < producers; p++)
    pthread_join(producerID[p], NULL);

  b.close();

  // terminate main

  pthread_exit(NULL);

  // won't get here

  return 0;
}

void *producer(void *args)
{
  producer_args_t *pArgs = reinterpret_cast< producer_args_t * >(args);

  for (int item = 0; item < pArgs->count; item++)
    {
      pArgs->buf->add(std::pair< int, int >(pArgs->id, item), pArgs->id);
      std::cout << "Producer " << pArgs->id << " added " << item
		<< std::endl;

      if (random() % 2 == 1)
	sleep(1);
    }

  // wait for all producers to finish, then close the buffer

  delete pArgs;

  pthread_exit(NULL);
}

void *consumer(void *args)
{
  consumer_args_t *cArgs = reinterpret_cast< consumer_args_t * >(args);

  std::pair< int, int > item;

  while (cArgs->buf->remove(item, cArgs->id))
    {
      std::cout << "Consumer " << cArgs->id << " removed " << item.second
		<< " added by producer " << item.first
		<< std::endl;
      sleep(random() % 2 + 1);
    }

  std::cout << "Consumer " << cArgs->id << " terminating"
	    << std::endl;

  delete cArgs;

  pthread_exit(NULL);
}

  

This code can also be downloaded from the files buffer.h, buffer.cpp, and producer_consumer.cpp.