P1 TLDR: keep the state in the message! and if you don’t know what that means yet, keep reading.
P2: Complex routing with channel and goroutine.
Goroutine and channel are the embraced model to do concurrency in Go. Channel syntax abstracts the explicit use of locks and help developer avoid incorrect data flow.
This post will focus on how to organize them into composable and reusable code.
Simplest: Fire-and-forget (only apply for async ops with no result collecting). Very easy but don’t forget to set your timer (especially on i/o call)
client := &http.Client{
Timeout: 3 * time.Second,
}
go func() {
res, err := client.Post("http://example.com", "application/json", bytes.NewReader([]byte("data")))
...
}()
Great… Wait, we don’t need that closure do we:
// we ignore closing response body
go client.Post("http://example.com", "application/json", bytes.NewReader([]byte("data")))
Much brevity but DON’T do the above in a loop with different arguments, you will need function scope, this is best for writing some scripts to get the job done.
Intermediate: Long running goroutines listen to a channel, they all do the same logic - process data and send back result (you may call them workers pool).
type (
work struct {
query string
}
result struct {
answer string
}
)
var (
workc = make(chan work, 4)
resultc = make(chan result, 4)
errc = make(chan error, 4)
)
// Run result collector in background
go func() {
for {
select {
case r := <-resultc:
processResult(r)
case err := <-errc:
log.Println(err)
default:
}
}
}()
// Run 4 workers in background
for i := 0; i < 4; i++ {
go func() {
for {
select {
case w := <- workc:
r, err := processWork(w)
if err != nil {
errc <- err
continue
}
resultc <- r
default:
}
}()
}
Great… Wait do we really need 3 channels? error in Go is just value, we can combine error and result together and reduce it to 2 channels, in and out!
// Combine result and error
type result struct {
answer string
err error
}
// Updated result collector
go func() {
for {
select {
case r := <-resultc:
if r.err != nil {
log.Println(err)
continue
}
processResult(r)
default:
}
}
}()
// Updated workers
for i := 0; i < 4; i++ {
go func() {
for {
select {
case w := <- workc:
resultc <- processWork(w)
default:
}
}()
}
Another intermediate: Long running goroutines listen on multiple channels. Given scenario that a set of workers listen to 2 or more channels to do different kind of work. This is particular common in a lot of codebase, so I think it’s beneficial to point it out.
type (
work1 struct {
query string
}
work2 struct {
command int
}
result1 struct {
answer string
err error
}
result2 struct {
output int
err error
}
)
var (
workc1 = make(chan work1, 4)
workc2 = make(chan work2, 4)
resultc1 = make(chan result1, 4)
resultc2 = make(chan result2, 4)
)
// Result collector listens to 2 result channels
go func() {
for {
select {
case r := <-resultc1:
if r.err != nil {
log.Println(err)
continue
}
processResultType1(r)
case r := <-resultc2
if r.err != nil {
log.Println(err)
continue
}
processResultType2(r)
default:
}
}
}()
// 4 workers listen to 2 working channels
for i := 0; i < 4; i++ {
go func() {
for {
select {
case w := <- workc1
resultc1 <- processWorkType1(w)
case w := <- workc2
resultc2 <- processWorkType2(w)
default:
}
}()
}
Great… Wait that’s 4 channels already, only for 2 types of work! and quite a pattern of repetitive code, imagine we have to do this with 3, 4 or even 10 types of work (we will need 2x number of channels!)
Channel is just a medium for communication between goroutines. A good medium is a stateless one, it shouldn’t care what it carries. To reduce the number of channels (down to a fixed 2 - in and out!) disregarding how many types of work, we can do this:
type work struct {
typ int
query string
command int
err error
}
type result struct {
typ int
answer string
output int
err error
}
or even better, consider in and out data types are just data!
type (
work1 struct {
query string
}
work2 struct {
command int
}
result1 struct {
answer string
}
result2 struct {
output int
}
message struct {
work1
work2
result1
result2
typ int
err error
}
)
The only notice is that we should increase the buffer if the message carry more types, for above example, we can bump the buffer up to guarantee the same throughput (in ideal scenario):
var (
workc = make(chan message, 8)
resultc = make(chan message, 8)
)
// Updated collector logic
go func() {
for {
select {
case msg := <-resultc:
if msg.err != nil {
log.Println(err)
continue
}
switch msg.typ {
case 1:
processResultType1(msg.result1)
case 2:
processResultType2(msg.result2)
}
default:
}
}
}()
// Updated workers logic
for i:= 0; i < 4; i++ {
go func() {
for {
select {
case msg := <- workc:
switch msg.typ {
case 1:
resultc <- processWorkType1(msg.work1)
case 2:
resultc <- processWorkType2(msg.work2)
}
default:
}
}()
}
For complex message routing, please see P2 here.