Exercising Concurrency in Go: 3 Common Pitfalls and Lessons Learned

Eduardo Ferreira
Dec 11, 2019 · 10 min read

Why Concurrency Is Needed

Image for post
Image for post
sometimes you need a helping hand (credit: Renee French)

When you are working with complex distributed systems, you will likely come across the need for concurrent processing. At Mode.net, we deal daily with real-time, fast and resilient software. Building a global private network that dynamically routes packets at millisecond scale wouldn’t be possible without a highly concurrent system. This dynamic routing is based on the state of the network and, while there are many parameters to consider here, our focus is on link metrics. In our context, link metrics can be anything related to the status or current properties of a network link (e.g.: link latency).

Concurrent Probing For Link Metrics

Image for post
Image for post
latency computation (credit: Mode.net)

Sequence Numbers and Resets: A Reordering Situation.

UDP Handshake and Finite State Machine

To properly implement this, we have to define a finite state machine with clear states and transitions. This allow us to properly manage all corner cases for the handshake formation.

Image for post
Image for post

Session IDs are generated by the handshake initiator. A full exchange sequence is as follows:

  1. The sender sends out a SYN (ID) packet.
  2. The receiver stores the received ID, and sends a SYN-ACK (ID).
  3. The sender receives the SYN-ACK (ID) and sends out an ACK (ID). It also starts sending packets starting with sequence number 0.
  4. The receiver checks the last received ID and accepts the ACK (ID) if the ID matches. It also starts accepting packets with sequence number 0.

Handling State Timeouts

  • Link events are either link up or link down updates. This can either initiate a link session or break an existing session.
  • Packet events are control packets (SYN/SYN-ACK/ACK) or just probe responses.
  • Timeout events are the ones triggered after a scheduled timeout expires for the current session state.

The main challenge here is how to handle concurrent timeout expiration and other events. And this is where one can easily fall into the traps of deadlocks and race conditions.

A First Approach

Image for post
Image for post
gophers hacking together (credit: Ashley McNamara)

You can start first by designing a structure which represents our Session and Timeout Handlers.

type Session struct {
State SessionState
Id SessionId
RemoteIp string
}
type TimeoutHandler struct {
callback func(Session)
session Session
duration int
timer *timer.Timer
}

Session identifies the connection session, with the session ID, neighboring link IP, and the current session state.

TimeoutHandler holds the callback function, the session for which it should run, the duration, and a pointer to the scheduled timer.

There is a global map that will store, per neighboring link session, the scheduled timeout handler.

SessionTimeout map[Session]*TimeoutHandler

Registering and cancelling a timeout is achieved by the following methods:

// schedules the timeout callback function.
func (timeout* TimeoutHandler) Register() {
timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time.Second, func() {
timeout.callback(timeout.session)
})
}
func (timeout* TimeoutHandler) Cancel() {
if timeout.timer == nil {
return
}
timeout.timer.Stop()
}

For the timeouts creation and storage, you can use a method like the following:

func CreateTimeoutHandler(callback func(Session), session Session, duration int) *TimeoutHandler {
if sessionTimeout[session] == nil {
sessionTimeout[session] := new(TimeoutHandler)
}

timeout = sessionTimeout[session]
timeout.session = session
timeout.callback = callback
timeout.duration = duration
return timeout
}

Once the timeout handler is created and registered, it runs the callback after duration seconds have elapsed. However, some events will require you to reschedule a timeout handler (as it happens at SYN state — every 3 seconds).

For that, you can have the callback rescheduling a new timeout:

func synCallback(session Session) {
sendSynPacket(session)
// reschedules the same callback.
newTimeout := NewTimeoutHandler(synCallback, session, SYN_TIMEOUT_DURATION)
newTimeout.Register()
sessionTimeout[state] = newTimeout
}

This callback reschedules itself in a new timeout handler and update the global sessionTimeout map.

Data Race and References

Surprisingly, this simple test found a bug in the solution. Cancelling timeouts using the cancel method was just not doing its job. The following order of events would cause a data race condition:

  1. You have one scheduled timeout handler.
  2. Thread 1:
    a) You receive a control packet, and you want to cancel the registered timeout and move on to the next session state. (E.g. received a SYN-ACK after you sent a SYN).
    b) You call timeout.Cancel(), which calls a timer.Stop(). (Note that a Golang timer stop doesn’t prevent an already expired timer from running.)
  3. Thread 2:
    a) Right before that cancel call, the timer has expired, and the callback was about to execute.
    b) The callback is executed, it schedules a new timeout and updates the global map.
  4. Thread 1:
    a) Thread 1 transitions to a new session state and registers a new timeout, updating the global map.

Both threads were updating the timeout map concurrently. The end result is that you failed to cancel the registered timeout, and then you also lost the reference to the rescheduled timeout done by thread 2. This results in a handler that keeps executing and rescheduling for a while, doing unwanted behavior.

When Locking Is Not Enough

func (timeout* TimeoutHandler) Register() {
timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time.Second, func() {
stateLock.Lock()
defer stateLock.Unlock()
timeout.callback(timeout.session)
})
}

The difference now is that the updates in the global map are synchronized, but this doesn’t prevent the callback from running after you call the timeout.Cancel() — This is the case if the scheduled timer expired but didn’t grab the lock yet. You should again lose reference to one of the registered timeouts.

Using Cancellation Channels

It is a slightly different approach. Now you won’t do a recursive re-scheduling through callbacks; instead, you register an infinite loop that waits for cancellation signals or timeout events.

The new Register() spawns a new go thread that runs your callback after a timeout and schedules a new timeout after the previous one has been executed. A cancellation channel is returned to the caller to control when the loop should stop.

func (timeout *TimeoutHandler) Register() chan struct{} {
cancelChan := make(chan struct{})

go func () {
select {
case _ = <- cancelChan:
return
case _ = <- time.AfterFunc(time.Duration(timeout.duration) * time.Second):
func () {
stateLock.Lock()
defer stateLock.Unlock()
timeout.callback(timeout.session)
} ()
}
} ()
return cancelChan
}
func (timeout* TimeoutHandler) Cancel() {
if timeout.cancelChan == nil {
return
}
timeout.cancelChan <- struct{}{}
}

This approach gives you a cancellation channel for each timeout you register. A cancel call sends an empty struct to the channel and trigger the cancellation. However this doesn’t resolve the previous issue; the timeout can expire right before you call cancel over the channel, and before the lock is grabbed by the timeout thread.

The solution here is to check the cancellation channel inside the timeout scope after you grab the lock.

  case _ = <- time.AfterFunc(time.Duration(timeout.duration) * time.Second):
func () {
stateLock.Lock()
defer stateLock.Unlock()

select {
case _ = <- handler.cancelChan:
return
default:
timeout.callback(timeout.session)
}
} ()
}

Finally this guarantees that the callback is only executed after you grab the lock and no cancellation was triggered.

Beware of Deadlocks

Please read again the code above and try to find it yourself. Think of concurrent calls to any of the methods described.

The last problem here is with the cancellation channel itself. We made it an unbuffered channel, which means that sending is a blocking call. Once you call cancel in a timeout handler, you only proceed once that handler is cancelled. The problem here is when you have multiple calls to the same cancellation channel, where a cancel request is only consumed once. And this can easily happen if concurrent events were to cancel the same timeout handler, like a link down or control packet event. This results in a deadlock situation, possibly bringing the application to a halt.

Image for post
Image for post
is anyone listening? (credit: Trevor Forrey)

The solution here is to at least make the channel buffered by one, so sends are not always blocking, and also explicitly make the send non-blocking in case of concurrent calls. This guarantees the cancellation is sent once and won’t block the subsequent cancel calls.

func (timeout* TimeoutHandler) Cancel() {
if timeout.cancelChan == nil {
return
}

select {
case timeout.cancelChan <- struct{}{}:
default:
// can’t send on the channel, someone has already requested the cancellation.
}
}

Conclusion

Updating Shared Data Without Synchronization

Image for post
Image for post
don’t forget to synchronize gophers’ work (credit: Renee French)

Missing Condition Checks

In our example, the timeout handler got a ‘wake up’ call from a timer expiration, but it still needed to check if a cancel signal was sent to it before it could proceed with the callback execution.

Image for post
Image for post
condition checks might be needed if you wake up multiple gophers (credit: Renee French)

Deadlocks

In our case, this happened due to multiple send calls to a non-buffered and blocking channel. This meant that the send call would only return after a receive is done on the same channel. Our timeout thread loop was promptly receiving signals on the cancellation channel; however, after the first signal is received, it would break off the loop and never read from that channel again. The remaining callers are stuck forever. To avoid this situation, you need to carefully think through your code, handle blocking calls with care, and guarantee that thread starvation doesn’t happen. The fix in our example was to make the cancellation calls non-blocking — we didn’t need a blocking call for our needs.

Author: Eduardo Ferreira, Senior Software Engineer @Mode.net

Illustrations taken from:

Modenetworks

Mode transforms physically discrete service provider…

Thanks to Seth Kenlon

Eduardo Ferreira

Written by

Senior Software Engineer @ Mode.net https://www.linkedin.com/in/edufgf/

Modenetworks

Mode transforms physically discrete service provider underlays into efficient, unified networks that combine cloud scale and flexibility with guaranteed QoS and SLAs. Our solution simplifies operations, drives profitability, and accelerates business innovation and growth.

Eduardo Ferreira

Written by

Senior Software Engineer @ Mode.net https://www.linkedin.com/in/edufgf/

Modenetworks

Mode transforms physically discrete service provider underlays into efficient, unified networks that combine cloud scale and flexibility with guaranteed QoS and SLAs. Our solution simplifies operations, drives profitability, and accelerates business innovation and growth.

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store