Photo by Dustin Humes on Unsplash

Java barrier synchronizers (CountDownLatch, CyclicBarrier, Phaser)

Introduction

Barrier synchronizers (barriers) are a kind of synchronizer that ensures that any threads must stop at a certain point and cannot proceed further until all other threads reach this point.

  • exit barriers, that waiting for all threads to finish processing

The CountDownLatch class

The CountDownLatch class is a one-time barrier that allows threads to wait until the given count of operations is performed in other threads.

CountDownLatch class diagram
CountDownLatch class diagram
CountDownLatch sequence diagram
CountDownLatch sequence diagram

Threads registration

The CountDownLatch(int count) constructor creates a latch with the given count. The current count cannot be reset without recreating a new latch object.

Threads waiting

The void await() method causes the current thread to wait until one of the events occurs:

  • the thread is interrupted
  • the latch has counted down to 0 due to calls of the countDown() method
  • the thread is interrupted

Threads arrival

The countDown() method decrements the current count, releasing all waiting threads if the count reaches 0. If the current count equals 0 then nothing happens.

Latch monitoring

The long getCount() method returns the current count of the latch.

Example

In the example are used 2 latches: first as a one-time entry barrier, second as a one-time exit barrier.

private static final int PARTIES = 3;public static void main(String[] args) throws InterruptedException {
CountDownLatch entryBarrier = new CountDownLatch(1);
CountDownLatch exitBarrier = new CountDownLatch(PARTIES);
for (int p = 0; p < PARTIES; p++) {
int delay = p + 1;
Runnable task = new Worker(delay, entryBarrier, exitBarrier);
new Thread(task).start();
}
logger.info("all threads waiting to start");
sleep(1);
entryBarrier.countDown();
logger.info("all threads started");
exitBarrier.await();
logger.info("all threads finished");
}
private static class Worker implements Runnable { private final int delay;
private final CountDownLatch entryBarrier;
private final CountDownLatch exitBarrier;
Worker(int delay, CountDownLatch entryBarrier, CountDownLatch exitBarrier) {
this.delay = delay;
this.entryBarrier = entryBarrier;
this.exitBarrier = exitBarrier;
}
@Override
public void run() {
try {
entryBarrier.await();
work();
exitBarrier.countDown();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private void work() {
logger.info("work {} started", delay);
sleep(delay);
logger.info("work {} finished", delay);
}
}

The CyclicBarrier class

The CyclicBarrier class is a reusable synchronization barrier that allows threads to wait for each other at a certain point.

CyclicBarrier class diagram
CyclicBarrier class diagram
CyclicBarrier sequence diagram
CyclicBarrier sequence diagram

Threads registration

The CyclicBarrier(int parties) constructor creates a new barrier that will trip when the given number of threads are waiting upon it.

Threads arrival and waiting

The int await() method causes the current thread to wait until one of the events occurs:

  • the barrier is broken (by the reasons described below)
  • the last thread arrives at the barrier
  • the barrier is broken (by the reasons described below)

Barrier reset

The void reset() method resets the barrier to its initial state. If any threads are waiting at the barrier on the await methods, the methods will throw a BrokenBarrierException.

Barrier monitoring

The int getParties() method returns the number of parties required to trip the barrier.

  • timeout elapsing
  • calling the reset() method
  • the barrier action failure due to an exception

Example

In the example are used 2 barriers: first as a cyclic entry barrier, second as a cyclic exit barrier.

private static final int PARTIES = 3;
private static final int ITERATIONS = 3;
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
CyclicBarrier entryBarrier = new CyclicBarrier(PARTIES + 1, () -> logger.info("iteration started"));
CyclicBarrier exitBarrier = new CyclicBarrier(PARTIES + 1, () -> logger.info("iteration finished"));
for (int i = 0; i < ITERATIONS; i++) {
for (int p = 0; p < PARTIES; p++) {
int delay = p + 1;
Runnable task = new Worker(delay, entryBarrier, exitBarrier);
new Thread(task).start();
}
logger.info("all threads waiting to start: iteration {}", i);
sleep(1);
entryBarrier.await();
logger.info("all threads started: iteration {}", i);
exitBarrier.await();
logger.info("all threads finished: iteration {}", i);
}
}
private static class Worker implements Runnable { private final int delay;
private final CyclicBarrier entryBarrier;
private final CyclicBarrier exitBarrier;
Worker(int delay, CyclicBarrier entryBarrier, CyclicBarrier exitBarrier) {
this.delay = delay;
this.entryBarrier = entryBarrier;
this.exitBarrier = exitBarrier;
}
@Override
public void run() {
try {
entryBarrier.await();
work();
exitBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
private void work() {
logger.info("work {} started", delay);
sleep(delay);
logger.info("work {} finished", delay);
}
}

The Phaser class

The Phaser class is a reusable barrier that allows a variable number of parties/threads. Because of this, it’s more flexible, however much more complicated.

Phaser class diagram
Phaser class diagram
Phaser sequence diagram
Phaser sequence diagram

Parties registration

The Phaser() constructor creates a phaser with initial phase number 0 and no registered parties (phase=0, registered=0). The Phaser(int parties) constructor creates a phaser with initial phase number 0 and the given number of registered parties (phase=0, registered=parties).

Parties synchronization

The int arrive() method marks a party arriving at the phaser, without waiting for other parties to arrive (arrived++, unarrived — ).

Phases iterations

The current phase is finished when all registered parties arrive (registered==arrived, unarrived==0). To decide whether to start the next phase or to terminate the phaser is used the protected boolean onAdvance(int phase, int registeredParties) method.

protected boolean onAdvance(int phase, int registeredParties) {
return registeredParties == 0;
}
@Override
protected boolean onAdvance(int phase, int registeredParties) {
return true;
}
@Override
protected boolean onAdvance(int phase, int registeredParties) {
return false;
}
@Override
protected boolean onAdvance(int phase, int registeredParties) {
return (phase >= maxPhase - 1) || (registeredParties == 0);
}

Phaser termination

Phaser is terminated automatically when the onAdvance method returns true. It’s possible to terminate the phaser manually by calling the forceTermination() method.

Phaser monitoring

The methods to monitor parties numbers:

  • int getArrivedParties() — returns the number of registered parties that have arrived at the current phase of the phaser
  • int getUnarrivedParties() — returns the number of registered parties that have not yet arrived at the current phase of the phaser

Examples

In the example are used the basic phaser methods.

public static void main(String[] args) {
Phaser phaser = new Phaser(3) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
log("inside onAdvance()", this);
return true;
}
};
log("after constructor", phaser);
phaser.register();
log("after register()", phaser);
phaser.arrive();
log("after arrive()", phaser);
Thread thread = new Thread() {
@Override
public void run() {
log("before arriveAndAwaitAdvance()", phaser);
phaser.arriveAndAwaitAdvance();
log("after arriveAndAwaitAdvance()", phaser);
}
};
thread.start();
phaser.arrive();
log("after arrive()", phaser);
phaser.arriveAndDeregister();
log("after arriveAndDeregister()", phaser);
}
private static final int PARTIES = 3;public static void main(String[] args) {
Phaser phaser = new Phaser(1);
log("after constructor", phaser);
for (int p = 0; p < PARTIES; p++) {
int delay = p + 1;
Runnable task = new Worker(delay, phaser);
new Thread(task).start();
}
log("all threads waiting to start", phaser);
sleep(1);
log("before all threads started", phaser);
phaser.arriveAndDeregister();
log("after all threads started", phaser);
sleep(10);
log("all threads finished", phaser);
}
private static class Worker implements Runnable { private final int delay;
private final Phaser phaser;
Worker(int delay, Phaser phaser) {
phaser.register();
this.delay = delay;
this.phaser = phaser;
}
@Override
public void run() {
phaser.arriveAndAwaitAdvance();
work();
}
private void work() {
logger.info("work {} started", delay);
sleep(delay);
logger.info("work {} finished", delay);
}
}
private static final int PARTIES = 3;public static void main(String[] args) {
Phaser phaser = new Phaser(1);
log("after constructor", phaser);
for (int p = 0; p < PARTIES; p++) {
int delay = p + 1;
Runnable task = new Worker(delay, phaser);
new Thread(task).start();
}
log("all threads waiting to start", phaser);
sleep(1);
log("before all threads started", phaser);
phaser.arriveAndDeregister();
log("after all threads started", phaser);
phaser.register();
while (!phaser.isTerminated()) {
phaser.arriveAndAwaitAdvance();
phaser.arriveAndDeregister();
}
log("all threads finished", phaser);
}
private static class Worker implements Runnable { private final int delay;
private final Phaser phaser;
Worker(int delay, Phaser phaser) {
phaser.register();
this.delay = delay;
this.phaser = phaser;
}
@Override
public void run() {
phaser.arriveAndAwaitAdvance();
work();
phaser.arriveAndDeregister();
}
private void work() {
logger.info("work {} started", delay);
sleep(delay);
logger.info("work {} finished", delay);
}
}
private static final int PARTIES = 3;
private static final int ITERATIONS = 3;
public static void main(String[] args) {
Phaser phaser = new Phaser(1) {
final private int maxPhase = ITERATIONS;
@Override
protected boolean onAdvance(int phase, int registeredParties) {
return (phase >= maxPhase - 1) || (registeredParties == 0);
}
};
log("after constructor", phaser);
for (int p = 0; p < PARTIES; p++) {
int delay = p + 1;
Runnable task = new Worker(delay, phaser);
new Thread(task).start();
}
log("all threads waiting to start", phaser);
sleep(1);
log("before all threads started", phaser);
phaser.arriveAndDeregister();
log("after all threads started", phaser);
phaser.register();
while (!phaser.isTerminated()) {
phaser.arriveAndAwaitAdvance();
}
log("all threads finished", phaser);
}
private static class Worker implements Runnable { private final int delay;
private final Phaser phaser;
Worker(int delay, Phaser phaser) {
phaser.register();
this.delay = delay;
this.phaser = phaser;
}
@Override
public void run() {
do {
work();
phaser.arriveAndAwaitAdvance();
} while (!phaser.isTerminated());
}
void work() {
logger.info("work {} started", delay);
sleep(delay);
logger.info("work {} finished", delay);
}
}

Conclusion

The CountDownLatch class is suitable for one-time iteration with a fixed number of parties.

Senior Software Engineer

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store