Limiting Goroutines with Semaphore
Go comes with a concurrency model that allows us to do a lot of processes almost at the same time. Depends on what the application is, it can spawn a lot of Goroutines at once thus making things run faster as long as you have a machine/resource to do so.
But, what if things grow big: the business, the application, the amount of data you have to process, so they start to overload and break the system, even other channels that it depends on because the resources are unable to handle them? Should we just throw it away?
Firstly, no. Secondly, you should not let it stop you from making your craft because you can absolutely prevent that.
Then how, you might ask, can we prevent this from happening?
My answer is, you would want to limit the number of goroutines that concurrently run at once.
What is semaphore?
In Go context, semaphore is a way of signalling goroutines that they can run a certain part of code or cannot. Much like a mutex, right? No, there’s a slight difference.
Notice that the goroutines are plural. Mutex allows only one goroutine to run a certain part of code while semaphore allows N. Both are useful to limit access to shared resources. Still, I wouldn’t say that the usage is interchangeable.
There are at least two ways of using semaphore in Go that I currently know. Using a buffered channel or using Go’s x semaphore.Weighted
.
Using a buffered channel
I will not explain what a ‘channel’ is in Go is but I will explain quickly about buffered channels. A buffered channel is a channel with capacity. You can instantiate one with
ch := make(chan struct{}, 10) // 10 as the capacity
If a buffered channel’s capacity is full, any send command ( ch <- data
) to it will block until one is freed by the receive command ( <-ch
).
Here comes the snippet.
func doThing(d interface{}){
// do your thing
}func main() {
data := []interface{}{} // len == 10 poolSize := runtime.NumCPU() // 4 thread CPU
sem := make(chan struct{}, poolSize)
for _, d := range data {
sem <- struct{}{}
go func(d interface{}){
doThing(d)
<-sem
}(d)
}
}
Lets say you have four threaded CPU, there are ten data to process, and the data processing takes a couple of seconds for each data. The line sem := make(chan struct{}, poolSize)
will instantiate channel with capacity as many as the number of the CPU thread. The code will iterate over all the data. On the 5th iteration, the line sem <- struct{}{}
will block, and there will be 4 goroutines running at the same time until any running goroutine execute <-sem
command. Once executed, one goroutine will finish, a room in the channel will be freed, the iteration will continue, and another goroutine will be spawned. This goes on until the iteration is over.
Using semaphore.Weighted
Using the buffered channel is pretty much the ‘hacky’ way while the semaphore
package is what actually intended for the problem. It works pretty much the same as the previous way.
Use the source, Luke.
Now, here’s a snippet.
import (
"context"
"golang.org/x/sync/semaphore"
)func doThing(d interface{}){
// do your thing
}func main() {
data := []interface{}{} // len == 10 poolSize := runtime.NumCPU() // 4 thread CPU
sem := semaphore.NewWeighted(poolSize)
for _, d := range data {
sem.Acquire(context.Background(), 1)
go func(d interface{}){
doThing(d)
sem.Release(1)
}(d)
}
}
The line sem := semaphore.NewWeighted(poolSize)
instantiates the semaphore. The sem.Acquire
acts as a lock and blocks. The sem.Release
unlocks so that the code can continue and run similarly as the previous way.
The sem.Acquire
‘s second parameter allows more flexibility to control the flow of your application and is context.Context
aware, which most Go applications support.
Conclusion
The point I want to make is that there are a couple of ways to limit the number of goroutines running at the same time using semaphore which I just showed you. There is also a way to limit goroutines by simply using for-range loop. Now, which one would you choose?