Photo by Christian Holzinger on Unsplash

Java executors and thread pools

Introduction

In small applications to execute each task (Runnable object) is created a new thread (Thread object). When the task is completed, the thread is terminated as well. But in large applications overhead of threads creation and termination can be significant. Such overhead can be reduced by reusing the same threads for the execution of many tasks. For that purpose are used executors and thread pools. An executor is a design pattern that provides API for task executions and hides its implementation. A thread pool is one of the executor implementations that uses a pool of threads for task execution.

  • ExecutorService
  • ScheduledExecutorService
  • ThreadPoolExecutor
  • ScheduledThreadPoolExecutor
  • ForkJoinPool
  • Executors

The Thread class

In general, the workflow of task execution using thread consists of the following steps:

  1. task submission
  2. thread creation
  3. thread start
  4. task execution
  5. thread termination
Runnable runnable = new Runnable() { // task creation
@Override
public void run() {
logger.info("run..."); // task execution
}
};
Thread thread = new Thread(runnable); // task submission during thread creation
thread.start(); // thread start
thread.join(); // thread termination
Executor executor = createExecutor(); // thread creation and thread startRunnable task = createTask(); // task creation
Future future = executor.submit(task); // task submission
future.get(); // waiting until task is executed
executor.terminate(); // thread termination

Executors

Executors are interfaces that provide API for threads creation, utilization, and termination for the rest of the application.

  • ExecutorService, that has methods to execute tasks with the ability to control their completion, and methods to manage executor termination
  • ScheduledExecutorService, that has methods to execute tasks once after a given delay, and methods to execute tasks periodically

The Executor interface

The Executor interface is intended to decouple the interface of tasks submitted from the implementation of task execution. Indeed tasks can be executed asynchronously (in another thread) as well as synchronously (in the caller’s thread).

Runnable runnable = () -> logger.info("run...");Executor executor = new Executor() {
@Override
public void execute(Runnable command) {
command.run();
}
};
executor.execute(runnable);
Runnable runnable = () -> logger.info("run...");Executor executor = new Executor() {
@Override
public void execute(Runnable command) {
new Thread(command).start();
}
};
executor.execute(runnable);
Runnable runnable = () -> logger.info("run...");Executor executor = Executors.newSingleThreadExecutor();executor.execute(runnable);

The ExecutorService interface

The ExecutorService interface extends the Executor interface and additionally has methods to execute tasks with the ability to control their completion, and methods to manage executor termination.

  • <T> Future<T> submit(Runnable task, T result)
  • <T> Future<T> submit(Callable<T> task)
  • V get​(long timeout, TimeUnit unit) — waits for the task completion for the given timeout
  • boolean cancel​(boolean mayInterruptIfRunning) — attempts to cancel the task
  • boolean isDone​() — informs whether the task is completed or not
  • boolean isCancelled​() — informs whether the task was completed normally or canceled
  • <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  • List<Runnable> shutdownNow()
  • boolean awaitTermination(long timeout, TimeUnit unit)
  • boolean isShutdown()
  • boolean isTerminated()
ExecutorService executorService = Executors.newSingleThreadExecutor();Runnable runnable = () -> System.out.println("run...");
Future<?> future = executorService.submit(runnable);
System.out.println("result: " + future.get()); // null
executorService.shutdown();
ExecutorService executorService = Executors.newSingleThreadExecutor();Runnable runnable = () -> System.out.println("run...");
Future<String> future = executorService.submit(runnable, "Alpha");
System.out.println("result: " + future.get());
executorService.shutdown();
ExecutorService executorService = Executors.newSingleThreadExecutor();Callable<String> callable = () -> "Bravo";
Future<String> future = executorService.submit(callable);
System.out.println("result: " + future.get());
executorService.shutdown();
ExecutorService executorService = Executors.newCachedThreadPool();List<Callable<String>> callables = Arrays.asList(
() -> sleepAndGet(2, "Bravo"),
() -> sleepAndGet(1, "Alpha"),
() -> sleepAndGet(3, "Charlie")
);
List<String> results = executorService.invokeAll(callables)
.stream()
.peek(future -> logger.info("is done: {}, is cancelled: {}",
future.isDone(),
future.isCancelled()))
.map(future -> {
try {
return future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList());
logger.info("results: {}", results);executorService.shutdown();
ExecutorService executorService = Executors.newCachedThreadPool();List<Callable<String>> callables = Arrays.asList(
() -> sleepAndGet(2, "Bravo"),
() -> sleepAndGet(1, "Alpha"),
() -> sleepAndGet(3, "Charlie")
);
String result = executorService.invokeAny(callables);
logger.info("result: {}", result);
executorService.shutdown();
ExecutorService executorService = Executors.newSingleThreadExecutor();executorService.submit(() -> sleepAndGet(1, "Alpha"));
executorService.submit(() -> sleepAndGet(1, "Bravo"));
executorService.submit(() -> sleepAndGet(1, "Charlie"));
logger.info("is shutdown: {}", executorService.isShutdown());logDuration(
"shutdown",
() -> executorService.shutdown()
);
logger.info("is shutdown: {}", executorService.isShutdown());
ExecutorService executorService = Executors.newSingleThreadExecutor();logger.info("is shutdown: {}", executorService.isShutdown());
executorService.shutdown();
logger.info("is shutdown: {}", executorService.isShutdown());
executorService.submit(() -> "Alpha"); // java.util.concurrent.RejectedExecutionException
ExecutorService executorService = Executors.newSingleThreadExecutor();executorService.submit(() -> sleepAndGet(1, "Alpha"));
executorService.submit(() -> sleepAndGet(1, "Bravo"));
executorService.submit(() -> sleepAndGet(1, "Charlie"));
logger.info("is terminated: {}", executorService.isTerminated());logDuration(
"shutdownNow",
() -> {
List<Runnable> skippedTasks = executorService.shutdownNow();
logger.info("count of tasks never commenced execution: {}", skippedTasks.size());
}
);
logger.info("is terminated: {}", executorService.isTerminated());
  • the timeout expires
  • the current thread is interrupted)
ExecutorService executorService = Executors.newSingleThreadExecutor();executorService.submit(() -> sleepAndGet(1, "Alpha"));
executorService.submit(() -> sleepAndGet(1, "Bravo"));
executorService.submit(() -> sleepAndGet(1, "Charlie"));
executorService.shutdown();logDuration(
"awaitTermination",
() -> executorService.awaitTermination(60, TimeUnit.SECONDS)
);
logger.debug("executor service: shutdown started");
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
List<Runnable> skippedTasks = executorService.shutdownNow();
logger.error("count of tasks never commenced execution: {}", skippedTasks.size());
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
logger.error("executor service didn't terminate");
}
}
} catch (InterruptedException e) {
logger.error("executor service is interrupted", e);
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
logger.debug("executor service: shutdown finished");

The ScheduledExecutorService interface

The ScheduledExecutorService interface extends the ExecutorService interface and additionally has methods to execute tasks once after a delay, and methods to execute tasks periodically until canceled.

  • <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
  • ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
  • the task is canceled by the returned ScheduledFuture
  • execution of the task throws an exception
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);Runnable runnable = () -> logger.info("finished");logger.info("started");
ScheduledFuture<?> scheduledFuture = scheduledExecutorService.schedule(runnable, 3000, TimeUnit.MILLISECONDS);
TimeUnit.MILLISECONDS.sleep(1000);long remainingDelay = scheduledFuture.getDelay(TimeUnit.MILLISECONDS);
logger.info("remaining delay: {} millisecond(s)", remainingDelay);
logger.info("result: {}", scheduledFuture.get());shutdown(scheduledExecutorService);
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);Callable<String> callable = () -> {
logger.info("finished");
return "Alpha";
};
logger.info("started");
ScheduledFuture<String> scheduledFuture = scheduledExecutorService.schedule(callable, 3000, TimeUnit.MILLISECONDS);
TimeUnit.MILLISECONDS.sleep(1000);long remainingDelay = scheduledFuture.getDelay(TimeUnit.MILLISECONDS);
logger.info("remaining delay: {} millisecond(s)", remainingDelay);
logger.info("result: {}", scheduledFuture.get());shutdown(scheduledExecutorService);
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);LocalTime start = LocalTime.now();Runnable runnable = () -> logger.info("duration from start: {} second(s)", Duration.between(start, LocalTime.now()).getSeconds());
int initialDelay = 3;
int period = 1;
ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(runnable, initialDelay, period, TimeUnit.SECONDS);
Runnable canceller = () -> {
scheduledFuture.cancel(true);
scheduledExecutorService.shutdown();
};
scheduledExecutorService.schedule(canceller, 10, TimeUnit.SECONDS);
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);LocalTime start = LocalTime.now();Runnable runnable = () -> {
sleep(2);
logger.info("duration from start: {} second(s)", Duration.between(start, LocalTime.now()).getSeconds());
};
int initialDelay = 3;
int delay = 1;
ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, delay, TimeUnit.SECONDS);
Runnable canceller = () -> {
scheduledFuture.cancel(true);
scheduledExecutorService.shutdown();
};
scheduledExecutorService.schedule(canceller, 10, TimeUnit.SECONDS);

Thread pools

Thread pools are implementations that hide details of threads creation, utilization, and termination from the rest of the application.

  • ScheduledThreadPoolExecutor — an implementation of the ScheduledExecutorService interface
  • ForkJoinPool — a thread pool to execute tasks that can be recursively broken down into smaller subtasks

The ThreadPoolExecutor class

The ThreadPoolExecutor class is an implementation of the ExecutorService interface which consists of the following parts:

  • a queue to transfer submitted tasks to threads
  • a thread factory to create new threads
  • maximumPoolSize — the maximum number of threads to allow in the pool
  • keepAliveTime — when the number of threads is greater than corePoolSize, this is the maximum time that excess idle threads will wait for new tasks before terminating
  • unit — the time unit for the keepAliveTime parameter
  • workQueue — the queue to use for holding tasks before they are executed
  • handler — the handler to use when execution is blocked because the thread bounds and queue capacities are reached (if not specified, ThreadPoolExecutor.AbortPolicy is used)

Threads creation

Threads can be created beforehand during thread pool creation and on-demand during task submission.

  1. if the prestartCoreThread​ method is called, one core thread is started to wait for tasks
  2. if the prestartAllCoreThreads method is called, all core threads are started to wait for tasks
  1. else, if the queue is full and the number of threads is less than maximumPoolSize, then a new thread is created
  2. else, the task is rejected
  • if corePoolSize is equal to Integer.MAX_VALUE, then the thread pool is essentially unbounded

Threads termination

If a thread has been idle more than keepAliveTime, then the thread can be terminated:

  1. if allowCoreThreadTimeOut is true, then the thread is certainly terminated
  • if keepAliveTime is equal to Long.MAX_VALUE TimeUnit.NANOSECONDS then threads are effectively never terminated before the thread pool is shut down

Tasks queueing

Tasks are added to the queue according to the used BlockingQueue implementation:

  • else, the task is rejected
  • else, if the queue is full and the number of threads is less than maximumPoolSize, then a new thread is created
  • else, the task is rejected

Tasks rejection

A task can be rejected in one of the cases:

  • number of threads equals to maximumPoolSize
  • the queue is full (remainingCapacity is 0)
  • if the ThreadPoolExecutor.CallerRunsPolicy is used, the task is being run not in one of the thread pool threads, but in the caller’s thread
  • if the ThreadPoolExecutor.DiscardPolicy is used, the task is silently dropped
  • if the ThreadPoolExecutor.DiscardOldestPolicy is used, the oldest task is dropped (the task from the head of the queue)

The ThreadFactory interface

The ThreadFactory interface has a single method:

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
logger.info("core pool size: {}", threadPoolExecutor.getCorePoolSize()); // 2
logger.info("maximum pool size: {}", threadPoolExecutor.getMaximumPoolSize()); // 4
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 10,
1L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2));
for (int i = 0; i < 10; i++) {
threadPoolExecutor.submit(() -> sleep(1));
}
for (int i = 0; i < 10; i++) {
sleep(1);
logThreadPoolSize(threadPoolExecutor); if (threadPoolExecutor.isTerminated()) {
logThreadPoolSize(threadPoolExecutor);
break;
}
}
threadPoolExecutor.shutdown();
...
private static void logThreadPoolSize(ThreadPoolExecutor threadPoolExecutor) {
logger.info("thread pool size (active/current/maximum): {}/{}/{}",
threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getPoolSize(),
threadPoolExecutor.getLargestPoolSize()
);
}
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
for (int i = 0; i < 10; i++) {
threadPoolExecutor.submit(() -> sleep(1));
}
for (int i = 0; i < 10; i++) {
sleep(1);
logTasksCount(threadPoolExecutor); if (threadPoolExecutor.isTerminated()) {
logTasksCount(threadPoolExecutor);
break;
}
}
threadPoolExecutor.shutdown();
...
private static void logTasksCount(ThreadPoolExecutor threadPoolExecutor) {
logger.info("tasks count (all/completed): {}/{}",
threadPoolExecutor.getTaskCount(),
threadPoolExecutor.getCompletedTaskCount());
}
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 10,
0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1));
for (int i = 0; i < 10; i++) {
threadPoolExecutor.submit(() -> sleep(1));
}
threadPoolExecutor.shutdown();for (int i = 0; i < 10; i++) {
sleep(1);
logThreadPoolSize(threadPoolExecutor); if (threadPoolExecutor.isTerminated()) {
logThreadPoolSize(threadPoolExecutor);
break;
}
}
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 10,
5L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1));
for (int i = 0; i < 10; i++) {
threadPoolExecutor.submit(() -> sleep(1));
}
threadPoolExecutor.shutdown();for (int i = 0; i < 10; i++) {
sleep(1);
logThreadPoolSize(threadPoolExecutor); if (threadPoolExecutor.isTerminated()) {
logThreadPoolSize(threadPoolExecutor);
break;
}
}
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>());
System.out.println(threadPoolExecutor.getQueue().remainingCapacity());threadPoolExecutor.submit(() -> sleepAndGet(3, "Alpha"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Bravo"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Charlie"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Delta"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Echo"));
threadPoolExecutor.shutdown();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
System.out.println(threadPoolExecutor.getQueue().remainingCapacity());threadPoolExecutor.submit(() -> sleepAndGet(3, "Alpha"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Bravo"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Charlie"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Delta"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Echo"));
threadPoolExecutor.shutdown();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(3));
System.out.println(threadPoolExecutor.getQueue().remainingCapacity());threadPoolExecutor.submit(() -> sleepAndGet(3, "Alpha"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Bravo"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Charlie"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Delta"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Echo"));
threadPoolExecutor.shutdown();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1),
new ThreadPoolExecutor.AbortPolicy());
try {
threadPoolExecutor.submit(() -> sleepAndGet(3, "Alpha"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Bravo"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Charlie")); // java.util.concurrent.RejectedExecutionException
threadPoolExecutor.submit(() -> sleepAndGet(3, "Delta"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Echo"));
} finally {
threadPoolExecutor.shutdown();
}
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1),
new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolExecutor.submit(() -> sleepAndGet(3, "Alpha"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Bravo"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Charlie"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Delta"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Echo"));
threadPoolExecutor.shutdown();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1),
new ThreadPoolExecutor.DiscardPolicy());
threadPoolExecutor.submit(() -> sleepAndGet(3, "Alpha"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Bravo"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Charlie"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Delta"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Echo"));
threadPoolExecutor.shutdown();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1),
new ThreadPoolExecutor.DiscardOldestPolicy());
threadPoolExecutor.submit(() -> sleepAndGet(3, "Alpha"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Bravo"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Charlie"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Delta"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Echo"));
threadPoolExecutor.shutdown();
ThreadPoolExecutor threadPoolExecutor = new ExtendedThreadPoolExecutor();threadPoolExecutor.submit(() -> sleep(1));for (int i = 0; i < 10; i++) {
sleep(1);
logTasksCount(threadPoolExecutor); if (threadPoolExecutor.isTerminated()) {
logTasksCount(threadPoolExecutor);
break;
}
}
threadPoolExecutor.shutdown();
...
private static class ExtendedThreadPoolExecutor extends ThreadPoolExecutor {
private ExtendedThreadPoolExecutor() {
super(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
logger.info("before task execution: thread {}, task {}", t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
logger.info("after task execution: task {}, exception {}", r, t);
}
@Override
protected void terminated() {
super.terminated();
logger.info("is terminated");
}
}

The ScheduledThreadPoolExecutor class

The ScheduledThreadPoolExecutor class is an implementation of the ScheduledExecutorService interface and a subclass of the ThreadPoolExecutor class.

  • handler — the handler to use when execution is blocked because the thread bounds and queue capacities are reached (if not specified, ThreadPoolExecutor.AbortPolicy is used)
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);Runnable runnable = () -> logger.info("finished");logger.info("started");
ScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.schedule(runnable, 3000, TimeUnit.MILLISECONDS);
TimeUnit.MILLISECONDS.sleep(1000);long remainingDelay = scheduledFuture.getDelay(TimeUnit.MILLISECONDS);
logger.info("remaining delay: {} millisecond(s)", remainingDelay);
logger.info("result: {}", scheduledFuture.get());shutdown(scheduledThreadPoolExecutor);

The ForkJoinPool class

The ForkJoinPool class is a thread pool to execute tasks that can be recursively broken down into smaller subtasks. The ForkJoinPool class differs from other kinds of thread pools by using the work-stealing algorithm: threads in the pool attempt to execute tasks submitted to the pool or spawned by other tasks. This enables efficient processing when many small tasks are submitted to the thread pool or most tasks spawn other subtasks.

  • tasks should access variables that are independent of those accessed by other tasks
  • tasks should avoid synchronized methods/blocks and should minimize other blocking synchronization apart from joining other tasks
  • tasks should not perform blocking I/O
  • RecursiveTask — for tasks returns results
  • CountedCompleter — for tasks in which completed actions trigger other actions
  • <T> T invoke(ForkJoinTask<T> task) — performs the given task, returning its result upon completion
  • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) — executes the given tasks, returning a list of Futures holding their status and results when all complete
  • <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) — submits the given task for execution
  • int getPoolSize() — returns the number of threads that have started but not yet terminated
  • int getActiveThreadCoun() — returns an estimate of the number of threads that are currently stealing or executing tasks
  • int getRunningThreadCount() — returns an estimate of the number of threads that are not blocked waiting to join tasks or for other managed synchronization
  • boolean isQuiescent() — returns true if all threads are currently idle
  • long getQueuedTaskCount() — returns an estimate of the total number of tasks currently held in queues by threads (but not including tasks submitted to the thread pool that have not begun executing)
  • long getStealCount() — returns an estimate of the total number of tasks stolen from one thread’s work queue by another
  • boolean hasQueuedSubmissions() — returns true if there are any tasks submitted to this thread pool that have not yet begun executing
public class PrimeNumbersCountRecursiveTask extends RecursiveTask<Long> {   private final int start;
private final int end;
private final int threshold;
public PrimeNumbersCountRecursiveTask(int start, int end, int threshold) {
this.start = start;
this.end = end;
this.threshold = threshold;
}
@Override
protected Long compute() {
if (((end + 1) - start) > threshold) {
return ForkJoinTask.invokeAll(getSubTasks())
.stream()
.mapToLong(ForkJoinTask::join)
.sum();
} else {
return findPrimeNumbersCount();
}
}
private List<PrimeNumbersCountRecursiveTask> getSubTasks() {
List<PrimeNumbersCountRecursiveTask> tasks = new ArrayList<>();
for (int i = 1; i <= end / threshold; i++) {
int end = i * threshold;
int start = (end - threshold) + 1;
tasks.add(new PrimeNumbersCountRecursiveTask(start, end, threshold));
}
return tasks;
}
private long findPrimeNumbersCount() {
long numbers = 0;
for (int n = start; n <= end; n++) {
if (isPrimeNumber(n)) {
numbers++;
}
}
return numbers;
}
private boolean isPrimeNumber(int n) {
if (n == 2) {
return true;
}
if (n == 1 || n % 2 == 0) {
return false;
}
int divisors = 0;
for (int i = 1; i <= n; i++) {
if (n % i == 0) {
divisors++;
}
}
return divisors == 2;
}
}
ForkJoinTask<Long> task = new PrimeNumbersCountRecursiveTask(1, 100000, 10);
ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
forkJoinPool.execute(task);do {
sleep(100);
} while (!task.isDone());
System.out.println("count: " + task.getRawResult()); // 9592forkJoinPool.shutdown();
ForkJoinTask<Long> task = new PrimeNumbersCountRecursiveTask(1, 100000, 10);
ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
Long count = forkJoinPool.invoke(task);
System.out.println("count: " + count); // 9592
forkJoinPool.shutdown();
ForkJoinTask<Long> task = new PrimeNumbersCountRecursiveTask(1, 100000, 10);
ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
ForkJoinTask<Long> forkJoinTask = forkJoinPool.submit(task);
Long count = forkJoinTask.get();
System.out.println("count: " + count); // 9592
forkJoinPool.shutdown();
ForkJoinTask<Long> task = new PrimeNumbersCountRecursiveTask(1, 100000, 10);
ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
forkJoinPool.execute(task);do {
logger.info("getParallelism={}, getPoolSize={}, getActiveThreadCount={}, getRunningThreadCount={}, isQuiescent={}",
forkJoinPool.getParallelism(),
forkJoinPool.getPoolSize(),
forkJoinPool.getActiveThreadCount(),
forkJoinPool.getRunningThreadCount(),
forkJoinPool.isQuiescent()
);
Thread.sleep(100);
} while (!task.isDone());
System.out.println("count: " + task.getRawResult()); // 9592forkJoinPool.shutdown();
ForkJoinTask<Long> task = new PrimeNumbersCountRecursiveTask(1, 100000, 10);
ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
forkJoinPool.execute(task);do {
logger.info("getQueuedSubmissionCount={}, getQueuedTaskCount={}, getStealCount={}, hasQueuedSubmissions={}",
forkJoinPool.getQueuedSubmissionCount(),
forkJoinPool.getQueuedTaskCount(),
forkJoinPool.getStealCount(),
forkJoinPool.hasQueuedSubmissions()
);
Thread.sleep(100);
} while (!task.isDone());
System.out.println("count: " + task.getRawResult()); // 9592forkJoinPool.shutdown();

The Executors class

The Executors class has factory methods for the ExecutorService, ScheduledExecutorService, ThreadFactory, Callable classes with commonly useful configuration settings.

Factory methods for ExecutorService instances

Methods to create ExecutorService instances:

  • static ExecutorService newFixedThreadPool(int nThreads)
  • static ExecutorService newSingleThreadExecutor()
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

Factory methods for ScheduledExecutorService instances

Methods to create ScheduledExecutorService instances

  • static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
}

Factory methods for non-reconfigurable executors

Methods to create non-reconfigurable ExecutorService, ScheduledExecutorService instances:

  • static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor)
ExecutorService executorService = Executors.unconfigurableExecutorService(
new ThreadPoolExecutor(1, 2,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>())
);
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService; // java.lang.ClassCastException
ScheduledExecutorService scheduledExecutorService = Executors.unconfigurableScheduledExecutorService(
new ScheduledThreadPoolExecutor(1)
);
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) scheduledExecutorService; // java.lang.ClassCastException

Factory methods for work-stealing executors

Methods to create work-stealing ExecutorService instances:

  • static ExecutorService newWorkStealingPool(int parallelism)
ForkJoinTask<Long> task = new PrimeNumbersCountRecursiveTask(1, 100000, 10);
ForkJoinPool forkJoinPool = (ForkJoinPool) Executors.newWorkStealingPool();
Long count = forkJoinPool.invoke(task);
System.out.println("count: " + count); // 9592

Factory methods for ThreadFactory instances

Methods to create ThreadFactory instances:

public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}

Factory methods for Callable instances

Methods to create Callable instances:

  • static <T> Callable<T> callable(Runnable task, T result)
Runnable runnable = () -> System.out.println("running ...");
Callable<Object> callable = Executors.callable(runnable);
System.out.println("result: " + callable.call()); // null
Runnable runnable = () -> System.out.println("running ...");
Callable<Integer> callable = Executors.callable(runnable, 1);
System.out.println("result: " + callable.call()); // 1

Conclusion

The optimum size of a thread pool depends on the nature of the tasks and the number of available processors.

Senior Software Engineer

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store