How we implemented ETL using Kubernetes CronJobs

Businesses have relied on the ETL process for many years to get a consolidated view of the data that drives better business decisions. Today, this method of integrating data from multiple systems and sources is a core component of an organisation’s data integration toolbox. In this article, we are going to take a look at what is the Singer specification for ETL and how to run ETL jobs using Kubernetes CronJobs.

ETL refers to the three steps

  • Extract
  • Transform
  • Load

It is used to extract data from multiple data sources (e.g: Facebook Insights, Google Search Console data) to often build a data warehouse. During this process, data is extracted from a source system, transformed into a format that can be analyzed, and loaded into a destination.

At Nine, we rely on analytics to guide us in making better business decisions. We need an extensible and robust platform to aggregate analytics data from our third-party service providers (such as Apple News, Facebook, Google, and Omny).

We came up with an ETL implementation conforming to the Singer specification and based on the use of Kubernetes CronJobs.

Singer Specification

Before jumping into the details of the ETL implementation, let's take a look at the Singer specification,

Singer describes how data extraction scripts — called “taps” — and data loading scripts — called “targets” — should communicate, allowing them to be used in any combination to move data from any source to any destination. Send data between databases, web APIs, files, queues, and just about anything else you can think of.

Taps

Taps extract data from any source and write it to a standard stream in a JSON-based format. It is a small program which can be written in any language that outputs data to stdout according to the Singer spec.

Targets

Targets consume data from taps and do something with it, like load it into a file, API or database. It is a program which reads data from stdout(written by the tap) and processes them in some way.

Writing a Tap

Information about how to write a tap following the Singer specification is very well documented here.

Following is a simple “Hello, world” tap written in bash, taken from the documentation.

printf '{"type":"SCHEMA","stream":"hello","key_properties":[],"schema":{"type":"object","properties":{"value":{"type":"string"}}}}\n{"type":"RECORD","stream":"hello","schema":"hello","record":{"value":"world"}}\n'

This can be piped to any target.

printf '{"type":"SCHEMA","stream":"hello","key_properties":[],"schema":{"type":"object","properties":{"value":{"type":"string"}}}}\n{"type":"RECORD","stream":"hello","schema":"hello","record":{"value":"world"}}\n' | target-bigquery -c config.json

The above code will push the records from the tap to Google BigQuery using the configuration details from config.json. If you are using Python to build your Singer tap, there is a helper library to make things easier. There are many already developed taps and targets available as well.

Project Setup

We created a skeleton project which is used as a basis to create new taps. The skeleton contains all the Singer specification related common code. When a new tap needs to be implemented, only the code specific to that tap should be implemented. The skeleton’s docker file installs the BigQuery target by default.

We use Amazon S3 to store the state information after each cronjob execution. S3 was chosen because we did not need a complete file system. Also, S3 is cheaper compared to EBS and easy to maintain as well.

The configuration files containing secrets(such as API keys) are mounted as Kubernetes secrets. These secrets are dropped into the Kubernetes namespace from our secret management service during the deployment of the cronjob.

The following is entrypoint.sh, which is executed first when the container is run.

This script runs the tap and pipes the output to the target. It then uploads the generated state file to S3 after successfully validating for JSON.

Deploying Using Kubernetes CronJobs

We use Kubernetes CronJobs to run our taps periodically. Some reasons for this choice were:

  • We were already using Kubernetes CronJobs and found them to be reliable for the purpose.
  • This allowed us to containerise each tap separately making it easy to create and deploy new taps.

Kubernetes CronJobs run about once per execution cycle. This means that there can be times when no job will be created or more than one job will be created. Therefore the jobs should be idempotent (no additional effect if called more than once). However, the possibility of this happening is quite rare.

Following is an example yaml file for Kubernetes CronJob for a tap.

apiVersion: batch/v1beta1
kind: CronJob
metadata:
name: hello
spec:
schedule: "*/1 * * * *"
concurrencyPolicy: Forbid
jobTemplate:
spec:
template:
spec:
containers:
- name: hello
image: <image url>
imagePullPolicy: IfNotPresent
restartPolicy: OnFailure
  • schedule — Cron expression for the schedule.
  • concurrencyPolicy — Whether jobs are allowed to run concurrently (possible values-Allow, Forbid and Replace).
  • restartPolicy — Job restart policy(possible values-Always, OnFailure, and Never).

Summary

In this article, we have discussed how we use the Singer ETL specification and how we run ETL jobs using Kubernetes CronJobs. We were able to develop a solution which can run ETL jobs reliably and a framework which makes it easier to create, maintain and deploy ETL jobs. More information about specific topics can be found in the references.

References

--

--