Producer-consumer pattern implementation with Golang

Seungkoo
hdac_rizon
Published in
4 min readApr 4, 2020

What bothers server developer the most? Personally, locking and unlocking the critical section without creating deadlocks were the most difficult part in developing servers with languages such as C/C++ or Java. Many developers face deadlocks while developing a project because they do not keep the order of locks. In extreme cases, a bug creating deadlocks occurs once in several years. In this case, it is extremely difficult to find the the bug and solve it.

Golang is known to provide features needed for server program development. Golang allows effective and easy synchronization by providing goroutines and channels. It detects and notifies deadlocks, which provides an easy and quick way to develop server programs.

This posting aims to create a structure of basic server programs by using the characteristics of Golang. It applies a single-producer and multi-consumer (SPMC) pattern and writes sample codes that can be utilized for multiple purposes using context and signal notification features.

Simple producer — consumer pattern

The producer — consumer pattern is a pattern in which one or more consumers carry out a job created by a producer. The producer and consumers exchange jobs by sharing one or more channels. The codes below implemented this pattern.

var done = make(chan bool)
var tasks = make(chan int)
func produce() {
for i := 1; i <= 1000; i++ {
tasks <- i
}
done <- true
}
func consume() {
for {
msg := <-tasks
fmt.Println(msg)
}
}
func main() {
go produce()
for i := 0; i < 10; i++ {
go consume()
}
<-done
}

The producer creates 1000 tasks defined as an integer and inserts them into a channel called tasks. Ten consumers bring tasks from the channel to process them. A done channel is used to synchronize with the main thread.

The producer is blocked when inserting tasks into channels if they are not empty. Similarly, the consumers are blocked if channels are empty when bringing tasks from them. Thus, they synchronize without being closed while waiting for operation of each other.

The ability to synchronize various goroutines by using only channels, not other synchronization features, is an advantage of Golang. Developing with queues in other languages have to make more complicated codes as developers should consider cases where queues are full or empty.

These codes need improvements before an actual use. More commonly, producers create jobs infinitely rather than creating and executing a certain amount of jobs. Moreover, programs should not be closed until all jobs of consumers are completed. Thus, a mechanism is needed to define conditions to close a program and ensure that a program is closed after all consumers finish their jobs. Sample codes are written based on these requirements.

Producer and Consumer

type Consumer struct {
in *chan int
jobs chan int
}
func (c Consumer) Work(wg *sync.WaitGroup) {
defer wg.Done()
for job := range c.jobs {
time.Sleep(50 * time.Millisecond) // Do something
fmt.Printf(“%dth job finished\n”, job)
}
}
func (c Consumer) Consume(ctx context.Context) {
for {
select {
case job := <-*c.in:
c.jobs <- job
case <-ctx.Done():
close(c.jobs)
return
}
}
}
type Producer struct {
in *chan int
}
func (p Producer) Produce() {
task := 1
for {
*p.in <- task
task++
}
}

Main function

func main() {
const nConsumers = 10
runtime.GOMAXPROCS(runtime.NumCPU())
in := make(chan int, 1)
p := Producer{&in}
c := Consumer{&in, make(chan int, nConsumers)}
go p.Produce()
ctx, cancelFunc := context.WithCancel(context.Background())
go c.Consume(ctx)
wg := &sync.WaitGroup{}
wg.Add(nConsumers)
for i := 0; i < nConsumers; i++ {
go c.Work(wg)
}
termChan := make(chan os.Signal)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
<-termChan
cancelFunc()
wg.Wait()
}

The main function creates three goroutines: Produce, Consume and Work. Ten goroutines are created to independently process tasks created by producers. The main thread finishes tasks when they receive SIGINT and SIGTERM signals. It calls a wg.Wait() function and waits for a goroutine’s job to be completed.

MAXPROC setting

Call below code to enable All CPU cores to carry out goroutine.

runtime.GOMAXPROCS(runtime.NumCPU())

Wait group

Wait groups are used to make the main thread wait until all tasks are completed. wg.Add() function is revoked to set the number of goroutines to wait for and a goroutine announces that it is complete by calling a wg.Done() function.

wg := &sync.WaitGroup{} // wait group creation
wg.Add(nConsumers) // set # of goroutines to wait for
...
wg.Wait() // wait util all goroutines finish
// work function
func (c Consumer) Work(wg *sync.WaitGroup) {
defer wg.Done() // decrease # of goroutines to wait for by 1

}

Signal processing

A termChan channel is created so that the main thread waits for SIGINT and SIGTERM without terminating. It waits for signals on the <-termChan code.

termChan := make(chan os.Signal)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
<-termChan

Context

One thing to remember when using goroutines is making sure that all executed goroutines are closed. Users can close goroutines past timeout by using context or by calling cancelFunc. Sample codes are implemented to close the jobs channel to terminate goroutines.

ctx, cancelFunc := context.WithCancel(context.Background())
...
<-termChan
cancelFunc() // call ctx.Done()
wg.Wait()
func (c Consumer) Consume(ctx context.Context) {
for {
select {
...
case <-ctx.Done():
close(c.jobs) // close channel to stop Work goroutine
return
}
...

We looked at an example of writing daemon programs using goroutines in Golang. I hope the sample codes above would help you understand implementation of daemon programs. But they are just an example to help your understanding and need modifications before an actual use.

References

https://golang.org/pkg/context/

https://gobyexample.com/waitgroups

https://tpaschalis.github.io/golang-producer-consumer/

https://www.golangprograms.com/illustration-of-producer-consumer-problem-in-golang.html

https://callistaenterprise.se/blogg/teknik/2019/10/05/go-worker-cancellation

--

--