I have this class that provides a disk-based queue for storing various
items. As items are queued the event delegate is called and the consumer
dequeues the object and processes it. This seems to work off and on.
Sometimes the queue does not pick up a queued item until another item is
queued. Can anyone see a problem with this class?
***
Here's the interface and EventArgs...
***
/// <summary>
/// Event called when items are queued.
/// </summary>
public delegate void ItemQueuedEventHandler(object sender,
ItemQueuedEventArgs e);
/// <summary>
/// Item queued event data.
/// </summary>
public class ItemQueuedEventArgs : System.EventArgs
{
internal ItemQueuedEventArgs(int count) : base()
{
this._count = count;
}
/// <summary>
/// Count of items in the queue.
/// </summary>
public int Count
{
get { return this._count; }
} int _count = 0;
}
/// <summary>
/// Base definition of queue types.
/// </summary>
public interface IQueue : IDisposable
{
/// <summary>
/// Current count of all items in the queue.
/// </summary>
int Count { get; }
/// <summary>
/// Removes all objects in the queue.
/// </summary>
void Clear();
/// <summary>
/// Adds an object to the queue.
/// </summary>
/// <param name="obj">The object to add.</param>
void Enqueue(object obj);
/// <summary>
/// Removed an item from the queue.
/// </summary>
/// <returns>The first object from the top of the queue.</returns>
object Dequeue();
/// <summary>
/// Event called when items are queued.
/// </summary>
event ItemQueuedEventHandler ItemQueued;
}
***
Here's the class...
***
using System;
/// <summary>
/// Provides an event enabled disk-based queue.
/// </summary>
public class DiskQueue : IQueue
{
/// <summary>
/// Event called when new items are queued.
/// </summary>
public event ItemQueuedEventHandler ItemQueued;
/// <summary>
/// Extension appended to all files in the queue.
/// </summary>
public readonly string Extension = ".q";
System.IO.FileSystemWatcher _fsw = null;
static readonly object diskQueueLock = new object();
/// <summary>
/// Creates a new instance of the DiskQueue class.
/// </summary>
/// <remarks>
/// All items queued will be serialized with a binary formatter.
/// You can retrieve the queue location from the Path property.
/// </remarks>
public DiskQueue()
{
this.Path = this._path;
}
/// <summary>
/// Creates a new instance of the DiskQueue class.
/// </summary>
/// <param name="path">The path to place queued items.</param>
/// <remarks>All items queued will be serialized with a binary
formatter.</remarks>
public DiskQueue(string path)
{
this.Path = path;
}
/// <summary>
/// Creates a new instance of the DiskQueue class.
/// </summary>
/// <param name="formatter">The formatter to use for serialization.</param>
/// <param name="path">The path to place queued items.</param>
public DiskQueue(string path, System.Runtime.Serialization.IFormatter
formatter)
{
this.Formatter = formatter;
this.Path = path;
}
/// <summary>
/// Gets an object to synchronize access to the collection.
/// </summary>
public object SyncRoot
{
get { return diskQueueLock; }
}
/// <summary>
/// Gets or sets the formatter used to serialize queued items.
/// </summary>
/// <exception cref="System.ArgumentNullException">Formatter is
null.</exception>
/// <remarks>Be careful changing the formatter after instantiation. If there
are items
/// left in the queue that were formatted with the former they will throw
exceptions
/// on Dequeue.</remarks>
public System.Runtime.Serialization.IFormatter Formatter
{
get { return this._formatter; }
set
{
if (value == null)
throw new ArgumentNullException("Formatter");
this._formatter = value;
}
} System.Runtime.Serialization.IFormatter _formatter = new
System.Runtime.Serialization.Formatters.Binary.Bin aryFormatter();
/// <summary>
/// Gets or sets the path to the queue folder.
/// </summary>
/// <exception cref="System.ArgumentNullException">Path is null.</exception>
public string Path
{
get { return this._path; }
set
{
if (value == null)
throw new ArgumentNullException("Path");
string path = value;
if (path.EndsWith("\\")) path = path.Substring(0, path.Length - 1);
this._path = path;
if (!System.IO.Directory.Exists(this.Path))
System.IO.Directory.CreateDirectory(this.Path);
if (this._fsw != null)
this._fsw.Path = path;
}
} string _path =
System.Environment.GetFolderPath(System.Environmen t.SpecialFolder.CommonAppl
icationData) + Guid.NewGuid().ToString();
/// <summary>
/// Get the number of items in the queue.
/// </summary>
public int Count
{
get
{
return System.IO.Directory.GetFiles(this.Path, "*" + this.Extension).Length;
}
}
/// <summary>
/// Begins watching the folder for queued items and raises an event when
items are queued.
/// </summary>
public void Start()
{
this._fsw = new System.IO.FileSystemWatcher(this.Path, "*" +
this.Extension);
this._fsw.EnableRaisingEvents = true;
this._fsw.Created += new System.IO.FileSystemEventHandler(_fsw_Created);
this._fsw.Changed += new System.IO.FileSystemEventHandler(_fsw_Created);
this._fsw.Renamed += new System.IO.RenamedEventHandler(_fsw_Renamed);
}
/// <summary>
/// Adds an item to the queue.
/// </summary>
/// <param name="graph">The object to queue.</param>
/// <exception cref="System.Exception">Could not enqueue the
item.</exception>
/// <exception
cref="System.Runtime.Serialization.SerializationEx ception">Error serializing
the item.</exception>
/// <exception cref="System.ArgumentNullException"><c>graph</c> was
null.</exception>
public void Enqueue(object graph)
{
if (graph == null)
throw new ArgumentNullException("graph");
Exception e = null;
System.Threading.Thread.Sleep(10);
lock (diskQueueLock)
{
try
{
string file = this.Path + "\\" + DateTime.Now.Ticks.ToString();
System.IO.FileStream fs = System.IO.File.Create(file);
this.Formatter.Serialize(fs, graph);
fs.Close();
System.IO.File.Move(file, file + this.Extension);
}
catch (System.Runtime.Serialization.SerializationExcepti on sex)
{
e = sex;
}
catch (Exception ex)
{
e = ex;
}
}
if (e != null)
throw e;
}
/// <summary>
/// Removes an item from the queue.
/// </summary>
/// <returns>The deserialized object.</returns>
/// <exception cref="System.Exception">Could not dequeue the
item.</exception>
/// <exception
cref="System.Runtime.Serialization.SerializationEx ception">Error
deserializing the item.</exception>
/// <exception cref="System.NullReferenceException">No items
queued.</exception>
public object Dequeue()
{
if (this.Count < 1)
throw new NullReferenceException("No items queued!");
Exception e = null;
object o = null;
lock (diskQueueLock)
{
try
{
string file = System.IO.Directory.GetFiles(this.Path, "*" +
this.Extension)[0];
System.IO.FileStream fs = System.IO.File.Open(file, System.IO.FileMode.Open,
System.IO.FileAccess.Read, System.IO.FileShare.Read);
o = this.Formatter.Deserialize(fs);
fs.Close();
System.IO.File.Delete(file);
}
catch (System.Runtime.Serialization.SerializationExcepti on sex)
{
e = sex;
}
catch (Exception ex)
{
e = ex;
}
}
if (e != null)
throw e;
else
return o;
}
/// <summary>
/// Clears all items in the queue.
/// </summary>
/// <exception cref="System.Exception">Error clearing the queue.</exception>
public void Clear()
{
Exception iex = null;
Exception e = null;
lock (diskQueueLock)
{
try
{
foreach (string file in System.IO.Directory.GetFiles(this.Path, "*" +
this.Extension))
{
try { System.IO.File.Delete(file); }
catch (Exception ex) { iex = ex; }
}
if (iex != null)
e = iex;
}
catch (Exception ex)
{
e = ex;
}
}
if (e != null)
throw e;
}
private void _fsw_Created(object sender, System.IO.FileSystemEventArgs e)
{
if (this.ItemQueued != null)
this.ItemQueued(typeof(DiskQueue).ToString(), new
ItemQueuedEventArgs(this.Count));
}
private void _fsw_Renamed(object sender, System.IO.RenamedEventArgs e)
{
this._fsw_Created(sender, e);
}
#region IDisposable Members
/// <summary>
/// Destroys this instance and releases all resources.
/// </summary>
public void Dispose()
{
try
{
this._formatter = null;
//this.Clear();
if (this._fsw != null)
{
this._fsw.Created -= new System.IO.FileSystemEventHandler(_fsw_Created);
this._fsw.Renamed -= new System.IO.RenamedEventHandler(_fsw_Renamed);
this._fsw.Dispose();
}
}
catch { /* ignored */ }
}
#endregion
}
}