Reusable barriers in Golang

How to implement them using buffered channels

Michał Łowicki
golangspec
Published in
5 min readFeb 27, 2018

--

In this story we’ll examine basic synchronization problem to see how concurrency primitives in Golang like buffered channels can be used to implement concise solutions.

Problem

Let’s suppose we’ve a number of workers. To take advantage of multiple CPU core every worker runs inside separate goroutine:

for i := 0; i < workers; i++ {
go worker()
}

Worker does a series of jobs:

func worker() {
for i := 0; i < 3; i++ {
job()
}
}

Each job is preceded with bootstrap that needs to be synchronized across all workers — worker needs to wait for all other workers to do their bootstrap before starting work on its own job:

func worker() {
for i := 0; i < 3; i++ {
bootstrap()
# wait for other workers to bootstrap
job()
}
}

There is one more thing. Bootstrap for loop n cannot be started if at least one worker still works on job from previous loop— bootstrap is used by following job so we cannot run another bootstrap if previous job is still being worked on:

func worker() {
for i := 0; i < 3; i++ {
# wait for all workers to finish previous loop
bootstrap()
# wait for other workers to bootstrap
job()
}
}

Our bootstrap will be incrementing shared counter and our job will be sleep for some random time and then printing value of shared counter:

type counter struct {
c int
sync.Mutex
}
func (c *counter) Incr() {
c.Lock()
c.c += 1
c.Unlock()
}
func (c *counter) Get() (res int) {
c.Lock()
res = c.c
c.Unlock()
return
}
func worker(c *counter) {
for i := 0; i < 3; i++ {
# wait for all workers to finish previous loop
c.Incr()
# wait for other workers to do bootstrap
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
fmt.Println(c.Get())
}
}

The goal is to write a program which prints numbers in the following way:

  • only n, 2*n and 3*n numbers are allowed (since each worker executes 3 loops)
  • number is never less than number printed line above
  • each number should be displayed exactly n times

Expected output with 3 workers:

3
3
3
6
6
6
9
9
9

Expected output with 2 workers:

2
2
4
4
6
6

Sample invalid output with 2 workers:

2
4
2
4
6
6

Think about possible solution(s). Next couple of lines are left blank intentionally to not show ready recipe right away.

.

.

.

.

.

.

.

.

.

.

.

.

.

.

Workers will be synchronized by data structure called reusable barrier. Barrier consists of 2 gates. 1st gate is placed before incrementing the counter. It’s closed at the very beginning. Closed gate means that worker will be blocked when reaching it. Once every worker gets to gate 1. then:

  • gate 2. (which is placed after increment) closes
  • gate 1. opens

All workers increment shared counter and will successively reach gate 2 which is blocked. Once all workers get to 2nd gate then:

  • gate 1. closes
  • gate 2. opens

Workers can now work on jobs and then reach 1st gate again during next loop. The whole cycle starts again. Let’s visualize the whole process:


1st gate 2nd gate
v v
-w1--> | |
--w2-->|
--w3--> |
--w4-->|
-w5--> | |
--w1-->| |
--w2-->|
--w3-->|
--w4-->|
--w5-->| |
| --w1-->|
--w2-->|
--w3--> |
--w4--> |
| --w5-->|
| --w1-->|
--w2-->|
--w3-->|
--w4-->|
| --w5-->|
--w1--> | |
| --w2-->
--w3-->|
--w4--> |
| | --w5-->

Below are two solutions using above idea but their implementations are slightly different. Here is the code to test two proposed ideas:

package mainimport (
"fmt"
"math/rand"
"sync"
"time"
// Set this import spec to point
// to the copy of one of proposed
// solutions.
"path/to/package/barrier"
)
func init() {
rand.Seed(time.Now().Unix())
}
type counter struct {
c int
sync.Mutex
}
func (c *counter) Incr() {
c.Lock()
c.c += 1
c.Unlock()
}
func (c *counter) Get() (res int) {
c.Lock()
res = c.c
c.Unlock()
return
}
func worker(c *counter, br *barrier.Barrier, wg *sync.WaitGroup) {
for i := 0; i < 3; i++ {
br.Before()
c.Incr()
br.After()
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
fmt.Println(c.Get())
}
wg.Done()
}
func main() {
var wg sync.WaitGroup
workers := 3
br := barrier.New(workers)
c := counter{}
for i := 0; i < workers; i++ {
wg.Add(1)
go worker(&c, br, &wg)
}
wg.Wait()
}

Barrier must implement Before and After methods which are 1st and 2nd gates respectively.

Solution #1

Buffered channels with capacity 1 will be needed:

ch := make(chan int, 1)

Gate is implemented by receive from channel followed by sending to it:

<-ch
ch <- 1

If number of unread elements is 1 then gate is open. It’ll pass exactly one worker and then worker will enqueue new element to pass another worker which will put another element… (you see what is going on here).

If there’s no queued elements then gate is closed and worker will block on receiving from channel.

// github.com/mlowicki/barrier
package barrier
import "sync"type Barrier struct {
c int
n int
m sync.Mutex
before chan int
after chan int
}
func New(n int) *Barrier {
b := Barrier{
n: n,
before: make(chan int, 1),
after: make(chan int, 1),
}
// close 1st gate
b.after <- 1
return &b
}
func (b *Barrier) Before() {
b.m.Lock()
b.c += 1
if b.c == b.n {
// close 2nd gate
<-b.after
// open 1st gate
b.before <- 1
}
b.m.Unlock()
<-b.before
b.before <- 1
}
func (b *Barrier) After() {
b.m.Lock()
b.c -= 1
if b.c == 0 {
// close 1st gate
<-b.before
// open 2st gate
b.after <- 1
}
b.m.Unlock()
<-b.after
b.after <- 1
}

Solution #2

This one uses buffered channels with capacity equal to the number of workers — n. Instead of going through each gate one worker by another, now we’ll put into channel exactly n elements just to pass all workers within single loop:

// github.com/mlowicki/barrier2
package barrier
import "sync"type Barrier struct {
c int
n int
m sync.Mutex
before chan int
after chan int
}
func New(n int) *Barrier {
b := Barrier{
n: n,
before: make(chan int, n),
after: make(chan int, n),
}
return &b
}
func (b *Barrier) Before() {
b.m.Lock()
b.c += 1
if b.c == b.n {
// open 2nd gate
for i := 0; i < b.n; i++ {
b.before <- 1
}
}
b.m.Unlock()
<-b.before
}
func (b *Barrier) After() {
b.m.Lock()
b.c -= 1
if b.c == 0 {
// open 1st gate
for i := 0; i < b.n; i++ {
b.after <- 1
}
}
b.m.Unlock()
<-b.after
}

Resources

  • “The Little Book of Semaphores” by Allen B. Downey

If you’ve found alternative solution please share in comments.

👏👏👏 below to help others discover this story. Please follow me here or on Twitter if you want to get updates about new posts or boost work on future stories.

--

--

Michał Łowicki
golangspec

Software engineer at Datadog, previously at Facebook and Opera, never satisfied.