Fixed size queue which automatically dequeues old values upon new enques
For what its worth, here's a lightweight circular buffer with some methods marked for safe and unsafe use.
public class CircularBuffer<T> : IEnumerable<T>
{
readonly int size;
readonly object locker;
int count;
int head;
int rear;
T[] values;
public CircularBuffer(int max)
{
this.size = max;
locker = new object();
count = 0;
head = 0;
rear = 0;
values = new T[size];
}
static int Incr(int index, int size)
{
return (index + 1) % size;
}
private void UnsafeEnsureQueueNotEmpty()
{
if (count == 0)
throw new Exception("Empty queue");
}
public int Size { get { return size; } }
public object SyncRoot { get { return locker; } }
#region Count
public int Count { get { return UnsafeCount; } }
public int SafeCount { get { lock (locker) { return UnsafeCount; } } }
public int UnsafeCount { get { return count; } }
#endregion
#region Enqueue
public void Enqueue(T obj)
{
UnsafeEnqueue(obj);
}
public void SafeEnqueue(T obj)
{
lock (locker) { UnsafeEnqueue(obj); }
}
public void UnsafeEnqueue(T obj)
{
values[rear] = obj;
if (Count == Size)
head = Incr(head, Size);
rear = Incr(rear, Size);
count = Math.Min(count + 1, Size);
}
#endregion
#region Dequeue
public T Dequeue()
{
return UnsafeDequeue();
}
public T SafeDequeue()
{
lock (locker) { return UnsafeDequeue(); }
}
public T UnsafeDequeue()
{
UnsafeEnsureQueueNotEmpty();
T res = values[head];
values[head] = default(T);
head = Incr(head, Size);
count--;
return res;
}
#endregion
#region Peek
public T Peek()
{
return UnsafePeek();
}
public T SafePeek()
{
lock (locker) { return UnsafePeek(); }
}
public T UnsafePeek()
{
UnsafeEnsureQueueNotEmpty();
return values[head];
}
#endregion
#region GetEnumerator
public IEnumerator<T> GetEnumerator()
{
return UnsafeGetEnumerator();
}
public IEnumerator<T> SafeGetEnumerator()
{
lock (locker)
{
List<T> res = new List<T>(count);
var enumerator = UnsafeGetEnumerator();
while (enumerator.MoveNext())
res.Add(enumerator.Current);
return res.GetEnumerator();
}
}
public IEnumerator<T> UnsafeGetEnumerator()
{
int index = head;
for (int i = 0; i < count; i++)
{
yield return values[index];
index = Incr(index, size);
}
}
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}
#endregion
}
I like to use the Foo()/SafeFoo()/UnsafeFoo()
convention:
Foo
methods callUnsafeFoo
as a default.UnsafeFoo
methods modify state freely without a lock, they should only call other unsafe methods.SafeFoo
methods callUnsafeFoo
methods inside a lock.
Its a little verbose, but it makes obvious errors, like calling unsafe methods outside a lock in a method which is supposed to be thread-safe, more apparent.
I would write a wrapper class that on Enqueue would check the Count and then Dequeue when the count exceeds the limit.
public class FixedSizedQueue<T>
{
ConcurrentQueue<T> q = new ConcurrentQueue<T>();
private object lockObject = new object();
public int Limit { get; set; }
public void Enqueue(T obj)
{
q.Enqueue(obj);
lock (lockObject)
{
T overflow;
while (q.Count > Limit && q.TryDequeue(out overflow)) ;
}
}
}
For anyone who finds it useful, here is some working code based on Richard Schneider's answer above:
public class FixedSizedQueue<T>
{
readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
public int Size { get; private set; }
public FixedSizedQueue(int size)
{
Size = size;
}
public void Enqueue(T obj)
{
queue.Enqueue(obj);
while (queue.Count > Size)
{
T outObj;
queue.TryDequeue(out outObj);
}
}
}
I'd go for a slight variant... extend ConcurrentQueue so as to be able to use Linq extensions on FixedSizeQueue
public class FixedSizedQueue<T> : ConcurrentQueue<T>
{
private readonly object syncObject = new object();
public int Size { get; private set; }
public FixedSizedQueue(int size)
{
Size = size;
}
public new void Enqueue(T obj)
{
base.Enqueue(obj);
lock (syncObject)
{
while (base.Count > Size)
{
T outObj;
base.TryDequeue(out outObj);
}
}
}
}