Java executors and thread pools
--
Introduction
In small applications to execute each task (Runnable object) is created a new thread (Thread object). When the task is completed, the thread is terminated as well. But in large applications overhead of threads creation and termination can be significant. Such overhead can be reduced by reusing the same threads for the execution of many tasks. For that purpose are used executors and thread pools. An executor is a design pattern that provides API for task executions and hides its implementation. A thread pool is one of the executor implementations that uses a pool of threads for task execution.
In this article are provided practical examples of using executors and thread pools from the java.util.concurrent package. Here are described the following classes and interfaces:
- Executor
- ExecutorService
- ScheduledExecutorService
- ThreadPoolExecutor
- ScheduledThreadPoolExecutor
- ForkJoinPool
- Executors
The code examples are compiled with Java 9 and expected to be run on a computer with at least a 4-core processor.
The Thread class
In general, the workflow of task execution using thread consists of the following steps:
- task creation
- task submission
- thread creation
- thread start
- task execution
- thread termination
There are two ways to manage threads during this workflow: explicit and implicit.
In an explicit way, a thread is created inextricably with a task, then the thread executes the submitted task, and finally, the thread is terminated after the task completion. In this way, the workflow of a task (Runnable object) and a thread (Thread object) is tightly connected.
Runnable runnable = new Runnable() { // task creation
@Override
public void run() {
logger.info("run..."); // task execution
}
};Thread thread = new Thread(runnable); // task submission during thread creation
thread.start(); // thread startthread.join(); // thread termination
In an implicit way, a thread is created, executes tasks, and is terminated independently from a task creation, submission, and execution. In this way, the workflow of the task (Runnable object) and the thread (Thread object) is loosely connected.
Executor executor = createExecutor(); // thread creation and thread startRunnable task = createTask(); // task creation
Future future = executor.submit(task); // task submission
future.get(); // waiting until task is executed
executor.terminate(); // thread termination
Executors
Executors are interfaces that provide API for threads creation, utilization, and termination for the rest of the application.
In the java.util.concurrent package there are three executors interfaces:
- Executor, that has a method to execute tasks
- ExecutorService, that has methods to execute tasks with the ability to control their completion, and methods to manage executor termination
- ScheduledExecutorService, that has methods to execute tasks once after a given delay, and methods to execute tasks periodically
The Executor interface
The Executor interface is intended to decouple the interface of tasks submitted from the implementation of task execution. Indeed tasks can be executed asynchronously (in another thread) as well as synchronously (in the caller’s thread).
The Executor interface has a single method:
- void execute(Runnable command)
The return type of the method is void, so there is no information about the task completion and no ability to cancel the task.
Examples
Example of a synchronous Executor:
Runnable runnable = () -> logger.info("run...");Executor executor = new Executor() {
@Override
public void execute(Runnable command) {
command.run();
}
};executor.execute(runnable);
Example of an asynchronous Executor that executes each task in a new Thread:
Runnable runnable = () -> logger.info("run...");Executor executor = new Executor() {
@Override
public void execute(Runnable command) {
new Thread(command).start();
}
};executor.execute(runnable);
Example of a predefined asynchronous Executor that execute each task in a reused Thread:
Runnable runnable = () -> logger.info("run...");Executor executor = Executors.newSingleThreadExecutor();executor.execute(runnable);
The ExecutorService interface
The ExecutorService interface extends the Executor interface and additionally has methods to execute tasks with the ability to control their completion, and methods to manage executor termination.
The executor methods to submit single Runnable or Callable tasks:
- Future<?> submit(Runnable task)
- <T> Future<T> submit(Runnable task, T result)
- <T> Future<T> submit(Callable<T> task)
Unlike the execute method, the submit methods return Future<V> that has methods to control task completion:
- V get() — waits for the task completion
- V get(long timeout, TimeUnit unit) — waits for the task completion for the given timeout
- boolean cancel(boolean mayInterruptIfRunning) — attempts to cancel the task
- boolean isDone() — informs whether the task is completed or not
- boolean isCancelled() — informs whether the task was completed normally or canceled
The executor methods to submit multiple Callable tasks:
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- <T> T invokeAny(Collection<? extends Callable<T>> tasks)
The invokeAll, invokeAny methods have overloaded versions with the maximum allowed timeout.
The executor methods to control executor termination:
- void shutdown()
- List<Runnable> shutdownNow()
- boolean awaitTermination(long timeout, TimeUnit unit)
- boolean isShutdown()
- boolean isTerminated()
It’s important that an ExecutorService instance isn’t terminated automatically if there are no tasks to execute. It continues to wait for new tasks until it is shut down manually.
Examples
Example of the submit method with Runnable parameter:
ExecutorService executorService = Executors.newSingleThreadExecutor();Runnable runnable = () -> System.out.println("run...");
Future<?> future = executorService.submit(runnable);
System.out.println("result: " + future.get()); // nullexecutorService.shutdown();
Example of the submit method with Runnable parameter and return value:
ExecutorService executorService = Executors.newSingleThreadExecutor();Runnable runnable = () -> System.out.println("run...");
Future<String> future = executorService.submit(runnable, "Alpha");
System.out.println("result: " + future.get());executorService.shutdown();
Example of the submit method with Callable parameter:
ExecutorService executorService = Executors.newSingleThreadExecutor();Callable<String> callable = () -> "Bravo";
Future<String> future = executorService.submit(callable);
System.out.println("result: " + future.get());executorService.shutdown();
Example of the invokeAll method:
(if the overloaded method with timeout is used, and the timeout expires before all tasks are successfully completed, the late tasks are not completed)
ExecutorService executorService = Executors.newCachedThreadPool();List<Callable<String>> callables = Arrays.asList(
() -> sleepAndGet(2, "Bravo"),
() -> sleepAndGet(1, "Alpha"),
() -> sleepAndGet(3, "Charlie")
);List<String> results = executorService.invokeAll(callables)
.stream()
.peek(future -> logger.info("is done: {}, is cancelled: {}",
future.isDone(),
future.isCancelled()))
.map(future -> {
try {
return future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList());logger.info("results: {}", results);executorService.shutdown();
Example of the invokeAny method:
(if the overloaded method with timeout is used, and the timeout expires before any task is successfully completed, a java.util.concurrent.TimeoutException is thrown)
ExecutorService executorService = Executors.newCachedThreadPool();List<Callable<String>> callables = Arrays.asList(
() -> sleepAndGet(2, "Bravo"),
() -> sleepAndGet(1, "Alpha"),
() -> sleepAndGet(3, "Charlie")
);String result = executorService.invokeAny(callables);
logger.info("result: {}", result);executorService.shutdown();
Example of the shutdown, isShutdown methods:
(the shutdown method starts an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted; the method does not wait for previously submitted tasks to complete execution)
ExecutorService executorService = Executors.newSingleThreadExecutor();executorService.submit(() -> sleepAndGet(1, "Alpha"));
executorService.submit(() -> sleepAndGet(1, "Bravo"));
executorService.submit(() -> sleepAndGet(1, "Charlie"));logger.info("is shutdown: {}", executorService.isShutdown());logDuration(
"shutdown",
() -> executorService.shutdown()
);logger.info("is shutdown: {}", executorService.isShutdown());
Example of task rejection after calling shutdown method:
ExecutorService executorService = Executors.newSingleThreadExecutor();logger.info("is shutdown: {}", executorService.isShutdown());
executorService.shutdown();
logger.info("is shutdown: {}", executorService.isShutdown());executorService.submit(() -> "Alpha"); // java.util.concurrent.RejectedExecutionException
Example of the shutdownNow, isTerminated methods:
(the shutdownNow method tries to stop all actively executing tasks, halts the waiting tasks, and returns a list of the tasks that were awaiting execution; the method does not wait for actively executing tasks to terminate)
ExecutorService executorService = Executors.newSingleThreadExecutor();executorService.submit(() -> sleepAndGet(1, "Alpha"));
executorService.submit(() -> sleepAndGet(1, "Bravo"));
executorService.submit(() -> sleepAndGet(1, "Charlie"));logger.info("is terminated: {}", executorService.isTerminated());logDuration(
"shutdownNow",
() -> {
List<Runnable> skippedTasks = executorService.shutdownNow();
logger.info("count of tasks never commenced execution: {}", skippedTasks.size());
}
);logger.info("is terminated: {}", executorService.isTerminated());
Example of the awaitTermination method:
(the awaitTermination method waits until one of the events occurs:
- all tasks have completed
- the timeout expires
- the current thread is interrupted)
ExecutorService executorService = Executors.newSingleThreadExecutor();executorService.submit(() -> sleepAndGet(1, "Alpha"));
executorService.submit(() -> sleepAndGet(1, "Bravo"));
executorService.submit(() -> sleepAndGet(1, "Charlie"));executorService.shutdown();logDuration(
"awaitTermination",
() -> executorService.awaitTermination(60, TimeUnit.SECONDS)
);
Example of the the recommended way of shutting down an ExecutorService instance:
logger.debug("executor service: shutdown started");
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
List<Runnable> skippedTasks = executorService.shutdownNow();
logger.error("count of tasks never commenced execution: {}", skippedTasks.size());
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
logger.error("executor service didn't terminate");
}
}
} catch (InterruptedException e) {
logger.error("executor service is interrupted", e);
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
logger.debug("executor service: shutdown finished");
The ScheduledExecutorService interface
The ScheduledExecutorService interface extends the ExecutorService interface and additionally has methods to execute tasks once after a delay, and methods to execute tasks periodically until canceled.
Methods to schedule tasks to execute once after a delay:
- ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
- <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
The methods return ScheduledFuture<V> (that extends Future<V>) that has a method to control task completion (delayed or periodical):
- method long getDelay(TimeUnit unit) — returns the remaining delay associated with the task
Methods to schedule tasks to execute periodically:
- ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
- ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
It’s important that ScheduledFuture that is returned from the scheduleAtFixedRate, scheduleWithFixedDelay methods is never completed while the task is periodically executing. The sequence of task executions continues indefinitely until one of the events occurs:
- the executor is terminated
- the task is canceled by the returned ScheduledFuture
- execution of the task throws an exception
Examples
Example of the schedule method with Runnable parameter:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);Runnable runnable = () -> logger.info("finished");logger.info("started");
ScheduledFuture<?> scheduledFuture = scheduledExecutorService.schedule(runnable, 3000, TimeUnit.MILLISECONDS);TimeUnit.MILLISECONDS.sleep(1000);long remainingDelay = scheduledFuture.getDelay(TimeUnit.MILLISECONDS);
logger.info("remaining delay: {} millisecond(s)", remainingDelay);logger.info("result: {}", scheduledFuture.get());shutdown(scheduledExecutorService);
Example of the schedule method with Callable parameter:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);Callable<String> callable = () -> {
logger.info("finished");
return "Alpha";
};logger.info("started");
ScheduledFuture<String> scheduledFuture = scheduledExecutorService.schedule(callable, 3000, TimeUnit.MILLISECONDS);TimeUnit.MILLISECONDS.sleep(1000);long remainingDelay = scheduledFuture.getDelay(TimeUnit.MILLISECONDS);
logger.info("remaining delay: {} millisecond(s)", remainingDelay);logger.info("result: {}", scheduledFuture.get());shutdown(scheduledExecutorService);
Example of the scheduleAtFixedRate method:
(If any execution of the task takes longer than its period, then subsequent executions may start late, but will not concurrently execute)
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);LocalTime start = LocalTime.now();Runnable runnable = () -> logger.info("duration from start: {} second(s)", Duration.between(start, LocalTime.now()).getSeconds());
int initialDelay = 3;
int period = 1;
ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(runnable, initialDelay, period, TimeUnit.SECONDS);Runnable canceller = () -> {
scheduledFuture.cancel(true);
scheduledExecutorService.shutdown();
};
scheduledExecutorService.schedule(canceller, 10, TimeUnit.SECONDS);
Example of the scheduleAtFixedDelay method:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);LocalTime start = LocalTime.now();Runnable runnable = () -> {
sleep(2);
logger.info("duration from start: {} second(s)", Duration.between(start, LocalTime.now()).getSeconds());
};
int initialDelay = 3;
int delay = 1;
ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, delay, TimeUnit.SECONDS);Runnable canceller = () -> {
scheduledFuture.cancel(true);
scheduledExecutorService.shutdown();
};
scheduledExecutorService.schedule(canceller, 10, TimeUnit.SECONDS);
Thread pools
Thread pools are implementations that hide details of threads creation, utilization, and termination from the rest of the application.
In the java.util.concurrent package there are three thread pools implementations:
- ThreadPoolExecutor — an implementation of the ExecutorService interface
- ScheduledThreadPoolExecutor — an implementation of the ScheduledExecutorService interface
- ForkJoinPool — a thread pool to execute tasks that can be recursively broken down into smaller subtasks
The ThreadPoolExecutor class
The ThreadPoolExecutor class is an implementation of the ExecutorService interface which consists of the following parts:
- a thread pool
- a queue to transfer submitted tasks to threads
- a thread factory to create new threads
The class has overloaded constructors with mandatory and optional parameters.
Mandatory constructors’ parameters:
- corePoolSize — the number of threads to keep in the pool, even if they are idle (unless allowCoreThreadTimeOut is true)
- maximumPoolSize — the maximum number of threads to allow in the pool
- keepAliveTime — when the number of threads is greater than corePoolSize, this is the maximum time that excess idle threads will wait for new tasks before terminating
- unit — the time unit for the keepAliveTime parameter
- workQueue — the queue to use for holding tasks before they are executed
Optional constructors’ parameters:
- threadFactory — the factory to use when the executor creates a new thread (if not specified, Executors.defaultThreadFactory is used)
- handler — the handler to use when execution is blocked because the thread bounds and queue capacities are reached (if not specified, ThreadPoolExecutor.AbortPolicy is used)
Threads creation
Threads can be created beforehand during thread pool creation and on-demand during task submission.
Algorithm of threads creation during thread pool creation:
- the thread pool is created without idle threads
- if the prestartCoreThread method is called, one core thread is started to wait for tasks
- if the prestartAllCoreThreads method is called, all core threads are started to wait for tasks
Algorithm of threads creation during tasks submission:
I) if the number of threads is less than corePoolSize, then a new thread is created
II) else, if the number of threads is equals or more than corePoolSize:
- if the queue is not full, then the task is added to the queue
- else, if the queue is full and the number of threads is less than maximumPoolSize, then a new thread is created
- else, the task is rejected
There are corner cases of threads creation:
- if corePoolSize is equal to maximumPoolSize, then it’s the fixed-size thread pool
- if corePoolSize is equal to Integer.MAX_VALUE, then the thread pool is essentially unbounded
Threads termination
If a thread has been idle more than keepAliveTime, then the thread can be terminated:
- if allowCoreThreadTimeOut is false (by default) and the number of threads is more than corePoolSize then the thread is terminated
- if allowCoreThreadTimeOut is true, then the thread is certainly terminated
There are corner cases of threads termination:
- if keepAliveTime is equal to 0 then threads are terminated immediately after executing a task
- if keepAliveTime is equal to Long.MAX_VALUE TimeUnit.NANOSECONDS then threads are effectively never terminated before the thread pool is shut down
Tasks queueing
Tasks are added to the queue according to the used BlockingQueue implementation:
I) if a direct handoffs queue (e.g. SynchronousQueue) is used, which is essentially always full:
- if the number of threads is less than maximumPoolSize, then a new thread is created
- else, the task is rejected
II) if an unbounded queue (e.g. LinkedBlockingQueue without a predefined capacity) is used, which is never full:
- the task is always added to the queue
III) if a bounded queue (e.g. ArrayBlockingQueue with a predefined capacity) is used:
- if the queue is not full, then the task is added to the queue
- else, if the queue is full and the number of threads is less than maximumPoolSize, then a new thread is created
- else, the task is rejected
It’s important that a ThreadPoolExecutor instance never creates a new thread if a task can be added to the queue.
Tasks rejection
A task can be rejected in one of the cases:
- the thread pool has been shut down
- number of threads equals to maximumPoolSize
- the queue is full (remainingCapacity is 0)
The rejected task can be handled with one of the predefined handler policies:
- if the ThreadPoolExecutor.AbortPolicy is used, the task is rejected and RejectedExecutionException is thrown (by default)
- if the ThreadPoolExecutor.CallerRunsPolicy is used, the task is being run not in one of the thread pool threads, but in the caller’s thread
- if the ThreadPoolExecutor.DiscardPolicy is used, the task is silently dropped
- if the ThreadPoolExecutor.DiscardOldestPolicy is used, the oldest task is dropped (the task from the head of the queue)
It’s possible to use a custom handler that implements the RejectedExecutionHandler interface.
The ThreadFactory interface
The ThreadFactory interface has a single method:
- Thread newThread(Runnable command)
Thread factory is used to create threads for a thread pool with necessary priority, name, daemon status, Thread.UncaughtExceptionHandler, etc.
Examples
Examples of the getCorePoolSize, getMaximumPoolSize methods:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());logger.info("core pool size: {}", threadPoolExecutor.getCorePoolSize()); // 2
logger.info("maximum pool size: {}", threadPoolExecutor.getMaximumPoolSize()); // 4
Examples of the getActiveCount, getPoolSize, getLargestPoolSize methods
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 10,
1L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2));for (int i = 0; i < 10; i++) {
threadPoolExecutor.submit(() -> sleep(1));
}for (int i = 0; i < 10; i++) {
sleep(1); logThreadPoolSize(threadPoolExecutor); if (threadPoolExecutor.isTerminated()) {
logThreadPoolSize(threadPoolExecutor);
break;
}
}threadPoolExecutor.shutdown();
...
private static void logThreadPoolSize(ThreadPoolExecutor threadPoolExecutor) {
logger.info("thread pool size (active/current/maximum): {}/{}/{}",
threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getPoolSize(),
threadPoolExecutor.getLargestPoolSize()
);
}
Examples of the getTaskCount, getCompletedTaskCount methods:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());for (int i = 0; i < 10; i++) {
threadPoolExecutor.submit(() -> sleep(1));
}for (int i = 0; i < 10; i++) {
sleep(1); logTasksCount(threadPoolExecutor); if (threadPoolExecutor.isTerminated()) {
logTasksCount(threadPoolExecutor);
break;
}
}threadPoolExecutor.shutdown();
...
private static void logTasksCount(ThreadPoolExecutor threadPoolExecutor) {
logger.info("tasks count (all/completed): {}/{}",
threadPoolExecutor.getTaskCount(),
threadPoolExecutor.getCompletedTaskCount());
}
Example of the keepAliveTime field (0 seconds):
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 10,
0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1));for (int i = 0; i < 10; i++) {
threadPoolExecutor.submit(() -> sleep(1));
}threadPoolExecutor.shutdown();for (int i = 0; i < 10; i++) {
sleep(1); logThreadPoolSize(threadPoolExecutor); if (threadPoolExecutor.isTerminated()) {
logThreadPoolSize(threadPoolExecutor);
break;
}
}
Example of the keepAliveTime field (5 seconds):
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 10,
5L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1));for (int i = 0; i < 10; i++) {
threadPoolExecutor.submit(() -> sleep(1));
}threadPoolExecutor.shutdown();for (int i = 0; i < 10; i++) {
sleep(1); logThreadPoolSize(threadPoolExecutor); if (threadPoolExecutor.isTerminated()) {
logThreadPoolSize(threadPoolExecutor);
break;
}
}
Example of a direct handoff queue (SynchronousQueue):
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>());System.out.println(threadPoolExecutor.getQueue().remainingCapacity());threadPoolExecutor.submit(() -> sleepAndGet(3, "Alpha"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Bravo"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Charlie"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Delta"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Echo"));threadPoolExecutor.shutdown();
Example of an unbounded queue (LinkedBlockingQueue):
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());System.out.println(threadPoolExecutor.getQueue().remainingCapacity());threadPoolExecutor.submit(() -> sleepAndGet(3, "Alpha"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Bravo"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Charlie"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Delta"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Echo"));threadPoolExecutor.shutdown();
Example of an bounded queue (ArrayBlockingQueue):
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(3));System.out.println(threadPoolExecutor.getQueue().remainingCapacity());threadPoolExecutor.submit(() -> sleepAndGet(3, "Alpha"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Bravo"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Charlie"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Delta"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Echo"));threadPoolExecutor.shutdown();
Example of the ThreadPoolExecutor.AbortPolicy rejecting policy:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1),
new ThreadPoolExecutor.AbortPolicy());try {
threadPoolExecutor.submit(() -> sleepAndGet(3, "Alpha"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Bravo"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Charlie")); // java.util.concurrent.RejectedExecutionException
threadPoolExecutor.submit(() -> sleepAndGet(3, "Delta"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Echo"));
} finally {
threadPoolExecutor.shutdown();
}
Example of the ThreadPoolExecutor.CallerRunsPolicy rejecting policy:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1),
new ThreadPoolExecutor.CallerRunsPolicy());threadPoolExecutor.submit(() -> sleepAndGet(3, "Alpha"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Bravo"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Charlie"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Delta"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Echo"));threadPoolExecutor.shutdown();
Example of the ThreadPoolExecutor.DiscardPolicy rejecting policy:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1),
new ThreadPoolExecutor.DiscardPolicy());threadPoolExecutor.submit(() -> sleepAndGet(3, "Alpha"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Bravo"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Charlie"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Delta"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Echo"));threadPoolExecutor.shutdown();
Example of the ThreadPoolExecutor.DiscardOldestPolicy rejecting policy:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1),
new ThreadPoolExecutor.DiscardOldestPolicy());threadPoolExecutor.submit(() -> sleepAndGet(3, "Alpha"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Bravo"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Charlie"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Delta"));
threadPoolExecutor.submit(() -> sleepAndGet(3, "Echo"));threadPoolExecutor.shutdown();
Example of the overridden beforeExecute, afterExecute, terminated methods:
ThreadPoolExecutor threadPoolExecutor = new ExtendedThreadPoolExecutor();threadPoolExecutor.submit(() -> sleep(1));for (int i = 0; i < 10; i++) {
sleep(1); logTasksCount(threadPoolExecutor); if (threadPoolExecutor.isTerminated()) {
logTasksCount(threadPoolExecutor);
break;
}
}threadPoolExecutor.shutdown();
...
private static class ExtendedThreadPoolExecutor extends ThreadPoolExecutor { private ExtendedThreadPoolExecutor() {
super(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
} @Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
logger.info("before task execution: thread {}, task {}", t, r);
} @Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
logger.info("after task execution: task {}, exception {}", r, t);
} @Override
protected void terminated() {
super.terminated();
logger.info("is terminated");
}
}
The ScheduledThreadPoolExecutor class
The ScheduledThreadPoolExecutor class is an implementation of the ScheduledExecutorService interface and a subclass of the ThreadPoolExecutor class.
The class has overloaded constructors with mandatory and optional parameters.
Mandatory constructors’ parameters:
- corePoolSize — the number of threads to keep in the pool, even if they are idle (unless allowCoreThreadTimeOut is true)
Optional constructors’ parameters:
- threadFactory — the factory to use when the executor creates a new thread (if not specified, Executors.defaultThreadFactory is used)
- handler — the handler to use when execution is blocked because the thread bounds and queue capacities are reached (if not specified, ThreadPoolExecutor.AbortPolicy is used)
Examples
Example of the schedule method with Runnable parameter:
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);Runnable runnable = () -> logger.info("finished");logger.info("started");
ScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.schedule(runnable, 3000, TimeUnit.MILLISECONDS);TimeUnit.MILLISECONDS.sleep(1000);long remainingDelay = scheduledFuture.getDelay(TimeUnit.MILLISECONDS);
logger.info("remaining delay: {} millisecond(s)", remainingDelay);logger.info("result: {}", scheduledFuture.get());shutdown(scheduledThreadPoolExecutor);
The ScheduledThreadPoolExecutor class has methods of its superinterface ScheduledExecutorService and its superclass ThreadPoolExecutor that are already described previously.
The ForkJoinPool class
The ForkJoinPool class is a thread pool to execute tasks that can be recursively broken down into smaller subtasks. The ForkJoinPool class differs from other kinds of thread pools by using the work-stealing algorithm: threads in the pool attempt to execute tasks submitted to the pool or spawned by other tasks. This enables efficient processing when many small tasks are submitted to the thread pool or most tasks spawn other subtasks.
The ForkJoinTask class is an abstract class for tasks that run within a ForkJoinPool. A ForkJoinTask is an object that is much lighter weight than a normal thread. Large numbers of tasks may be executed by a small number of threads in a ForkJoinPool, at the price of some restrictions:
- tasks should be used for calculating pure functions
- tasks should access variables that are independent of those accessed by other tasks
- tasks should avoid synchronized methods/blocks and should minimize other blocking synchronization apart from joining other tasks
- tasks should not perform blocking I/O
There are abstract subclasses of the abstract ForkJoinTask class that support a particular style of fork/join tasks:
- RecursiveAction — for tasks that do not return results
- RecursiveTask — for tasks returns results
- CountedCompleter — for tasks in which completed actions trigger other actions
The thread pool methods to submit ForkJoinTask:
- void execute(ForkJoinTask<?> task) — arranges for asynchronous execution of the given task
- <T> T invoke(ForkJoinTask<T> task) — performs the given task, returning its result upon completion
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) — executes the given tasks, returning a list of Futures holding their status and results when all complete
- <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) — submits the given task for execution
The thread pool methods to monitor threads:
- int getParallelism() — returns the targeted parallelism level of this thread pool
- int getPoolSize() — returns the number of threads that have started but not yet terminated
- int getActiveThreadCoun() — returns an estimate of the number of threads that are currently stealing or executing tasks
- int getRunningThreadCount() — returns an estimate of the number of threads that are not blocked waiting to join tasks or for other managed synchronization
- boolean isQuiescent() — returns true if all threads are currently idle
The thread pool methods to monitor tasks:
- int getQueuedSubmissionCount() — returns an estimate of the number of tasks submitted to this thread pool that have not yet begun executing
- long getQueuedTaskCount() — returns an estimate of the total number of tasks currently held in queues by threads (but not including tasks submitted to the thread pool that have not begun executing)
- long getStealCount() — returns an estimate of the total number of tasks stolen from one thread’s work queue by another
- boolean hasQueuedSubmissions() — returns true if there are any tasks submitted to this thread pool that have not yet begun executing
Examples
Example of the RecursiveTask class:
(the class is used to find the count of prime numbers from 1 to 100000 by the trial division algorithm — there are 9592 of them)
public class PrimeNumbersCountRecursiveTask extends RecursiveTask<Long> { private final int start;
private final int end;
private final int threshold; public PrimeNumbersCountRecursiveTask(int start, int end, int threshold) {
this.start = start;
this.end = end;
this.threshold = threshold;
} @Override
protected Long compute() {
if (((end + 1) - start) > threshold) {
return ForkJoinTask.invokeAll(getSubTasks())
.stream()
.mapToLong(ForkJoinTask::join)
.sum();
} else {
return findPrimeNumbersCount();
}
} private List<PrimeNumbersCountRecursiveTask> getSubTasks() {
List<PrimeNumbersCountRecursiveTask> tasks = new ArrayList<>();
for (int i = 1; i <= end / threshold; i++) {
int end = i * threshold;
int start = (end - threshold) + 1;
tasks.add(new PrimeNumbersCountRecursiveTask(start, end, threshold));
}
return tasks;
} private long findPrimeNumbersCount() {
long numbers = 0;
for (int n = start; n <= end; n++) {
if (isPrimeNumber(n)) {
numbers++;
}
}
return numbers;
} private boolean isPrimeNumber(int n) {
if (n == 2) {
return true;
} if (n == 1 || n % 2 == 0) {
return false;
} int divisors = 0;
for (int i = 1; i <= n; i++) {
if (n % i == 0) {
divisors++;
}
}
return divisors == 2;
}
}
Example of the execute method:
ForkJoinTask<Long> task = new PrimeNumbersCountRecursiveTask(1, 100000, 10);
ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());forkJoinPool.execute(task);do {
sleep(100);
} while (!task.isDone());System.out.println("count: " + task.getRawResult()); // 9592forkJoinPool.shutdown();
Example of the invoke method:
ForkJoinTask<Long> task = new PrimeNumbersCountRecursiveTask(1, 100000, 10);
ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());Long count = forkJoinPool.invoke(task);
System.out.println("count: " + count); // 9592forkJoinPool.shutdown();
Example of the submit method:
ForkJoinTask<Long> task = new PrimeNumbersCountRecursiveTask(1, 100000, 10);
ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());ForkJoinTask<Long> forkJoinTask = forkJoinPool.submit(task);
Long count = forkJoinTask.get();
System.out.println("count: " + count); // 9592forkJoinPool.shutdown();
Example of the getParallelism, getPoolSize, getActiveThreadCount, getRunningThreadCount, isQuiescent methods:
ForkJoinTask<Long> task = new PrimeNumbersCountRecursiveTask(1, 100000, 10);
ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());forkJoinPool.execute(task);do {
logger.info("getParallelism={}, getPoolSize={}, getActiveThreadCount={}, getRunningThreadCount={}, isQuiescent={}",
forkJoinPool.getParallelism(),
forkJoinPool.getPoolSize(),
forkJoinPool.getActiveThreadCount(),
forkJoinPool.getRunningThreadCount(),
forkJoinPool.isQuiescent()
);
Thread.sleep(100);
} while (!task.isDone());System.out.println("count: " + task.getRawResult()); // 9592forkJoinPool.shutdown();
Example of the getQueuedSubmissionCount, getQueuedTaskCount, getStealCount, hasQueuedSubmissions methods:
ForkJoinTask<Long> task = new PrimeNumbersCountRecursiveTask(1, 100000, 10);
ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());forkJoinPool.execute(task);do {
logger.info("getQueuedSubmissionCount={}, getQueuedTaskCount={}, getStealCount={}, hasQueuedSubmissions={}",
forkJoinPool.getQueuedSubmissionCount(),
forkJoinPool.getQueuedTaskCount(),
forkJoinPool.getStealCount(),
forkJoinPool.hasQueuedSubmissions()
);
Thread.sleep(100);
} while (!task.isDone());System.out.println("count: " + task.getRawResult()); // 9592forkJoinPool.shutdown();
The Executors class
The Executors class has factory methods for the ExecutorService, ScheduledExecutorService, ThreadFactory, Callable classes with commonly useful configuration settings.
Factory methods for ExecutorService instances
Methods to create ExecutorService instances:
- static ExecutorService newCachedThreadPool()
- static ExecutorService newFixedThreadPool(int nThreads)
- static ExecutorService newSingleThreadExecutor()
There are overloaded versions of these methods that have an additional ThreadFactory parameter.
Sources of the newCachedThreadPool method:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
This executor is effectively unbounded (maximumPoolSize=Integer.MAX_VALUE) and terminates all threads (corePoolSize=0 and allowCoreThreadTimeOut=false) that have been idle for 60 seconds. The direct handoffs queue (SynchronousQueue) doesn’t hold submitted tasks and each submitted task starts a new thread or uses an idle one. This executor can be useful for executing many short-lived tasks.
Sources of the newFixedThreadPool method:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
This executor can create no more than the fixed amount of threads. Once created, threads aren’t terminated until the thread pool is shut down. The unbounded queue (LinkedBlockingQueue without capacity) is used, so If additional tasks are submitted when all threads are busy, the tasks will wait in the queue until threads are available. Because neither extended threads (corePoolSize=maximumPoolSize) nor core threads (allowCoreThreadTimeOut=false) can’t be terminated by inactivity, keepAliveTime doesn’t matter here.
Sources of the newSingleThreadExecutor method:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
This thread pool can create no more than 1 thread. The thread pool and the queue behavior are identical to those described previously. Tasks are guaranteed to execute sequentially, and no more than one task will be active at any given time. The executor is decorated with a non-reconfigurable wrapper, so it cannot be reconfigured after creation.
Factory methods for ScheduledExecutorService instances
Methods to create ScheduledExecutorService instances
- static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
- static ScheduledExecutorService newSingleThreadScheduledExecutor()
There are overloaded versions of these methods that have an additional ThreadFactory parameter.
Sources of the newScheduledThreadPool method:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
Sources of the newSingleThreadScheduledExecutor method:
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
}
The executor is decorated with a non-reconfigurable wrapper, so it cannot be reconfigured after creation.
Factory methods for non-reconfigurable executors
Methods to create non-reconfigurable ExecutorService, ScheduledExecutorService instances:
- static ExecutorService unconfigurableExecutorService(ExecutorService executor)
- static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor)
Examples
Example of the unconfigurableExecutorService method:
(the method returns an object that delegates all defined ExecutorService methods to the given executor, but not any other methods that might otherwise be accessible using cast)
ExecutorService executorService = Executors.unconfigurableExecutorService(
new ThreadPoolExecutor(1, 2,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>())
);ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService; // java.lang.ClassCastException
Example of the unconfigurableScheduledExecutorService method:
(the method returns an object that delegates all defined ScheduledExecutorService methods to the given executor, but not any other methods that might otherwise be accessible using cast)
ScheduledExecutorService scheduledExecutorService = Executors.unconfigurableScheduledExecutorService(
new ScheduledThreadPoolExecutor(1)
);ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) scheduledExecutorService; // java.lang.ClassCastException
Factory methods for work-stealing executors
Methods to create work-stealing ExecutorService instances:
- static ExecutorService newWorkStealingPool()
- static ExecutorService newWorkStealingPool(int parallelism)
Examples
ForkJoinTask<Long> task = new PrimeNumbersCountRecursiveTask(1, 100000, 10);
ForkJoinPool forkJoinPool = (ForkJoinPool) Executors.newWorkStealingPool();Long count = forkJoinPool.invoke(task);
System.out.println("count: " + count); // 9592
Factory methods for ThreadFactory instances
Methods to create ThreadFactory instances:
- static ThreadFactory defaultThreadFactory()
Sources of the defaultThreadFactory method:
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix; DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
} public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
This thread factory is implicitly used in the ThreadPoolExecutor, ScheduledThreadPoolExecutor constructors.
Factory methods for Callable instances
Methods to create Callable instances:
- static Callable<Object> callable(Runnable task)
- static <T> Callable<T> callable(Runnable task, T result)
Examples
Example of the method callable with Runnable parameter:
Runnable runnable = () -> System.out.println("running ...");
Callable<Object> callable = Executors.callable(runnable);
System.out.println("result: " + callable.call()); // null
Example of the method callable with Runnable parameter and result:
Runnable runnable = () -> System.out.println("running ...");
Callable<Integer> callable = Executors.callable(runnable, 1);
System.out.println("result: " + callable.call()); // 1
Conclusion
The optimum size of a thread pool depends on the nature of the tasks and the number of available processors.
For CPU-bound tasks on an N-processor system, it’s possible to achieve maximum CPU utilization with a thread pool of N or N+1 threads. However, if more threads are used, performance degrades because of the additional thread context switching overhead.
For I/O-bound tasks it’s reasonable to increase the pool size beyond the number of available processors because not all threads will be working at all times. On an N-processor system, it’s reasonable to have approximately N*(1+waiting_time/service_time) threads to keep maximum processors utilization. If fewer threads are used, threads block on I/O, wasting idle CPU cores when there are tasks to be done.
Code examples are available in the GitHub repository.