CS 702 - Operating Systems - Spring 2008
Producer/Consumer with POSIX Mutexes and Condition Variables


Loyola College > Department of Computer Science > Dr. James Glenn > CS 702 > Examples and Lecture Notes > Producer/Consumer with POSIX Mutexes and Condition Variables

We begin with the following code for add and remove.

template < class Item >
void Buffer< Item >::add(const Item& toAdd, int id)
{
  pthread_mutex_lock(&mutex);

  if (count == size)
    {
      pthread_cond_wait(&notFull, &mutex);
    }

  addToArray(toAdd);

  pthread_cond_signal(&notEmpty);

  pthread_mutex_unlock(&mutex);
}

template < class Item >
bool Buffer< Item >::remove(Item& removed, int id)
{
  pthread_mutex_lock(&mutex);

  if (count == 0)
    pthread_cond_wait(&notEmpty, &mutex);

  removed = removeFromArray();
  
  pthread_cond_signal(&notFull);

  pthread_mutex_unlock(&mutex);
  
  return true;
}

We have a problem because when a thread receives a signal it is not necessarily the first one that gets scheduled when the signalling thread unlocks the mutex. For example, with an initially empty buffer we could have the following series of operations.

The standard way to fix this problem is to change the ifs into whiles.

template < class Item >
void Buffer< Item >::add(const Item& toAdd, int id)
{
  pthread_mutex_lock(&mutex);

  while (count == size)
    {
      pthread_cond_wait(&notFull, &mutex);
    }

  addToArray(toAdd);

  pthread_cond_signal(&notEmpty);

  pthread_mutex_unlock(&mutex);
}

template < class Item >
bool Buffer< Item >::remove(Item& removed, int id)
{
  pthread_mutex_lock(&mutex);

  while (count == 0)
    pthread_cond_wait(&notEmpty, &mutex);

  removed = removeFromArray();
  
  pthread_cond_signal(&notFull);

  pthread_mutex_unlock(&mutex);
  
  return true;
}
#endif

In this case, we can go back to an if with some extra bookkeeping. We keep track of the number of producers waiting, the number of consumers waiting, the number of items reserved for consumers that have been unblocked but haven't been scheduled yet, and the number of empty spaces reserved for producers unblocks but not yet scheduled.

template < class Item >
void Buffer< Item >::add(const Item& toAdd, int id)
{
  pthread_mutex_lock(&mutex);

  if (count + spacesReserved == size)
    {
      producersWaiting++;
      pthread_cond_wait(&notFull, &mutex);
      spacesReserved--;
      producersWaiting--;
    }

  addToArray(toAdd);

  if (consumersWaiting > 0)
    {
      itemsReserved++;
      pthread_cond_signal(&notEmpty);
    }

  pthread_mutex_unlock(&mutex);
}

template < class Item >
bool Buffer< Item >::remove(Item& removed, int id)
{
  pthread_mutex_lock(&mutex);

  if (count - itemsReserved == 0)
    {
      consumersWaiting++;
      pthread_cond_wait(&notEmpty, &mutex);

      itemsReserved--;
      consumersWaiting--;
    }

  removed = removeFromArray();
  
  if (producersWaiting > 0)
    {
      spacesReserved++;
      pthread_cond_signal(&notFull);
    }

  pthread_mutex_unlock(&mutex);
  
  return true;
}

Even this is not quite correct, as the following sequence of operations illustrates.

Doing some bookkeeping on behalf of the thread you are unblocking fixes the problem.

template < class Item >
void Buffer< Item >::add(const Item& toAdd, int id)
{
  pthread_mutex_lock(&mutex);

  if (count + spacesReserved == size)
    {
      producersWaiting++;
      pthread_cond_wait(&notFull, &mutex);
      spacesReserved--;
      producersWaiting--;
    }

  addToArray(toAdd);

  if (consumersWaiting > 0)
    {
      consumersWaiting--;
      itemsReserved++;
      pthread_cond_signal(&notEmpty);
    }

  pthread_mutex_unlock(&mutex);
}

template < class Item >
bool Buffer< Item >::remove(Item& removed, int id)
{
  pthread_mutex_lock(&mutex);

  if (count - itemsReserved == 0)
    {
      consumersWaiting++;
      pthread_cond_wait(&notEmpty, &mutex);
      itemsReserved--;
      consumersWaiting--;
    }

  removed = removeFromArray();
  
  if (producersWaiting > 0)
    {
      producersWaiting--;
      spacesReserved++;
      pthread_cond_signal(&notFull);
    }

  pthread_mutex_unlock(&mutex);
  
  return true;
}

Finally, we resolve the issue with consumers waiting forever in remove once the producers have all terminated. We do so with a closed flag that is set when the last producer terminates. When the flag is set, a broadcast is sent to the consumers so that any that awake and see an empty buffer know that there will never be any more items and so they should terminate.

template < class Item >
void Buffer< Item >::add(const Item& toAdd, int id)
{
  pthread_mutex_lock(&mutex);

  if (count + spacesReserved == size)
    {
      producersWaiting++;
      pthread_cond_wait(&notFull, &mutex);
      spacesReserved--;
    }

  addToArray(toAdd);

  if (consumersWaiting > 0)
    {
      consumersWaiting--;
      itemsReserved++;
      pthread_cond_signal(&notEmpty);
    }

  pthread_mutex_unlock(&mutex);
}

template < class Item >
bool Buffer< Item >::remove(Item& removed, int id)
{
  pthread_mutex_lock(&mutex);

  bool gotItem = false;

  if (count - itemsReserved == 0 && !closed)
    {
      consumersWaiting++;
      pthread_cond_wait(&notEmpty, &mutex);

      if (!closed || (closed && itemsReserved > 0))
	{
	  itemsReserved--;
	  gotItem = true;
	}
    }
  else if (count > 0)
    {
      gotItem = true;
    }

  if (gotItem)
    removed = removeFromArray();
  
  if (producersWaiting > 0)
    {
      producersWaiting--;
      spacesReserved++;
      pthread_cond_signal(&notFull);
    }

  pthread_mutex_unlock(&mutex);
  
  return gotItem;
}


template < class Item >
void Buffer< Item >::close(int n)
{
  pthread_mutex_lock(&mutex);

  // get totalProducers from 1st call

  if (totalProducers == -1)
    totalProducers = n;

  producersFinished++;

  if (producersFinished == totalProducers)
    {
      closed = true;
      pthread_cond_broadcast(&notEmpty);
      consumersWaiting = 0;
    }

  pthread_mutex_unlock(&mutex);
}

Complete code follows.

buffer.h

#ifndef __BUFFER_H__
#define __BUFFER_H__

/**
 * A bounded buffer.
 *
 * @param Item the type of item thse buffers hold; must have an assignment
 * operator, a copy constructor, and a zero argument constructor
 * @param size the maximum number of items in these buffers; must be positive
 *
 * @author Jim Glenn
 * @version 0.1 2/17/2004
 */

template < class Item >
class Buffer
{
 public:
  /**
   * Creates an empty buffer that can hold up to the given number of items.
   *
   * @param max the maximum number of items this buffer can hold
   */

  Buffer(int max);

  /**
   * Destroys this buffer.
   */

  ~Buffer();

  /**
   * Adds the given item to this buffer.  If this buffer is full, the
   * calling thread is blocked until it is not full.  This method will
   * not return until the item has been successfully added.
   *
   * @param toAdd the item to add
   * @param id the id of the calling thread (for debugging)
   */

  void add(const Item& toAdd, int id);

  /**
   * Removes one item from this buffer.  If this buffer is empty, the
   * calling thread is blocked until one is available or the buffer is closed.
   *
   * @param removed the item removed from this buffer
   * @param id the id of the calling thread (for debugging)
   * @return true if an item was removed; false if the buffer was closed
   */

  bool remove(Item& removed, int id);

  void close(int);

 private:
  /**
   * Copying is disallowed.
   */

  Buffer(const Buffer&);

  /**
   * Assignment is disallowed.
   */

  void operator = (const Buffer &);

  /**
   * Adds the given item to the array used to implement this buffer.
   * There must be room in the buffer for the new item.
   * This is the part of add that does not have to do with mutual exclusion
   * and synchronization; it should be called from the public <CODE>add</CODE>
   * method only after mutual exclusion has been assured and only when
   * there is room in the buffer.
   *
   * @param toAdd the item to add
   */

  void addToArray(const Item& toAdd);

  /**
   * Removes one item from the array used to implement this buffer.
   * There must be at least one item in the array.
   * This is the part of remove that does not have to do with mutual exclusion
   * and synchronization; it should be called from the public
   * <CODE>remove</CODE> method only after mutual exclusion has been assured
   * and only when the buffer is nonempty.
   *
   * @return the item removed
   */

  Item removeFromArray();

  /**
   * The array that holds the items in this buffer.  Works like a
   * FIFO queue with wraparound.
   */

  Item *items;

  /**
   * The size of the <CODE>items</CODE> array.
   */

  int size;

  /**
   * The index of the front item in the queue.
   */

  int front;

  /**
   * The index of the last item in the queue.
   */

  int back;

  /**
   * The number of items in the queue.
   */

  int count;

  pthread_cond_t notFull;
  pthread_cond_t notEmpty;

  pthread_mutex_t mutex;

  bool closed;
  int consumersWaiting;
  int producersWaiting;
  int spacesReserved;
  int itemsReserved;
  int totalProducers;
  int producersFinished;
};

#include "buffer.cpp"

#endif

buffer.cpp

#ifndef __BUFFER_CPP__
#define __BUFFER_CPP__

#include <cassert>
#include <iostream>

#include "buffer.h"

template < class Item >
Buffer< Item >::Buffer(int max)
{
  size = max;
  count = 0;
  front = 0;
  back = size - 1;

  pthread_mutex_init(&mutex, NULL);
  pthread_cond_init(&notFull, NULL);
  pthread_cond_init(&notEmpty, NULL);

  closed = false;
  consumersWaiting = 0;
  producersWaiting = 0;
  spacesReserved = 0;
  itemsReserved = 0;
  totalProducers = -1; // we don't know yet
  producersFinished = 0;

  // create the array

  items = new Item[max];
}

template < class Item >
Buffer< Item >::~Buffer()
{
  delete[] items;
}

template < class Item >
void Buffer< Item >::add(const Item& toAdd, int id)
{
  pthread_mutex_lock(&mutex);

  if (count + spacesReserved == size)
    {
      producersWaiting++;
      pthread_cond_wait(&notFull, &mutex);
      spacesReserved--;
    }

  addToArray(toAdd);

  if (consumersWaiting > 0)
    {
      consumersWaiting--;
      itemsReserved++;
      pthread_cond_signal(&notEmpty);
    }

  pthread_mutex_unlock(&mutex);
}

template < class Item >
bool Buffer< Item >::remove(Item& removed, int id)
{
  pthread_mutex_lock(&mutex);

  bool gotItem = false;

  if (count - itemsReserved == 0 && !closed)
    {
      consumersWaiting++;
      pthread_cond_wait(&notEmpty, &mutex);

      if (!closed || (closed && itemsReserved > 0))
	{
	  itemsReserved--;
	  gotItem = true;
	}
    }
  else if (count > 0)
    {
      gotItem = true;
    }

  if (gotItem)
    removed = removeFromArray();
  
  if (producersWaiting > 0)
    {
      producersWaiting--;
      spacesReserved++;
      pthread_cond_signal(&notFull);
    }

  pthread_mutex_unlock(&mutex);
  
  return gotItem;
}

template < class Item >
void Buffer< Item >::close(int n)
{
  pthread_mutex_lock(&mutex);

  // get totalProducers from 1st call

  if (totalProducers == -1)
    totalProducers = n;

  producersFinished++;

  if (producersFinished == totalProducers)
    {
      closed = true;
      pthread_cond_broadcast(&notEmpty);
      consumersWaiting = 0;
    }

  pthread_mutex_unlock(&mutex);
}

template < class Item >
Buffer< Item >::Buffer(const Buffer&)
{
  assert(false);
}

template < class Item >
void Buffer< Item >::operator = (const Buffer &)
{
  assert(false);
}

template < class Item >
void Buffer< Item >::addToArray(const Item& toAdd)
{
  assert(count < size);

  // update index of back, accounting for wraparound

  back = (back + 1) % size;

  // update number of items in queue

  count++;

  // put item in array

  items[back] = toAdd;
}

template < class Item >
Item Buffer< Item >::removeFromArray()
{
  assert(count > 0);

  // save index of the item we'll need to return

  int oldFront = front;

  // update front, accounting for wraparound

  front = (front + 1) % size;

  // update number of items

  count--;

  // return item that used to be at the front

  return items[oldFront];
}

#endif

producer_consumer.cpp

#include <iostream>
#include <sstream>
#include <map>
#include <vector>
#include <cstdlib>

#include <unistd.h>
#include <time.h>

#include "buffer.h"

/**
 * The entry point for producer threads.
 *
 * @param args a pointer to a producer_args_t structure
 * @return <CODE>NULL</CODE>
 */

void *producer(void *args);

/**
 * The entry point for consumer threads.
 *
 * @param args a pointer to a consumer_args_t structure
 */

void *consumer(void *args);

/**
 * Arguments for producer threads.
 */

struct producer_args_t
{
  /**
   * An identifier.
   */

  int id;

  /**
   * A pointer to the buffer to write to.
   */

  Buffer< std::pair< int, int > > *buf;

  /**
   * The number of items to add.
   */

  int count;

  /**
   * The number of producers.
   */

  int numProducers;
};

/**
 * Arguments for consumer threads.
 */

struct consumer_args_t
{
  /**
   * An identifier.
   */

  int id;

  /**
   * A pointer to the buffer to read from.
   */

  Buffer< std::pair< int, int > > *buf;
};

/**
 * Simulates the producer/consumer problem.  Creates a number of
 * producers and consumers (determined by command line arguments)
 * and a bounded buffer for them to share.  The size also determines
 * the number of items the procuders will add to the buffer -- each
 * producer adds twice as many items as there are spaces in the buffer.
 *
 * @param argc the number of command line arguments; should be 4
 * @param argv an array of strings with the number of producers,
 * consumers, and size of the bounded buffer in elements 1 through 3
 */

int main(int argc, char **argv)
{
  // parse and validate command line arguments

  if (argc != 4)
    {
      std::cerr << "USAGE: " << argv[0] << " producers consumers buffer-size"
		<< std::endl;
      return 1;
    }

  std::stringstream args;
  for (int a = 1; a < argc; a++)
    args << argv[a] << ' ';

  int producers, consumers, size;
  args >> producers >> consumers >> size;

  if (producers <= 0 || consumers <= 0 || size <= 0)
    {
      std::cerr << argv[0] << " all argument must be positive"
		<< std::endl;
      return 1;
    }

  // create the buffer

  Buffer< std::pair< int, int > > b(size);

  // randomize

  srandom(time(NULL));

  // start the consumers

  for (int c = 0; c < consumers; c++)
    {
      pthread_t id;
      consumer_args_t *cArgs = new consumer_args_t();
      cArgs->id = c;
      cArgs->buf = &b;

      pthread_create(&id, NULL, consumer, cArgs);
    }
      
  // start the producers

  pthread_t producerID[producers];

  for (int p = 0; p < producers; p++)
    {
      pthread_t id;
      producer_args_t *pArgs = new producer_args_t();
      pArgs->id = p;
      pArgs->buf = &b;
      pArgs->count = size * 2;
      pArgs->numProducers = producers;

      pthread_create(&id, NULL, producer, pArgs);
      producerID[p] = id;
    }

  // terminate main

  pthread_exit(NULL);

  // won't get here

  return 0;
}

void *producer(void *args)
{
  producer_args_t *pArgs = reinterpret_cast< producer_args_t * >(args);

  for (int item = 0; item < pArgs->count; item++)
    {
      pArgs->buf->add(std::pair< int, int >(pArgs->id, item), pArgs->id);
      std::cout << "Producer " << pArgs->id << " added " << item
		<< std::endl;

      if (random() % 2 == 1)
	sleep(1);
    }

  pArgs->buf->close(pArgs->numProducers);

  delete pArgs;

  pthread_exit(NULL);
}

void *consumer(void *args)
{
  consumer_args_t *cArgs = reinterpret_cast< consumer_args_t * >(args);

  std::pair< int, int > item;

  while (cArgs->buf->remove(item, cArgs->id))
    {
      std::cout << "Consumer " << cArgs->id << " removed " << item.second
		<< " added by producer " << item.first
		<< std::endl;
      sleep(random() % 2 + 1);
    }

  std::cout << "Consumer " << cArgs->id << " terminating"
	    << std::endl;

  delete cArgs;

  pthread_exit(NULL);
}

  

This code can also be downloaded from the files buffer.h, buffer.cpp, and producer_consumer.cpp.