By using this site, you agree to our updated Privacy Policy and our Terms of Use. Manage your Cookies Settings.
458,165 Members | 1,323 Online
Bytes IT Community
+ Ask a Question
Need help? Post your question and get tips & solutions from a community of 458,165 IT Pros & Developers. It's quick & easy.

I need help on this THREAD problem !

P: n/a

Hello,

I am trying to write a multithread program that simulates producers and
consumers. My program can have many producers and many consumers (each in a
separate thread). It has a storage place (buffer) with "n" capacity. The
user provides these parameters on startup. The buffer works with a circular
queue.

It's a console application. My implementation is working with Semaphores
to synchornize everything. I think it's synchornized OK, but the problem is:
I tell it to produce 10 products, and it produces 10 products, but the
consumers consume only 8 (they should consume all the products). I can't
find out why this happens.

Any help on this will be appreciated.

Thanks in advance!!!

Leonardo Hyppolito
here is the code:

It's also published (with syntax highlighting) here:

http://www.rafb.net/paste/results/u2946385.html
----

using System;
using System.Threading;

namespace TesteProdCons
{
public class Semaphore
{
private int counter;

// constructor
public Semaphore(int c)
{
counter = c;
}

public void p()
{
lock(this)
{
while(counter <= 0)
{
Monitor.Wait(this, Timeout.Infinite);
}

// we have control, decrement the counter
counter--;
}
}

public void v()
{
lock(this)
{
counter++;
Monitor.Pulse(this);
}
}
}

// can hold 1 product at a time (buffer of 1 position)
public class Storage
{
// semaphores used for synchronization
private Semaphore notFull;
private Semaphore notEmpty;
private Semaphore exclusive;

// prodBox is the buffer that keeps the products
private int[] prodBox;
private int readIndex;
private int writeIndex;
private int capacity;

// actual product number
private static int n;

// max number of products to be produced
private static int nMax;

// property for n
public int Number
{
get {return n;}
set {n = value;}
}

// property for nMax
public int MaxNumber
{
get {return nMax;}
set {nMax = value;}
}

// writes a string representation of the storage
public override string ToString()
{
string ret = "";
for (int i = 0; i < capacity; i++)
{
ret = ret + prodBox[i] + " ";
}
ret = ret + "readIndex=" + readIndex + " writeIndex=" + writeIndex;
return ret;
}
// constructor
public Storage(int max, int cap)
{
n = 0;
nMax = max;
notFull = new Semaphore(cap);
notEmpty = new Semaphore(0);
exclusive = new Semaphore(1);
prodBox = new int[cap];
capacity = cap;
readIndex = 0;
writeIndex = 0;
}

public int Product
{
get
{
// wait for a nonEmpty resource
notEmpty.p();

// wait for exclusive access
exclusive.p();

// read the product
int ret = prodBox[readIndex];

Console.WriteLine(" " + Thread.CurrentThread.Name + " reads " + ret);

// set the new readIndex
readIndex = (readIndex + 1) % capacity;

// for debug
//Console.WriteLine(" " + Thread.CurrentThread.Name + "--> " + this);

// release the exclusive access
exclusive.v();

// release a notFull resource
notFull.v();

return ret;
}
set
{
// wait for a notFull resource
notFull.p();

// wait for exclusive access
exclusive.p();

// if already reached maximum number of productions,
// abort the producer thread

if(n >= nMax)
{
Console.WriteLine("Aborting " + Thread.CurrentThread.Name +
" (reached max number of productions)");
Thread.CurrentThread.Abort();
}

// store the product
prodBox[writeIndex] = value;

// set the new writeIndex
writeIndex = (writeIndex + 1) % capacity;

// increment the actual product number
n = n + 1;

// write information on screen
Console.WriteLine("++ [" + Thread.CurrentThread.Name + "] produces n." +
Number + ": " + value);
//Console.WriteLine(" " + Thread.CurrentThread.Name + "--> " + this);

// release the exclusive access
exclusive.v();

// release a notEmpty resource
notEmpty.v();
}
}
}

public class Producer
{
// a reference to the storage
private Storage a;

// seed for the random
private int seed;

// constructor
public Producer(Storage arm, int s)
{
a = arm;
seed = s;
}

// entry point for the producer thread
public void produce()
{
Random r = new Random(seed);
while(a.Number <= a.MaxNumber)
{
// get a random integer (from 0 to 2 seconds)
int prod = r.Next(2001);

// store the product
a.Product = prod;

// rest for a while...
Thread.Sleep(200);
}
}
}

public class Consumer
{
// a reference to the storage
private Storage a;

// constructor
public Consumer(Storage arm)
{
a = arm;
}

// entry point for the consumer thread
public void consume()
{
while(true)
{
// get the product value
int prod = a.Product;

// "consume" the product (pauses for the product's miliseconds)
Thread.Sleep(prod);
Console.WriteLine("-- [" + Thread.CurrentThread.Name + "] consumed " +
prod);

}
}
}

class TesteProdCons
{
static void Main(string[] args)
{
int numProducts;
int numProducers;
int numConsumers;
int capacity;

// read information from the user
Console.Write("Max number of products: ");
numProducts = Int32.Parse(Console.ReadLine());
Console.Write("Number of producers: ");
numProducers = Int32.Parse(Console.ReadLine());
Console.Write("Numero of consumers: ");
numConsumers = Int32.Parse(Console.ReadLine());
Console.Write("Capacity of the storage buffer: ");
capacity = Int32.Parse(Console.ReadLine());

// a single shared storage place
Storage a = new Storage(numProducts, capacity);

// one producer object for each producer thread,
// to have different random generators
Producer[] prod = new Producer[numProducers];
Thread[] tProd = new Thread[numProducers];

// only one consumer object for all consumer threads
Consumer c = new Consumer(a);
Thread[] tCons = new Thread[numConsumers];

Random r = new Random();

// create the producers objects and threads
for(int i = 0; i < numProducers; i++)
{
prod[i] = new Producer(a, r.Next(999999));
tProd[i] = new Thread(new ThreadStart(prod[i].produce));
tProd[i].Name = "p" + (i+1);
}

// create the consumers threads
for(int i = 0; i < numConsumers; i++)
{
tCons[i] = new Thread(new ThreadStart(c.consume));
tCons[i].Name = "c" + (i+1);
}

// start the threads
foreach (Thread t in tProd) {t.Start();}
foreach (Thread t in tCons) {t.Start();}

Console.WriteLine("Storage, producers and consumers created!");
}
}
}
Nov 15 '05 #1
Share this Question
Share on Google+
4 Replies


P: n/a

That's not a lot of information to go on. Have you checked in the debugger
to see if there are still active producer threads after the time when you
check? Try and look at those threads in VS in thread view and see what code
they are sitting in. Maybe they're deadlocked? Also check your consumers and
make sure that they're not skipping events..

+++ Rick ---

--

Rick Strahl
West Wind Technologies
http://www.west-wind.com/
http://www.west-wind.com/webblog/
----------------------------------
Making waves on the Web
"Leonardo Hyppolito" <le**********@uol.com.br> wrote in message
news:uH**************@TK2MSFTNGP09.phx.gbl...

Hello,

I am trying to write a multithread program that simulates producers and consumers. My program can have many producers and many consumers (each in a separate thread). It has a storage place (buffer) with "n" capacity. The
user provides these parameters on startup. The buffer works with a circular queue.

It's a console application. My implementation is working with Semaphores to synchornize everything. I think it's synchornized OK, but the problem is: I tell it to produce 10 products, and it produces 10 products, but the
consumers consume only 8 (they should consume all the products). I can't
find out why this happens.

Any help on this will be appreciated.

Thanks in advance!!!

Leonardo Hyppolito
here is the code:

It's also published (with syntax highlighting) here:

http://www.rafb.net/paste/results/u2946385.html
----

using System;
using System.Threading;

namespace TesteProdCons
{
public class Semaphore
{
private int counter;

// constructor
public Semaphore(int c)
{
counter = c;
}

public void p()
{
lock(this)
{
while(counter <= 0)
{
Monitor.Wait(this, Timeout.Infinite);
}

// we have control, decrement the counter
counter--;
}
}

public void v()
{
lock(this)
{
counter++;
Monitor.Pulse(this);
}
}
}

// can hold 1 product at a time (buffer of 1 position)
public class Storage
{
// semaphores used for synchronization
private Semaphore notFull;
private Semaphore notEmpty;
private Semaphore exclusive;

// prodBox is the buffer that keeps the products
private int[] prodBox;
private int readIndex;
private int writeIndex;
private int capacity;

// actual product number
private static int n;

// max number of products to be produced
private static int nMax;

// property for n
public int Number
{
get {return n;}
set {n = value;}
}

// property for nMax
public int MaxNumber
{
get {return nMax;}
set {nMax = value;}
}

// writes a string representation of the storage
public override string ToString()
{
string ret = "";
for (int i = 0; i < capacity; i++)
{
ret = ret + prodBox[i] + " ";
}
ret = ret + "readIndex=" + readIndex + " writeIndex=" + writeIndex;
return ret;
}
// constructor
public Storage(int max, int cap)
{
n = 0;
nMax = max;
notFull = new Semaphore(cap);
notEmpty = new Semaphore(0);
exclusive = new Semaphore(1);
prodBox = new int[cap];
capacity = cap;
readIndex = 0;
writeIndex = 0;
}

public int Product
{
get
{
// wait for a nonEmpty resource
notEmpty.p();

// wait for exclusive access
exclusive.p();

// read the product
int ret = prodBox[readIndex];

Console.WriteLine(" " + Thread.CurrentThread.Name + " reads " + ret);
// set the new readIndex
readIndex = (readIndex + 1) % capacity;

// for debug
//Console.WriteLine(" " + Thread.CurrentThread.Name + "--> " + this);
// release the exclusive access
exclusive.v();

// release a notFull resource
notFull.v();

return ret;
}
set
{
// wait for a notFull resource
notFull.p();

// wait for exclusive access
exclusive.p();

// if already reached maximum number of productions,
// abort the producer thread

if(n >= nMax)
{
Console.WriteLine("Aborting " + Thread.CurrentThread.Name +
" (reached max number of productions)");
Thread.CurrentThread.Abort();
}

// store the product
prodBox[writeIndex] = value;

// set the new writeIndex
writeIndex = (writeIndex + 1) % capacity;

// increment the actual product number
n = n + 1;

// write information on screen
Console.WriteLine("++ [" + Thread.CurrentThread.Name + "] produces n." + Number + ": " + value);
//Console.WriteLine(" " + Thread.CurrentThread.Name + "--> " + this);
// release the exclusive access
exclusive.v();

// release a notEmpty resource
notEmpty.v();
}
}
}

public class Producer
{
// a reference to the storage
private Storage a;

// seed for the random
private int seed;

// constructor
public Producer(Storage arm, int s)
{
a = arm;
seed = s;
}

// entry point for the producer thread
public void produce()
{
Random r = new Random(seed);
while(a.Number <= a.MaxNumber)
{
// get a random integer (from 0 to 2 seconds)
int prod = r.Next(2001);

// store the product
a.Product = prod;

// rest for a while...
Thread.Sleep(200);
}
}
}

public class Consumer
{
// a reference to the storage
private Storage a;

// constructor
public Consumer(Storage arm)
{
a = arm;
}

// entry point for the consumer thread
public void consume()
{
while(true)
{
// get the product value
int prod = a.Product;

// "consume" the product (pauses for the product's miliseconds)
Thread.Sleep(prod);
Console.WriteLine("-- [" + Thread.CurrentThread.Name + "] consumed " +
prod);

}
}
}

class TesteProdCons
{
static void Main(string[] args)
{
int numProducts;
int numProducers;
int numConsumers;
int capacity;

// read information from the user
Console.Write("Max number of products: ");
numProducts = Int32.Parse(Console.ReadLine());
Console.Write("Number of producers: ");
numProducers = Int32.Parse(Console.ReadLine());
Console.Write("Numero of consumers: ");
numConsumers = Int32.Parse(Console.ReadLine());
Console.Write("Capacity of the storage buffer: ");
capacity = Int32.Parse(Console.ReadLine());

// a single shared storage place
Storage a = new Storage(numProducts, capacity);

// one producer object for each producer thread,
// to have different random generators
Producer[] prod = new Producer[numProducers];
Thread[] tProd = new Thread[numProducers];

// only one consumer object for all consumer threads
Consumer c = new Consumer(a);
Thread[] tCons = new Thread[numConsumers];

Random r = new Random();

// create the producers objects and threads
for(int i = 0; i < numProducers; i++)
{
prod[i] = new Producer(a, r.Next(999999));
tProd[i] = new Thread(new ThreadStart(prod[i].produce));
tProd[i].Name = "p" + (i+1);
}

// create the consumers threads
for(int i = 0; i < numConsumers; i++)
{
tCons[i] = new Thread(new ThreadStart(c.consume));
tCons[i].Name = "c" + (i+1);
}

// start the threads
foreach (Thread t in tProd) {t.Start();}
foreach (Thread t in tCons) {t.Start();}

Console.WriteLine("Storage, producers and consumers created!");
}
}
}

Nov 15 '05 #2

P: n/a
> That's not a lot of information to go on. Have you checked in the debugger
to see if there are still active producer threads after the time when you
check? Try and look at those threads in VS in thread view and see what code they are sitting in. Maybe they're deadlocked? Also check your consumers and make sure that they're not skipping events..


Thanks for answering Rick. I am trying to debug it, though it's not easy
to debug thread applications. It switches from threads and it's difficult to
keep track. But I am trying to find out.

[]'s

Leonardo Hyppolito
Nov 15 '05 #3

P: n/a
Hi,

In the Storage.Product setter, you don't release the
semaphore when you reached the maximum number of
productions (where you test for n >= nMax).
The 'exclusive' semaphore counter will stay at 0, locking
everyone else out. I haven't tried it, but I'm pretty
sure that's the problem. So just call exclusive.v()
before aborting the producer threads. Keep me informed ?

regards,

Bram.

-----Original Message-----

Hello,

I am trying to write a multithread program that simulates producers andconsumers. My program can have many producers and many consumers (each in aseparate thread). It has a storage place (buffer) with "n" capacity. Theuser provides these parameters on startup. The buffer works with a circularqueue.

It's a console application. My implementation is working with Semaphoresto synchornize everything. I think it's synchornized OK, but the problem is:I tell it to produce 10 products, and it produces 10 products, but theconsumers consume only 8 (they should consume all the products). I can'tfind out why this happens.

Any help on this will be appreciated.

Thanks in advance!!!

Leonardo Hyppolito
here is the code:

It's also published (with syntax highlighting) here:

http://www.rafb.net/paste/results/u2946385.html
----

using System;
using System.Threading;

namespace TesteProdCons
{
public class Semaphore
{
private int counter;

// constructor
public Semaphore(int c)
{
counter = c;
}

public void p()
{
lock(this)
{
while(counter <= 0)
{
Monitor.Wait(this, Timeout.Infinite);
}

// we have control, decrement the counter
counter--;
}
}

public void v()
{
lock(this)
{
counter++;
Monitor.Pulse(this);
}
}
}

// can hold 1 product at a time (buffer of 1 position)
public class Storage
{
// semaphores used for synchronization
private Semaphore notFull;
private Semaphore notEmpty;
private Semaphore exclusive;

// prodBox is the buffer that keeps the products
private int[] prodBox;
private int readIndex;
private int writeIndex;
private int capacity;

// actual product number
private static int n;

// max number of products to be produced
private static int nMax;

// property for n
public int Number
{
get {return n;}
set {n = value;}
}

// property for nMax
public int MaxNumber
{
get {return nMax;}
set {nMax = value;}
}

// writes a string representation of the storage
public override string ToString()
{
string ret = "";
for (int i = 0; i < capacity; i++)
{
ret = ret + prodBox[i] + " ";
}
ret = ret + "readIndex=" + readIndex + " writeIndex=" + writeIndex; return ret;
}
// constructor
public Storage(int max, int cap)
{
n = 0;
nMax = max;
notFull = new Semaphore(cap);
notEmpty = new Semaphore(0);
exclusive = new Semaphore(1);
prodBox = new int[cap];
capacity = cap;
readIndex = 0;
writeIndex = 0;
}

public int Product
{
get
{
// wait for a nonEmpty resource
notEmpty.p();

// wait for exclusive access
exclusive.p();

// read the product
int ret = prodBox[readIndex];

Console.WriteLine(" " + Thread.CurrentThread.Name + " reads " + ret);
// set the new readIndex
readIndex = (readIndex + 1) % capacity;

// for debug
//Console.WriteLine(" " + Thread.CurrentThread.Name + "--> " + this);
// release the exclusive access
exclusive.v();

// release a notFull resource
notFull.v();

return ret;
}
set
{
// wait for a notFull resource
notFull.p();

// wait for exclusive access
exclusive.p();

// if already reached maximum number of productions,
// abort the producer thread

if(n >= nMax)
{
Console.WriteLine("Aborting " + Thread.CurrentThread.Name + " (reached max number of productions)");
Thread.CurrentThread.Abort();
}

// store the product
prodBox[writeIndex] = value;

// set the new writeIndex
writeIndex = (writeIndex + 1) % capacity;

// increment the actual product number
n = n + 1;

// write information on screen
Console.WriteLine("++ [" + Thread.CurrentThread.Name + "] produces n." +Number + ": " + value);
//Console.WriteLine(" " + Thread.CurrentThread.Name + "--> " + this);
// release the exclusive access
exclusive.v();

// release a notEmpty resource
notEmpty.v();
}
}
}

public class Producer
{
// a reference to the storage
private Storage a;

// seed for the random
private int seed;

// constructor
public Producer(Storage arm, int s)
{
a = arm;
seed = s;
}

// entry point for the producer thread
public void produce()
{
Random r = new Random(seed);
while(a.Number <= a.MaxNumber)
{
// get a random integer (from 0 to 2 seconds)
int prod = r.Next(2001);

// store the product
a.Product = prod;

// rest for a while...
Thread.Sleep(200);
}
}
}

public class Consumer
{
// a reference to the storage
private Storage a;

// constructor
public Consumer(Storage arm)
{
a = arm;
}

// entry point for the consumer thread
public void consume()
{
while(true)
{
// get the product value
int prod = a.Product;

// "consume" the product (pauses for the product's miliseconds) Thread.Sleep(prod);
Console.WriteLine("-- [" + Thread.CurrentThread.Name + "] consumed " +prod);

}
}
}

class TesteProdCons
{
static void Main(string[] args)
{
int numProducts;
int numProducers;
int numConsumers;
int capacity;

// read information from the user
Console.Write("Max number of products: ");
numProducts = Int32.Parse(Console.ReadLine());
Console.Write("Number of producers: ");
numProducers = Int32.Parse(Console.ReadLine());
Console.Write("Numero of consumers: ");
numConsumers = Int32.Parse(Console.ReadLine());
Console.Write("Capacity of the storage buffer: ");
capacity = Int32.Parse(Console.ReadLine());

// a single shared storage place
Storage a = new Storage(numProducts, capacity);

// one producer object for each producer thread,
// to have different random generators
Producer[] prod = new Producer[numProducers];
Thread[] tProd = new Thread[numProducers];

// only one consumer object for all consumer threads
Consumer c = new Consumer(a);
Thread[] tCons = new Thread[numConsumers];

Random r = new Random();

// create the producers objects and threads
for(int i = 0; i < numProducers; i++)
{
prod[i] = new Producer(a, r.Next(999999));
tProd[i] = new Thread(new ThreadStart(prod [i].produce)); tProd[i].Name = "p" + (i+1);
}

// create the consumers threads
for(int i = 0; i < numConsumers; i++)
{
tCons[i] = new Thread(new ThreadStart(c.consume));
tCons[i].Name = "c" + (i+1);
}

// start the threads
foreach (Thread t in tProd) {t.Start();}
foreach (Thread t in tCons) {t.Start();}

Console.WriteLine("Storage, producers and consumers created!"); }
}
}
.

Nov 15 '05 #4

P: n/a
> In the Storage.Product setter, you don't release the
semaphore when you reached the maximum number of
productions (where you test for n >= nMax).
The 'exclusive' semaphore counter will stay at 0, locking
everyone else out. I haven't tried it, but I'm pretty
sure that's the problem. So just call exclusive.v()
before aborting the producer threads. Keep me informed ?


Bram, you are right!

I added a line to release to exclusive semaphore, in that aborting
section, and everything worked as expected.

Thanks a lot for the help :)

Cheers,

Leonardo Hyppolito
Nov 15 '05 #5

This discussion thread is closed

Replies have been disabled for this discussion.