CS 702 - Operating Systems - Fall 2007
Producer/Consumer in C
Loyola College >
Department of Computer Science >
Dr. James Glenn >
CS 702 >
Examples and Lecture Notes >
Producer/Consumer in C
buffer.h
#ifndef __BUFFER_H__
#define __BUFFER_H__
/**
* A bounded buffer.
*
* @param Item the type of item thse buffers hold; must have an assignment
* operator, a copy constructor, and a zero argument constructor
* @param size the maximum number of items in these buffers; must be positive
*
* @author Jim Glenn
* @version 0.1 2/17/2004
*/
template < class Item >
class Buffer
{
public:
/**
* Creates an empty buffer that can hold up to the given number of items.
*
* @param max the maximum number of items this buffer can hold
*/
Buffer(int max);
/**
* Destroys this buffer.
*/
~Buffer();
/**
* Adds the given item to this buffer. If this buffer is full, the
* calling thread is blocked until it is not full. This method will
* not return until the item has been successfully added.
*
* @param toAdd the item to add
* @param id the id of the calling thread (for debugging)
*/
void add(const Item& toAdd, int id);
/**
* Removes one item from this buffer. If this buffer is empty, the
* calling thread is blocked until one is available or the buffer is closed.
*
* @param removed the item removed from this buffer
* @param id the id of the calling thread (for debugging)
* @return true if an item was removed; false if the buffer was closed
*/
bool remove(Item& removed, int id);
void close();
private:
/**
* Copying is disallowed.
*/
Buffer(const Buffer&);
/**
* Assignment is disallowed.
*/
void operator = (const Buffer &);
/**
* Adds the given item to the array used to implement this buffer.
* There must be room in the buffer for the new item.
* This is the part of add that does not have to do with mutual exclusion
* and synchronization; it should be called from the public <CODE>add</CODE>
* method only after mutual exclusion has been assured and only when
* there is room in the buffer.
*
* @param toAdd the item to add
*/
void addToArray(const Item& toAdd);
/**
* Removes one item from the array used to implement this buffer.
* There must be at least one item in the array.
* This is the part of remove that does not have to do with mutual exclusion
* and synchronization; it should be called from the public
* <CODE>remove</CODE> method only after mutual exclusion has been assured
* and only when the buffer is nonempty.
*
* @return the item removed
*/
Item removeFromArray();
/**
* The array that holds the items in this buffer. Works like a
* FIFO queue with wraparound.
*/
Item *items;
/**
* The size of the <CODE>items</CODE> array.
*/
int size;
/**
* The index of the front item in the queue.
*/
int front;
/**
* The index of the last item in the queue.
*/
int back;
/**
* The number of items in the queue.
*/
int count;
static const pthread_cond_t DEFAULT_COND;
static const pthread_mutex_t DEFAULT_MUTEX;
pthread_cond_t notFull;
pthread_cond_t notEmpty;
pthread_mutex_t mutex;
bool closed;
int consumersWaiting;
int producersWaiting;
int spacesReserved;
int itemsReserved;
};
#include "buffer.cpp"
#endif
buffer.cpp
#ifndef __BUFFER_CPP__
#define __BUFFER_CPP__
#include <cassert>
#include <iostream>
#include "buffer.h"
template <class Item >
const pthread_cond_t Buffer< Item >::DEFAULT_COND = PTHREAD_COND_INITIALIZER;
template <class Item >
const pthread_mutex_t Buffer< Item >::DEFAULT_MUTEX = PTHREAD_MUTEX_INITIALIZER;
template < class Item >
Buffer< Item >::Buffer(int max) :
notFull(DEFAULT_COND),
notEmpty(DEFAULT_COND),
mutex(DEFAULT_MUTEX)
{
size = max;
count = 0;
front = 0;
back = size - 1;
closed = false;
consumersWaiting = 0;
producersWaiting = 0;
spacesReserved = 0;
itemsReserved = 0;
// create the array
items = new Item[max];
}
template < class Item >
Buffer< Item >::~Buffer()
{
delete[] items;
}
template < class Item >
void Buffer< Item >::add(const Item& toAdd, int id)
{
pthread_mutex_lock(&mutex);
if (count + spacesReserved == size)
{
producersWaiting++;
pthread_cond_wait(¬Full, &mutex);
spacesReserved--;
}
addToArray(toAdd);
if (consumersWaiting > 0)
{
consumersWaiting--;
itemsReserved++;
pthread_cond_signal(¬Empty);
}
pthread_mutex_unlock(&mutex);
}
template < class Item >
bool Buffer< Item >::remove(Item& removed, int id)
{
pthread_mutex_lock(&mutex);
bool gotItem = false;
if (count - itemsReserved == 0 && !closed)
{
consumersWaiting++;
pthread_cond_wait(¬Empty, &mutex);
if (!closed || (closed && itemsReserved > 0))
{
itemsReserved--;
gotItem = true;
}
}
else if (count > 0)
{
gotItem = true;
}
if (gotItem)
removed = removeFromArray();
if (producersWaiting > 0)
{
producersWaiting--;
spacesReserved++;
pthread_cond_signal(¬Full);
}
pthread_mutex_unlock(&mutex);
return gotItem;
}
template < class Item >
void Buffer< Item >::close()
{
pthread_mutex_lock(&mutex);
closed = true;
pthread_cond_broadcast(¬Empty);
pthread_mutex_unlock(&mutex);
}
template < class Item >
Buffer< Item >::Buffer(const Buffer&)
{
assert(false);
}
template < class Item >
void Buffer< Item >::operator = (const Buffer &)
{
assert(false);
}
template < class Item >
void Buffer< Item >::addToArray(const Item& toAdd)
{
assert(count < size);
// update index of back, accounting for wraparound
back = (back + 1) % size;
// update number of items in queue
count++;
// put item in array
items[back] = toAdd;
}
template < class Item >
Item Buffer< Item >::removeFromArray()
{
assert(count > 0);
// save index of the item we'll need to return
int oldFront = front;
// update front, accounting for wraparound
front = (front + 1) % size;
// update number of items
count--;
// return item that used to be at the front
return items[oldFront];
}
#endif
producer_consumer.cpp
#include <iostream>
#include <sstream>
#include <map>
#include <vector>
#include <cstdlib>
#include <unistd.h>
#include <time.h>
#include "buffer.h"
/**
* The entry point for producer threads.
*
* @param args a pointer to a producer_args_t structure
* @return <CODE>NULL</CODE>
*/
void *producer(void *args);
/**
* The entry point for consumer threads.
*
* @param args a pointer to a consumer_args_t structure
*/
void *consumer(void *args);
/**
* Arguments for producer threads.
*/
struct producer_args_t
{
/**
* An identifier.
*/
int id;
/**
* A pointer to the buffer to write to.
*/
Buffer< std::pair< int, int > > *buf;
/**
* The number of items to add.
*/
int count;
};
/**
* Arguments for consumer threads.
*/
struct consumer_args_t
{
/**
* An identifier.
*/
int id;
/**
* A pointer to the buffer to read from.
*/
Buffer< std::pair< int, int > > *buf;
};
/**
* Simulates the producer/consumer problem. Creates a number of
* producers and consumers (determined by command line arguments)
* and a bounded buffer for them to share. The size also determines
* the number of items the procuders will add to the buffer -- each
* producer adds twice as many items as there are spaces in the buffer.
*
* @param argc the number of command line arguments; should be 4
* @param argv an array of strings with the number of producers,
* consumers, and size of the bounded buffer in elements 1 through 3
*/
int main(int argc, char **argv)
{
// parse and validate command line arguments
if (argc != 4)
{
std::cerr << "USAGE: " << argv[0] << " producers consumers buffer-size"
<< std::endl;
return 1;
}
std::stringstream args;
for (int a = 1; a < argc; a++)
args << argv[a] << ' ';
int producers, consumers, size;
args >> producers >> consumers >> size;
if (producers <= 0 || consumers <= 0 || size <= 0)
{
std::cerr << argv[0] << " all argument must be positive"
<< std::endl;
return 1;
}
// create the buffer
Buffer< std::pair< int, int > > b(size);
// randomize
srandom(time(NULL));
// start the consumers
for (int c = 0; c < consumers; c++)
{
pthread_t id;
consumer_args_t *cArgs = new consumer_args_t();
cArgs->id = c;
cArgs->buf = &b;
pthread_create(&id, NULL, consumer, cArgs);
}
// start the producers
pthread_t producerID[producers];
for (int p = 0; p < producers; p++)
{
pthread_t id;
producer_args_t *pArgs = new producer_args_t();
pArgs->id = p;
pArgs->buf = &b;
pArgs->count = size * 2;
pthread_create(&id, NULL, producer, pArgs);
producerID[p] = id;
}
// wait for producers to finish
for (int p = 0; p < producers; p++)
pthread_join(producerID[p], NULL);
b.close();
// terminate main
pthread_exit(NULL);
// won't get here
return 0;
}
void *producer(void *args)
{
producer_args_t *pArgs = reinterpret_cast< producer_args_t * >(args);
for (int item = 0; item < pArgs->count; item++)
{
pArgs->buf->add(std::pair< int, int >(pArgs->id, item), pArgs->id);
std::cout << "Producer " << pArgs->id << " added " << item
<< std::endl;
if (random() % 2 == 1)
sleep(1);
}
// wait for all producers to finish, then close the buffer
delete pArgs;
pthread_exit(NULL);
}
void *consumer(void *args)
{
consumer_args_t *cArgs = reinterpret_cast< consumer_args_t * >(args);
std::pair< int, int > item;
while (cArgs->buf->remove(item, cArgs->id))
{
std::cout << "Consumer " << cArgs->id << " removed " << item.second
<< " added by producer " << item.first
<< std::endl;
sleep(random() % 2 + 1);
}
std::cout << "Consumer " << cArgs->id << " terminating"
<< std::endl;
delete cArgs;
pthread_exit(NULL);
}
This code can also be downloaded from the files
buffer.h,
buffer.cpp,
and producer_consumer.cpp.