Demystifying Java Threads: Producer and Consumer Problem

Farrukh Masroor
7 min readMar 13, 2024

--

In this lesson, we will cover a real-world problem related to thread synchronization and inter-thread communication the Producer and Consumer Problem.

In this lesson, we’ll delve into a common yet critical issue in concurrent programming: the Producer and Consumer Problem. This scenario simulates a real-world situation where a thread (producer) generates data that is consumed by other threads (consumers). The challenge lies in ensuring that producers and consumers can operate concurrently without data corruption or deadlock. We’ll discuss various strategies and synchronization techniques to tackle this challenge effectively.

Problem statement

The Producer and Consumer problem also known as (bounded-buffer problem) involves two processes, the producer and the consumer, which share a common, fixed-size buffer used as a queue.

  • The producer’s job is to generate data, put it into the buffer, and start again.
  • At the same time, the consumer is consuming the data (i.e. removing it from the buffer), one piece at a time.

Our task is to make sure that the

  • producer won’t try to add data into the buffer if it’s full and
  • that the consumer won’t try to remove data from an empty buffer,
  • also the producer and consumer may try to update the queue at the same time. This could lead to data loss or inconsistencies.

Let’s understand this better with an example let there be a company that generates data in a shared resource let’s say a Queue, and in the Company we can add data in the Queue or we can remove data from the Queue.

So in the Company class, we have shared resources and two methods to produce and consume the data.

package com.threads;

import java.util.LinkedList;
import java.util.Queue;

public class Company {

Queue<Integer> queue;

int size;

Company(int size){
this.queue= new LinkedList<>();
this.size=size;
}

boolean producer=true;
public void produce(int num) {
if (producer || this.queue.size()<this.size){
queue.add(num);
producer=false;
System.out.println("produced : "+num);
}
}

public int consume() {
if (!producer || !this.queue.isEmpty()){
System.out.println("Consumer :"+Thread.currentThread().getName()+" consuming");
System.out.println("consumed : "+queue.peek());
producer=true;
return queue.remove();
}
return -1;
}
}

We have added the condition to check that the producer will produce the data when the producer flag is true and the Queue is not filled, and the producer will consume the data only when the Queue is not empty and the producer has put the data in the Queue.

Now let’s create a class that acts as a producer to produce data in the Company. The producer generates data every second in the Company and pushes it into the Queue.

package com.threads;

public class Producer extends Thread{
Company company;
Producer(Company company){
this.company=company;
}
@Override
public void run(){
int i=1;
while (true) {
try {
company.produce(i);
System.out.println("Queue state"+company.queue);
i++;
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}

Let’s also create a Consumer class that will consume the data produced by the Producer.

package com.threads;

public class Consumer extends Thread{
Company company;
Consumer (Company company){
this.company=company;
}
@Override
public void run(){
while (true){
try {
company.consume();
System.out.println("Queue state after consuming"+company.queue);
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

}
}
}

Now let’s create our Production class which will drive our company to produce and consume the data.

package com.threads;

public class Production {
public static void main(String args[]){
Company company= new Company(5);
Producer producer= new Producer(company);
Consumer consumer= new Consumer(company);

producer.setName("producer-thread");
producer.start();
consumer.setName("consumer-thread-1");
consumer.start();
}
}

When we run the above program with a single thread, there seem to be no problems, but as our consumer is faster as compared to the Producer so when the Producer is sleeping the consumer will try to consume and end up getting no message.

The output of the above program will be:

produced : 1
Queue state[1]
Consumer :consumer-thread-1 consuming
consumed : 1
Queue state after consuming[]
Queue state after consuming[]
Queue state after consuming[]
Queue state after consuming[]
produced : 2
Queue state[2]
Consumer :consumer-thread-1 consuming
consumed : 2
Queue state after consuming[]
Queue state after consuming[]
Queue state after consuming[]
Queue state after consuming[]
produced : 3
Queue state[3]
Consumer :consumer-thread-1 consuming
consumed : 3
Queue state after consuming[]
Queue state after consuming[]
Queue state after consuming[]
Queue state after consuming[]
produced : 4
Queue state[4]
Consumer :consumer-thread-1 consuming
consumed : 4
Queue state after consuming[]
Queue state after consuming[]
Queue state after consuming[]
Queue state after consuming[]
produced : 5
Queue state[5]
Consumer :consumer-thread-1 consuming
consumed : 5
Queue state after consuming[]

The output clearly shows that the consumer repeatedly attempts to consume data even when none is present, violating our Producer and Consumer conditions. The situation becomes more problematic with multiple consumers. If we add another consumer to the code, the output will be affected.

public static void main(String args[]){
Company company= new Company(5);
Producer producer= new Producer(company);
Consumer consumer= new Consumer(company);
Consumer consumer2= new Consumer(company);

producer.setName("producer-thread");
producer.start();
consumer.setName("consumer-thread-1");
consumer2.setName("consumer-thread-2");
consumer.start();
consumer2.start();
}

produced : 1
Consumer :consumer-thread-2 consuming
Consumer :consumer-thread-1 consuming
consumed : 1
consumed : 1
Queue state[]
Queue state after consuming[]
Exception in thread “consumer-thread-1” java.util.NoSuchElementException
at java.base/java.util.LinkedList.removeFirst(LinkedList.java:274)
at java.base/java.util.LinkedList.remove(LinkedList.java:689)
at com.threads.Company.consume(Company.java:64)
at com.threads.Consumer.run(Consumer.java:12)
Queue state after consuming[]
Queue state after consuming[]
Queue state after consuming[]
produced : 2
Queue state[2]
Consumer :consumer-thread-2 consuming
consumed : 2
Queue state after consuming[]
Queue state after consuming[]
Queue state after consuming[]
Queue state after consuming[]
produced : 3
Queue state[3]
Consumer :consumer-thread-2 consuming
consumed : 3
Queue state after consuming[]
Queue state after consuming[]
Queue state after consuming[]

In the output above, both consumer-thread-1 and consumer-thread-2 attempt to consume data simultaneously. However, as consumer-thread-2 has already consumed the data, the queue becomes empty. Consequently, when consumer-thread-1 tries to execute queue.remove(), it encounters the java.util.NoSuchElementException exception.

To avoid such conditions we need to make sure that at a time only a single thread can access the shared resource otherwise we will get the errors and inconsistent state.

So as we have discussed in our previous lesson on synchronization to make our shared resource be used by a single thread at a time we need to use synchronization.

Let’s implement the logic of our Company again with synchronization.

To use synchronization we will add a synchronized keyword which marks our method as synchronized and only a single thread can access the code at a time.

package com.threads;

import java.util.LinkedList;
import java.util.Queue;

public class Company {



Queue<Integer> queue;

int size;

Company(int size){
this.queue= new LinkedList<>();
this.size=size;
}

boolean producer=true;
synchronized public void produce(int num) throws InterruptedException {

if (!producer || this.queue.size()>this.size){
System.out.println("waiting "+Thread.currentThread().getName());
wait();
}

queue.add(num);
producer=false;
System.out.println("produced : "+num);
notifyAll();
}

synchronized public int consume() throws InterruptedException {

if (producer || this.queue.isEmpty()){
System.out.println("waiting "+Thread.currentThread().getName());
wait();
}
System.out.println("Consumer :"+Thread.currentThread().getName()+" consuming");
System.out.println("consumed : "+queue.peek());
producer=true;
notifyAll();
return queue.remove();
}
}

Now our code is synchronized, so now will our problems end?

Let’s see the output

produced : 1
Consumer :consumer-thread-2 consuming
consumed : 1
waiting consumer-thread-1
produced : 2
Consumer :consumer-thread-1 consuming
consumed : 2
waiting consumer-thread-2
produced : 3
Consumer :consumer-thread-2 consuming
consumed : 3
waiting consumer-thread-1
waiting consumer-thread-2
produced : 4
Consumer :consumer-thread-1 consuming
consumed : 4
Consumer :consumer-thread-2 consuming
consumed : null
Exception in thread “consumer-thread-2” java.util.NoSuchElementException
at java.base/java.util.LinkedList.removeFirst(LinkedList.java:274)
at java.base/java.util.LinkedList.remove(LinkedList.java:689)
at com.threads.Company.consume(Company.java:57)
at com.threads.Consumer.run(Consumer.java:12)
produced : 5
Consumer :consumer-thread-1 consuming
consumed : 5
produced : 6
Consumer :consumer-thread-1 consuming
consumed : 6
produced : 7
Consumer :consumer-thread-1 consuming
consumed : 7
produced : 8
Consumer :consumer-thread-1 consuming
consumed : 8

Hmm 😩 why still this error, we have synchronized and made every effort to make a single thread to access the code, but what happens after the producer produces 4 data,

When the producer produces the data and calls the notifyAll method to notify all the waiting threads to wake up and start their execution when the consumer-thread-2 wakes up it consumes the data, but when the consumer-thread-1 wakes up and tries to consume the data but by this time the queue is empty and when it removes the data from empty queue we got the exception

To overcome this scenario we need to modify our condition to check if the queue is empty or not.

package com.threads;

import java.util.LinkedList;
import java.util.Queue;

public class Company {



Queue<Integer> queue;

int size;

Company(int size){
this.queue= new LinkedList<>();
this.size=size;
}

boolean producer=true;
synchronized public void produce(int num) throws InterruptedException {

while (!producer && queue.size()>=size){
System.out.println(producer+" "+queue.size()+" "+size);
System.out.println("waiting "+Thread.currentThread().getName());
wait();
}

queue.add(num);
producer=false;
System.out.println("produced : "+num);
notifyAll();
}


synchronized public int consume() throws InterruptedException {

while (producer && queue.isEmpty()){
System.out.println("waiting "+Thread.currentThread().getName());
wait();
}
System.out.println("Consumer :"+Thread.currentThread().getName()+" consuming");
System.out.println("consumed : "+queue.peek());
producer=true;
notifyAll();
return queue.remove();
}


}

We’ve made a crucial update by replacing the ‘if’ with a ‘while’ loop. Now, when both threads wake up, the second thread will recheck the conditions. Since the queue is empty, the condition remains true, and the second thread goes back to waiting. This modification protects us from encountering the exception 😃.

Our journey doesn’t end here! In our upcoming lessons, we’ll continue to explore various threading challenges, diving deeper into the world of programming to discover more efficient and effective solutions. So, stay tuned and join us as we unravel the next chapter in our demystifying threading adventure!

--

--

Farrukh Masroor

Welcome to my corner of the tech world! I'm Farrukh Masroor, a seasoned software engineer with a passion for all things technology.