- snip -
Also, DeveloperX, I will be out on Monday as well, so if you had an example
on Tuesday I'd really really love to see it! I'm not the best at threads, so
any help is appreciated.
-Dave- Hide quoted text -
- Show quoted text -
Here we go. Just paste it over a new console app and it should compile
as long as you watch out for line wrap.
In a nutshell, there's a Job class, an Executive class and a demo
class. The executive is given a bunch of jobs to process and then told
to start five of them. Each time one completes the executive is
notified and the next is kicked off.
The main demo program just runs for 15 seconds using a sleep and then
safely waits for any remaining threads to complete before shutting
down.
There are a few comments, and also various lines remmed out. For
example in the Job.AddToQueue you can see how to do start a job
through the ThreadPool and via a Thread object.
Finally there's some balancing in there. You can specify the maximum
number of threads it can consume and it will then try and stay at that
level for as long as possible.
Shout if anything doesn't make sense.
using System;
//using System.Collections.Generic;
using System.Collections;
using System.Text;
using System.Threading;
namespace RunLotsOfThreads2
{
#region test program
class Program
{
static void Main(string[] args)
{
Executive exec = new Executive();
exec.AddJob(new Job("Job 1", 1300)); //new job, name and time to
run
exec.AddJob(new Job("Job 2", 1500));
exec.AddJob(new Job("Job 3", 1700));
exec.AddJob(new Job("Job 4", 1200));
exec.AddJob(new Job("Job 5", 200));
//SetMaxThreads is 2.0 only
//ThreadPool.SetMaxThreads(3, 3); //Only got five jobs, so making
pool 3 big.
exec.ProcessStart(5);
System.Threading.Thread.Sleep(15000); //Run for n secs then close
down gracefully.
exec.CloseAll(); //that would be here :)
Console.WriteLine("done");
Console.Read();
}
}
#endregion
#region JobComplete delegate
//delegate used by event (I've broken with convention, normally it
would be sender and e params)
public delegate void JobComplete(string pReturns);
#endregion
#region Executive class
//the control class
class Executive
{
private ArrayList _jobs = new ArrayList();
//private List<Job_jobs = new List<Job>(); // dn 2 version
private int _current = 0; //current next element to process
private bool _running = false; //are we running?
private int _runningJobs = 0; //how many threads are out there.
private object lockme = new object(); //used to lock where necessary
private ManualResetEvent _mre = new ManualResetEvent(false);
//used in the tidy shut down.
private bool _closing = false; //closing down flag.
private int _maxThreads = 5;
//Waits for currently running threads to finish and closes.
public void CloseAll()
{
bool ready = false;
Console.WriteLine("Closing down");
lock (lockme)
{
_closing = true;
if (_runningJobs == 0) //it is possible there are no threads
running
{
ready = true;
}
}
if(!ready)
{
WaitForThreads();
}
foreach (Job j in _jobs)
{
//unhook. There are better ways of doing this including using
IDispose.
j.JobCompleted -= new JobComplete(pJob_JobCompleted);
}
}
private void WaitForThreads()
{
while (true)
{
if (_mre.WaitOne(1000, false)) //MRE objects allow other
interthread communication
{
break;
}
}
}
public void AddJob(Job pJob)
{
//hook up to its completion notification and add to list.
pJob.JobCompleted += new JobComplete(pJob_JobCompleted);
_jobs.Add(pJob);
}
public void ProcessStart(int pCount)
{
_running = true;
_current = 0;
ProcessNext(pCount);
}
public void ProcessNext(int pCount)
{
Job j;
//iterate through list c times looking for a currently pending
thread.
if (_running && !_closing)
{
for (int c = 0; c < pCount; c++)
{
lock(lockme)
{
j = (Job)_jobs[_current];
_current = (_current >= _jobs.Count -1 ? 0 : _current +1 );
}
if (j.Status == JobStatus.Pending || j.Status ==
JobStatus.Finished)
{
j.AddToQueue();
lock (lockme)
{
_runningJobs++;
}
}
}
}
}
private void pJob_JobCompleted(string pReturns)
{
int numberToStart;
//don't want two threads fighting for _runningJobs.
lock(lockme)
{
_runningJobs--;
numberToStart = (_maxThreads _jobs.Count ? _jobs.Count :
_maxThreads) - _runningJobs;
if (_closing)
{
if (_runningJobs == 0)
{
_mre.Set(); //Inform CloseAll we're safe to finish.
return;
}
}
}
Console.WriteLine("Balancing {0}",numberToStart);
ProcessNext(numberToStart);
}
public int MaxThreads
{
get
{
return _maxThreads;
}
set
{
_maxThreads = value;
}
}
public bool Running
{
get
{
return _running;
}
set
{
_running = value;
}
}
}
#endregion
#region JobStatus enum
//Status of job, some aren't used, but you'll get the idea.
[FlagsAttribute]
public enum JobStatus
{
None = 0,
Pending = 1,
Queued = 2,
Running = 4,
Finished = 8,
Suspended = 16
}
#endregion
#region Job class
//The job class
class Job
{
private JobStatus _status = JobStatus.Pending;
private string _name;
private int _duration = 0; //how long this will take (work is sleep
in this class)
//private Thread _thread; //used if you reinstate the non threadpool
code.
//something to lock on. Not really relevant but necessary for the
thread safe event pattern.
private object _eventLock = new object();
private event JobComplete _jobCompleted; //private event set using
the public JobComplete
public event JobComplete JobCompleted
{
add
{
lock (_eventLock) //This is required for thread safety. Exec is on
one thread so just here for illustration
{
_jobCompleted += value;
}
}
remove
{
lock (_eventLock)
{
_jobCompleted -= value;
}
}
}
public Job(string pName, int pDuration)
{
_name = pName;
_duration = pDuration;
_status = JobStatus.Pending;
}
//This is our threaded method used with QueueUserWorkItem
private void ThreadedJobMethod(object pO)
{
_status = JobStatus.Running;
Console.WriteLine("Running {0}", _name);
//do some work in this case sleep
Thread.Sleep(_duration);
//Fire event using overridable OnComplete method.
OnComplete();
}
//This is our threaded method used with Thread and ThreadStart
private void ThreadedJobMethod2()
{
ThreadedJobMethod(null);
}
//Start the thread (add to pool)
public void AddToQueue()
{
_status = JobStatus.Queued;
Console.WriteLine("Queuing {0}", _name);
//Start in thread pool
ThreadPool.QueueUserWorkItem(new WaitCallback(ThreadedJobMethod));
//Or rem out above and unrem the below to run outside of pool
//new Thread(new ThreadStart(ThreadedJobMethod2)).Start();
}
public JobStatus Status
{
get
{
return _status;
}
}
//if used as a base class, this can be overriden for extra funkiness
protected virtual void OnComplete()
{
Console.WriteLine("Completed {0}", _name);
_status = JobStatus.Finished;
if (_jobCompleted != null) //don't fire event if no one is
listening.
{
_jobCompleted(_name + " fired OnComplete"); //fire event
}
}
public string Name
{
get
{
return _name;
}
}
}
#endregion
}