Image for post
Image for post

Pipeline Patterns in Go

Pipelines with Error-handling and Cancellation

Claudio Fahey
Sep 13, 2017 · 10 min read
Image for post
Image for post

Introduction

Pipeline Error Handling and Cancellation

An Example Pipeline

Source Pipeline Stage

func lineListSource(ctx context.Context, lines ...string) (
<-chan string, <-chan error, error) {
if len(lines) == 0 {
// Handle an error that occurs before the goroutine begins.
return nil, nil, errors.Errorf("no lines provided")
}
out := make(chan string)
errc := make(chan error, 1)
go func() {
defer close(out)
defer close(errc)
for lineIndex, line := range lines {
if line == "" {
// Handle an error that occurs during the goroutine.
errc <- errors.Errorf("line %v is empty", lineIndex+1)
return
}
// Send the data to the output channel but return early
// if the context has been cancelled.
select {
case out <- line:
case <-ctx.Done():
return
}
}
}()
return out, errc, nil
}

Transformer Pipeline Stage

func lineParser(ctx context.Context, base int, in <-chan string) (
<-chan int64, <-chan error, error) {
if base < 2 {
// Handle an error that occurs before the goroutine begins.
return nil, nil, errors.Errorf("invalid base %v", base)
}
out := make(chan int64)
errc := make(chan error, 1)
go func() {
defer close(out)
defer close(errc)
for line := range in {
n, err := strconv.ParseInt(line, base, 64)
if err != nil {
// Handle an error that occurs during the goroutine.
errc <- err
return
}
// Send the data to the output channel but return early
// if the context has been cancelled.
select {
case out <- n:
case <-ctx.Done():
return
}
}
}()
return out, errc, nil
}

Sink Pipeline Stage

func sink(ctx context.Context, in <-chan int64) (
<-chan error, error) {
errc := make(chan error, 1)
go func() {
defer close(errc)
for n := range in {
if n >= 100 {
// Handle an error that occurs during the goroutine.
errc <- errors.Errorf("number %v is too large", n)
return
}
fmt.Printf("sink: %v\n", n)
}
}()
return errc, nil
}

Building and Executing the Pipeline

func runSimplePipeline(base int, lines []string) error {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
var errcList []<-chan error // Source pipeline stage.
linec, errc, err := lineListSource(ctx, lines...)
if err != nil {
return err
}
errcList = append(errcList, errc)
// Transformer pipeline stage.
numberc, errc, err := lineParser(ctx, base, linec)
if err != nil {
return err
}
errcList = append(errcList, errc)
// Sink pipeline stage.
errc, err = sink(ctx, numberc)
if err != nil {
return err
}
errcList = append(errcList, errc)
fmt.Println("Pipeline started. Waiting for pipeline to complete.") return WaitForPipeline(errcList...)
}
// WaitForPipeline waits for results from all error channels.
// It returns early on the first error.
func WaitForPipeline(errs ...<-chan error) error {
errc := MergeErrors(errs...)
for err := range errc {
if err != nil {
return err
}
}
return nil
}
// MergeErrors merges multiple channels of errors.
// Based on https://blog.golang.org/pipelines.
func MergeErrors(cs ...<-chan error) <-chan error {
var wg sync.WaitGroup
// We must ensure that the output channel has the capacity to
// hold as many errors
// as there are error channels.
// This will ensure that it never blocks, even
// if WaitForPipeline returns early.
out := make(chan error, len(cs))
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls
// wg.Done.
output := func(c <-chan error) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines
// are done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}

How Errors are Handled

How Cancellation is Handled

Complex Pipelines

func splitter(ctx context.Context, in <-chan int64) (
<-chan int64, <-chan int64, <-chan error, error) {
out1 := make(chan int64)
out2 := make(chan int64)
errc := make(chan error, 1)
go func() {
defer close(out1)
defer close(out2)
defer close(errc)
for n := range in {
// Send the data to the output channel 1 but return early
// if the context has been cancelled.
select {
case out1 <- n:
case <-ctx.Done():
return
}
// Send the data to the output channel 2 but return early
// if the context has been cancelled.
select {
case out2 <- n:
case <-ctx.Done():
return
}
}
}()
return out1, out2, errc, nil
}
                                  / squarer -> sink
lines -> lineParser -> splitter -|
\ sink
func runComplexPipeline(base int, lines []string) error {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
var errcList []<-chan error // Source pipeline stage.
linec, errc, err := lineListSource(ctx, lines...)
if err != nil {
return err
}
errcList = append(errcList, errc)
// Transformer pipeline stage 1.
numberc, errc, err := lineParser(ctx, base, linec)
if err != nil {
return err
}
errcList = append(errcList, errc)
// Transformer pipeline stage 2.
numberc1, numberc2, errc, err := splitter(ctx, numberc)
if err != nil {
return err
}
errcList = append(errcList, errc)
// Transformer pipeline stage 3.
numberc3, errc, err := squarer(ctx, numberc1)
if err != nil {
return err
}
errcList = append(errcList, errc)
// Sink pipeline stage 1.
errc, err = sink(ctx, numberc3)
if err != nil {
return err
}
errcList = append(errcList, errc)
// Sink pipeline stage 2.
errc, err = sink(ctx, numberc2)
if err != nil {
return err
}
errcList = append(errcList, errc)
fmt.Println("Pipeline started. Waiting for pipeline to complete.") return WaitForPipeline(errcList...)
}

Download and Run the Code

$ go run -race pipeline_demo.go 
runSimplePipeline: base=10, lines=[3 2 1]
Pipeline started. Waiting for pipeline to complete.
sink: 3
sink: 2
sink: 1
runSimplePipeline: base=1, lines=[3 2 1]
invalid base 1
runSimplePipeline: base=2, lines=[1010 1100 1000]
Pipeline started. Waiting for pipeline to complete.
sink: 10
sink: 12
sink: 8
runSimplePipeline: base=2, lines=[1010 1100 2000 1111]
Pipeline started. Waiting for pipeline to complete.
sink: 10
sink: 12
strconv.ParseInt: parsing "2000": invalid syntax
runSimplePipeline: base=10, lines=[1 10 100 1000]
Pipeline started. Waiting for pipeline to complete.
sink: 1
sink: 10
number 100 is too large
runComplexPipeline: base=10, lines=[5 4 3]
Pipeline started. Waiting for pipeline to complete.
sink: 25
sink: 5
sink: 4
sink: 3
sink: 16
sink: 9
runPipelineWithTimeout
Pipeline started. Waiting for pipeline to complete.
sink: 86
sink: 86
sink: 92
sink: 40
sink: 4
sink: 54
sink: 30
sink: 64
sink: 11
sink: 76
Cancelling context.
Image for post
Image for post

Further Reading

Statuscode

Keeping developers informed.

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store