Routing with Channel and Goroutine (P2)

Khoa Pham
4 min readJul 30, 2019

--

For more simple scenarios, please see P1.

From part 1 of this post, we know we can simplify the code greatly with a generic message carrying different types (using type embedding). With that, message will be passed from one goroutine to another, predefined, which also means those goroutines are tightly coupled, making some message passing patterns (bouncing, parallel, aggregate, fan-out, etc.) more verbose, more code.

So can we go further and abstract away the routing part? Will the code be more manageable?

Let’s start with same message structure:

type message struct {
// embed your data types here
}

To make this work, a same set of goroutines (or actor, I use the term actor from now on, but take it slightly, very loose meaning from the original) will need to tell where the message should go:

type actor func(message) (dst uint64, messages []message, err error)                                                                                                                                                                                                                                                                             

If you wonder, dst is just a bitmask (easier to do routing) pointing to other actor(s), more on this later.

Each actor need a channel to communicate, one way to tight them together (each actor and its correspondent channel can be addressed with the same unsigned int “id”):

var actors = make(map[uint64]actor)
var chans = make(map[uint64]chan msg)

To register a new actor:

// locking are left out for brevity
func AddActor(id uint64, fn actor) {
if id == 0 || (id&(id-1) != 0) {
panic("id must be power of 2 value")
}
if len(actors) >= 63 {
panic("too many actors")
}
actors[id] = fn
}

Now we got 3 key components: actor, message and channel. It’s time to implement the routing logic:

n := runtime.NumCPU()
if n < 4 {
n = 4
}
// create the channels
for id := range actors {
chans[id] = make(chan msg, n)
}
for id, fn := range actors {
for w := 0; w < n; w++ {
go func(rcv <-chan msg, fn actor) {
for {
select {
default:
case m := <-rcv:
mask, messages, err := fn(m)
for flag := range actors {
if mask&flag != 0 {
for _, each := messages {
chans[flag] <- each
}
}
}
}
}
}(chans[id], fn)
}
}

Essentially for each actor registered, we spin up a couple of goroutines (min. 4). Each actor (actual logic) will be used to process incoming message, they will also let us know where to route the post-processed messages.

Let’s look at a typical use case: We want to parse a raw event into a message, process that message, log and notify the result. We can go with this model of 4 actors (a bit overkill but to make the point):

  • 1 actor as a producer
  • 1 actor as a processor
  • 1 actor as a logger
  • 1 actor as a notifier

A visual flow of the above:

G1-G16: 16 goroutines; M1: Raw event; M1': Parsed message
// identify them
const (
producer uint64 = 1 << iota
processor
logger
notifier
)
func produceActor(m message) (uint64, []messages, error) {
// logic here
return processor, []message{message{}}, nil
}
func processActor(m message) (uint64, []messages, error) {
// logic here
return logger|notifier, []message{message{}}, nil
}
func logActor(m message) (uint64, []messages, error) {
log.Println(m)
return logger, nil, nil
}
func notifyActor(m message) (uint64, []messages, error) {
// logic here
return notifier, nil, nil
}
// register
AddActor(producer, produceActor)
AddActor(processor, processActor)
AddActor(logger, logActor)
AddActor(notifier, notifyActor)
// trigger
chan[producer] <- message{}

Yes, that’s all it takes to achieve the depicted flow. As you can see with this approach, the actual logic are prevalent, all of the code for channel coordination are abstracted away. Actor will only care for its logic and who to delegate the work when done.

The key for routing is the bitmask of actors, some more examples:

Bounce from processor back to producer:

func processActor(m message) (uint64, []messages, error) {
if false {
return producer, []message{m}, fmt.Errorf("bounced!")
}
return logger|notifier, []message{message{}}, nil
}

Wire producer to logger (i.e. logging raw event):

func produceActor(m message) (uint64, []messages, error) {
return processor|logger, []message{message{}}, nil
}

As Go already have necessary primitives, with the right dose of abstraction, we can achieve very complicated message passing model with a manageable codebase.

--

--