Go’da Graceful Shutdown

demironW47
GoTurkiye
Published in
7 min readJan 7, 2024

Herkese merhabalar, bu yazıda “Graceful Shutdown” konsepti, çözdüğü sorunlar ve Go dilinde uygulamasını ele alacağız.

Çalışmakta olan bir yazılımı kapatmaya çalıştığımızda arka planda yaptığı işlemlere ne olur?
Projenize yeni bir sürüm çıkacağınız zaman çalışan sürüm nasıl kapatılır? Eğer bu kapatma işlemi Graceful Shutdown ise var olan işlemlerin bitmesi beklenir daha sonra yazılım kapatılır. Peki sizin projelerinizde durum nedir?

DALL-E 3

[ İçindekiler ]

  • Graceful Shutdown Nedir ve Ne İçin Kullanılır?
  • Go dilinde nasıl implement edilir?
  • Goroutine’leri nasıl kapatabiliriz?

Graceful Shutdown Nedir ve Ne İçin Kullanılır?

Çalışmakta olan bir yazılımı kapatmak istediğimizde arka planda yürütülen işlemlerin kaderini belirlememiz gerekir, eğer var olan işlemlerin tamamlanmasını ve daha sonra programın sonlandırılmasını istiyorsak buna graceful shutdown (Temiz Kapatma) denir. Yürütülmekte olan işlemlerin bitmesini beklemek bizim veri kaybının önüne geçmemize yardımcı oluyor. Örnek olarak bir e-ticaret uygulaması düşünelim bir ürünü satın almak için gereken son isteği gönderiyoruz. Bu isteği alan uygulama sunucusu

  1. orders tablosuna kayıt
  2. ürün stok durumunu güncelleme
  3. satıcıya sipariş ile ilgili bildirim
  4. sipariş alındığına dair müşteriye bilgilendirme (sms / mail)

işlemlerini yapıyor olsun. Eğer request uygulama sunucusuna ulaştıktan sonra program graceful shutdown olmayan bir şekilde kapatılmak istenirse; program hangi aşamada ise orada kırılacaktır, örneğin birinci aşamayı tamamlamış olsun bu durumda orders tablosunda bir kayıt olacak ama stok güncellenmeyecek, satıcı sipariş ile alakalı olarak bilgilendirilmeyecek ve müşteri siparişin alındığına dair bir bilgilendirme almayacak. Ancak bu kapatma işlemi graceful shutdown olarak yapılırsa; o andan itibaren uygulama sunucusu yeni gelen istekleri kabul etmez ve elindeki işlemin tamamlanmasını bekler ve daha sonra programı kapatır. Böylelikle ürünün satın alma süreci tamamlanmış olur.

Go’da Nasıl İpmelement Edilir?

Yukarıda vermiş olduğum örneği parça parça kodlayarak bunu açıklamaya çalışacağım. Öncelikle yukarıdaki işlemleri gerçekleştirecek bir endpoint yazmamız gerekiyor.

package main

import (
"log"
"net/http"
"time"
)

func main() {

http.HandleFunc("/buyTheProduct", buyTheProduct)

srv := http.Server{
Addr: ":8080",
Handler: nil,
}

err := srv.ListenAndServe()
if err != nil {
log.Fatal(err)
}

}

func buyTheProduct(w http.ResponseWriter, r *http.Request) {

// insert orders
saveOrder()

// update unit stock
updateStock()

// notify to seller

// notify to customer

log.Println("the payoff successfully.")
}

func saveOrder() {
time.Sleep(2 * time.Second)
log.Println("Order saved.")
}

func updateStock() {
time.Sleep(2 * time.Second)
log.Println("Stock updated.")
}

kodumuz bu haliyle herhangi bir graceful shutdown işlemi uygulamıyor. Bu kodu çalıştırdığımızda ve ilgili endpointe istekte bulunduktan sonra projemizi kapattığımızda aşağıdaki gibi bir sonuçla karşılaşırız.

without graceful shutdown

Order save işlemi başarılı olmuş ancak stok update edilememiş, bu istenen bir durum değil bunu düzeltmek için;

package main

import (
"context"
"errors"
"log"
"net/http"
"os/signal"
"syscall"
"time"
)

func main() {

http.HandleFunc("/buyTheProduct", buyTheProduct)

srv := http.Server{
Addr: ":8080",
Handler: nil,
}

ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)

defer stop()

go func() {
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatal(err)
}
}()

<-ctx.Done()
log.Println("got interruption signal.")
if err := srv.Shutdown(context.TODO()); err != nil {
log.Printf("server shutdown returned an err: %v\n", err)
}

}

Kodumuzda göze çarpan ilk farklılık NotifyContext() isimli bir metodun çağırılması. Bu metot işletim sistemi tarafından programımıza iletilen sinyalleri dinlememize yardımcı oluyor, SIGINT ve SIGTERM sinyallerini dinliyoruz. Bu sinyaller aslında terminal üzerinden çalışan bir programı sonlandırmak için kullandığımız komutların sinyalizasyon karşılıklarıdır. SIGINT sinyali Ctrl + C ifadesine karşılık gelirken SIGTERM sinyali kill <id> ifadesine karşılık gelir. Şu anda işletim sistemi bize programımız kapatılmak istendiğinde bir sinyal gönderecek ve biz bu durumu handle edebiliyoruz. Bu metot iki tane parametre return ediyor, ilk parametremiz bir context diğeri ise işletim sisteminden gelen sinyalleri dinleme işlemini sonlandırmak için kullanabileceğimiz bir metot. Hemen alt satırda defer keywordu ile beraber bu stop metodunu çağırdık.

Kodun devamında http sunucusunu başlatmak için kullandığımız ListenAndServe() metodunu bir goroutine içerisinde çağırdık. Daha sonra context’in sonlanması halinde yani SIGTERM yada SIGINT sinyali gelirse Shutdown() isimli metodu çağırıyoruz. Standard Library içerisindeki net/http paketini kullanmıştık ve bu paket içerisinde http sunucumuzu shutdown etmek için gereken metot bulunuyor, iyi ki varsın standard lib. Bu metot çağırıldıktan sonra yeni gelen istekleri kabul etmiyor ve var olan işlemlerin tamamlanması bekliyor, tüm işlemler bitince sunucuyu kapatıyor. Bu senaryoda sunucuyu başlatıp ilgili endpoint’e istek atar ve işlem bitmeden sunucuyu kapatmaya çalışırsak aşağıdaki gibi bir sonuç elde ederiz.

with graceful shutdown

Order save işlemi başarılı oldu daha sonra sunucuyu kapatmak için SIGINT sinyali gönderildi (terminalde Ctrl + C yaptım) ancak sunucu çalışmaya devam etti, stoku güncelledi ve ödeme başarılı bir şekilde tamamlandı daha sonra sunucuyu kapattı. Böylelikle veri kaybı yaşanmamış oldu. Benim senaryomda odaklandığım kısım http sunucusuydu şu aşamada veri kaybının önüne geçtik bunu daha iyi hale getirmek için projede kullanılan Database bağlantıları gibi dış kaynaklarla olan bağlantıların da serbest bırakılması çok daha iyi bir kod örneği olacaktır. Go’da bunu yapabilmek için kullandığınız sql paketinin Close metodunu http sunucusu kapattıktan sonra çağırabilirsiniz. Standard kütüphanedeki database/sql paketi içerisinde Close() metodu bulunmakta. Eğer Gorm’u kullanıyorsanız, gormdan elde ettiğiniz *gorm.DB’yi *sql.DB’ye çevirebilirsiniz ve bunun üzerinden Close() metodunu çağırabilirsiniz.

Goroutine’leri Nasıl Kapatabiliriz?

İlk olarak bir endpoint oluşturup bu endpoint üzerinden Kafka’daki mesajları çekip bunları işleyip commit edeceğiz, bu işlemin concurrent çalışmasını istediğim için go komutunu kullanarak start edeceğim.

import (
"context"
"errors"
"log"
"net/http"
"os/signal"
"syscall"
"time"

"github.com/segmentio/kafka-go"
)

func main() {

http.HandleFunc("/buyTheProduct", buyTheProduct)
http.HandleFunc("/consume", startConsume)

srv := http.Server{
Addr: ":8080",
Handler: nil,
}

ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)

defer stop()

go func() {
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatal(err)
}
}()

<-ctx.Done()
log.Println("got interruption signal.")
if err := srv.Shutdown(context.TODO()); err != nil {
log.Printf("server shutdown returned an err: %v\n", err)
}

}

func startConsume(w http.ResponseWriter, r *http.Request) {

log.Println("consumer is started.")
go Consume()

}

func Consume() {

reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "gracefulShutdown",
GroupID: "group-1",
StartOffset: kafka.FirstOffset,
})

defer reader.Close()

for {
m, err := reader.FetchMessage(context.Background())
if err != nil {
log.Println("read error: ", err)
}

log.Printf("message: %v ", string(m.Value))
time.Sleep(2 * time.Second)

err = reader.CommitMessages(context.Background(), m)
if err != nil {
log.Println("commit error: ", err)
} else {
log.Println("commit successfully : ", string(m.Value))
}
}

}

Yukarıdaki koda baktığımızda /consume isimli bir endpoint’e istek geldiğinde Consume() metodunu concurrent bir şekilde çalıştırıyor. Bu metot ile for döngüsü kullanılarak Kafka’da bulunan mesajları alıp ekrana yazıyoruz ve daha sonra mesajı commit ediyoruz. Oluşturduğumuz http sunucusunu daha öncesinde de olduğu gibi graceful shutdown yöntemi ile kapatıyoruz, kodumuz çalıştırdığımızda aşağıdaki gibi bir çıktı veriyor.

Kodumuz çalıştı ve ardından /consume endpointine istek geldi, sırasıyla 17, 20 ve 23 numaralı mesajları okuyup commit etti ancak 10 numaralı mesajı okuduktan sonra bir kapatma isteği geldi ama bu mesajın commit edilmediğini görüyoruz. Biz her ne kadar http sunucusunu graceful shutdown etsek de bu kod bloğu concurrent çalıştığı için http sunucusunun Shutdown() metodu bunu yakalayamıyor bunun için kendimiz bir çözüm üretmemiz gerekecek.

package main

import (
"context"
"errors"
"log"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/segmentio/kafka-go"
)

var shutdownCh chan struct{}
var wg sync.WaitGroup

func main() {

shutdownCh = make(chan struct{})

http.HandleFunc("/buyTheProduct", buyTheProduct)
http.HandleFunc("/consume", startConsume)

srv := http.Server{
Addr: ":8080",
Handler: nil,
}

signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)

go func() {
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatal(err)
}
}()

select {
case sig := <-signalCh:
log.Printf("Received signal: %v. Shutting down...", sig)
close(shutdownCh)
wg.Wait()
if err := srv.Shutdown(context.Background()); err != nil {
log.Printf("Server shutdown returned an error: %v\n", err)
}
}

}

func startConsume(w http.ResponseWriter, r *http.Request) {

log.Println("Consumer is started.")
wg.Add(1)
go Consume()

}

func Consume() {

defer wg.Done()

reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "gracefulShutdown",
GroupID: "group-1",
StartOffset: kafka.FirstOffset,
})

for {
select {
case <-shutdownCh:
log.Println("kafka consumer closing.")
reader.Close()
return

default:
m, err := reader.FetchMessage(context.Background())
if err != nil {
log.Println("read error: ", err)
}

log.Printf("message: %v ", string(m.Value))
time.Sleep(2 * time.Second)

err = reader.CommitMessages(context.Background(), m)
if err != nil {
log.Println("commit error: ", err)
} else {
log.Println("commit successfully : ", string(m.Value))
}
}

}

}

ShutdownCh isminde bir channel ve WaitGroup oluşturduk. İşletim sisteminden gelen SIGINT ve SIGTERM sinyallerini signalCh’ye gönderdik şayet bir kapatma isteği gelirse, ShutdownCh kanalını kapatır. WaitGroup’ların bitmesini bekler ve ardından http sunucusunu kapatır.

Bizim Kafka consumer’ın kapanmasını yöneten kodumuz Consume() metodu içerisindeki shutdownCh kanalına gelecek olan kapatma sinyalidir. Kapatma sinyali geldiğinde reader.Close() metodu çağırılır ve elimizdeki mesajları işleyip commit edilinceye kadar bekler, tabi bu esnada yeni bir mesaj alınmaz ve ardından Kafka consumer’ı kapatır. Consume metodunun ilk satırındaki defer wg.Done() ifadesi ise Consume() metodunun en sonunda çağırılır. Böyelikle çalışan bir goroutine kalmadığı bilgisi main metodu içerisindeki case ifadesinde bulunan wg.Wait() ‘e iletilir ve kod devam eder. Önce Kafka daha sonra http sunucusu kapatılır. Kodumuzu test ettiğimizde aşağıdaki gibi bir çıktı alıyoruz.

Http sunucumuz çalışmaya başlayınca /consume endpointine istek geliyor 13. mesajı işleyip commit ediyor, 16. mesajı aldığında bir kapatma sinyali gönderiliyor ancak elindeki 16. mesaj ile işimiz henüz bitmediği için sunucuyu kapatmıyor bunun yerine işlemin bitmesini bekliyor ve işlem tamamlanınca önce Kafka daha sonra da http sunucusu kapatılıyor.

Eğer kodları kendi local ortamınızda çalıştırmak isterseniz aşağıdaki repodan erişebilirsiniz.

May the code be with you…

--

--