PostgreSQL to Google BigQuery with Python — Delta Extraction

Onur Taşhan
Trendyol Tech
Published in
4 min readNov 15, 2019
PostgreSQL to Google Bigquery

Selamlar,

Bu yazımda PostgreSQL’da bulunan bir tabloyu delta extraction yolu ile Google Bigquery’de bir tabloya load ederken izlediğimiz yolu paylaşacağım.

Öncelikle PostgreSQL DB’de price_history diye bir tablomuz olsun. Bu tablodan created_date alanına göre delta alabiliyor olalım.

create table price_history
(
id bigint not null primary key,
original_price numeric(19,2) not null,
selling_price numeric(19,2) not null,
buying_price numeric(19,2) not null
created_date timestamp not null,
);

create index "prices_history_createdDate_index"
on price_history (created_date);

Bu delta extraction için kurduğum model kısaca şöyle olacak;

  • Verileri Bigquery’e taşırken, öncelikle delta verileri PostgreSQL’dan JSON olarak bir dosyaya çıkalım.
  • Oluşan dosyayı Google Storage’da bucket’a yükleyelim.
  • Bucket’dan da Bigquery’e load edelim.
  • Tüm işlemleri de Python kullanarak yapalım.
Image from: https://medium.com/google-cloud/import-json-into-bigquery-with-google-cloud-functions-31facea134bf

O zaman PostgreSQL Table to Json yapabileceğimiz bir python script’i yazalım.

Bu script’de yaptıklarımızı açıklamak gerekirse;

  • 1 olarak işaretlediğim bölümde; psycopg2 kütüphanesini kullanarak PostgreSQL DB’ye bağlanıyoruz.
  • 2 olarak işaretlediğim bölümde; created_date değişkenine önemsiz bir başlangıç tarih değerini atıyoruz.
  • 3 olarak işaretlediğim bölümde; kodun çalışma zamanını alıyoruz, bu zamanı daha sonra oluşturduğumuz dosyanın isminde kullanacağız.
  • 4 olarak işaretlediğim bölümde; kodun oluşturacağı dosyaları(temp ve normal) ve dosyaların dizini belirtiyoruz.
  • 5 olarak işaretlediğim bölümde; last_created_date.txt olarak yaratacağımız ve ilk çalışmasında initial bir tarih içerecek ancak sonradan her delta extraction sonrası aldığımız son created_date değerini içerecek dosyadan created_date değerini alıyoruz.
  • 6 olarak işaretlediğim bölümde; PostgreSQL’un row_to_json ve COPY fonksiyonlarını kullanarak; tablodaki veriyi json olarak okuyacak SQL’i yazıyoruz.
  • 7 olarak işaretlediğim bölümde; yazdığımız SQL’i PorstgreSQL’da çalıştırıp dönen sonucu temp_file’a yazıyoruz. Eğer oluşan dosyada kayıt yok ise oluşan dosyayı silip, script’in çalışmasını durduruyoruz.
  • 8 olarak işaretlediğim bölümde; oluşan temp dosyasındaki son satıra gidip buradaki created_date değerini alıyoruz ve last_created_date.txt dosyasına yazıyoruz.
  • 9 olarak işaretlediğim bölümde; temp_file’ı (xxx.json.temp) normal file(xxx.json) olarak rename ediyoruz.

“postgresql_api.py” python script’imiz düzenli olarak çalıştırdığımız created_date alanına göre delta gelen kayıtları “price_history_YYYYMMDDHHMMSS.json” olarak belirttiğimiz dizinde oluşturmaya başlayacak.

Sıra geldi bu dosyaları Google Cloud Storage’a yükleyip, Bigquery tablosuna yükleme işlemine.

Bu işlem için de bir python script’i yazalım.

Bu script’de yaptıklarımızı açıklamak gerekirse;

  • 1 olarak işaretlediğim bölümde; script’in çalıştığı ortamdaki google-cloud-sdk path’ini, google_cloud_credential.json’ının path’ini set ediyoruz. Ardından da os.system kütüphanesi ile aşağıdaki komutu çalıştırıyoruz.

gcloud auth activate-service-account google_service_account_user@google_cloud_project_name.iam.gserviceaccount.com — key-file=google_credential.json — project=google_cloud_project_name

google_service_account_user = “Sizin google cloud’da oluşturduğunuz service account user’ınız.
google_cloud_project_name = “Sizin google cloud projeniz”
google_credential.json = “Sizin google cloud credential json dosyanız”

  • 2 olarak işaretlediğim bölümde; bir önceki python script’inde oluşturduğumuz dosyaların dizini gösteriyoruz.
  • 3 olarak işaretlediğim bölümde; glob kütüphanesini kullanarak o dizindeki dosyaların path’lerini bir diziye atıyoruz.
  • 4 olarak işaretlediğim bölümde; eğer dizi boş ise script’in çalışmasını durduruyoruz.
  • 5 olarak işaretlediğim bölümde; google-cloud kütüphanesindeki storage modülünü kullanarak hangi bucket’ı kullanacağımızı seçiyoruz. Burada google_storage_bucket olarak yazan yere kendi bucket’ınızı yazmanız gerekiyor.
  • 6 olarak işaretlediğim bölümde; bucket altında prc-public-price_history-TO-extr_prc-price_history adlı klasör altına oluşturduğumuz dosyaları for döngüsünü kullanarak bucket’a yüklüyoruz.
  • 7 olarak işaretlediğim bölümde; bigquery’deki dataset’imizi (tablonun şemasını) belirtiyoruz.
    Dosya formatını olarak json seçiyoruz.
    autodetect=TRUE ile json’daki kolon yapısını baz alarak Bigquery’de tabloyu aynı şekilde oluşturmasını belirtiyoruz.
    Bu şekilde json’da gördüğü kolon yapısına göre bigquery’deki tabloyu eğer mevcut değil ise create edecek, mevcut ise veriyi direk tabloya append edecek.
  • 8 olarak işaretlediğim bölümde; bigquery tablosuna load etmeden önceki tablodaki kayıt sayısını alıyoruz ve bir değişkene yazıyoruz.
  • 9 olarak işaretlediğim bölümde; for loop kullanarak bucket’a attığımız dosyalarımızı bigquery’e load ediyoruz.
    Burada uri değişkeninde bulunan google_storage_bucket_name yazan yere yukarıdaki gibi kendi bucket’ınızı yazmanız gerekiyor.
  • 10 olarak işaretlediğim bölümde; load işlemi sonrası tablodaki kayıt sayısını bir değişkene yazıyoruz.
  • 11 olarak işaretlediğim bölümde; load işlemi sonrası tablodaki kayıt sayısındaki değişimi ekrana yazdırıyoruz.

“gcs_bq_api.py” python script’imiz çalıştığında belirtilen dizindeki .json dosyaları belirtilen bucket’a ve ardından da bigquery tablosuna yükleyecek.

Sonuç olarak iki python script’i ile PostgreSQL’deki tablomuzun delta extraction işlemini tamamlamış olduk.

Bonus delta extraction’da veri kaçırıp kaçırmadığımız kontrol etmek için bir kontrol script’i yazmıştım ondan da bahsedeyim.

Bu script’de yaptıklarımızı açıklamak gerekirse;

  • 1 olarak işaretlediğim bölümde; psycopg2 kütüphanesini kullanarak PostgreSQL DB’ye bağlanıyoruz.
  • 2 olarak işaretlediğim bölümde; script’in çalıştığı ortamdaki google-cloud-sdk path’ini, google_cloud_credential.json’ının path’ini set ediyoruz. Ardından da os.system kütüphanesi ile aşağıdaki komutu çalıştırıyoruz.

gcloud auth activate-service-account google_service_account_user@google_cloud_project_name.iam.gserviceaccount.com — key-file=google_credential.json — project=google_cloud_project_name

google_service_account_user = “Sizin google cloud’da oluşturduğunuz service account user’ınız.
google_cloud_project_name = “Sizin google cloud projeniz”
google_credential.json = “Sizin google cloud credential json dosyanız”

  • 3 olarak işaretlediğim bölümde; last_created_date.txt dosyasından created_date değerini alıyoruz.
  • 4 olarak işaretlediğim bölümde; PostgreSQL’daki tablodaki kayıt sayısına bakıyoruz, bakarken created_date filtresi koyup, 3. adımdaki aldığımız değeri kullanıyoruz.
  • 5 olarak işaretlediğim bölümde; Bigquery’deki kayıt sayısına bakıyoruz, bakarken created_date filtresi koyup, 3. adımdaki aldığımız değeri kullanıyoruz.
  • 6 olarak işaretlediğim bölümde; iki kayıt sayısının eşit olup olmamasına göre kodun hata almasını veya başarılı şekilde sonuçlanmasını sağlıyoruz.

Üç python kodu ile PostgreSQL’dan Bigquery’e delta extraction akışını anlatmış oldum.

Sorularınız olursa sormaktan çekinmeyiniz. İyi kodlamalar :)

Special thanks to Kürşat Topçuoğlu & Murat Özcan. 🙌

--

--