Many times we use tools Java provides out of the box, but rarely do we dig deeper into how they function. As many would argue that you could be just fine and that some level of ignorance is healthy, we as software engineers have an innate urge to understand how things work.
In this post, we will build a really basic thread pool and pinpoint some drawbacks that the approach may present.
What is a thread pool?
Before getting into the code, let’s understand what is a thread pool on a higher level.
Imagine that you are manager of a fast food restaurant and you have 10 employees working for you. Usually there is 1 employee that works from the start, until the end of the working day. If there is a spike in clients, you start calling the other employees to arrive at work. After 30 minutes the employees you called came and started work. During that time there were some unhappy customers that left the restaurant because they saw things are going really slow with only one open cashier. As the incoming workers started helping things came back to normal.
After awhile, you as a manager, noticed that these spikes have a pattern and you decide to call all the people on certain days. That way people would have warmed-up before the spike hits and all customers will be served timely.
Now, for the computer world example… I think you got this…but in short – If you have only one thread working and you summon new ones ad-hoc there is time and resources wasted during the initialisation and destroy phase of the thread. If the spikes in load are not planned and happen frequently the JVM would spend a lot of time doing thread management and you would have less resources to do the actual work that you have created the program for!
If you, on the other hand, expect these spikes to happen often or you know that your load is unpredictable you could create a few threads beforehand. And these would be your thread pool!
Implementing a thread pool
Let’s see how we can implement a thread pool in Java.
First, let’s start with the way we are going to use the Thread pool, so we start on the high level, and then we will go into the implementation of each component.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| public class Main { | |
| public static void main(String[] args) throws InterruptedException { | |
| ITask summator1 = new Summator((byte) 90); | |
| ITask summator2 = new Summator((byte) 9); | |
| ITask summator3 = new Summator((byte) 30); | |
| ITask summator4 = new Summator((byte) 65); | |
| ITask summator5 = new Summator((byte) 100); | |
| IThreadPool myThreadPool = new ThreadPool(4, 4); | |
| myThreadPool.schedule(summator1); | |
| myThreadPool.schedule(summator2); | |
| myThreadPool.schedule(summator3); | |
| myThreadPool.schedule(summator4); | |
| myThreadPool.schedule(summator5); | |
| Thread.sleep(1000); | |
| myThreadPool.stop(); | |
| } | |
| } |
First we create a few tasks Summator that we feed into the thread pool. We setup the pool to have 4 threads ready to do the job and the maximum number of tasks in the buffer is 4.
Let’s take a look at the ThreadPool class.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.util.ArrayList; | |
| import java.util.List; | |
| import java.util.concurrent.ArrayBlockingQueue; | |
| import java.util.concurrent.BlockingQueue; | |
| public class ThreadPool implements IThreadPool { | |
| private List<PoolThread> threads; | |
| private BlockingQueue<ITask> tasks; | |
| private volatile boolean isStopped = false; | |
| public ThreadPool(int numberOfThreads, int maxNumberOfTasks) { | |
| threads = new ArrayList<>(); | |
| tasks = new ArrayBlockingQueue<>(maxNumberOfTasks); | |
| this.fillThreads(numberOfThreads); | |
| this.start(); | |
| } | |
| @Override | |
| public synchronized void schedule(ITask task) { | |
| if (this.isStopped) { | |
| throw new IllegalArgumentException("Cannot accept any more tasks!"); | |
| } | |
| try { | |
| this.tasks.put(task); | |
| } catch (InterruptedException e) { | |
| } | |
| } | |
| @Override | |
| public synchronized void stop() { | |
| this.isStopped = true; | |
| for (PoolThread thread : this.threads) { | |
| thread.interrupt(); | |
| } | |
| } | |
| private void start() { | |
| for (Thread thread : this.threads) { | |
| thread.start(); | |
| } | |
| } | |
| private void fillThreads(int numberOfThreads) { | |
| for (int i = 0; i < numberOfThreads; i++) { | |
| this.threads.add(new PoolThread(this.tasks)); | |
| } | |
| } | |
| } |
Pay no attention to the IThreadPool interface which I used just to design the API of the class.
In the constructor we initialise a Blocking Queue with the parameter that denotes the maximum number of tasks that you can add to the queue before the producer has to wait to add additional item.
We also initialise a list of PoolThread which is just extends Thread and waits for tasks from the queue.
The schedule method adds a task to the queue.
The stop method interrupts all threads.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.util.concurrent.BlockingQueue; | |
| public class PoolThread extends Thread { | |
| private BlockingQueue<ITask> tasks; | |
| private boolean isStopped = false; | |
| public PoolThread(BlockingQueue<ITask> tasks) { | |
| this.tasks = tasks; | |
| } | |
| @Override | |
| public void run() { | |
| while (!this.isStopped) { | |
| try { | |
| ITask task = this.tasks.take(); | |
| task.workOnTask(); | |
| } catch (InterruptedException e) { | |
| this.isStopped = true; | |
| } | |
| } | |
| } | |
| } |
PoolThread is the worker, the individual employee that can perform a task. So in our case this worker just calls a method on the task, effectively starting it. In the restaurant example that would mean switching on the oven.
run method waits for tasks on the queue. the take call will block until a task is added to the queue. BlockingQueue is interesting by itself, so I would encourage you to check this article if you want to know more.
The final piece is the class that I use as a unit of work is Summator class. I used a custom interface but you can use Runnable too.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| public class Summator implements ITask { | |
| private byte endRange; | |
| private long result; | |
| public Summator(byte endRange) { | |
| this.endRange = endRange; | |
| this.result = 0; | |
| } | |
| @Override | |
| public void workOnTask() { | |
| for (int i = 1; i < this.endRange; i++) { | |
| this.result += i; | |
| } | |
| String msg = "Done with the task! The result is: " + | |
| this.result + | |
| "; The job was done from thread: " + | |
| Thread.currentThread(); | |
| System.out.println(msg); | |
| } | |
| } |
You can implement the Task interface and make other tasks that our ThreadPool can work with.
Risks
- Deadlock – this can happen when a async task can wait on another async task. What can happen is that “Task 1” can wait on “Task 2” and vice versa. The program cannot continue.
- Thread leakage – When a thread is taken from the pool but is not returned to it. If that thread throws exception that will reduce the size of the pool. Eventually leading to empty thread pool.
- Thread starvation – when there are too many threads, some of them will not get tasks. Also, having many threads increases the context switching between them.
Considerations
It is important to properly calculate the size of the pool. It is a general rule of thumb to have as many threads as there are cores in your processor + 1. But we have to keep in mind that the threads could wait on I/O.
Summary
I hope that after this article you have a better understanding how thread pools work and it should make you more confident about using, configuring and tuning your thread pool!
Java has wonderful classes that abstract the thread pool for you, so I would advice you to use them! Have some fun with Java Executor!
Happy coding!