Concurrency Patterns in Go | Part 1

Miah Md Shahjahan
6 min readMay 21, 2023

Go provides several powerful concurrency patterns that can be used to design efficient and scalable concurrent programs. In the first part, we will delve into three fundamental concurrency patterns, accompanied by real-world use cases.

Pipeline Pattern

The pipeline pattern in Go is a concurrency pattern used for efficiently processing large amounts of data. It involves dividing a complex task into smaller stages, each executed concurrently by separate goroutines.The output of one stage is passed as input to the next stage through channels, forming a pipeline.

Use Case

Let’s say you have a system that needs to process a large dataset of customer orders. Each order goes through several stages of processing, such as validation, enrichment, and final calculation. Instead of sequentially processing each order, you can utilize the pipeline pattern to parallelize the processing and improve overall efficiency.

Here’s how the pipeline pattern can be used in this scenario:

  1. Stage 1 (Validation): Each order is received and validated for correctness and completeness. Any invalid orders are filtered out.
  2. Stage 2 (Enrichment): The valid orders are then enriched with additional information, such as customer details or product data, to enhance their content.
  3. Stage 3 (Calculation): The enriched orders are processed further to perform calculations, such as total order value or shipping costs.

Example

Assuming Order object resembles

type Order struct {
ID int
Amount float64
}

Stage 1 (Validation)

Each order is received and validated for correctness and completeness. Any invalid orders are filtered out.

func validateOrders(input <-chan Order, output chan<- Order){
defer close(output) // Close the output channel when the function finishes

for order := range input {
// Simulating validation
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)

// Validate order
if order.Amount > 0 {
output <- order
}
}
}

Stage 2 (Enrichment)

The valid orders are then enriched with additional information, such as customer details or product data, to enhance their content.

func enrichOrders(input <-chan Order, output chan<- Order) {
defer close(output) // Close the output channel when the function finishes

for order := range input {
// Simulating enrichment
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)

// Enrich order
order.Amount *= 1.1
output <- order
}
}

Stage 3 (Calculation)

The enriched orders are processed further to perform calculations, such as total order value or shipping costs.

func calculateOrderValues(input <-chan Order, output chan<- Order) {
defer close(output) // Close the output channel when the function finishes

for order := range input {
// Simulating calculation
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)

// Perform calculation
order.Amount += 5.0
output <- order
}
}

Each stage is implemented as a separate goroutine, and channels are used to connect the stages. The orders flow through the pipeline, with each stage concurrently processing the orders it receives. This allows for parallelism, as multiple orders can be processed simultaneously by different stages.

func main() {
rand.Seed(time.Now().UnixNano())

// Create channels to connect the stages
orders := make(chan Order)
validOrders := make(chan Order)
enrichedOrders := make(chan Order)
calculatedOrders := make(chan Order)

// Start the stages concurrently
go validateOrders(orders, validOrders)
go enrichOrders(validOrders, enrichedOrders)
go calculateOrderValues(enrichedOrders, calculatedOrders)

// Generate sample orders
go func() {
for i := 1; i <= 10; i++ {
order := Order{
ID: i,
Amount: float64(i * 10),
}
orders <- order
}
close(orders)
}()

// Receive the final output from the pipeline
for order := range calculatedOrders {
fmt.Printf("Processed order ID: %d, Final amount: %.2f\n", order.ID, order.Amount)
}
}

By using the pipeline pattern, you can efficiently process a large number of orders in a scalable and concurrent manner. This can significantly reduce the overall processing time and improve the performance of your system.

Fan-Out, Fan-In Pattern

So you’ve got a pipeline set up.Data is flowing through your system beautifully, transforming as it makes its way through the stages you’ve chained together. It’s like a beautiful orders processing; a beautiful, slow, and oh god why is this taking so long?

Sometimes, stages in your pipeline can be particularly computationally expensive. Imagine you have many attributes to validate and Stage 1 (Validation) is taking longer time then expected. In fact, it turns out it can, and you can solve it by another pattern has name: fan-out, fan-in.

Here’s how the fan-out, fan-in pattern works:

  1. Fan-Out: The input data is divided into smaller chunks, and each chunk is processed concurrently by a separate goroutine or stage. This allows for parallel processing of the workload.
  2. Fan-In: The results produced by the concurrent goroutines or stages are collected and merged into a single output channel or data structure. This consolidation combines the individual results into a final result.

Let’s use fan-out, fan-in pattern for validateOrders goroutine

func validateOrders(input <-chan Order, output chan<- Order) {
defer close(output) // Close the output channel when the function finishes

// Create a channel to receive validated orders
validatedOrders := make(chan Order)

// Specify the number of worker goroutines to use for fan-out
workerCount := 3

// Fan-out: Start multiple worker goroutines
var wg sync.WaitGroup
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func(input <-chan Order) {
defer wg.Done()
for order := range input {
// Simulating validation
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)

// Validate order
if order.Amount > 0 {
validatedOrders <- order
}
}
}(input)
}

// Fan-in: Collect the validated orders from the workers
go func() {
wg.Wait()
close(validatedOrders)
}()

for order := range validatedOrders {
output <- order
}
}

The fan-out, fan-in pattern is particularly useful when you have a large amount of data or a computationally intensive task that can be divided into independent subtasks. By distributing the work across multiple goroutines or stages like validateOrders goroutine, you can take advantage of parallel processing capabilities and improve overall performance.

Worker Pool Pattern

Bark! You’re fully prepared and ready to enter production.

Wow! Your exceptional work pays off as your product becomes a sensation, going viral and generating a surge of orders that require rapid validation.

In the case of the validateOrders function, the worker pool pattern can be beneficial if as you have a potentially large number of orders to validate, and you want to control the number of goroutines executing the validation process.

The Worker Pool pattern is useful in scenarios where you have a large number of tasks to be processed concurrently, and you want to limit the number of active goroutines to avoid resource exhaustion.

By introducing a worker pool, you can limit the number of active goroutines to a predefined count (e.g., workerCount). The worker goroutines in the pool can process the orders concurrently, with each worker picking up an order from the task queue as soon as it becomes available. This ensures that the number of active goroutines is controlled and resources are utilized efficiently.

func validateOrders(input <-chan Order, output chan<- Order) {
defer close(output) // Close the output channel when the function finishes

// Specify the number of worker goroutines to use in the worker pool
workerCount := 3

// Create a wait group to track the completion of the worker goroutines
var wg sync.WaitGroup

// Create a buffered channel to limit the number of tasks that can be submitted to the worker pool
taskQueue := make(chan Order, workerCount)

// Create a channel to collect the validated orders from the worker goroutines
validatedOrders := make(chan Order)

// Start worker goroutines
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for order := range taskQueue {
// Simulating validation
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)

// Validate order
if order.Amount > 0 {
validatedOrders <- order
}
}
}()
}

// Enqueue tasks to the task queue (Fan-out)
go func() {
for order := range input {
taskQueue <- order
}
close(taskQueue)
}()

// Collect validated orders from the worker goroutines (Fan-in)
go func() {
wg.Wait() // Wait for all worker goroutines to finish
close(validatedOrders) // Close the channel after all orders have been processed
}()

// Collect the validated orders from the channel and send them to the output channel
for order := range validatedOrders {
output <- order
}
}

Conclusion

In summary, the code snippet demonstrates the pipeline pattern by flowing the tasks from the input channel to the worker goroutines through the task queue. It incorporates the fan-out pattern by distributing tasks among multiple worker goroutines. The worker pool pattern is used by creating a fixed number of worker goroutines to process tasks concurrently.By combining these patterns, you can enables efficient and concurrent processing of orders, leveraging parallelism and task distribution among worker goroutines, resulting in improved throughput and scalability.

I’m constantly delighted to receive feedback. Whether you spot an error, have a suggestion for improvement, or just want to share your thoughts, please don’t hesitate to comment/reach out. I truly value connecting with readers!

--

--

Miah Md Shahjahan

Solution Architect with more than 11 years in software design and enterprise product development.