Go 언어의 producer/consumer 패턴

Seungkoo
RIZON Korea
Published in
9 min readApr 6, 2020

서버 프로그램 개발자를 가장 힘들게 하는 것은 무엇일까? 개인적으로 C/C++이나 Java와 같은 언어로 서버를 개발할 경우 가장 어려웠던 것은 deadlock을 발생 시키지 않고 critical section을 lock/unlock해야 한다는 것이었다. 많은 개발자들이 한 프로젝트를 개발할 경우 lock의 순서를 지키지 않아 deadlock이 발생하는 경우가 많은데, 극단적인 경우 몇 년에 한번 씩 deadlock이 발생하는 버그가 생기기도 한다. 이러한 버그의 원인을 파악하여 해결하는 것은 지극히 어려운 작업이다.

Go 언어는 서버 프로그램 개발에 최적화 된 언어로 알려져있다. Go 언어는 goroutine과 channel을 제공하여 효율적이고 쉽게 synchronization이 가능하다. 또한 deadlock 발생을 감지하여 알려 준다. 이러한 Go 언어의 특징은 개발자가 서버 프로그램을 쉽고 빠르게 개발하는 것을 가능하게 한다.

이 포스팅은 Go 언어의 특성을 활용하여 기본적인 서버 프로그램의 구조를 만드는 것을 목표로 한다. One Producer — Multiple Consumer 패턴을 적용하고 context, signal notification 기능을 사용하여 여러 용도에 활용할 수 있는 샘플 코드를 작성한다.

Producer — Consumer 패턴

Producer — Consumer 패턴은 producer가 작업을 생산하고 하나 이상의 consumer가 생산된 작업을 수행하는 패턴을 말한다. Producer와 consumer는 하나 이상의 channel을 공유하여 작업을 주고받게 된다. 아래는 이 패턴을 간단히 구현한 코드이다

var done = make(chan bool)
var tasks = make(chan int)
func produce() {
for i := 1; i <= 1000; i++ {
tasks <- i
}
done <- true
}
func consume() {
for {
msg := <-tasks
fmt.Println(msg)
}
}
func main() {
go produce()
for i := 0; i < 10; i++ {
go consume()
}
<-done
}

위 코드에서 producer는 정수형으로 정의된 task를 1000개 생산하여 tasks라는 채널에 넣는다. 10개의 consumer는 그 채널에서 task를 가져와서 처리한다. done 채널은 main thread와 동기화 하기 위해 사용된다.

Producer는 채널이 비어있지 않으면 채널에 task를 넣을 때 블록된다. 비슷하게 consumer는 채널에서 task를 가져올 때 채널이 비어있으면 블록된다. 따라서 producer, consumer는 종료되지 않고 서로의 동작을 기다리며 동기화 하게 된다.

이처럼 다른 동기화 기능을 사용하지 않고 channel만을 사용하여 여러 goroutine을 동기화할 수 있다는 것이 Go 언어의 장점이라 할 수 있다. 다른 언어에서 queue 등을 사용하여 개발할 경우 queue가 가득 차거나 비어있을 경우를 고려해야 하므로 로직이 복잡해지게 된다.

이 코드는 실제 사용하기에는 보완해야 할 점이 있다. Producer가 일정량의 작업을 생성하고 종료하기 보다는 무한히 작업을 생성하는 경우가 보다 일반적이다. 또한 consumer의 작업이 끝나기 전에 프로그램이 종료할 수 있다. 따라서 프로그램이 종료할 조건을 정의하고 모든 consumer들의 작업이 끝난 후 프로그램이 종료하도록 하는 메카니즘이 필요하다. 이런 요구 사항을 고려하여 샘플 코드를 작성하였다.

producer, consumer 코드

type Consumer struct {
in *chan int
jobs chan int
}
func (c Consumer) Work(wg *sync.WaitGroup) {
defer wg.Done()
for job := range c.jobs {
time.Sleep(50 * time.Millisecond) // Do something
fmt.Printf(“%dth job finished\n”, job)
}
}
func (c Consumer) Consume(ctx context.Context) {
for {
select {
case job := <-*c.in:
c.jobs <- job
case <-ctx.Done():
close(c.jobs)
return
}
}
}
type Producer struct {
in *chan int
}
func (p Producer) Produce() {
task := 1
for {
*p.in <- task
task++
}
}

main 함수

func main() {
const nConsumers = 10
runtime.GOMAXPROCS(runtime.NumCPU())
in := make(chan int, 1)
p := Producer{&in}
c := Consumer{&in, make(chan int, nConsumers)}
go p.Produce()
ctx, cancelFunc := context.WithCancel(context.Background())
go c.Consume(ctx)
wg := &sync.WaitGroup{}
wg.Add(nConsumers)
for i := 0; i < nConsumers; i++ {
go c.Work(wg)
}
termChan := make(chan os.Signal)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
<-termChan
cancelFunc()
wg.Wait()
}

main 함수는 Produce, Consume, Work 3개의 goroutine을 생성한다. Work goroutine은 10개 생성되어 Producer가 생성한 task를 서로 독립적으로 처리한다. main thread는 SIGINT, SIGTERM 시그널을 맞으면 종료한다. 이 때 wg.Wait() 함수를 호출하여 goroutine의 작업이 종료되기를 기다린다.

MAXPROC 설정

모든 CPU 코어에서 goroutine을 실행하기 위해 아래의 코드를 호출한다.

runtime.GOMAXPROCS(runtime.NumCPU())

Wait Group

main thread가 모든 작업이 끝날 때까지 대기하기 하기 위하여 wait group을 사용하였다. wg.Add() 함수로 대기할 goroutine의 갯수를 설정하고 goroutine에서는 wg.Done() 함수를 호출하여 goroutine이 종료되었음을 알린다.

wg := &sync.WaitGroup{} // wait group creation
wg.Add(nConsumers) // set # of goroutines to wait for
...
wg.Wait() // wait util all goroutines finish
// work function
func (c Consumer) Work(wg *sync.WaitGroup) {
defer wg.Done() // decrease # of goroutines to wait for by 1

}

Signal 처리

main thread가 종료하지 않고 SIGINT, SIGTERM을 기다리도록 termChan 채널을 생성한다. 해당 시그널이 오지 않을 동안 <-termChan 코드에서 대기하게 된다.

termChan := make(chan os.Signal)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
<-termChan

Context

goroutine을 사용할 때 주의해야 할 점은 내가 실행한 goroutine은 반드시 종료될 것이란 것을 보장해야 한다는 것이다. context를 사용하면 timeout이 지난 goroutine을 종료하거나 cancelFunc 함수를 호출하여 goroutine을 강제 종료할 수 있다. 샘플에서는 consumer의 jobs 채널을 close하여 goroutine을 종료하도록 구현하였다. 블락 수집기는 producer/consumer 패턴을 구현한 구조로 구현되었다

// main 함수
ctx, cancelFunc := context.WithCancel(context.Background())
...
cancelFunc() // call ctx.Done()
wg.Wait()
// Consumer 모듈
func (c Consumer) Consume(ctx context.Context) {
for {
select {
...
case <-ctx.Done():
close(c.jobs) // close channel to stop Work goroutine
...

Go 언어에서 goroutine을 사용하여 daemon 프로그램을 작성하는 한 예를 설명하였다. 이 샘플코드를 통해 daemon 프로그램의 구현을 이해했으면 하는 바램이다. 샘플 코드는 간단한 설명을 위한 예시일뿐 실제 사용을 위해서는 좀 더 수정이 필요할 것이다.

Reference

https://golang.org/pkg/context/

https://gobyexample.com/waitgroups

https://tpaschalis.github.io/golang-producer-consumer/

https://www.golangprograms.com/illustration-of-producer-consumer-problem-in-golang.html

https://callistaenterprise.se/blogg/teknik/2019/10/05/go-worker-cancellation

--

--