How we simplified our data pipeline

Ian Fuller
Freetrade Blog
Published in
6 min readNov 13, 2018
A person, composing some clouds — Photo by Joseph Pearson on Unsplash

When you’re building a stockbroker from the ground up, the first thing you notice is data. Lots of data.

We’ve recently started using Google Cloud Composer to handle our data processing and we can confirm that it’s awesome. So in this post, we’ll take a quick dive into Apache Airflow using Google Cloud Composer. Need to (re-)build a data pipeline? Read on.

So what is Airflow and why do I need it anyway?

Airflow allows you to build workflows and data pipelines. It uses python scripts to define tasks as well as job configuration. It provides a number of hooks, operators, and sensors out of the box and simplifies monitoring, retries, et al. If you’re currently writing business logic to orchestrate and schedule tasks then it will save you a lot of pain. That said, running Airflow securely at scale is non-trivial.

Cloud Composer

Google Cloud Composer solves the aforementioned infrastructure challenge. It does this by providing a completely serverless Airflow configuration. They use their existing platform services like GCS and GKE to host and serve your setup. Allowing you to go from config scripts to secure scalable data processing in a few easy steps. All of the benefits of Airflow without the infra challenge.

Getting Started

We’re going to use the gcloud CLI as its simpler than clicking through the UI. Simply save the following as a script:

And then run: ./setup.sh my-project-name

You may need to accept a number of permissions but within 10 or so minutes you’ll be able to visit https://console.cloud.google.com/composer/environments/detail/europe-west1/my-project-name-composer/configuration?project=my-project-name and be greeted with the landing page for your first cloud composer environment!

Your first DAG

Now that you’re all setup, its time to start writing some code! As I mentioned earlier Airflow defines everything in code. This includes everything from scheduling your workflows to business logic.

Airflow uses DAGs — directed acyclic graphs — which is a quick way of saying a-graph-that-goes-one-way-and-has-no-loops. An Airflow DAG has a schedule, some config for retries, and represents the parent for a set of tasks.

A task can be anything from a built-in operation that moves data from one place to another to some arbitrary python code.

Let’s put it in action with a simple DAG that prints ‘Hello Cloud Composer!’:

Our new DAG:

  • Creates a DAG instance
  • Sets a start date as the end of yesterday
  • Creates a Python operator that logs our message

DAG scheduling is a little complex as ETL — extract transform load — jobs are expected to run on specific intervals (e.g. every hour for the last day). This requires a specific start time and catch-up. In this example, we’ll start as early as possible today and disable the catch-up.

Publishing the code to your Cloud Composer environment is as simple as: gcloud composer environments storage dags import --source=example.py --project=my-project-name --environment=my-project-name-composer --location=europe-west1

Once things are up and running you can navigate to your environment https://medium.com/r/?url=https%3A%2F%2Fconsole.cloud.google.com%2Fcomposer%2Fenvironments%2Fdetail%2Feurope-west1%2Fmy-project-name-composer%2Fconfiguration%3Fproject%3Dmy-project-name and click on the Airflow web UI to see your DAG in action!

Clicking on the graph view (sun like icon) in the links section will show you your entire DAG. If you click on your one (and only) task, ‘say_hello’ you’ll be treated to the task menu.

This menu allows you manage the task, re-running it if necessary or accessing logs. If you click on the logs you should see the hello world text. Nice!

Hello Cloud Composer!

Doing work

Generating log output is always fun but its not going to help us in production. For the next phase we’ll pull data from a real API and save it to GCS before sending it to Slack. This will demonstrate some of the Airflow Hooks, and how tasks work together.

For our next example let’s pull some pricing data for US stocks. This is a Freetrade article after all :)

Our data source

For this example we’ll use the IEX APIs. These are a set of free APIs for the US Investors Exchange. So, to get Apples pricing data we can use https://api.iextrading.com/1.0/stock/aapl/quote

See https://iextrading.com/developer/docs/#getting-started for more detail.

Building our DAG

Because the workflows are designed in code this allows us to do some pretty neat things when creating tasks. In this example we’ll use a list of our favourite stock symbols to dynamically create subtasks which each fetch pricing data from IEX. Once all of the pricing data has been fetched we’ll join it all together and send it to slack so we’re always up to date!

First let’s add the tasks to fetch IEX data:

A few new things have been introduced here but let’s go through the most important ones

  • The tasks created have their order set using the bitwise right shift >>. This is the D in DAG
  • The use of a for loop to dynamically create tasks
  • The use of a HttpHook to fetch from IEX. Hooks allow you to access external data sources.
  • The use of cloud composers mounted GCS bucket /home/airflow/gcs/data

The conn_id used in the HttpHook is used to provide the host name, auth headers etc. If you head to the web console you can configure it there:

Once you’ve made the changes you can run the earlier dags import command. You should end up with a swanky new DAG like this:

Such DAG

Now that we’ve got pricing data we can send our Slack message. To demonstrate another Airflow feature— Xcom — we’ll add two more tasks. Xcom is a simple key value store that Airflow provides. We can use Xcom to pass a short string to our slack task. Here’s the updated DAG with the final two tasks:

In this final step we’ve:

  • Pushed some data to the Xcom key value store so we can reuse it
  • Used another operator type, SlackOperator
  • Used variables to retrieve a secret
  • Used a template so the html_contentfield can be pulled from Xcom

To populate the models.Variables.get requests navigate to the variables admin section in the airflow web ui:

Once everything is in place you should get shiney new DAG. That is, you would, if it wasn’t for a missing dependency we introduced by importing Slack. To fix the dependency you can configure you composer environment to pull in the missing slackclient library. Click into your environment and add version 1.3.0 as follows:

This can also be achieved using a requirements.txt. If you’d prefer this approach (and who wouldn’t) check out the official docs :https://cloud.google.com/composer/docs/how-to/using/installing-python-dependencies

Now that we’re up and running again we can check out our final DAG!

A real bonafide DAG

And you can either wait 90 minutes or, click the top level DAGs button and find the trigger button (looks like a play icon). A few moments later you’ll get your first Slack pricing update — nice!

Conclusion

As you can see its pretty easy to get something up and running with Google Cloud Composer. If you’ve got scheduled jobs to run check it out. Or even better, come work with us — we’re hiring don’t you know! https://freetrade.io/careers/

--

--