Thread-safe circular buffer in Java
Buffer fifo = BufferUtils.synchronizedBuffer(new CircularFifoBuffer());
Here's a lock-free ring buffer implementation. It implements a fixed-size buffer - there is no FIFO functionality. I would suggest you store a Collection
of requests for each server instead. That way your report can do the filtering rather than getting your data structure to filter.
/**
* Container
* ---------
*
* A lock-free container that offers a close-to O(1) add/remove performance.
*
*/
public class Container<T> implements Iterable<T> {
// The capacity of the container.
final int capacity;
// The list.
AtomicReference<Node<T>> head = new AtomicReference<Node<T>>();
// TESTING {
AtomicLong totalAdded = new AtomicLong(0);
AtomicLong totalFreed = new AtomicLong(0);
AtomicLong totalSkipped = new AtomicLong(0);
private void resetStats() {
totalAdded.set(0);
totalFreed.set(0);
totalSkipped.set(0);
}
// TESTING }
// Constructor
public Container(int capacity) {
this.capacity = capacity;
// Construct the list.
Node<T> h = new Node<T>();
Node<T> it = h;
// One created, now add (capacity - 1) more
for (int i = 0; i < capacity - 1; i++) {
// Add it.
it.next = new Node<T>();
// Step on to it.
it = it.next;
}
// Make it a ring.
it.next = h;
// Install it.
head.set(h);
}
// Empty ... NOT thread safe.
public void clear() {
Node<T> it = head.get();
for (int i = 0; i < capacity; i++) {
// Trash the element
it.element = null;
// Mark it free.
it.free.set(true);
it = it.next;
}
// Clear stats.
resetStats();
}
// Add a new one.
public Node<T> add(T element) {
// Get a free node and attach the element.
totalAdded.incrementAndGet();
return getFree().attach(element);
}
// Find the next free element and mark it not free.
private Node<T> getFree() {
Node<T> freeNode = head.get();
int skipped = 0;
// Stop when we hit the end of the list
// ... or we successfully transit a node from free to not-free.
while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) {
skipped += 1;
freeNode = freeNode.next;
}
// Keep count of skipped.
totalSkipped.addAndGet(skipped);
if (skipped < capacity) {
// Put the head as next.
// Doesn't matter if it fails. That would just mean someone else was doing the same.
head.set(freeNode.next);
} else {
// We hit the end! No more free nodes.
throw new IllegalStateException("Capacity exhausted.");
}
return freeNode;
}
// Mark it free.
public void remove(Node<T> it, T element) {
totalFreed.incrementAndGet();
// Remove the element first.
it.detach(element);
// Mark it as free.
if (!it.free.compareAndSet(false, true)) {
throw new IllegalStateException("Freeing a freed node.");
}
}
// The Node class. It is static so needs the <T> repeated.
public static class Node<T> {
// The element in the node.
private T element;
// Are we free?
private AtomicBoolean free = new AtomicBoolean(true);
// The next reference in whatever list I am in.
private Node<T> next;
// Construct a node of the list
private Node() {
// Start empty.
element = null;
}
// Attach the element.
public Node<T> attach(T element) {
// Sanity check.
if (this.element == null) {
this.element = element;
} else {
throw new IllegalArgumentException("There is already an element attached.");
}
// Useful for chaining.
return this;
}
// Detach the element.
public Node<T> detach(T element) {
// Sanity check.
if (this.element == element) {
this.element = null;
} else {
throw new IllegalArgumentException("Removal of wrong element.");
}
// Useful for chaining.
return this;
}
public T get () {
return element;
}
@Override
public String toString() {
return element != null ? element.toString() : "null";
}
}
// Provides an iterator across all items in the container.
public Iterator<T> iterator() {
return new UsedNodesIterator<T>(this);
}
// Iterates across used nodes.
private static class UsedNodesIterator<T> implements Iterator<T> {
// Where next to look for the next used node.
Node<T> it;
int limit = 0;
T next = null;
public UsedNodesIterator(Container<T> c) {
// Snapshot the head node at this time.
it = c.head.get();
limit = c.capacity;
}
public boolean hasNext() {
// Made into a `while` loop to fix issue reported by @Nim in code review
while (next == null && limit > 0) {
// Scan to the next non-free node.
while (limit > 0 && it.free.get() == true) {
it = it.next;
// Step down 1.
limit -= 1;
}
if (limit != 0) {
next = it.element;
}
}
return next != null;
}
public T next() {
T n = null;
if ( hasNext () ) {
// Give it to them.
n = next;
next = null;
// Step forward.
it = it.next;
limit -= 1;
} else {
// Not there!!
throw new NoSuchElementException ();
}
return n;
}
public void remove() {
throw new UnsupportedOperationException("Not supported.");
}
}
@Override
public String toString() {
StringBuilder s = new StringBuilder();
Separator comma = new Separator(",");
// Keep counts too.
int usedCount = 0;
int freeCount = 0;
// I will iterate the list myself as I want to count free nodes too.
Node<T> it = head.get();
int count = 0;
s.append("[");
// Scan to the end.
while (count < capacity) {
// Is it in-use?
if (it.free.get() == false) {
// Grab its element.
T e = it.element;
// Is it null?
if (e != null) {
// Good element.
s.append(comma.sep()).append(e.toString());
// Count them.
usedCount += 1;
} else {
// Probably became free while I was traversing.
// Because the element is detached before the entry is marked free.
freeCount += 1;
}
} else {
// Free one.
freeCount += 1;
}
// Next
it = it.next;
count += 1;
}
// Decorate with counts "]used+free".
s.append("]").append(usedCount).append("+").append(freeCount);
if (usedCount + freeCount != capacity) {
// Perhaps something was added/freed while we were iterating.
s.append("?");
}
return s.toString();
}
}
Note that this is close to O1 put and get. A Separator
just emits "" first time around and then its parameter from then on.
Edit: Added test methods.
// ***** Following only needed for testing. *****
private static boolean Debug = false;
private final static String logName = "Container.log";
private final static NamedFileOutput log = new NamedFileOutput("C:\\Junk\\");
private static synchronized void log(boolean toStdoutToo, String s) {
if (Debug) {
if (toStdoutToo) {
System.out.println(s);
}
log(s);
}
}
private static synchronized void log(String s) {
if (Debug) {
try {
log.writeLn(logName, s);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
static volatile boolean testing = true;
// Tester object to exercise the container.
static class Tester<T> implements Runnable {
// My name.
T me;
// The container I am testing.
Container<T> c;
public Tester(Container<T> container, T name) {
c = container;
me = name;
}
private void pause() {
try {
Thread.sleep(0);
} catch (InterruptedException ex) {
testing = false;
}
}
public void run() {
// Spin on add/remove until stopped.
while (testing) {
// Add it.
Node<T> n = c.add(me);
log("Added " + me + ": " + c.toString());
pause();
// Remove it.
c.remove(n, me);
log("Removed " + me + ": " + c.toString());
pause();
}
}
}
static final String[] strings = {
"One", "Two", "Three", "Four", "Five",
"Six", "Seven", "Eight", "Nine", "Ten"
};
static final int TEST_THREADS = Math.min(10, strings.length);
public static void main(String[] args) throws InterruptedException {
Debug = true;
log.delete(logName);
Container<String> c = new Container<String>(10);
// Simple add/remove
log(true, "Simple test");
Node<String> it = c.add(strings[0]);
log("Added " + c.toString());
c.remove(it, strings[0]);
log("Removed " + c.toString());
// Capacity test.
log(true, "Capacity test");
ArrayList<Node<String>> nodes = new ArrayList<Node<String>>(strings.length);
// Fill it.
for (int i = 0; i < strings.length; i++) {
nodes.add(i, c.add(strings[i]));
log("Added " + strings[i] + " " + c.toString());
}
// Add one more.
try {
c.add("Wafer thin mint!");
} catch (IllegalStateException ise) {
log("Full!");
}
c.clear();
log("Empty: " + c.toString());
// Iterate test.
log(true, "Iterator test");
for (int i = 0; i < strings.length; i++) {
nodes.add(i, c.add(strings[i]));
}
StringBuilder all = new StringBuilder ();
Separator sep = new Separator(",");
for (String s : c) {
all.append(sep.sep()).append(s);
}
log("All: "+all);
for (int i = 0; i < strings.length; i++) {
c.remove(nodes.get(i), strings[i]);
}
sep.reset();
all.setLength(0);
for (String s : c) {
all.append(sep.sep()).append(s);
}
log("None: " + all.toString());
// Multiple add/remove
log(true, "Multi test");
for (int i = 0; i < strings.length; i++) {
nodes.add(i, c.add(strings[i]));
log("Added " + strings[i] + " " + c.toString());
}
log("Filled " + c.toString());
for (int i = 0; i < strings.length - 1; i++) {
c.remove(nodes.get(i), strings[i]);
log("Removed " + strings[i] + " " + c.toString());
}
c.remove(nodes.get(strings.length - 1), strings[strings.length - 1]);
log("Empty " + c.toString());
// Multi-threaded add/remove
log(true, "Threads test");
c.clear();
for (int i = 0; i < TEST_THREADS; i++) {
Thread t = new Thread(new Tester<String>(c, strings[i]));
t.setName("Tester " + strings[i]);
log("Starting " + t.getName());
t.start();
}
// Wait for 10 seconds.
long stop = System.currentTimeMillis() + 10 * 1000;
while (System.currentTimeMillis() < stop) {
Thread.sleep(100);
}
// Stop the testers.
testing = false;
// Wait some more.
Thread.sleep(1 * 100);
// Get stats.
double added = c.totalAdded.doubleValue();
double skipped = c.totalSkipped.doubleValue();
//double freed = c.freed.doubleValue();
log(true, "Stats: added=" + c.totalAdded + ",freed=" + c.totalFreed + ",skipped=" + c.totalSkipped + ",O(" + ((added + skipped) / added) + ")");
}