Custom Implementation of LinkedBlockingQueue in Java
Complete Step-by-Step Guide
Introduction
In Java, LinkedBlockingQueue is part of the java.util.concurrent package and implements the BlockingQueue interface.
It is a thread-safe queue based on linked nodes that supports producer-consumer patterns.
Key Features:
- Thread-safe FIFO (First-In-First-Out) queue.
- Supports blocking operations (put() and take()).
- Optional capacity bounds to control size.
- Useful for inter-thread communication.
Step-by-Step Implementation
Step 1 — Understand Requirements
Our custom LinkedBlockingQueue must:
- Allow multiple producers and consumers to access it concurrently.
- Block producers if the queue is full.
- Block consumers if the queue is empty.
- Support capacity limits.
- Ensure thread safety.
Step 2 — Create the Node Structure
LinkedBlockingQueue is implemented using linked nodes internally.
class Node {
T item;
Node next;
Node(T item) {
this.item = item;
this.next = null;
}
}
Step 3 — Create the CustomLinkedBlockingQueue Class
We define:
- head and tail for the linked list.
- Capacity limit.
- Synchronization mechanisms for producers and consumers.
public class CustomLinkedBlockingQueue {
private Node head;
private Node tail;
private int size;
private final int capacity;
public CustomLinkedBlockingQueue(int capacity) {
this.capacity = capacity;
head = tail = new Node<>(null); // dummy node
}
}
Step 4 — Implement the put() Method (Producer)
Producers add elements to the queue.
If the queue is full, producers must wait.
public synchronized void put(T item) throws InterruptedException {
while (size == capacity) {
wait(); // wait if queue is full
}
Node newNode = new Node<>(item);
tail.next = newNode;
tail = newNode;
size++;
notifyAll(); // notify consumers
}
Step 5 — Implement the take() Method (Consumer)
Consumers remove elements from the queue.
If the queue is empty, consumers must wait.
public synchronized T take() throws InterruptedException {
while (size == 0) {
wait(); // wait if queue is empty
}
Node first = head.next;
head.next = first.next;
size--;
if (size == 0) {
tail = head; // reset tail when queue is empty
}
notifyAll(); // notify producers
return first.item;
}
Step 6 — Implement size() Method
public synchronized int size() {
return size;
}
Step 7 — Test the CustomLinkedBlockingQueue
We create a producer-consumer example.
public class CustomLinkedBlockingQueueTest {
public static void main(String[] args) {
CustomLinkedBlockingQueue queue = new CustomLinkedBlockingQueue<>(3);
// Producer Thread
Thread producer = new Thread(() -> {
int i = 1;
while (true) {
try {
System.out.println("Producing: " + i);
queue.put(i++);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// Consumer Thread
Thread consumer = new Thread(() -> {
while (true) {
try {
Integer value = queue.take();
System.out.println("Consuming: " + value);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
consumer.start();
}
}
Expected Output
Producing: 1
Producing: 2
Producing: 3
Consuming: 1
Producing: 4
Consuming: 2
Producing: 5
Consuming: 3
...Complete code
class Node<T> {
T item;
Node<T> next;
Node(T item) {
this.item = item;
this.next = null;
}
}
public class CustomLinkedBlockingQueue<T> {
private Node<T> head;
private Node<T> tail;
private int size;
private final int capacity;
public CustomLinkedBlockingQueue(int capacity) {
this.capacity = capacity;
head = tail = new Node<>(null); // dummy node
}
// Producer adds element
public synchronized void put(T item) throws InterruptedException {
while (size == capacity) {
wait(); // block producer if full
}
Node<T> newNode = new Node<>(item);
tail.next = newNode;
tail = newNode;
size++;
notifyAll(); // notify consumers
}
// Consumer removes element
public synchronized T take() throws InterruptedException {
while (size == 0) {
wait(); // block consumer if empty
}
Node<T> first = head.next;
head.next = first.next;
size--;
if (size == 0) {
tail = head; // reset tail if queue becomes empty
}
notifyAll(); // notify producers
return first.item;
}
// Returns current queue size
public synchronized int size() {
return size;
}
}
// Test Class
class CustomLinkedBlockingQueueTest {
public static void main(String[] args) {
CustomLinkedBlockingQueue<Integer> queue = new CustomLinkedBlockingQueue<>(3);
// Producer Thread
Thread producer = new Thread(() -> {
int i = 1;
while (true) {
try {
System.out.println("Producing: " + i);
queue.put(i++);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// Consumer Thread
Thread consumer = new Thread(() -> {
while (true) {
try {
Integer value = queue.take();
System.out.println("Consuming: " + value);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
consumer.start();
}
}
Real-World Usage
CustomLinkedBlockingQueue is useful for:
- Producer-consumer problems.
- Thread-safe queues in concurrent programming.
- Task scheduling systems.
- Message passing between threads.
Optional Enhancements
After basic implementation, we can enhance:
- Fairness in thread access (FIFO for waiting threads).
- Separate locks for put and take operations to reduce contention.
- Timeout versions of put() and take().
- Size monitoring and statistics.
Production Recommendation
While building a custom blocking queue is great for understanding concurrency, Java’s built-in LinkedBlockingQueue is optimized and safer for production because it:
- Handles advanced concurrency internally.
- Provides fairness options.
- Supports capacity limits and timeout operations.
Next Blog- Producer-Consumer Problem Using Custom LinkedBlockingQueue in Java
