How to Build a Data Warehouse: Our custom pipeline implementation with Luigi

CAB
Glossier
Published in
11 min readOct 26, 2018

--

Here is my long overdue second instalment in our How to Build a Data Warehouse series, focusing on the technical aspects of how we built a measurable, easy to maintain, and effective data pipeline at Glossier. There are many considerations to take into account when building a successful data warehouse — one of the most crucial aspects is the pipe that allows data to flow between different services.

I’ll focus largely on our tailored implementation of Luigi — an open-source project that Spotify created and released 4 years ago to power music recommendations, top lists, and A/B test analysis, amongst other things. I hope you’ll find useful ideas in this piece that can be transferred to your own integration problems. If you have suggestions, or you want more information about a specific topic, don’t hesitate to comment down below!

In this piece, what we define as a pipeline is a means of fetching data from a source, then transforming it (i.e. cleaning, deduplicating, monitoring, etc.), and saving it in a location for it to be used by analysts or other computer programs. This process is known as ETL (Extract, Transform, Load).

Initial requirements

The initial goal of our pipeline was to fetch our transaction data, transform this into a clean and queryable state, and save it into our data warehouse. The process wasn’t meant to happen in real-time. With those restrictions defined, we concluded that having the pipeline run at a regular interval, at least once a day, was the best approach. We wanted to find an open-source pipelining tool that gave us the flexibility to customize and grow into.

Why Luigi?

Our first priority was to evaluate different pipeline solutions on the market — having the flexibility that open-source provides was a key. We first considered Airflow by Apache, but we felt like it was too convoluted. Then we looked at Suro by Netflix, but it wasn’t actively maintained. Finally, there was Glue by Amazon, but it had just been released and we weren’t ready to invest time in a platform that was in its infancy. Then we stumbled upon Luigi by Spotify and immediately fell in love. The tool was actively maintained and well documented. Our initial prototype proved to be extremely simple to implement using their documentation.

Building a task in Luigi involves creating a class then implementing three methods:

  • Requires: What other tasks this task depends on
  • Run: The business logic of the task
  • Output: Where the task writes its output to

It’s that simple!

Luigi also offers multiple packages that they call contribs. Contribs are a way to interface the Luigi task to other multiple services. For instance, there are contribs for Redshift, Amazon S3, Docker, Spark and many more.

Evolving with Luigi

Our initial implementation of Luigi was pretty scrappy. Honestly, none of us really had much experience in building a data warehouse and even less with pipelines. Nonetheless, the whole process was a lot of fun.

As Glossier’s teams grew, we started getting more and more data requests data from different departments, such as marketing and finance. We started experimenting with different ways to implement Luigi and began finding patterns in our work.

As part of this experimentation, we went through a major refactor of our pipeline in the hopes of reducing the number of lines needed to accomplish an ETL operation.

With this experience, we were able to build a scalable solution that would run on a schedule. We’re now in a place where we feel confident about our implementation and pipeline architecture and wanted to share some key features that made our life a little easier.

Custom feature: Luigi task abstraction

Historically, it was hard to keep up to date with the upstream Luigi codebase, because of many hacks we had to implement to execute certain behaviors on tight deadlines. At the time, some of these custom behaviors were not generic enough to be contributed to the open-source project. There was a lot of duplicate flow, and a lot of copy-pasting, which could potentially cause errors and give unexpected results if not well enough understood.

In 2018, we’re happy that say that we’ve contributed back to the community with a fair number of Luigi features. As we continue polishing our implementation, more common abstractions will appear that we’ll be able to polish and share.

First abstraction: Verbs

One of our first abstractions was something we called verbs. A verb acts like the Luigi contribs but extended with our own business logic and common behaviors.

The verb terminology comes from old point and click video games where verbs were actions that allowed the user to navigate the adventure. We thought it made sense since, in our case, a verb is a way to act on the data.

Source: Michael Stum

Translated in our Luigi world, here is a verb example RedshiftUnloadToS3:

Here is an example of what the abstraction of a verb named RedshiftUnloadToS3

It contains our default business logic, which is included in every task of this type. In this particular example, it defines where to fetch the SQL query to run (more on that later) and where to unload the data onto S3. The mixin MultipleS3FileInformationMixin provides the context to find where to output the data onto S3, and the DatabaseConfigurableMixin allows to know how to a task can connect to a database.

This verb allows us to write a task like this:

Here is an example of what the implementation of the verb RedshiftUnloadToS3

Third abstraction: Sentences

Wait, where’s the second abstraction, you might be wondering? Let’s explain how sentences work first.

A sentence is a way to aggregate multiple sets of tasks (verbs). This abstraction allows us to cut about 4,000 lines of code from our ETL, thereby making it more robust, testable, and effectively increasing developer happiness.

Most of our pipeline was following this behavior:

  1. Pulling data from a database, using SQL, and saving that data into S3
  2. Retrieving this data from S3 and pushing it back into a specific table in our data warehouse.

Each step required us to implement a task, with the latter dependant on the first:

Here is an example of what the implementation would look like without the sentence abstraction

This flow was quite tedious. With sentences, we now only have to create one task that spawns both of the tasks above:

Here is an example of what a sentence implementation of UpdateRedshiftTableFromQuery could look like

Here is a little graph of what the sentence task achieves:

Inside the blue lines are the task that you write and outside of it are tasks that get automatically generated by the sentences logic

The implementation isn’t perfect, but it saves us a lot of time. Effectively, in order to make the sentence abstraction work, we had to come up with the “second” abstraction.

Second abstraction: Conjugated verbs

Verbs need to be conjugated before being put in a sentence, right? At least that’s why I remember my high school teacher telling me.

With regular verbs (or contribs), you need to implement the class and its properties. Whereas with conjugated verbs, you only need to pass the parameters to a class, and it simply deals with it. It looks a bit like this:

What a conjugated verb looks like VS what a regular verb looks like

With conjugated verbs, we can now use them in our sentences task implementation to conjugate (configure) our verbs the way we want by passing parameters to the sentences.

Here is an example of what a sentence abstraction could look like using a CopyMultipleFilesFromS3ToRedshiftTable conjugable verb.

Custom feature: Workspaces

We also wanted to have data contributors to create tasks and run them without affecting data that had already been calculated and was being used by the business. One of our limitations is that we’re using Looker as the presentation layer, and it only allows explores and reports to one database connection at a time. Meaning that if we were to output the development or staging data, we would have to rebuild the report on all of these different connections. This is why we opted to build the concept of workspaces.

We came up with the idea of workspaces where you would simply set the value of an environment variable named WORKSPACE, which would then output the data into the relevant tables namespaced with its content.

For instance, let’s assume that we want to run the pipeline that generates data for the table analytics.orders without overwriting that table. We would then change the env variable WORKSPACE to peach (our staging environment)!

Fun fact: most of our data tools are named after characters from the Super Mario Bros fantasy world.

Then the pipeline generates tables which have the WORKSPACE value appended to them. For example:


analytics.line_items_peach
analytics.inventory_units_peach
analytics.orders_peach

Another way that this can be used is by using a generated ticket number of your preferred project management platform as a workspace to encapsulate the pipeline output for a given feature, thus facilitating QA.

Note: We’re still discussing whether we should namespace this output into a specific schema, or if we should continue and append the workspace at the end of the table.

Again, since we’re using Looker, we’ve added a quick hack on top of our views containing the following:

sql_table_name: analytics.addresses{% if _user_attributes[‘workspace’] != ‘production’ %}_{{ _user_attributes[‘workspace’]}}{% endif %};;

This allows the person playing with Looker to change their user attributes in their profile and change the value of the workspace. Instead of having to update every single view and update the table name with the workspace value, the workspace gets appended automatically, thus allowing Looker consumer to easily query the content for a given workspace.

Our Pipeline folder structure

Something often left in the shadows in these types of posts is file structure. You may be wondering: how are we managing tasks and compartmentalize them? At Glossier, most of us come from a Ruby on Rails background where “convention over configuration” is often on our mind. We’ve tried to bring some of these concepts into our pipeline.

We have very strict rules about how we write tasks, how we name them, and how we document them, facilitating default and known behaviors. Obviously, these rules can be overridden and changed as the project progresses and specific cases emerge.

Due to the nature of our pipeline, most of our transformation processes are done via SQL. Luigi’s default convention suggests that we define the query to run directly in the task itself. This can get messy very quickly.

Something that we’ve done to counteract this is to create files following the same folder structure for tasks and queries. For example, let’s say a task is named UpdateAnalyticsOrdersTable and resides at /src/tasks/analytics/orders.py. It will, by default, run the query at /src/queries/analytics/orders/update_analytics_orders_table.sql

We have other folders like:

  • /src/tasks/behavioral/…
  • /src/tasks/monitoring/…
  • /src/tasks/provisioning/…
  • /src/tasks/third_parties/…

As you can see, the name of the task in CamelCase refers to the query of the same name and same file path. This makes it a no-brainer to figure out where the query would reside, and how it gets executed. This also allows us to have mixins that automatically fetch the query of a task based on the name of the task and its folder structure. We made many decisions like these to facilitate pipeline contributions. Some may call this feature magic, but we call it peace of mind.

Custom feature: Variables in queries

Another custom feature that we needed to build was focused on variables in queries. Most of the pipeline transforms data directly from SQL, meaning that we:

  1. Fetch data from a source
  2. Transform it in SQL (clean it up)
  3. Save it into a destination

Simply put, a variable is surrounded by a set of curly braces like this: {my_variable}, directly in our SQL query. If the variable is yet to be defined when running the task, the pipeline will halt and throw an error. We’re supporting quite a few different variables. I’ve selected a few that are noteworthy that you may find interesting.

Custom variables: configurable schemas

In a scenario where a developer wants to run the pipeline against the staging dataset of Glossier.com instead of the regular production dataset, we don’t want to have to manually change the query and possibly make mistakes. So we’ve created a list of configurable schemas whose values are dependant on the SOURCE_ENVIRONMENT environment variable.

Having {solidus_backend} in the query would yield the value glossier_staging if SOURCE_ENVIRONMENT is set to staging or with the value of glossier_production if SOURCE_ENVIRONMENT is set to production.

Here is an example of how the configurable schemas variables work based on our current test case

Custom variables: {workspace}

As explained above, we’re supporting different workspaces. In order to manage task dependencies with tables from the same workspace, we added the support of the {workspace} flag in the query.

Here is an example of how the workspaces variables work based on our current test case

Custom variables: Ad-hoc variables

It may happen that you still need to pass variables tailored to a specific use-case. Let’s say that your task receives a date which you need to pass to the query in order to filter the data with a specific date. Rest assured, there’s a solution for that.

Defining the property query_mapping_data returns a dictionary like this:

Here is how you would define the query_mapping_data dictionary which creates value for these variables in a given query

It will allow the usage for the variable {var1} and {var2} with their respective values in the query.

Here is an example of how the query_mapping_data variables work based on our current test case

It’s now easy to pass custom variables to a query that is specific to a use-case.

Monitoring & Alerting

During the early days of the pipeline, we wanted to be alerted when a task would fail. Initially, we used Luigi’s built-in feature which would send us emails for every task failure. That was great in theory, but in practice, it didn’t allow us to easily return custom error messages on these emails.

We wanted to think bigger. As the company grew, we knew we needed a more scalable approach. We decided to go with a solution called Datadog, which allows us to send our metrics and events to this platform so that we can monitor the health of our applications and raise an alert if necessary. Datadog supports out-of-the-box AWS metrics and integrates very easily with Slack, PagerDuty and others.

We then decided to make a custom integration within Luigi to send our key metrics to Datadog, such as:

  • How much time does a task take to run?
  • How many tasks have failed / were started / were disabled / were completed?
  • How many rows were added to a table?
  • How many rows couldn’t be added to a table and why?
  • How many rows are duplicated for a table?

This allows us to raise alerts in Datadog that notify the involved party when things are considered unhealthy. The dashboard also gives us a quick overview of our infrastructure health.

One of our many dashboards to monitor the health of our pipeline

In addition to our custom integrations within Luigi with Datadog, we’ve added a concept of common monitor behavior that our verb inherits, allowing us to run specific monitors validating the health of our data.

Here is an example of how we implemented our monitoring system throughout our Luigi tasks. In this example the RedshiftCopySingleFileToTableFromS3 will run SendCopyErrorsToDatadog and SendDuplicatedKeysToDatadog.

In the above example, for every task that adds data to Redshift via the COPY command, we start another series of tasks that effectively verify for duplicated data, or if there are any rows that couldn’t be added due to COPY errors.

In Redshift, the primary key constraints are not respected, but we set them anyway because this allows us to internally have checks to raise alerts if there are duplicated data based on these primary keys and it effectively helps the query planner to make more efficient queries.

Having these monitors and integrating with Datadog allows us to be much more confident in the data computed in our ETL. For us, being aware of a problem before key stakeholders consume the data is very important, and having this kind of monitoring moves up one step closer to that goal.

In closing

Building a data pipeline is very challenging! Although there are no perfect solutions, and it’s very dependent on business needs, there are a few common factors that make a pipeline successful: maintainability, monitoring and simplicity.

The different concepts and abstractions that I highlighted in this post helped us to solve some of our challenges at Glossier. It’s possible that your set of problems are totally different, but we hope that sharing some key pieces of our pipeline is useful for you. As always, feel free to comment below, or reach out to me if you would like further information on anything discussed above.

--

--

CAB
Glossier

Data engineer at Chord.co. ex-Yaguara, ex-Glossier and ex-Dynamo.