Producer-Consumer Problem Using Custom LinkedBlockingQueue in Java
Complete Step-by-Step Guide
Introduction
The Producer-Consumer problem is a classic example of a multithreading problem where one or more producer threads generate data and place it into a shared resource (a queue), and one or more consumer threads take data from the shared resource for processing.
The main challenges in this problem are:
- Synchronization — to avoid race conditions.
- Blocking — producers should wait when the queue is full, and consumers should wait when it’s empty.
- Thread safety — to ensure consistent data access.
Java provides tools like BlockingQueue for this, but in this blog we’ll solve it using our CustomLinkedBlockingQueue from the previous blog, building a complete producer-consumer system.
Step-by-Step Implementation
Step 1 — Understand Requirements
We need:
- A thread-safe queue (CustomLinkedBlockingQueue) with capacity.
- Producer threads to add tasks to the queue.
- Consumer threads to take tasks from the queue.
- Proper synchronization and blocking behavior.
Step 2 — Define CustomLinkedBlockingQueue
We will use the CustomLinkedBlockingQueue from the previous blog as our shared resource.
If you missed it, here’s the class name for reference:
CustomLinkedBlockingQueue
This queue already supports:
- put() — blocks when the queue is full.
- take() — blocks when the queue is empty.
Step 3 — Create Producer Class
The producer will generate integers and put them into the queue.
class Producer implements Runnable {
private final CustomLinkedBlockingQueue queue;
public Producer(CustomLinkedBlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
int value = 1;
while (true) {
try {
System.out.println(Thread.currentThread().getName() + " producing: " + value);
queue.put(value++);
Thread.sleep(500); // simulate time taken to produce
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
Step 4 — Create Consumer Class
The consumer will take integers from the queue and process them.
class Consumer implements Runnable {
private final CustomLinkedBlockingQueue queue;
public Consumer(CustomLinkedBlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
Integer value = queue.take();
System.out.println(Thread.currentThread().getName() + " consuming: " + value);
Thread.sleep(1000); // simulate time taken to consume
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
Step 5 — Create Main Class to Test Producer-Consumer
We create multiple producer and consumer threads.
public class ProducerConsumerTest {
public static void main(String[] args) {
CustomLinkedBlockingQueue queue = new CustomLinkedBlockingQueue<>(5);
Thread producer1 = new Thread(new Producer(queue), "Producer-1");
Thread producer2 = new Thread(new Producer(queue), "Producer-2");
Thread consumer1 = new Thread(new Consumer(queue), "Consumer-1");
Thread consumer2 = new Thread(new Consumer(queue), "Consumer-2");
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
}
}
Step 6 — Expected Output
Producer-1 producing: 1
Producer-2 producing: 2
Consumer-1 consuming: 1
Producer-1 producing: 3
Consumer-2 consuming: 2
Producer-2 producing: 4
...
Explanation:
- Producers and consumers work concurrently.
- Producers wait if the queue is full.
- Consumers wait if the queue is empty.
- Thread-safe access ensures no data corruption.
Step 7 — Add Graceful Shutdown (Optional)
In real-world systems, we need to stop threads gracefully.
Add a shutdown flag in producer and consumer classes:
private volatile boolean running = true;
public void shutdown() {
running = false;
}
Then in run() method:
while (running) {
...
}
In main:
producer1.shutdown();
consumer1.shutdown();
Complete code
// Node class
class Node<T> {
T item;
Node<T> next;
Node(T item) {
this.item = item;
this.next = null;
}
}
// Custom LinkedBlockingQueue implementation
class CustomLinkedBlockingQueue<T> {
private Node<T> head;
private Node<T> tail;
private int size;
private final int capacity;
public CustomLinkedBlockingQueue(int capacity) {
this.capacity = capacity;
head = tail = new Node<>(null); // dummy node
}
// Add element (Producer)
public synchronized void put(T item) throws InterruptedException {
while (size == capacity) {
wait(); // wait if queue is full
}
Node<T> newNode = new Node<>(item);
tail.next = newNode;
tail = newNode;
size++;
notifyAll(); // notify waiting consumers
}
// Remove element (Consumer)
public synchronized T take() throws InterruptedException {
while (size == 0) {
wait(); // wait if queue is empty
}
Node<T> first = head.next;
head.next = first.next;
size--;
if (size == 0) {
tail = head; // reset tail when empty
}
notifyAll(); // notify waiting producers
return first.item;
}
public synchronized int size() {
return size;
}
}
// Producer class
class Producer implements Runnable {
private final CustomLinkedBlockingQueue<Integer> queue;
public Producer(CustomLinkedBlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
int value = 1;
while (true) {
try {
System.out.println(Thread.currentThread().getName() + " producing: " + value);
queue.put(value++);
Thread.sleep(500); // simulate production delay
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
// Consumer class
class Consumer implements Runnable {
private final CustomLinkedBlockingQueue<Integer> queue;
public Consumer(CustomLinkedBlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
Integer value = queue.take();
System.out.println(Thread.currentThread().getName() + " consuming: " + value);
Thread.sleep(1000); // simulate consumption delay
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
// Main Test Class
public class ProducerConsumerTest {
public static void main(String[] args) {
CustomLinkedBlockingQueue<Integer> queue = new CustomLinkedBlockingQueue<>(5);
Thread producer1 = new Thread(new Producer(queue), "Producer-1");
Thread producer2 = new Thread(new Producer(queue), "Producer-2");
Thread consumer1 = new Thread(new Consumer(queue), "Consumer-1");
Thread consumer2 = new Thread(new Consumer(queue), "Consumer-2");
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
}
}
Explanation:
- Two producers generate integers and insert them into the custom blocking queue.
- Two consumers retrieve and process those integers.
- Thread safety is achieved using synchronized, wait(), and notifyAll().
- The queue blocks automatically when full or empty, ensuring balanced producer-consumer flow.
Real-World Applications
The producer-consumer pattern with a blocking queue is useful for:
- Web server request handling.
- Data processing pipelines.
- Logging systems.
- Task scheduling systems.
Enhancements
After implementing the basic producer-consumer problem, you can enhance:
- Multiple producer-consumer coordination using fairness.
- Priority-based consumption using priority queues.
- Timeout operations for put/take methods.
- Monitoring and metrics for production-level systems.
Best Practices
While custom implementations help in learning, in real projects use Java’s built-in:
- LinkedBlockingQueue
- ArrayBlockingQueue
- PriorityBlockingQueue
These are highly optimized, thread-safe, and avoid common concurrency pitfalls.
Summary
The producer-consumer problem is a fundamental concurrency problem that helps developers understand synchronization and inter-thread communication.
By implementing it with a custom LinkedBlockingQueue, we learned how to:
- Ensure thread safety using synchronized blocks.
- Use wait() and notifyAll() to block threads efficiently.
- Maintain FIFO ordering in shared resources.
- Balance producers and consumers so neither overwhelms the other.
This hands-on approach deepens your understanding of multithreading, blocking queues, and concurrency design patterns. Implementing your own producer-consumer system builds a strong foundation for working with advanced Java concurrency tools such as ExecutorService, CompletableFuture, and reactive frameworks.
Next Blog- Implementing a Custom Thread Pool Using LinkedBlockingQueue in Java
