Latch that can be incremented

For those needing an AQS based solution, here's one that worked for me:

public class CountLatch {

    private class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1L;

        public Sync() {
        }

        @Override
        protected int tryAcquireShared(int arg) {
            return count.get() == releaseValue ? 1 : -1;
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            return true;
        }
    }

    private final Sync sync;
    private final AtomicLong count;
    private volatile long releaseValue;

    public CountLatch(final long initial, final long releaseValue) {
        this.releaseValue = releaseValue;
        this.count = new AtomicLong(initial);
        this.sync = new Sync();
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public long countUp() {
        final long current = count.incrementAndGet();
        if (current == releaseValue) {
            sync.releaseShared(0);
        }
        return current;
    }

    public long countDown() {
        final long current = count.decrementAndGet();
        if (current == releaseValue) {
            sync.releaseShared(0);
        }
        return current;
    }

    public long getCount() {
        return count.get();
    }
}

You initialize the synchronizer with an initial and target value. Once the target value has been reached (by counting up and / or down), the waiting threads will be released.


You could also use a Phaser (java.util.concurrent.Phaser)

final Phaser phaser = new Phaser(1); // register self
while (/* some condition */) {
    phaser.register(); // Equivalent to countUp
    // do some work asynchronously, invoking
    // phaser.arriveAndDeregister() (equiv to countDown) in a finally block
}
phaser.arriveAndAwaitAdvance(); // await any async tasks to complete

I hope this helps.


java.util.concurrent.Semaphore seems to fit the bill.

  • acquire() or acquire(n)
  • also acquire() (not sure I understand what the difference is here) (*)
  • release() or release(n)

(*) Okay, there is no built-in method to wait until the semaphore becomes unavailable. I suppose you'd write your own wrapper for acquire that does a tryAcquire first and if that fails triggers your "busy event" (and continues using the normal acquire). Everyone would need to call your wrapper. Maybe subclass Semaphore?


Instead of starting back from AQS, you could use a simple implementation like below. It is somewhat naive (it is synchronized vs. AQS lock-free algorithms) but unless you expect to use it in a contented scenario it could be good enough.

public class CountUpAndDownLatch {
    private CountDownLatch latch;
    private final Object lock = new Object();

    public CountUpAndDownLatch(int count) {
        this.latch = new CountDownLatch(count);
    }

    public void countDownOrWaitIfZero() throws InterruptedException {
        synchronized(lock) {
            while(latch.getCount() == 0) {
                lock.wait();
            }
            latch.countDown();
            lock.notifyAll();
        }
    }

    public void waitUntilZero() throws InterruptedException {
        synchronized(lock) {
            while(latch.getCount() != 0) {
                lock.wait();
            }
        }
    }

    public void countUp() { //should probably check for Integer.MAX_VALUE
        synchronized(lock) {
            latch = new CountDownLatch((int) latch.getCount() + 1);
            lock.notifyAll();
        }
    }

    public int getCount() {
        synchronized(lock) {
            return (int) latch.getCount();
        }
    }
}

Note: I have not tested it in depth but it seems to behave as expected:

public static void main(String[] args) throws InterruptedException {
    final CountUpAndDownLatch latch = new CountUpAndDownLatch(1);
    Runnable up = new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println("IN UP " + latch.getCount());
                latch.countUp();
                System.out.println("UP " + latch.getCount());
            } catch (InterruptedException ex) {
            }
        }
    };

    Runnable downOrWait = new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println("IN DOWN " + latch.getCount());
                latch.countDownOrWaitIfZero();
                System.out.println("DOWN " + latch.getCount());
            } catch (InterruptedException ex) {
            }
        }
    };

    Runnable waitFor0 = new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println("WAIT FOR ZERO " + latch.getCount());
                latch.waitUntilZero();
                System.out.println("ZERO " + latch.getCount());
            } catch (InterruptedException ex) {
            }
        }
    };
    new Thread(waitFor0).start();
    up.run();
    downOrWait.run();
    Thread.sleep(100);
    downOrWait.run();
    new Thread(up).start();
    downOrWait.run();
}

Output:

IN UP 1
UP 2
WAIT FOR ZERO 1
IN DOWN 2
DOWN 1
IN DOWN 1
ZERO 0
DOWN 0
IN DOWN 0
IN UP 0
DOWN 0
UP 0