Custom Implementation of ExecutorService in Java
Step 1 — Understand the Requirements
Before coding, we define the structure for our custom executor service:
- A fixed-size thread pool to manage worker threads.
- A task queue to hold incoming tasks.
- A method to submit tasks (execute()).
- Worker threads that fetch and run tasks from the queue.
- A mechanism for graceful shutdown of threads.
Step 2 — Create the Basic Thread Pool Structure
We create the main class CustomThreadPool that manages:
- An array of worker threads.
- A queue for holding tasks (Runnable objects).
Code:
import java.util.LinkedList;
import java.util.Queue;
public class CustomThreadPool {
private final int nThreads; // number of threads in pool
private final PoolWorker[] threads; // worker threads
private final Queue taskQueue; // queue to store tasks
public CustomThreadPool(int nThreads) {
this.nThreads = nThreads;
taskQueue = new LinkedList<>();
threads = new PoolWorker[nThreads];
for (int i = 0; i < nThreads; i++) {
threads[i] = new PoolWorker();
threads[i].start();
}
}
}
Step 3 — Implement Task Submission (execute method)
The execute() method adds tasks to the queue and notifies waiting threads.
Code:
public void execute(Runnable task) {
synchronized (taskQueue) {
taskQueue.add(task); // add task to queue
taskQueue.notify(); // notify waiting worker threads
}
}
Step 4 — Implement Worker Threads
Each worker thread should:
- Continuously look for tasks in the queue.
- If no tasks are available, wait.
- When a task is available, execute it.
Code:
private class PoolWorker extends Thread {
public void run() {
Runnable task;
while (true) {
synchronized (taskQueue) {
while (taskQueue.isEmpty()) {
try {
taskQueue.wait();
} catch (InterruptedException e) {
return; // exit if interrupted
}
}
task = taskQueue.poll();
}
try {
if (task != null) task.run();
} catch (RuntimeException e) {
System.err.println("Error executing task: " + e.getMessage());
}
}
}
}
Step 5 — Implement Graceful Shutdown
We add a shutdown() method to stop all threads in the pool gracefully.
Code:
public void shutdown() {
for (int i = 0; i < nThreads; i++) {
threads[i].interrupt();
}
}
Step 6 — Test the Custom Thread Pool
We create a test program to verify that the thread pool executes tasks correctly.
Code:
public class CustomThreadPoolTest {
public static void main(String[] args) {
CustomThreadPool pool = new CustomThreadPool(3); // pool of 3 threads
for (int i = 0; i < 6; i++) {
final int taskId = i;
pool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is executing task " + taskId);
try {
Thread.sleep(2000); // simulate task execution
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " finished task " + taskId);
});
}
pool.shutdown();
}
}
Step 7 — Expected Output
Thread-0 is executing task 0
Thread-1 is executing task 1
Thread-2 is executing task 2
Thread-0 finished task 0
Thread-0 is executing task 3
Thread-1 finished task 1
Thread-1 is executing task 4
Thread-2 finished task 2
Thread-2 is executing task 5
Thread-0 finished task 3
Thread-1 finished task 4
Thread-2 finished task 5
Full Code — Custom Thread Pool Example
import java.util.LinkedList;
import java.util.Queue;
// Step 1 & 2 — Create the Basic Thread Pool Structure
class CustomThreadPool {
private final int nThreads; // Number of threads in the pool
private final PoolWorker[] threads; // Array of worker threads
private final Queue<Runnable> taskQueue; // Task queue
public CustomThreadPool(int nThreads) {
this.nThreads = nThreads;
taskQueue = new LinkedList<>();
threads = new PoolWorker[nThreads];
// Initialize and start worker threads
for (int i = 0; i < nThreads; i++) {
threads[i] = new PoolWorker();
threads[i].start();
}
}
// Step 3 — Task Submission
public void execute(Runnable task) {
synchronized (taskQueue) {
taskQueue.add(task); // Add task to the queue
taskQueue.notify(); // Notify waiting worker threads
}
}
// Step 4 — Worker Thread Definition
private class PoolWorker extends Thread {
public void run() {
Runnable task;
while (true) {
synchronized (taskQueue) {
// Wait for a task if queue is empty
while (taskQueue.isEmpty()) {
try {
taskQueue.wait();
} catch (InterruptedException e) {
return; // Exit thread if interrupted
}
}
// Retrieve next task from queue
task = taskQueue.poll();
}
// Execute the task
try {
if (task != null) {
task.run();
}
} catch (RuntimeException e) {
System.err.println("Error executing task: " + e.getMessage());
}
}
}
}
// Step 5 — Graceful Shutdown
public void shutdown() {
for (int i = 0; i < nThreads; i++) {
threads[i].interrupt(); // Interrupt all worker threads
}
}
}
// Step 6 — Testing the Custom Thread Pool
public class CustomThreadPoolTest {
public static void main(String[] args) {
// Create a thread pool with 3 threads
CustomThreadPool pool = new CustomThreadPool(3);
// Submit 6 tasks
for (int i = 0; i < 6; i++) {
final int taskId = i;
pool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is executing task " + taskId);
try {
Thread.sleep(2000); // Simulate task execution
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " finished task " + taskId);
});
}
// Shutdown the pool after submitting tasks
pool.shutdown();
}
}
Expected Output (Order may vary due to concurrency)
Thread-0 is executing task 0
Thread-1 is executing task 1
Thread-2 is executing task 2
Thread-0 finished task 0
Thread-0 is executing task 3
Thread-1 finished task 1
Thread-1 is executing task 4
Thread-2 finished task 2
Thread-2 is executing task 5
Thread-0 finished task 3
Thread-1 finished task 4
Thread-2 finished task 5
How to Run
- Save this file as CustomThreadPoolTest.java.
Compile:
javac CustomThreadPoolTest.javaRun:
java CustomThreadPoolTest
Add Optional Improvements
After basic implementation, you can enhance the custom thread pool by adding:
- Dynamic thread pool resizing.
- Task rejection policies when queue is full.
- Support for Future tasks to return results.
- Timeout handling for waiting threads.
Real-World Usage
This custom implementation is useful for:
- Learning how ExecutorService works internally.
- Building lightweight thread pools for small projects.
- Situations where you need complete control over task execution.
Production Recommendation
For production systems, always prefer Java’s built-in ExecutorService (Executors.newFixedThreadPool(), etc.) because:
- It’s optimized for performance.
- It handles advanced features like task scheduling, shutdown hooks, and thread safety.
- It avoids common pitfalls of manual thread pool implementation.
Next Blog- Custom Implementation of CompletableFuture in Java
