We begin with the following code for add and remove.
template < class Item >
void Buffer< Item >::add(const Item& toAdd, int id)
{
pthread_mutex_lock(&mutex);
if (count == size)
{
pthread_cond_wait(¬Full, &mutex);
}
addToArray(toAdd);
pthread_cond_signal(¬Empty);
pthread_mutex_unlock(&mutex);
}
template < class Item >
bool Buffer< Item >::remove(Item& removed, int id)
{
pthread_mutex_lock(&mutex);
if (count == 0)
pthread_cond_wait(¬Empty, &mutex);
removed = removeFromArray();
pthread_cond_signal(¬Full);
pthread_mutex_unlock(&mutex);
return true;
}
We have a problem because when a thread receives a signal it is not necessarily the first one that gets scheduled when the signalling thread unlocks the mutex. For example, with an initially empty buffer we could have the following series of operations.
The standard way to fix this problem is to change the ifs into whiles.
template < class Item >
void Buffer< Item >::add(const Item& toAdd, int id)
{
pthread_mutex_lock(&mutex);
while (count == size)
{
pthread_cond_wait(¬Full, &mutex);
}
addToArray(toAdd);
pthread_cond_signal(¬Empty);
pthread_mutex_unlock(&mutex);
}
template < class Item >
bool Buffer< Item >::remove(Item& removed, int id)
{
pthread_mutex_lock(&mutex);
while (count == 0)
pthread_cond_wait(¬Empty, &mutex);
removed = removeFromArray();
pthread_cond_signal(¬Full);
pthread_mutex_unlock(&mutex);
return true;
}
#endif
In this case, we can go back to an if with some extra bookkeeping. We keep track of the number of producers waiting, the number of consumers waiting, the number of items reserved for consumers that have been unblocked but haven't been scheduled yet, and the number of empty spaces reserved for producers unblocks but not yet scheduled.
template < class Item >
void Buffer< Item >::add(const Item& toAdd, int id)
{
pthread_mutex_lock(&mutex);
if (count + spacesReserved == size)
{
producersWaiting++;
pthread_cond_wait(¬Full, &mutex);
spacesReserved--;
producersWaiting--;
}
addToArray(toAdd);
if (consumersWaiting > 0)
{
itemsReserved++;
pthread_cond_signal(¬Empty);
}
pthread_mutex_unlock(&mutex);
}
template < class Item >
bool Buffer< Item >::remove(Item& removed, int id)
{
pthread_mutex_lock(&mutex);
if (count - itemsReserved == 0)
{
consumersWaiting++;
pthread_cond_wait(¬Empty, &mutex);
itemsReserved--;
consumersWaiting--;
}
removed = removeFromArray();
if (producersWaiting > 0)
{
spacesReserved++;
pthread_cond_signal(¬Full);
}
pthread_mutex_unlock(&mutex);
return true;
}
Even this is not quite correct, as the following sequence of operations illustrates.
Doing some bookkeeping on behalf of the thread you are unblocking fixes the problem.
template < class Item >
void Buffer< Item >::add(const Item& toAdd, int id)
{
pthread_mutex_lock(&mutex);
if (count + spacesReserved == size)
{
producersWaiting++;
pthread_cond_wait(¬Full, &mutex);
spacesReserved--;
producersWaiting--;
}
addToArray(toAdd);
if (consumersWaiting > 0)
{
consumersWaiting--;
itemsReserved++;
pthread_cond_signal(¬Empty);
}
pthread_mutex_unlock(&mutex);
}
template < class Item >
bool Buffer< Item >::remove(Item& removed, int id)
{
pthread_mutex_lock(&mutex);
if (count - itemsReserved == 0)
{
consumersWaiting++;
pthread_cond_wait(¬Empty, &mutex);
itemsReserved--;
consumersWaiting--;
}
removed = removeFromArray();
if (producersWaiting > 0)
{
producersWaiting--;
spacesReserved++;
pthread_cond_signal(¬Full);
}
pthread_mutex_unlock(&mutex);
return true;
}
Finally, we resolve the issue with consumers waiting forever in remove once the producers have all terminated. We do so with a closed flag that is set when the last producer terminates. When the flag is set, a broadcast is sent to the consumers so that any that awake and see an empty buffer know that there will never be any more items and so they should terminate.
template < class Item >
void Buffer< Item >::add(const Item& toAdd, int id)
{
pthread_mutex_lock(&mutex);
if (count + spacesReserved == size)
{
producersWaiting++;
pthread_cond_wait(¬Full, &mutex);
spacesReserved--;
}
addToArray(toAdd);
if (consumersWaiting > 0)
{
consumersWaiting--;
itemsReserved++;
pthread_cond_signal(¬Empty);
}
pthread_mutex_unlock(&mutex);
}
template < class Item >
bool Buffer< Item >::remove(Item& removed, int id)
{
pthread_mutex_lock(&mutex);
bool gotItem = false;
if (count - itemsReserved == 0 && !closed)
{
consumersWaiting++;
pthread_cond_wait(¬Empty, &mutex);
if (!closed || (closed && itemsReserved > 0))
{
itemsReserved--;
gotItem = true;
}
}
else if (count > 0)
{
gotItem = true;
}
if (gotItem)
removed = removeFromArray();
if (producersWaiting > 0)
{
producersWaiting--;
spacesReserved++;
pthread_cond_signal(¬Full);
}
pthread_mutex_unlock(&mutex);
return gotItem;
}
template < class Item >
void Buffer< Item >::close(int n)
{
pthread_mutex_lock(&mutex);
// get totalProducers from 1st call
if (totalProducers == -1)
totalProducers = n;
producersFinished++;
if (producersFinished == totalProducers)
{
closed = true;
pthread_cond_broadcast(¬Empty);
consumersWaiting = 0;
}
pthread_mutex_unlock(&mutex);
}
Complete code follows.
#ifndef __BUFFER_H__
#define __BUFFER_H__
/**
* A bounded buffer.
*
* @param Item the type of item thse buffers hold; must have an assignment
* operator, a copy constructor, and a zero argument constructor
* @param size the maximum number of items in these buffers; must be positive
*
* @author Jim Glenn
* @version 0.1 2/17/2004
*/
template < class Item >
class Buffer
{
public:
/**
* Creates an empty buffer that can hold up to the given number of items.
*
* @param max the maximum number of items this buffer can hold
*/
Buffer(int max);
/**
* Destroys this buffer.
*/
~Buffer();
/**
* Adds the given item to this buffer. If this buffer is full, the
* calling thread is blocked until it is not full. This method will
* not return until the item has been successfully added.
*
* @param toAdd the item to add
* @param id the id of the calling thread (for debugging)
*/
void add(const Item& toAdd, int id);
/**
* Removes one item from this buffer. If this buffer is empty, the
* calling thread is blocked until one is available or the buffer is closed.
*
* @param removed the item removed from this buffer
* @param id the id of the calling thread (for debugging)
* @return true if an item was removed; false if the buffer was closed
*/
bool remove(Item& removed, int id);
void close(int);
private:
/**
* Copying is disallowed.
*/
Buffer(const Buffer&);
/**
* Assignment is disallowed.
*/
void operator = (const Buffer &);
/**
* Adds the given item to the array used to implement this buffer.
* There must be room in the buffer for the new item.
* This is the part of add that does not have to do with mutual exclusion
* and synchronization; it should be called from the public <CODE>add</CODE>
* method only after mutual exclusion has been assured and only when
* there is room in the buffer.
*
* @param toAdd the item to add
*/
void addToArray(const Item& toAdd);
/**
* Removes one item from the array used to implement this buffer.
* There must be at least one item in the array.
* This is the part of remove that does not have to do with mutual exclusion
* and synchronization; it should be called from the public
* <CODE>remove</CODE> method only after mutual exclusion has been assured
* and only when the buffer is nonempty.
*
* @return the item removed
*/
Item removeFromArray();
/**
* The array that holds the items in this buffer. Works like a
* FIFO queue with wraparound.
*/
Item *items;
/**
* The size of the <CODE>items</CODE> array.
*/
int size;
/**
* The index of the front item in the queue.
*/
int front;
/**
* The index of the last item in the queue.
*/
int back;
/**
* The number of items in the queue.
*/
int count;
pthread_cond_t notFull;
pthread_cond_t notEmpty;
pthread_mutex_t mutex;
bool closed;
int consumersWaiting;
int producersWaiting;
int spacesReserved;
int itemsReserved;
int totalProducers;
int producersFinished;
};
#include "buffer.cpp"
#endif
#ifndef __BUFFER_CPP__
#define __BUFFER_CPP__
#include <cassert>
#include <iostream>
#include "buffer.h"
template < class Item >
Buffer< Item >::Buffer(int max)
{
size = max;
count = 0;
front = 0;
back = size - 1;
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(¬Full, NULL);
pthread_cond_init(¬Empty, NULL);
closed = false;
consumersWaiting = 0;
producersWaiting = 0;
spacesReserved = 0;
itemsReserved = 0;
totalProducers = -1; // we don't know yet
producersFinished = 0;
// create the array
items = new Item[max];
}
template < class Item >
Buffer< Item >::~Buffer()
{
delete[] items;
}
template < class Item >
void Buffer< Item >::add(const Item& toAdd, int id)
{
pthread_mutex_lock(&mutex);
if (count + spacesReserved == size)
{
producersWaiting++;
pthread_cond_wait(¬Full, &mutex);
spacesReserved--;
}
addToArray(toAdd);
if (consumersWaiting > 0)
{
consumersWaiting--;
itemsReserved++;
pthread_cond_signal(¬Empty);
}
pthread_mutex_unlock(&mutex);
}
template < class Item >
bool Buffer< Item >::remove(Item& removed, int id)
{
pthread_mutex_lock(&mutex);
bool gotItem = false;
if (count - itemsReserved == 0 && !closed)
{
consumersWaiting++;
pthread_cond_wait(¬Empty, &mutex);
if (!closed || (closed && itemsReserved > 0))
{
itemsReserved--;
gotItem = true;
}
}
else if (count > 0)
{
gotItem = true;
}
if (gotItem)
removed = removeFromArray();
if (producersWaiting > 0)
{
producersWaiting--;
spacesReserved++;
pthread_cond_signal(¬Full);
}
pthread_mutex_unlock(&mutex);
return gotItem;
}
template < class Item >
void Buffer< Item >::close(int n)
{
pthread_mutex_lock(&mutex);
// get totalProducers from 1st call
if (totalProducers == -1)
totalProducers = n;
producersFinished++;
if (producersFinished == totalProducers)
{
closed = true;
pthread_cond_broadcast(¬Empty);
consumersWaiting = 0;
}
pthread_mutex_unlock(&mutex);
}
template < class Item >
Buffer< Item >::Buffer(const Buffer&)
{
assert(false);
}
template < class Item >
void Buffer< Item >::operator = (const Buffer &)
{
assert(false);
}
template < class Item >
void Buffer< Item >::addToArray(const Item& toAdd)
{
assert(count < size);
// update index of back, accounting for wraparound
back = (back + 1) % size;
// update number of items in queue
count++;
// put item in array
items[back] = toAdd;
}
template < class Item >
Item Buffer< Item >::removeFromArray()
{
assert(count > 0);
// save index of the item we'll need to return
int oldFront = front;
// update front, accounting for wraparound
front = (front + 1) % size;
// update number of items
count--;
// return item that used to be at the front
return items[oldFront];
}
#endif
#include <iostream>
#include <sstream>
#include <map>
#include <vector>
#include <cstdlib>
#include <unistd.h>
#include <time.h>
#include "buffer.h"
/**
* The entry point for producer threads.
*
* @param args a pointer to a producer_args_t structure
* @return <CODE>NULL</CODE>
*/
void *producer(void *args);
/**
* The entry point for consumer threads.
*
* @param args a pointer to a consumer_args_t structure
*/
void *consumer(void *args);
/**
* Arguments for producer threads.
*/
struct producer_args_t
{
/**
* An identifier.
*/
int id;
/**
* A pointer to the buffer to write to.
*/
Buffer< std::pair< int, int > > *buf;
/**
* The number of items to add.
*/
int count;
/**
* The number of producers.
*/
int numProducers;
};
/**
* Arguments for consumer threads.
*/
struct consumer_args_t
{
/**
* An identifier.
*/
int id;
/**
* A pointer to the buffer to read from.
*/
Buffer< std::pair< int, int > > *buf;
};
/**
* Simulates the producer/consumer problem. Creates a number of
* producers and consumers (determined by command line arguments)
* and a bounded buffer for them to share. The size also determines
* the number of items the procuders will add to the buffer -- each
* producer adds twice as many items as there are spaces in the buffer.
*
* @param argc the number of command line arguments; should be 4
* @param argv an array of strings with the number of producers,
* consumers, and size of the bounded buffer in elements 1 through 3
*/
int main(int argc, char **argv)
{
// parse and validate command line arguments
if (argc != 4)
{
std::cerr << "USAGE: " << argv[0] << " producers consumers buffer-size"
<< std::endl;
return 1;
}
std::stringstream args;
for (int a = 1; a < argc; a++)
args << argv[a] << ' ';
int producers, consumers, size;
args >> producers >> consumers >> size;
if (producers <= 0 || consumers <= 0 || size <= 0)
{
std::cerr << argv[0] << " all argument must be positive"
<< std::endl;
return 1;
}
// create the buffer
Buffer< std::pair< int, int > > b(size);
// randomize
srandom(time(NULL));
// start the consumers
for (int c = 0; c < consumers; c++)
{
pthread_t id;
consumer_args_t *cArgs = new consumer_args_t();
cArgs->id = c;
cArgs->buf = &b;
pthread_create(&id, NULL, consumer, cArgs);
}
// start the producers
pthread_t producerID[producers];
for (int p = 0; p < producers; p++)
{
pthread_t id;
producer_args_t *pArgs = new producer_args_t();
pArgs->id = p;
pArgs->buf = &b;
pArgs->count = size * 2;
pArgs->numProducers = producers;
pthread_create(&id, NULL, producer, pArgs);
producerID[p] = id;
}
// terminate main
pthread_exit(NULL);
// won't get here
return 0;
}
void *producer(void *args)
{
producer_args_t *pArgs = reinterpret_cast< producer_args_t * >(args);
for (int item = 0; item < pArgs->count; item++)
{
pArgs->buf->add(std::pair< int, int >(pArgs->id, item), pArgs->id);
std::cout << "Producer " << pArgs->id << " added " << item
<< std::endl;
if (random() % 2 == 1)
sleep(1);
}
pArgs->buf->close(pArgs->numProducers);
delete pArgs;
pthread_exit(NULL);
}
void *consumer(void *args)
{
consumer_args_t *cArgs = reinterpret_cast< consumer_args_t * >(args);
std::pair< int, int > item;
while (cArgs->buf->remove(item, cArgs->id))
{
std::cout << "Consumer " << cArgs->id << " removed " << item.second
<< " added by producer " << item.first
<< std::endl;
sleep(random() % 2 + 1);
}
std::cout << "Consumer " << cArgs->id << " terminating"
<< std::endl;
delete cArgs;
pthread_exit(NULL);
}
This code can also be downloaded from the files
buffer.h,
buffer.cpp,
and producer_consumer.cpp.