Producer-consumer problem, also called bounded-buffer problem, is one of the famous real-world scenarios of synchronization, and also the prerequisite to the readers-writers problem.

Figures: Illustration and code of producer-consumer problem [1][2][3]

Outline

(1) Scenario 01: Buffer not full
1–1 Complete output
1–2 Code with explanation
(2) Scenario 02: Buffer full
1–1 Complete output
1–2 Code with explanation
(3) References

(1) Scenario 01: Buffer not full

Let’s see the output on my Ubuntu VPS first.

Figure: Partial output

1–1 Complete output

root@DemoYuChengKuo:~/3_multithreading_cpp/code_04_producer_consumer# ./16_producer_consumer_01 
Producing 1
Buffer size after producing: 1

Consuming 1
Buffer size after consuming: 0

Producing 2
Buffer size after producing: 1

Consuming 2
Buffer size after consuming: 0

Producing 3
Buffer size after producing: 1

Consuming 3
Buffer size after consuming: 0

Producing 4
Buffer size after producing: 1

Consuming 4
Buffer size after consuming: 0

Producing 5
Buffer size after producing: 1

Consuming 5
Buffer size after consuming: 0

Producing 6
Buffer size after producing: 1

Consuming 6
Buffer size after consuming: 0

Producing 7
Buffer size after producing: 1

Consuming 7
Buffer size after consuming: 0

Producing 8
Buffer size after producing: 1

Consuming 8
Buffer size after consuming: 0

Producing 9
Buffer size after producing: 1

Consuming 9
Buffer size after consuming: 0

Producing 10
Buffer size after producing: 1

Consuming 10
Buffer size after consuming: 0

Producing 11
Buffer size after producing: 1

Consuming 11
Buffer size after consuming: 0

Producing 12
Buffer size after producing: 1

Consuming 12
Buffer size after consuming: 0

Producing 13
Buffer size after producing: 1

Consuming 13
Buffer size after consuming: 0

Producing 14
Buffer size after producing: 1

Consuming 14
Buffer size after consuming: 0

Producing 15
Buffer size after producing: 1

Consuming 15
Buffer size after consuming: 0

Producing 16
Buffer size after producing: 1

Consuming 16
Buffer size after consuming: 0

Producing 17
Buffer size after producing: 1

Consuming 17
Buffer size after consuming: 0

Producing 18
Buffer size after producing: 1

Consuming 18
Buffer size after consuming: 0

Producing 19
Buffer size after producing: 1

Consuming 19
Buffer size after consuming: 0

Producing 20
Buffer size after producing: 1

Consuming 20
Buffer size after consuming: 0

1–2 Code with explanation

// 16_producer_consumer_01.cpp
#include <iostream> // Include for input/output operations
#include <queue> // Include for using the queue data structure
#include <thread> // Include for using thread functionality
#include <mutex> // Include for using mutex for synchronization
#include <condition_variable> // Include for using condition variables

std::mutex mtx; // Mutex for synchronization
std::condition_variable cond_var; // Condition variable for producer-consumer signaling
std::queue<int> buffer; // Queue to act as a buffer
const unsigned int MAX_BUFFER_SIZE = 10; // Maximum size of the buffer
void producer(int value) {
std::unique_lock<std::mutex> lock(mtx); // Lock the mutex
cond_var.wait(lock, [] { return buffer.size() < MAX_BUFFER_SIZE; }); // Wait until there's space in buffer
buffer.push(value); // Add value to the buffer
std::cout << "Producing " << value << std::endl; // Output the produced value
std::cout << "Buffer size after producing: " << buffer.size() << std::endl << std::endl; // Display buffer size after producing
lock.unlock(); // Unlock the mutex
cond_var.notify_one(); // Notify one waiting thread
}
void consumer() {
std::unique_lock<std::mutex> lock(mtx); // Lock the mutex
cond_var.wait(lock, [] { return buffer.size() > 0; }); // Wait until there's something in the buffer
int value = buffer.front(); // Get the front value from buffer
buffer.pop(); // Remove the value from buffer
std::cout << "Consuming " << value << std::endl; // Output the consumed value
std::cout << "Buffer size after consuming: " << buffer.size() << std::endl << std::endl; // Display buffer size after consuming
lock.unlock(); // Unlock the mutex
cond_var.notify_one(); // Notify one waiting thread
}
int main() {
std::thread producerThread([] {
for (int i = 1; i <= 20; ++i) {
producer(i); // Create a thread to produce values
}
});
std::thread consumerThread([] {
for (int i = 1; i <= 20; ++i) {
consumer(); // Create a thread to consume values
}
});
producerThread.join(); // Wait for producer thread to finish
consumerThread.join(); // Wait for consumer thread to finish
return 0;
}

[Remark]

  • [1] Note the uses of queue, mutex, and the condition variable (for some specific conditions to trigger something) to ensure the reliability of the transmission between the producer and the consumer.
  • [2] The mutex (mtx) and condition variable (cond_var) are used to synchronize access to a shared buffer (buffer) between producer and consumer threads, ensuring thread-safe operations.
  • [3] Producer Locking and Waiting: In producer(), a std::unique_lock locks the mutex mtx. The producer then waits using cond_var.wait() if the buffer is full (buffer.size() >= MAX_BUFFER_SIZE). This wait condition ensures the producer only adds items to the buffer when there's space.
  • [4] Producer Adding to Buffer: Once the buffer has space, the producer adds an item (buffer.push(value)) and then unlocks the mutex (lock.unlock()). It signals one waiting thread (possibly a consumer) that a change has occurred (cond_var.notify_one()).
  • [5] Consumer Locking and Waiting: In consumer(), similar to the producer, a std::unique_lock locks the mutex. The consumer waits (cond_var.wait()) until the buffer has at least one item (buffer.size() > 0).
  • [6] Consumer Removing from Buffer: Once there is an item in the buffer, the consumer removes it (buffer.pop()) and unlocks the mutex. It then signals one waiting thread (possibly a producer) that a change has occurred.
  • [7] Synchronization: The use of std::unique_lock with std::mutex and std::condition_variable allows threads to wait for certain conditions (buffer being not full for the producer and not empty for the consumer) before proceeding with their operations, thus ensuring no race conditions occur in accessing and modifying the shared buffer.

We see the buffer of size 10 bytes was never full. So, we try to fill the buffer in the next scenario.

(2) Scenario 02: Buffer full

Figure: Partial output

2–1 Complete output

16_producer_consumer_01  16_producer_consumer_01.cpp  17_producer_consumer_02  17_producer_consumer_02.cpp  Makefile
root@DemoYuChengKuo:~/3_multithreading_cpp/code_04_producer_consumer# ./17_producer_consumer_02
Producing 1
Buffer size after producing: 1

Producing 2
Buffer size after producing: 2

Producing 3
Buffer size after producing: 3

Producing 4
Buffer size after producing: 4

Producing 5
Buffer size after producing: 5

Producing 6
Buffer size after producing: 6

Producing 7
Buffer size after producing: 7

Producing 8
Buffer size after producing: 8

Producing 9
Buffer size after producing: 9

Producing 10
Buffer size after producing: 10

Consuming 1
Buffer size after consuming: 9

Producing 11
Buffer size after producing: 10

Consuming 2
Buffer size after consuming: 9

Producing 12
Buffer size after producing: 10

Consuming 3
Buffer size after consuming: 9

Producing 13
Buffer size after producing: 10

Consuming 4
Buffer size after consuming: 9

Producing 14
Buffer size after producing: 10

Consuming 5
Buffer size after consuming: 9

Producing 15
Buffer size after producing: 10

Consuming 6
Buffer size after consuming: 9

Producing 16
Buffer size after producing: 10

Consuming 7
Buffer size after consuming: 9

Producing 17
Buffer size after producing: 10

Consuming 8
Buffer size after consuming: 9

Producing 18
Buffer size after producing: 10

Consuming 9
Buffer size after consuming: 9

Producing 19
Buffer size after producing: 10

Consuming 10
Buffer size after consuming: 9

Producing 20
Buffer size after producing: 10

Consuming 11
Buffer size after consuming: 9

Consuming 12
Buffer size after consuming: 8

Consuming 13
Buffer size after consuming: 7

Consuming 14
Buffer size after consuming: 6

Consuming 15
Buffer size after consuming: 5

Consuming 16
Buffer size after consuming: 4

Consuming 17
Buffer size after consuming: 3

Consuming 18
Buffer size after consuming: 2

Consuming 19
Buffer size after consuming: 1

Consuming 20
Buffer size after consuming: 0

2–2 Code with explanation

// 17_producer_consumer_02.cpp
#include <iostream> // Includes the standard I/O library
#include <queue> // Includes the queue library for queue operations
#include <thread> // Includes the thread library for threading operations
#include <mutex> // Includes the mutex library for synchronization
#include <condition_variable> // Includes the condition_variable library for thread communication

std::mutex mtx; // Declares a mutex for critical section management
std::condition_variable cond_var; // Declares a condition variable for blocking and waking threads
std::queue<int> buffer; // Declares a queue to act as the buffer
const unsigned int MAX_BUFFER_SIZE = 10; // Sets the maximum size of the buffer
void producer(int value) { // Defines the producer function
std::unique_lock<std::mutex> lock(mtx); // Locks the mutex before accessing the buffer
cond_var.wait(lock, [] { return buffer.size() < MAX_BUFFER_SIZE; }); // Waits if the buffer is full
std::cout << "Producing " << value << std::endl; // Prints the produced value
buffer.push(value); // Pushes the value into the buffer
std::cout << "Buffer size after producing: " << buffer.size() << std::endl << std::endl; // Prints the buffer size after producing
lock.unlock(); // Unlocks the mutex
cond_var.notify_one(); // Notifies one waiting thread
}
void consumer() { // Defines the consumer function
std::unique_lock<std::mutex> lock(mtx); // Locks the mutex before accessing the buffer
cond_var.wait(lock, [] { return buffer.size() > 0; }); // Waits if the buffer is empty
int value = buffer.front(); // Gets the value from the front of the buffer
buffer.pop(); // Removes the value from the buffer
std::cout << "Consuming " << value << std::endl; // Prints the consumed value
std::cout << "Buffer size after consuming: " << buffer.size() << std::endl << std::endl; // Prints the buffer size after consuming
lock.unlock(); // Unlocks the mutex
cond_var.notify_one(); // Notifies one waiting thread
}
int main() { // The main function
std::thread producerThread([] { // Creates a producer thread
for (int i = 1; i <= 20; ++i) {
producer(i); // Produces 20 items
}
});
// Delay before starting consumer thread
std::this_thread::sleep_for(std::chrono::seconds(3));
std::thread consumerThread([] { // Creates a consumer thread
for (int i = 1; i <= 20; ++i) {
consumer(); // Consumes 20 items
}
});
producerThread.join(); // Waits for producer thread to finish
consumerThread.join(); // Waits for consumer thread to finish
return 0; // Ends the program
}

In this example, we introduced a delay with “std::this_thread::sleep_for(std::chrono::seconds(3));” to make sure the buffer would be full with data.

(3) References

[1] Analytics Vidhya | Famous Concurrency Problems in DBMS

[2] University of Illinois Chicago | Process Synchronization

[3] Consumer-Producer Problem In Java Multithreading

[4] ChatGPT-4

--

--

Yu-Cheng (Morton) Kuo
Nerd For Tech

CS/DS blog with C/C++/Embedded Systems/Python. Embedded Software Engineer. Email: yc.kuo.28@gmail.com