I think I got all the regions opened up so all code should be there.
// Stephen Toub
//
st***@microsoft .com
//
// ManagedThreadPo ol.cs
// ThreadPool written in 100% managed code. Mimics the core
functionality of
// the System.Threadin g.ThreadPool class.
//
// HISTORY:
// v1.0.1 - Disposes of items remaining in queue when the queue is
emptied
// - Catches errors thrown during execution of delegates
// - Added reset to semaphore, called during empty queue
// - Catches errors when unable to dequeue delegates
// v1.0.0 - Original version
//
// August 27, 2002
// v1.0.1
//
http://www.gotdotnet.com/community/u...agedThreadPool
#region Namespaces
using System;
using System.Threadin g;
using System.Collecti ons;
#endregion
namespace Toub.Threading
{
/// <summary>Implem entation of Dijkstra's PV Semaphore based on the
Monitor class.</summary>
public class Semaphore
{
#region Member Variables
/// <summary>The number of units alloted by this semaphore.</summary>
private int _count;
#endregion
#region Construction
/// <summaryInitial ize the semaphore as a binary
semaphore.</summary>
public Semaphore() : this(1)
{
}
/// <summaryInitial ize the semaphore as a counting
semaphore.</summary>
/// <param name="count">In itial number of threads that can take out
units from this semaphore.</param>
/// <exception cref="ArgumentE xception">Throw s if the count argument
is less than 1.</exception>
public Semaphore(int count)
{
if (count < 0) throw new ArgumentExcepti on("Semaphore must have a
count of at least 0.", "count");
_count = count;
}
#endregion
#region Synchronization Operations
/// <summary>V the semaphore (add 1 unit to it).</summary>
public void AddOne() { V(); }
/// <summary>P the semaphore (take out 1 unit from it).</summary>
public void WaitOne() { P(); }
/// <summary>P the semaphore (take out 1 unit from it).</summary>
public void P()
{
// Lock so we can work in peace. This works because lock is
actually
// built around Monitor.
lock(this)
{
// Wait until a unit becomes available. We need to wait
// in a loop in case someone else wakes up before us. This could
// happen if the Monitor.Pulse statements were changed to
Monitor.PulseAl l
// statements in order to introduce some randomness into the order
// in which threads are woken.
while(_count <= 0) Monitor.Wait(th is, Timeout.Infinit e);
_count--;
}
}
/// <summary>V the semaphore (add 1 unit to it).</summary>
public void V()
{
// Lock so we can work in peace. This works because lock is
actually
// built around Monitor.
lock(this)
{
// Release our hold on the unit of control. Then tell everyone
// waiting on this object that there is a unit available.
_count++;
Monitor.Pulse(t his);
}
}
/// <summary>Rese ts the semaphore to the specified count. Should be
used cautiously.</summary>
public void Reset(int count)
{
lock(this) { _count = count; }
}
#endregion
}
/// <summary>Manage d thread pool.</summary>
public class ManagedThreadPo ol
{
#region Constants
/// <summary>Maximu m number of threads the thread pool has at its
disposal.</summary>
private const int _maxWorkerThrea ds = 15;
#endregion
#region Member Variables
/// <summary>Queu e of all the callbacks waiting to be
executed.</summary>
static Queue _waitingCallbac ks;
/// <summary>
/// Used to signal that a worker thread is needed for processing.
Note that multiple
/// threads may be needed simultaneously and as such we use a
semaphore instead of
/// an auto reset event.
/// </summary>
static Semaphore _workerThreadNe eded;
/// <summary>List of all worker threads at the disposal of the thread
pool.</summary>
static ArrayList _workerThreads;
/// <summary>Numb er of threads currently active.</summary>
static int _inUseThreads;
#endregion
#region Construction
/// <summary>Initia lize the thread pool.</summary>
static ManagedThreadPo ol()
{
// Create our thread stores; we handle synchronization ourself
// as we may run into situtations where multiple operations need to
be atomic.
// We keep track of the threads we've created just for good measure;
not actually
// needed for any core functionality.
_waitingCallbac ks = new Queue();
_workerThreads = new ArrayList();
_inUseThreads = 0;
// Create our "thread needed" event
_workerThreadNe eded = new Semaphore(0);
// Create all of the worker threads
for(int i=0; i<_maxWorkerThr eads; i++)
{
// Create a new thread and add it to the list of threads.
Thread newThread = new Thread(new ThreadStart(Pro cessQueuedItems ));
_workerThreads. Add(newThread);
// Configure the new thread and start it
newThread.Name = "ManagedPoolThr ead #" + i.ToString();
newThread.IsBac kground = true;
newThread.Start ();
}
}
#endregion
#region Public Methods
/// <summary>Queu es a user work item to the thread pool.</summary>
/// <param name="callback" >
/// A WaitCallback representing the delegate to invoke when the
thread in the
/// thread pool picks up the work item.
/// </param>
public static void QueueUserWorkIt em(WaitCallback callback)
{
// Queue the delegate with no state
QueueUserWorkIt em(callback, null);
}
/// <summary>Queu es a user work item to the thread pool.</summary>
/// <param name="callback" >
/// A WaitCallback representing the delegate to invoke when the
thread in the
/// thread pool picks up the work item.
/// </param>
/// <param name="state">
/// The object that is passed to the delegate when serviced from the
thread pool.
/// </param>
public static void QueueUserWorkIt em(WaitCallback callback, object
state)
{
// Create a waiting callback that contains the delegate and its
state.
// Add it to the processing queue, and signal that data is waiting.
WaitingCallback waiting = new WaitingCallback (callback, state);
lock(_waitingCa llbacks.SyncRoo t) {
_waitingCallbac ks.Enqueue(wait ing); }
_workerThreadNe eded.AddOne();
}
/// <summary>Emptie s the work queue of any queued work
items.</summary>
public static void EmptyQueue()
{
lock(_waitingCa llbacks.SyncRoo t)
{
try
{
// Try to dispose of all remaining state
foreach(object obj in _waitingCallbac ks)
{
WaitingCallback callback = (WaitingCallbac k)obj;
if (callback.State is IDisposable)
((IDisposable)c allback.State). Dispose();
}
}
catch
{
// Make sure an error isn't thrown.
}
// Clear all waiting items and reset the number of worker threads
currently needed
// to be 0 (there is nothing for threads to do)
_waitingCallbac ks.Clear();
_workerThreadNe eded.Reset(0);
}
}
#endregion
#region Properties
/// <summary>Gets the number of threads at the disposal of the thread
pool.</summary>
public static int MaxThreads { get { return _maxWorkerThrea ds; } }
/// <summary>Gets the number of currently active threads in the
thread pool.</summary>
public static int ActiveThreads { get { return _inUseThreads; } }
/// <summary>Gets the number of callback delegates currently waiting
in the thread pool.</summary>
public static int WaitingCallback s { get {
lock(_waitingCa llbacks.SyncRoo t) { return _waitingCallbac ks.Count; } }
}
#endregion
#region Thread Processing
/// <summary>A thread worker function that processes items from the
work queue.</summary>
private static void ProcessQueuedIt ems()
{
// Process indefinitely
while(true)
{
// Get the next item in the queue. If there is nothing there, go
to sleep
// for a while until we're woken up when a callback is waiting.
WaitingCallback callback = null;
while (callback == null)
{
// Try to get the next callback available. We need to lock on the
// queue in order to make our count check and retrieval atomic.
lock(_waitingCa llbacks.SyncRoo t)
{
if (_waitingCallba cks.Count 0)
{
try { callback = (WaitingCallbac k)_waitingCallb acks.Dequeue(); }
catch{} // make sure not to fail here
}
}
// If we can't get one, go to sleep.
if (callback == null) _workerThreadNe eded.WaitOne();
}
// We now have a callback. Execute it. Make sure to accurately
// record how many callbacks are currently executing.
try
{
Interlocked.Inc rement(ref _inUseThreads);
callback.Callba ck(callback.Sta te);
}
catch
{
// Make sure we don't throw here. Errors are not our problem.
}
finally
{
Interlocked.Dec rement(ref _inUseThreads);
}
}
}
#endregion
/// <summary>Used to hold a callback delegate and the state for that
delegate.</summary>
private class WaitingCallback
{
#region Member Variables
/// <summary>Callba ck delegate for the callback.</summary>
private WaitCallback _callback;
/// <summary>Stat e with which to call the callback
delegate.</summary>
private object _state;
#endregion
#region Construction
/// <summary>Initia lize the callback holding object.</summary>
/// <param name="callback" >Callback delegate for the
callback.</param>
/// <param name="state">St ate with which to call the callback
delegate.</param>
public WaitingCallback (WaitCallback callback, object state)
{
_callback = callback;
_state = state;
}
#endregion
#region Properties
/// <summary>Gets the callback delegate for the callback.</summary>
public WaitCallback Callback { get { return _callback; } }
/// <summary>Gets the state with which to call the callback
delegate.</summary>
public object State { get { return _state; } }
#endregion
}
}
}