Scheduling Streaming Insert ke BigQuery Menggunakan Serverless Option

Thursysatriani
Google Cloud Indonesia
8 min readJan 11, 2023

Apache Airflow biasanya menjadi pilihan untuk melakukan eksekusi script secara berkala (scheduled). Tapi untuk bisa menggunakannya kita harus hosting Airflow ini di sebuah dedicated on-premise machine atau Cloud-based machine. Biaya yang dikeluarkan untuk ini akan bergantung dengan berapa lama machine tersebut digunakan, yang dalam kasus ini sepanjang hari dan juga jika auto scaling dinyalakan maka berapa banyak instances yang digunakan untuk mengeksekusi DAG tersebut.

Lantas apakah kita bisa membuat arsitektur yang ekuivalen untuk melakukan hal yang sama menggunakan serverless services yang ada di Google Cloud Platform. Serverless service berarti anda tidak perlu mengurusi infrastruktur dan runtime untuk service tersebut karena sudah di-handle oleh Cloud platform. Selain itu kelebihan dari serverless service sendiri adalah auto-scale dan cost friendly. Auto-scale memiliki pengertian, service dapat menaikkan dan menurunkan resources secara mandiri tergantung banyaknya penggunaan. Serverless service juga cost-friendly atau ramah di kantong, karena bisa scale down hingga 0 instances, yang berarti kita hanya perlu membayar saat function digunakan.

Arsitektur Scheduling Streaming insert ke BigQuery menggunakan service serverless di GCP

Secara singkat kita akan membuat Cloud Scheduler job yang secara berkala mengaktifkan Pub/Sub trigger. Hal ini akan memicu Cloud Function yang menjadi Subscriber dari Pub/Sub topic tersebut ter-trigger. Function yang dieksekusi tersebut berisikan script untuk mengkonsumsi API dan menyimpan datanya ke sebuah BigQuery Table.Dalam kasus ini, kita akan mencoba mengambil data dari API Bloomberg Market and Financial News dan sehari sekali menyimpan data tersebut ke sebuah tabel di BigQuery.

Bloomberg Market and Financial News in Rapid API

Service yang dibutuhkan adalah

  • Cloud Scheduler
  • Pub/Sub
  • Cloud Function
  • BigQuery
  • Cloud IAM (untuk membuat Service Account dan API key)

Berikut adalah tahapan pengerjaan untuk membangun arsitektur diatas

  1. Enable API
  2. Menyiapkan service account
  3. Membuat Pub/Sub topic
  4. Setup job di Cloud Scheduler
  5. Membuat BigQuery table
  6. Men-deploy Function sederhana di Cloud Function
  7. Trigger Job di Cloud Scheduler

Enable API

Sebelum menggunakan service terkait, langkah pertama yang harus dilakukan adalah enable API. Hal ini bisa dilakukan secara otomatis saat service terkait diakses jika anda memiliki permission untuk enable API.

Menyiapkan Service account

Langkah selanjutnya adalah menyiapkan service account. Apabila anda ingin mengakses GCP project, biasanya authorization dilakukan menggunakan user account. Untuk aplikasi atau script, dapat mengakses resources tersebut dengan menggunakan service account. Service account berguna apabila anda melakukan development di-local computer anda. Service account yang sama juga dapat digunakan oleh function yang akan di-deploy di Cloud Function nanti.

Opsi Service account di Navigation Menu

Untuk membuat service account kunjungi IAM & Admin dan klik Service Accounts. Pada bagian atas klik CREATE SERVICE ACCOUNT. Isi Service account detail dengan nama yang anda inginkan, lalu click CREATE AND CONTINUE.

Mengisi detail dari Service Account

Berikan specific role untuk service account, jika ingin melakukan hal lebih selain menyimpan data di BigQuery, role dapat ditambahkan atau memilih role yang lebih coarse seperti EDITOR. Namun pemberian role haruslah sespesifik mungkin, kali ini kita menambahkan basic roles seperti BigQuery Data Editor dan BigQuery Job User. lalu klik DONE.

Memberikan Role untuk Service Account

Setelah service account selesai dibuat, tambahkan API key dengan klik Actions dan pilih Manage keys.

Memilih Manage Keys untuk menambahkan Key

Pilih ADD KEY dan klik Create new key. Download private key untuk service account tersebut dalam bentuk JSON jika ingin menggunakan untuk development di-local computer dan click CREATE.

Pilih Create new key pada menu ADD KEY
Pilih JSON sebagai tipe Key

Membuat Pub/Sub Topic

Setelah selesai membuat service account, kita bisa membuat Pub/Sub topic dengan cara pilih Pub/Sub di navigation menu dan klik Topics. Cloud Pub/Sub merupakan messaging service yang asynchronous dan scalable yang bermanfaat untuk decoupling antara service yang memproduksi message dan yang menerimanya.

Pilihan menu Pub/Sub pada Navigation Menu

Seperti namanya Pub/Sub memiliki konsep publisher dan subscriber dari sebuah topic. Kali ini kita akan buat sebuah topik dan sekaligus menambahkan subscription dari topic tersebut yang nantinya akan digunakan oleh Cloud Function. Klik CREATE TOPIC, pop-up window akan muncul dan namai topic sesuai keinginan, jangan lupa tandai Add a default subscription.

Membuat topic baru

Setup Job di Cloud Scheduler

Langkah selanjutnya setelah membuat topic adalah mengatur schedule. Untuk itu kita dapat menggunakan Cloud Scheduler. Cloud Scheduler adalah cron job scheduler yang berstandar enterprise. Infrastrukturnya Cloud Scheduler di-manage secara menyeluruh oleh Google. Kita hanya perlu membuat job untuk membuat cron job schedule.

Untuk membuat job pada Cloud Scheduler, klik CREATE JOB. Beri nama job sesuai keinginan, pilih region dan isi frequency menggunakan format unix-cron string, dan jangan lupa pilih timezone dari Frequency yang telah ditentukan. Klik CONTINUE.

Menjadwalkan cron schedule menggunakan unix-cron format

Pilih Pub/Sub sebagai Target type, dan pilih Pub/Sub topic yang kita buat sebelumnya. Messaging body bisa diisi dengan kata yang akan nanti diterima oleh Subscriber dari Pub/Sub Topic ini. Untuk kali ini saya memilih kata “Invoke”. Setelah selesai klik CREATE.

Memilih Pub/Sub topic, dan message yang dikirimkan

Create BigQuery Table

Tahapan selanjutnya adalah membuat BigQuery table yang akan menjadi drain dari arsitektur yang telah didefinisikan tadi. Untuk membuat tabel, kita diharuskan membuat dataset terlebih dahulu. Click icon di samping project dan pilih Create dataset. Isi Dataset ID dan Data location sesuai dengan kebutuhan, klik CREATE DATASET.

Menu Create Dataset di BigQuery
Mengisi informasi untuk membuat dataset baru

Klik icon disamping dataset dan pilih opsi Create table. Pada bagian Source, pilih Empty table. Untuk Destination isi nama tabel yang diinginkan pada bagian Table.

Menu Create table di BigQuery
Mengisi informasi untuk membuat table baru di sebuah dataset

Pada bagian Schema, klik toggle Edit as text, dan isi sesuai schema yang dibutuhkan. Pada kasus ini schema pada untuk tabel yang dibutuhkan adalah:

stock:STRING,   
Current_P_E_Ratio__ttm_:FLOAT,
Estimated_P_E:FLOAT,
Relative_P_E:FLOAT,
Earnings_Per_Share__ttm_:FLOAT,
Est__EPS:FLOAT,
Est__PEG_Ratio:FLOAT,
Market_Cap__M_:FLOAT,
Shares_Outstanding__M_:FLOAT,
Price_Book__mrq_:FLOAT,
Price_Sales__ttm_:FLOAT,
Dividend_Indicated_Gross_Yield:FLOAT,
Next_Earnings_Announcement:DATE,
Last_Dividend_Reported:FLOAT,
_5Y_Net_Dividend_Growth:FLOAT,
Average_Volume__30_day_:INTEGER,
update_at:TIMESTAMP

Schema berisikan nama-nama column dan tipe data dari setiap column tersebut. Klik CREATE TABLE.

Men-deploy Function sederhana di Cloud Function

Dalam artikel ini kita akan menggunakan Cloud Function 1st gen. Cloud Function memungkinkan kita mendeploy code yang kita miliki tanpa harus me-manage server atau container, dengan kata lain Cloud Function merupakan function as a service (FaaS).

Klik CREATE FUNCTION untuk membuat function baru. Pada bagian Basics pilih 1st gen sebagai Environment. Isi nama function dan juga pilih Region yang diinginkan. Pada bagian Trigger, pilih Cloud Pub/Sub sebagai Trigger type dan pilih topic yang telah kita buat sebelumnya. Klik SAVE.

Mengisi Informasi untuk Function yang akan di-deploy berikut dengan Pub/Sub Topic yang dijadikan trigger untuk mulai mengeksekusi function.

Pada bagian Runtime, build, connections and security settings pilih memory yang ingin dialokasikan untuk function ini pada Memory allocated yang memiliki pilihan 128 MB hingga 8 GB. Set Minimum dan Maximum number of instances sesuai kebutuhan. Pilih service account yang kita buat tadi sebagai Runtime service account. Apabila function membutuhkan environment variables, dapat ditambahkan pada Runtime environment variables. Disini kita menambahkan api_key sebagai environment variables. Klik NEXT.

Set runtime, autoscaling, dan Service Account
Menambahkan API key

Pada bagian Code, pilih Python 3.9 sebagai Runtime. Upload code atau pilih Inline Editor pada bagian Source code. File main.py dan requirements.txt otomatis akan ditambahakan apabila memilih Inline Editor. Tambahkan library yang dibutuhkan ke requirement.txt dan code utama ke main.py.

Menambahakan package yang dibutuhkan pada requirement.txt

Berikut adalah snapshot code yang digunakan untuk streaming insert ke BigQuery. Untuk lebih lengkapnya silahkan kunjungi Github berikut. Setelah selesai klik DEPLOY.

Code snapshot untuk streaming insert ke BigQuery table

Apabila anda ingin menggunakan Cloud Function 2nd gen , bisa menambahkan Eventarc trigger selain HTTP trigger.

Mengisi Eventarc Trigger apabila ingin menggunakan Cloud Function 2nd gen bukan Cloud Function 1st gen.

Trigger Job di Cloud Scheduler

Untuk memulai mengeksekusi script, kembali ke Cloud Scheduler dan pilih Force a job run atas cron schedule yang kita buat sebelumnya.

Menu Force a job run untuk melakukan invocation pertama kali

Pastikan Function bekerja dengan benar dan selesai dengan status “ok”. Hal ini bisa dilakukan dengan membuka Logging dengan View logs, atau klik function dan buka Tab LOGS.

Menu Logs untuk melihat Logs dari Function yang di-deploy
Logs function dekesekusi dengan status ok

Data yang tersimpan dapat dilihat pada BigQuery table yang sebelumnya telah dibuat.

Hasil streaming insert di BigQuery Table

Hal yang harus diperhatikan: Bisa jadi ini bukanlah arsitektur yang tepat untuk anda!

  1. Waktu yang dibutuhkan untuk eksekusi script dari invocation hingga selesai maksimum 540 sec (9 menit) jika eksekusi yang dibutuhkan lebih dari itu, function tersebut akan berhenti dengan status timeout.
  2. Streaming insert max: 10.000 rows per insert.
  3. Buffer 90 menit untuk streaming insert data, dengan kata lain DELETE dan UPDATE baru bisa dilakukan 90 menit setelah data di insert ke BigQuery Table.
  4. Rate limit dari API perlu diperhatikan karena apabila violate rate limit akan menyebabkan function menjadi error.
  5. Resources dari Cloud Function seperti memory allocated, batas atas dan bawah autoscalling, dapat dinaikkan atau diturunkan berdasarkan informasi yang tersedia di tab METRICS pada Function yang di deploy.
Melihat penggunaan resources untuk mengoptimalkan runtime setting pada Cloud Function.

Kesimpulan

Arsitektur yang digunakan pada kasus ini tentu saja memiliki keterbatasan seperti timeout yang dimiliki function maksimum 9 menit, namun arsitektur seperti ini juga membuktikan bahwa serverless services di Google Cloud Platform dapat digunakan untuk scheduling script execution seperti layaknya Apache Airflow. Ini dapat dijadikan salah satu opsi untuk membangun arsitektur yang auto-scale, cost friendly, dan low maintenance.

--

--