Golang Concurrency Patterns: For-Select-Done, Errgroup and Worker Pool
--
Hello! My name is Nina and I am a Go programmer.
In this article, I want to talk about three concurrency patterns in Go that can be really handy:
- for-select-done
- errgroup
- worker pool
Pattern 1: For-Select-Done
The main idea of the for-select-done pattern is to use an infinite for loop to handle events from various channels using the select
statement.
The select
statement allows selecting the first operation ready to be executed from multiple channels. It can be a signal to perform some useful task or to exit the infinite loop.
In this pattern, the infinite for
loop is usually called in a separate goroutine to avoid blocking the main thread.
Code Example
We need to call function someTask()
once per second. At a certain point, we should also be able to exit the infinite loop based on the context and terminate the goroutine.
Let’s see the code example:
package main
import (
"context"
"fmt"
"math/rand"
"os"
"os/signal"
"syscall"
"time"
)
// someTask function that we call periodically.
func someTask() {
fmt.Println(rand.Int() * rand.Int())
}
// PeriodicTask runs someTask every 1 second.
// If canceled goroutine should be stopped.
func PeriodicTask(ctx context.Context) {
// Create a new ticker with a period of 1 second.
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
someTask()
case <-ctx.Done():
fmt.Println("stopping PeriodicTask")
ticker.Stop()
return
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
go PeriodicTask(ctx)
// Create a channel to receive signals from the operating system.
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGTERM)
// The code blocks until a signal is received (e.g. Ctrl+C).
<-sigCh
}
To periodically call someTask()
, we will create a new ticker using with a period of 1 second. Every second, a message is sent to the ticker.C
channel, which is read in the corresponding case statement and triggers the execution of the someTask()
function.
If the context is canceled, a message will be sent to the channel <-ctx.Done()
, and the corresponding case will be triggered, which will exit the for loop and the goroutine.
In the main function, we create a context ctx
with a timeout of 5 seconds. This means that if the operation associated with this context is not completed within the specified time, the context will be canceled, and all operations associated with it will be interrupted.
In the infinite loop of the PeriodicTask
goroutine, the ticker will fire multiple times, and the someTask()
function will be executed multiple times. After 5 seconds, the context ticker will fire, the case <-ctx.Done()
will be triggered in the select
statement, and the infinite loop will be terminated.
Result of code execution:
-777992493516638130
-3179832721048378793
-4070697154687973288
2884823370254822744
stopping PeriodicTask
When to Use This Pattern
This pattern is useful when you need to perform a task in an infinite loop based on some event or timer and then stop its execution based on a certain condition.
For example, it can be used to run deferred calculations using data that was saved to a database, or to asynchronously enrich records in the database with data from other services. At the same time, we always have the ability to safely terminate the goroutine when the context is canceled or some other external event occurs.
Pattern 2: Errgroup
The main idea of the errgroup pattern is to start a group of goroutines, wait for them to finish their work, and handle any errors that may occur during execution.
Code Example
Here is an example code using the golang.org/x/sync/errgroup package, which implements the errgroup pattern.
package main
import (
"errors"
"fmt"
"golang.org/x/sync/errgroup"
)
// errFailure some custom error.
var errFailure = errors.New("some error")
func main() {
// Create errgroup.
group := errgroup.Group{}
// Run first task.
group.Go(func() error {
time.Sleep(5 * time.Second)
fmt.Println("doing some work 1")
return nil
})
// Run second task.
group.Go(func() error {
fmt.Println("doing some work 2")
return nil
})
// Run third task.
group.Go(func() error {
fmt.Println("doing some work 3")
return errFailure
})
// Wait for all goroutines to complete.
if err := group.Wait(); err != nil {
fmt.Printf("errgroup tasks ended up with an error: %v\n", err)
} else {
fmt.Println("all works done successfully")
}
}
In this example, we call the errgroup.Group{}
task group, which is executed in parallel using the group.Go()
method.
We use group.Wait()
to wait for all tasks in the group to complete. If any of the tasks finish with an error, the group.Wait()
method will return the first error it received. If all tasks complete successfully, the group.Wait()
method will return nil
.
In this example, the third task completes with an error, so group.Wait()
returns an error, which we handle.
Result of code execution:
doing some work 3
doing some work 2
doing some work 1
errgroup tasks ended up with an error: some error
Let’s consider another use case for this pattern.
The errgroup.WithContext()
function creates a new group of goroutines of the errgroup.Group
type and a new context.Context
, which can be passed between goroutines and, if necessary, will allow canceling the execution of the task group.
package main
import (
"context"
"errors"
"fmt"
"time"
"golang.org/x/sync/errgroup"
)
// errFailure some custom error.
var errFailure = errors.New("some error")
func main() {
// Create errgroup with context.
group, qctx := errgroup.WithContext(context.Background())
// Run first periodic task.
group.Go(func() error {
firstTask(qctx)
return nil
})
// Run second task.
group.Go(func() error {
if err := secondTask(); err != nil {
return err
}
return nil
})
// Wait for all tasks to complete or the error to appear.
if err := group.Wait(); err != nil {
fmt.Printf("errgroup tasks ended up with an error: %v", err)
}
}
func firstTask(ctx context.Context) {
var counter int
for {
select {
case <-ctx.Done():
return
case <-time.After(500 * time.Millisecond):
fmt.Println("some task")
if counter > 10 {
return
}
counter++
}
}
}
func secondTask() error {
time.Sleep(3 * time.Second)
return errFailure
}
Here, the firstTask()
function represents a periodic task that needs to perform some action 10 times. The secondTask()
task is a function that performs some task but returns an error.
In this case, our periodic task firstTask()
manages to complete several times before the secondTask()
task finishes with an error. This error causes the qctx
context to be canceled for the group of goroutines, and they will terminate their execution.
Result of code execution:
some task
some task
some task
some task
some task
errgroup tasks ended up with an error: some error
When to Use This Pattern
I use this pattern where it is important for all goroutines in the group to complete successfully without errors.
For example, if I need to perform a calculation using combined data. I cannot perform the calculation if data from a particular table is missing (e.g., it has not yet been saved). In this case, I return a custom error about the missing data, and the execution of all other database queries is interrupted until the next task group run.
Pattern 3: Worker Pool
The worker pool pattern is a pattern that allows tasks to be parallelized, limiting the number of simultaneously executing goroutines.
In this pattern, we create a fixed number of workers that wait for tasks from a queue. When a task appears, it is added to the queue. If a worker is free, it takes a task from the queue and executes it. The result of task execution can be returned to the main thread where it can be processed.
Code Example
Let’s consider one of the variations of the worker pool. Here we will not return the result of the task execution to the main thread.
package main
import (
"fmt"
"sync"
"time"
)
// Data to be proccessed.
var taskCount = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
const (
// Number of concurrent workers.
numberOfWorkers = 3
)
func main() {
// Create buffered channel.
jobs := make(chan struct{}, numberOfWorkers)
wg := sync.WaitGroup{}
// Add workers.
for id := range taskCount {
wg.Add(1)
jobs <- struct{}{}
go func(id int) {
worker(id)
<-jobs
defer wg.Done()
}(id)
}
// Wait for all workers to complete.
wg.Wait()
}
func worker(id int) {
fmt.Println(id)
time.Sleep(2 * time.Second)
}
The slice taskCount
contains data that needs to be processed (a total of 10 elements). In the constant numberOfWorkers
, we set the number of concurrent workers — the size of our queue.
Next, we create a buffered channel jobs
of type struct{}
with a buffer size of numberOfWorkers
.
To wait for all tasks to complete, we create a WaitGroup
. Using a for
loop, we iterate through all the data in the taskCount
slice, incrementing the WaitGroup
counter by 1 and adding the task to the jobs
channel queue for processing.
Then, our worker handler is called through a goroutine, which processes the data and then removes the task from the queue using wg.Done()
, decrementing the WaitGroup
counter by 1.
Using wg.Wait()
, we wait for all goroutines to complete until the group counter becomes zero. We have 3 workers in numberOfWorkers
, so the first three goroutines will be executed simultaneously, and on the fourth task, the jobs <-struct{}{}
line will be blocked until one of the workers finishes its work and takes the next item from the <-jobs
queue.
When to Use This Pattern
Worker pool pattern is useful when it is required to process a large number of tasks, but we want to limit the number of concurrently executing goroutines, which will have a positive impact on code performance and avoid overloading the system. This pattern also allows easy system scaling by increasing the number of allowable concurrent workers.
A worker pool can be used for processing client requests on the server side or for performing background tasks, such as report generation or data processing.
I often use this pattern when there is a need to process and save a large amount of data rows in a database. Additionally, this pattern is very convenient for processing events from a distributed queue, such as Kafka.
Conclusion
There are many concurrency patterns, but here I have presented three advanced concurrency patterns that one can use in their work.. If you are interested in learning about other concurrency patterns, I recommend reading about fan-in, fan-out, pipeline, and bridge channel pattern.
And I hope that my article was helpful to you!