Golang Concurrency Unleashed: Channels & Semaphores Demystified

Vajahat Kareem
5 min readDec 31, 2023

--

Go channels serve as an excellent means to enhance both concurrency and synchronization when utilized effectively. They offer a seamless approach to implementing worker pools, semaphores, queues and many other things. Here, we’ll explore the implementation of semaphores and a worker pool using Go channels.

Semaphores

A Semaphore, by definition, represents a critical section in a program where, at most, a specified number of threads can execute concurrently.

Let’s define a very basic and easy semaphore. sema.go

package main
/*
* The `sema` struct is defining a semaphore. A semaphore is a synchronization
* primitive that limits the number of concurrent operations or access to a
* shared resource. In this case, the `sema` struct has two fields: `ch` which
* is a channel of `struct{}`, and `size` which represents the maximum number
* of semaphores that can be acquired. The `Enter()` method is used to acquire
* a semaphore by adding a value to the `ch` channel, and the `Exit()` method
* is used to release a semaphore by removing a value from the `ch` channel.
*/
type sema struct {
ch chan struct{}
size int
}

const (
/*
* `DefaultSemaCounter` is a constant that represents the default size of
* the semaphore, if the size of the semaphore is not specified.
*/
DefaultSemaCounter = 10
)

// The NewSema function creates a new semaphore with the specified size.
func NewSema(size int) *sema {
if size == 0 {
size = DefaultSemaCounter
}
return &sema{
ch: make(chan struct{}, size),
size: size,
}
}

Now let’s define the 2 main methods for semaphore to work, whic are Enter() and Exit().

/*
* The `Enter()` method of the `sema` struct is used to acquire a semaphore.
* It adds a struct{} value to the `ch` channel, effectively blocking if
* the channel is already full (i.e., if the number of acquired semaphores
* equals the size of the channel). Once a semaphore is acquired, it can be
* used to control access to a shared resource or limit the number of
*concurrent operations.
*/
func (s *sema) Enter() {
s.ch <- struct{}{}
}

/*
* The `Exit()` method of the `sema` struct is used to release a semaphore.
* It removes a struct{} value from the `ch` channel, effectively allowing
* another goroutine to acquire the semaphore. This is typically used when
* a goroutine is finished using a shared resource or has completed its
* concurrent operation.
*/
func (s *sema) Exit() {
<-s.ch
}

This basic implementation serves as a foundation. To enhance its functionality, one can seamlessly incorporate additional features. Let’s integrate the aforementioned semaphore into our illustration. Within the system, there exists a service layer called the iolayer, designed to facilitate the uploading of a list of strings to a remote destination.

Demonstration

Imagine a situation where the upload process demands significant resources, especially due to constraints in the system’s input/output bandwidth. Without limitations on uploads, multiple requests may overwhelm the application’s memory, potentially leading to an OS-triggered shutdown. In such instances, it becomes imperative to deploy a semaphore with a restricted size to control and regulate the upload operations. service.go

package main

import (
"bytes"
"io"
"math/rand"
"time"
)

type logger interface {
Error(...string)
Info(...string)
}

// The iolayer type represents an input/output layer with a semaphore and logger.
// @property sema - The `sema` property is a pointer to a `sema` struct. It is likely used for managing
// concurrency and synchronization in the I/O layer.
// @property {logger} logger - The `logger` property is a variable of type `logger`. It is used to log
// messages or events in the code.
type iolayer struct {
sema *sema
logger logger
rang *rand.Rand
}

func NewIO(size int, logger logger) *iolayer {
return &iolayer{
rang: rand.New(rand.NewSource(time.Now().UnixMilli())),
logger: logger,
sema: NewSema(size),
}
}

// The `Upload` function is a method of the `iolayer` struct. It takes a variadic parameter
// `data` of type `string`.
func (s *iolayer) Upload(data ...string) {
var buff = bytes.NewBuffer([]byte{})
for _, d := range data {
buff.Write([]byte(d))
buff.Write([]byte("\n"))
}
s.logger.Info("begin uploading:", data[0])



s.sema.Enter()
//>>>>>>>>>>>>>>>>begin critical section<<<<<<<<<<<<<<
err := s.upload(buff)
//>>>>>>>>>>>>>>>>end critical section<<<<<<<<<<<<<<<<
s.sema.Exit()



//updadate in database about failure/success
s.logger.Info("end uploading:", data[0])
if err != nil {
s.logger.Error("exception while upload", err.Error())
} else {
s.logger.Info("successfuly file uploaded")
}
}

// The `Upload` function is a method of the `service` struct. It takes an `io.Reader` as an argument,
// which represents a source of data. This function is responsible for uploading the data from the
// `io.Reader` to some destination.
func (s *iolayer) upload(reader io.Reader) error {
// writing the reader data to some destination
ms := time.Millisecond * time.Duration(s.rang.Intn(5000))
time.Sleep(ms)
s.logger.Info(fmt.Sprintf("it took %s to uplodat", ms))
return nil
}

In the provided example, it’s evident that the process of uploading data will establish a highly restricted quantity of connections to the remote source. This limitation occurs because access to critical sections is controlled by a sema object. Consequently, if a multitude of upload requests occur simultaneously, only a limited number of uploads will transpire concurrently. Subsequently, the remaining requests will await the acquisition of a counter from the sema object before proceeding with their upload operation.

➜  medium git:(master) ✗ go run sema/cmd/main.go
begin uploading: file-9
begin uploading: file-2
begin uploading: file-3
begin uploading: file-10
begin uploading: file-1
it took 21ms to upload the file: file-3
end uploading: file-3
begin uploading: file-4
it took 499ms to upload the file: file-2
end uploading: file-2
begin uploading: file-5
it took 981ms to upload the file: file-1
end uploading: file-1
begin uploading: file-6
it took 1.342s to upload the file: file-10
end uploading: file-10
begin uploading: file-7
it took 964ms to upload the file: file-6
end uploading: file-6
begin uploading: file-8
it took 1.925s to upload the file: file-4
end uploading: file-4
it took 2.365s to upload the file: file-9
end uploading: file-9
it took 2.59s to upload the file: file-7
end uploading: file-7
it took 4.213s to upload the file: file-5
end uploading: file-5
it took 3.895s to upload the file: file-8
end uploading: file-8
➜ medium git:(master) ✗

Building upon the results from the output files file-9, file-2, file-3, file-10 and file-1 sequentially accessed the critical section for file upload. Specifically, file-3 completed its upload before file-4 entered the critical section, and a similar sequence occurred with file-2 and file-5. This observation unmistakably demonstrates the effective functioning of the semaphore as expected.

Concluss

Please find source https://github.com/vkstack/techblogs/blob/master/sema/cmd/main.go

Post your suggestion in comments. Let’s talk!

--

--