Golang Concurrency Patterns: For-Select-Done, Errgroup and Worker Pool

Nina Pakshina
7 min readMay 16

--

Hello! My name is Nina and I am a Go programmer.

In this article, I want to talk about three concurrency patterns in Go that can be really handy:

  • for-select-done
  • errgroup
  • worker pool
Working with concurrency patterns is more enjoyable with a cat. © 2022, Phuket, Nina Pakshina.

Pattern 1: For-Select-Done

The main idea of the for-select-done pattern is to use an infinite for loop to handle events from various channels using the select statement.

The select statement allows selecting the first operation ready to be executed from multiple channels. It can be a signal to perform some useful task or to exit the infinite loop.

In this pattern, the infinite for loop is usually called in a separate goroutine to avoid blocking the main thread.

Code Example

We need to call function someTask() once per second. At a certain point, we should also be able to exit the infinite loop based on the context and terminate the goroutine.

Let’s see the code example:

package main

import (
"context"
"fmt"
"math/rand"
"os"
"os/signal"
"syscall"
"time"
)

// someTask function that we call periodically.
func someTask() {
fmt.Println(rand.Int() * rand.Int())
}

// PeriodicTask runs someTask every 1 second.
// If canceled goroutine should be stopped.
func PeriodicTask(ctx context.Context) {
// Create a new ticker with a period of 1 second.
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
someTask()
case <-ctx.Done():
fmt.Println("stopping PeriodicTask")
ticker.Stop()
return
}
}
}

func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

go PeriodicTask(ctx)

// Create a channel to receive signals from the operating system.
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGTERM)

// The code blocks until a signal is received (e.g. Ctrl+C).
<-sigCh
}

To periodically call someTask(), we will create a new ticker using with a period of 1 second. Every second, a message is sent to the ticker.C channel, which is read in the corresponding case statement and triggers the execution of the someTask() function.

If the context is canceled, a message will be sent to the channel <-ctx.Done(), and the corresponding case will be triggered, which will exit the for loop and the goroutine.

In the main function, we create a context ctx with a timeout of 5 seconds. This means that if the operation associated with this context is not completed within the specified time, the context will be canceled, and all operations associated with it will be interrupted.

In the infinite loop of the PeriodicTask goroutine, the ticker will fire multiple times, and the someTask() function will be executed multiple times. After 5 seconds, the context ticker will fire, the case <-ctx.Done() will be triggered in the select statement, and the infinite loop will be terminated.

Result of code execution:

-777992493516638130
-3179832721048378793
-4070697154687973288
2884823370254822744
stopping PeriodicTask

When to Use This Pattern

This pattern is useful when you need to perform a task in an infinite loop based on some event or timer and then stop its execution based on a certain condition.

For example, it can be used to run deferred calculations using data that was saved to a database, or to asynchronously enrich records in the database with data from other services. At the same time, we always have the ability to safely terminate the goroutine when the context is canceled or some other external event occurs.

Pattern 2: Errgroup

The main idea of the errgroup pattern is to start a group of goroutines, wait for them to finish their work, and handle any errors that may occur during execution.

Code Example

Here is an example code using the golang.org/x/sync/errgroup package, which implements the errgroup pattern.


package main

import (
"errors"
"fmt"

"golang.org/x/sync/errgroup"
)

// errFailure some custom error.
var errFailure = errors.New("some error")

func main() {
// Create errgroup.
group := errgroup.Group{}

// Run first task.
group.Go(func() error {
time.Sleep(5 * time.Second)
fmt.Println("doing some work 1")
return nil
})

// Run second task.
group.Go(func() error {
fmt.Println("doing some work 2")
return nil
})

// Run third task.
group.Go(func() error {
fmt.Println("doing some work 3")
return errFailure
})

// Wait for all goroutines to complete.
if err := group.Wait(); err != nil {
fmt.Printf("errgroup tasks ended up with an error: %v\n", err)
} else {
fmt.Println("all works done successfully")
}
}

In this example, we call the errgroup.Group{} task group, which is executed in parallel using the group.Go() method.

We use group.Wait() to wait for all tasks in the group to complete. If any of the tasks finish with an error, the group.Wait() method will return the first error it received. If all tasks complete successfully, the group.Wait() method will return nil.

In this example, the third task completes with an error, so group.Wait() returns an error, which we handle.

Result of code execution:

doing some work 3
doing some work 2
doing some work 1
errgroup tasks ended up with an error: some error

Let’s consider another use case for this pattern.

The errgroup.WithContext() function creates a new group of goroutines of the errgroup.Group type and a new context.Context, which can be passed between goroutines and, if necessary, will allow canceling the execution of the task group.

package main

import (
"context"
"errors"
"fmt"
"time"

"golang.org/x/sync/errgroup"
)

// errFailure some custom error.
var errFailure = errors.New("some error")

func main() {
// Create errgroup with context.
group, qctx := errgroup.WithContext(context.Background())

// Run first periodic task.
group.Go(func() error {
firstTask(qctx)
return nil
})

// Run second task.
group.Go(func() error {
if err := secondTask(); err != nil {
return err
}
return nil
})

// Wait for all tasks to complete or the error to appear.
if err := group.Wait(); err != nil {
fmt.Printf("errgroup tasks ended up with an error: %v", err)
}
}

func firstTask(ctx context.Context) {
var counter int
for {
select {
case <-ctx.Done():
return
case <-time.After(500 * time.Millisecond):
fmt.Println("some task")
if counter > 10 {
return
}
counter++
}
}
}

func secondTask() error {
time.Sleep(3 * time.Second)
return errFailure
}

Here, the firstTask() function represents a periodic task that needs to perform some action 10 times. The secondTask() task is a function that performs some task but returns an error.

In this case, our periodic task firstTask() manages to complete several times before the secondTask() task finishes with an error. This error causes the qctx context to be canceled for the group of goroutines, and they will terminate their execution.

Result of code execution:

some task
some task
some task
some task
some task
errgroup tasks ended up with an error: some error

When to Use This Pattern

I use this pattern where it is important for all goroutines in the group to complete successfully without errors.

For example, if I need to perform a calculation using combined data. I cannot perform the calculation if data from a particular table is missing (e.g., it has not yet been saved). In this case, I return a custom error about the missing data, and the execution of all other database queries is interrupted until the next task group run.

Pattern 3: Worker Pool

The worker pool pattern is a pattern that allows tasks to be parallelized, limiting the number of simultaneously executing goroutines.

In this pattern, we create a fixed number of workers that wait for tasks from a queue. When a task appears, it is added to the queue. If a worker is free, it takes a task from the queue and executes it. The result of task execution can be returned to the main thread where it can be processed.

Code Example

Let’s consider one of the variations of the worker pool. Here we will not return the result of the task execution to the main thread.

package main

import (
"fmt"
"sync"
"time"
)

// Data to be proccessed.
var taskCount = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

const (
// Number of concurrent workers.
numberOfWorkers = 3
)

func main() {
// Create buffered channel.
jobs := make(chan struct{}, numberOfWorkers)
wg := sync.WaitGroup{}

// Add workers.
for id := range taskCount {
wg.Add(1)
jobs <- struct{}{}

go func(id int) {
worker(id)
<-jobs
defer wg.Done()
}(id)
}

// Wait for all workers to complete.
wg.Wait()
}

func worker(id int) {
fmt.Println(id)
time.Sleep(2 * time.Second)
}

The slice taskCount contains data that needs to be processed (a total of 10 elements). In the constant numberOfWorkers, we set the number of concurrent workers — the size of our queue.

Next, we create a buffered channel jobs of type struct{} with a buffer size of numberOfWorkers.

To wait for all tasks to complete, we create a WaitGroup. Using a for loop, we iterate through all the data in the taskCount slice, incrementing the WaitGroup counter by 1 and adding the task to the jobs channel queue for processing.

Then, our worker handler is called through a goroutine, which processes the data and then removes the task from the queue using wg.Done(), decrementing the WaitGroup counter by 1.

Using wg.Wait(), we wait for all goroutines to complete until the group counter becomes zero. We have 3 workers in numberOfWorkers, so the first three goroutines will be executed simultaneously, and on the fourth task, the jobs <-struct{}{} line will be blocked until one of the workers finishes its work and takes the next item from the <-jobs queue.

When to Use This Pattern

Worker pool pattern is useful when it is required to process a large number of tasks, but we want to limit the number of concurrently executing goroutines, which will have a positive impact on code performance and avoid overloading the system. This pattern also allows easy system scaling by increasing the number of allowable concurrent workers.

A worker pool can be used for processing client requests on the server side or for performing background tasks, such as report generation or data processing.

I often use this pattern when there is a need to process and save a large amount of data rows in a database. Additionally, this pattern is very convenient for processing events from a distributed queue, such as Kafka.

Conclusion

There are many concurrency patterns, but here I have presented three advanced concurrency patterns that one can use in their work.. If you are interested in learning about other concurrency patterns, I recommend reading about fan-in, fan-out, pipeline, and bridge channel pattern.

And I hope that my article was helpful to you!

--

--