Image for post
Image for post
Photo by Dustin Humes on Unsplash

Java barrier synchronizers (CountDownLatch, CyclicBarrier, Phaser)

Introduction

The CountDownLatch class

Threads registration

Threads waiting

Threads arrival

Latch monitoring

Example

private static final int PARTIES = 3;public static void main(String[] args) throws InterruptedException {
CountDownLatch entryBarrier = new CountDownLatch(1);
CountDownLatch exitBarrier = new CountDownLatch(PARTIES);
for (int p = 0; p < PARTIES; p++) {
int delay = p + 1;
Runnable task = new Worker(delay, entryBarrier, exitBarrier);
new Thread(task).start();
}
logger.info("all threads waiting to start");
sleep(1);
entryBarrier.countDown();
logger.info("all threads started");
exitBarrier.await();
logger.info("all threads finished");
}
private static class Worker implements Runnable { private final int delay;
private final CountDownLatch entryBarrier;
private final CountDownLatch exitBarrier;
Worker(int delay, CountDownLatch entryBarrier, CountDownLatch exitBarrier) {
this.delay = delay;
this.entryBarrier = entryBarrier;
this.exitBarrier = exitBarrier;
}
@Override
public void run() {
try {
entryBarrier.await();
work();
exitBarrier.countDown();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private void work() {
logger.info("work {} started", delay);
sleep(delay);
logger.info("work {} finished", delay);
}
}

The CyclicBarrier class

Threads registration

Threads arrival and waiting

Barrier reset

Barrier monitoring

Example

private static final int PARTIES = 3;
private static final int ITERATIONS = 3;
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
CyclicBarrier entryBarrier = new CyclicBarrier(PARTIES + 1, () -> logger.info("iteration started"));
CyclicBarrier exitBarrier = new CyclicBarrier(PARTIES + 1, () -> logger.info("iteration finished"));
for (int i = 0; i < ITERATIONS; i++) {
for (int p = 0; p < PARTIES; p++) {
int delay = p + 1;
Runnable task = new Worker(delay, entryBarrier, exitBarrier);
new Thread(task).start();
}
logger.info("all threads waiting to start: iteration {}", i);
sleep(1);
entryBarrier.await();
logger.info("all threads started: iteration {}", i);
exitBarrier.await();
logger.info("all threads finished: iteration {}", i);
}
}
private static class Worker implements Runnable { private final int delay;
private final CyclicBarrier entryBarrier;
private final CyclicBarrier exitBarrier;
Worker(int delay, CyclicBarrier entryBarrier, CyclicBarrier exitBarrier) {
this.delay = delay;
this.entryBarrier = entryBarrier;
this.exitBarrier = exitBarrier;
}
@Override
public void run() {
try {
entryBarrier.await();
work();
exitBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
private void work() {
logger.info("work {} started", delay);
sleep(delay);
logger.info("work {} finished", delay);
}
}

The Phaser class

Parties registration

Parties synchronization

Phases iterations

protected boolean onAdvance(int phase, int registeredParties) {
return registeredParties == 0;
}
@Override
protected boolean onAdvance(int phase, int registeredParties) {
return true;
}
@Override
protected boolean onAdvance(int phase, int registeredParties) {
return false;
}
@Override
protected boolean onAdvance(int phase, int registeredParties) {
return (phase >= maxPhase - 1) || (registeredParties == 0);
}

Phaser termination

Phaser monitoring

Examples

public static void main(String[] args) {
Phaser phaser = new Phaser(3) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
log("inside onAdvance", this);
return true;
}
};
log("after constructor", phaser);
phaser.register();
log("after register", phaser);
phaser.arrive();
log("after arrive", phaser);
Thread thread = new Thread() {
@Override
public void run() {
log("before arriveAndAwaitAdvance", phaser);
phaser.arriveAndAwaitAdvance();
log("after arriveAndAwaitAdvance", phaser);
}
};
thread.start();
phaser.arrive();
log("after arrive", phaser);
phaser.arriveAndDeregister();
log("after arriveAndDeregister", phaser);
}
private static final int PARTIES = 3;public static void main(String[] args) {
Phaser phaser = new Phaser(1);
log("after constructor", phaser);
for (int p = 0; p < PARTIES; p++) {
int delay = p + 1;
Runnable task = new Worker(delay, phaser);
new Thread(task).start();
}
log("all threads waiting to start", phaser);
sleep(1);
log("before all threads started", phaser);
phaser.arriveAndDeregister();
log("after all threads started", phaser);
sleep(10);
log("all threads finished", phaser);
}
private static class Worker implements Runnable { private final int delay;
private final Phaser phaser;
Worker(int delay, Phaser phaser) {
phaser.register();
this.delay = delay;
this.phaser = phaser;
}
@Override
public void run() {
phaser.arriveAndAwaitAdvance();
work();
}
private void work() {
logger.info("work {} started", delay);
sleep(delay);
logger.info("work {} finished", delay);
}
}
private static final int PARTIES = 3;public static void main(String[] args) {
Phaser phaser = new Phaser(1);
log("after constructor", phaser);
for (int p = 0; p < PARTIES; p++) {
int delay = p + 1;
Runnable task = new Worker(delay, phaser);
new Thread(task).start();
}
log("all threads waiting to start", phaser);
sleep(1);
log("before all threads started", phaser);
phaser.arriveAndDeregister();
log("after all threads started", phaser);
phaser.register();
while (!phaser.isTerminated()) {
phaser.arriveAndAwaitAdvance();
phaser.arriveAndDeregister();
}
log("all threads finished", phaser);
}
private static class Worker implements Runnable { private final int delay;
private final Phaser phaser;
Worker(int delay, Phaser phaser) {
phaser.register();
this.delay = delay;
this.phaser = phaser;
}
@Override
public void run() {
phaser.arriveAndAwaitAdvance();
work();
phaser.arriveAndDeregister();
}
private void work() {
logger.info("work {} started", delay);
sleep(delay);
logger.info("work {} finished", delay);
}
}
private static final int PARTIES = 3;
private static final int ITERATIONS = 3;
public static void main(String[] args) {
Phaser phaser = new Phaser(1) {
final private int maxPhase = ITERATIONS;
@Override
protected boolean onAdvance(int phase, int registeredParties) {
return (phase >= maxPhase - 1) || (registeredParties == 0);
}
};
log("after constructor", phaser);
for (int p = 0; p < PARTIES; p++) {
int delay = p + 1;
Runnable task = new Worker(delay, phaser);
new Thread(task).start();
}
log("all threads waiting to start", phaser);
sleep(1);
log("before all threads started", phaser);
phaser.arriveAndDeregister();
log("after all threads started", phaser);
phaser.register();
while (!phaser.isTerminated()) {
phaser.arriveAndAwaitAdvance();
}
log("all threads finished", phaser);
}
private static class Worker implements Runnable { private final int delay;
private final Phaser phaser;
Worker(int delay, Phaser phaser) {
phaser.register();
this.delay = delay;
this.phaser = phaser;
}
@Override
public void run() {
do {
work();
phaser.arriveAndAwaitAdvance();
} while (!phaser.isTerminated());
}
void work() {
logger.info("work {} started", delay);
sleep(delay);
logger.info("work {} finished", delay);
}
}

Conclusion

Written by

Senior Software Engineer

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store