Synchronizing Data Pipelines

Javier Buquet
dataxu.technology
Published in
8 min readJun 13, 2018

Are you finding it hard to synchronize processes that read and write data on a data lake?

Are your solutions overly complex?

If you answered yes to these questions, there is a simple pattern we use at dataxu that can help. It gets the work done with minimal overhead.

The Data Science Engineering team at dataxu spends most of its’ day designing, developing, and maintaining a wide range of data pipelines that support our AI-based bidding system.

A pipeline is a set of stages (or sub-processes) that process input data producing an output, in which the output of each stage serves as the input for the subsequent stage; thus creating a chain of stages that modify a given input to produce a desired output. In particular, a data pipeline can be defined as the process or pipeline of processes that move, join, format, and process data in a system. And a good data pipeline should do so in an automated and reliable way, with minimum — or, ideally, no — human intervention.

In the case of systems with the scale and complexity of dataxu’s platform, TouchPoint, it’s common to have many different data pipelines — each having a particular goal or role in the whole system — that need to interact. Let’s say we have a dataset with all the ads we’ve shown to different users and another dataset with all the clicks the users have made. We want to have two pipelines that “clean” both datasets independently, and then use the output of those pipelines to study which ads were effective.

Given all these interactions, it’s very important that we have a robust mechanism, or protocol, for synchronizing the data between the different pipelines. I.e. making sure we provide the correct input to each pipeline based on the output of its predecessors.

Defining a synchronization protocol

An acceptable first approach to this problem could be to run each of the predecessors for a given pipeline on each run keeping the results in the program’s memory, and therefore have all the necessary data available right away. Of course, this method is immediately hindered by a number of drawbacks such as:

  • Not being able to reuse an intermediate result in case we need to re-run a pipeline (which usually makes things a lot more difficult to debug too)
  • Doing duplicate work when different pipelines have shared predecessors
  • Not being able to fully de-couple a given pipeline from its predecessors (for example, implementing them using different languages or technologies).

Another approach could be to implement every pipeline independently of its predecessors and agree on a location in which each pipeline will drop its final results (so that they can be picked up by any other pipeline).

For example, let’s say we agree that Pipeline A will drop its results in Location A, so that Pipeline B knows that it should look for its input in such location. Each time Pipeline A runs, it must make sure it cleans its drop location (remove any previous results) and then store the new results in the same place.

This is, in many ways, better than our previous approach, but still not ideal as we don’t keep a history of the various pipelines’ results (which might be useful in some other moment or for some other pipeline). Nor does a pipeline have a way to differentiate complete results from partial results. Imagine the case in which Pipeline B starts running before Pipeline A has finished storing its results.

By the way, this is a simple example of what we call “the blackboard” architectural pattern. The main idea is that we define “a blackboard” in which each “producer” (in this case a predecessor pipeline) writes down its results so that any “consumer” can pick it up at any given time that it needs to. In our case, we usually use Amazon S3 as “the blackboard” and each pipeline defines a specific location for its results within the service, which is then shared with pipelines that need its output as input.

So, how can we define a synchronization protocol in which we address or mitigate each of the drawbacks of the two approaches mentioned above?

The File Feed

We developed our own synchronization protocol for data pipelines, and named it the “File Feed”. This protocol is no more than a set of conventions that make it possible for a data pipeline to store its results in a way that any other pipeline can synchronize with it — i.e. use its output (and the correct output at any given moment in time) as input.

In order to comply with this protocol, a data pipeline must:

  • Define a location for the feed
  • Upload results following the “new update convention”
  • Add an “update manifest” when the uploading of results is complete

Feed Location

To comply with the File Feed protocol, a data pipeline first declares a location in which it will store its results. An example of this location when using Amazon S3 as the blackboard could be s3://my-bucket/my-prefix/. A data pipeline also normally declares a current version, which, if available, will be used as the suffix for the location, resulting in something like s3://my-bucket/my-prefix/v2.1/ as the full path to a feed. This is where the data pipeline should store its results, so that another pipeline can retrieve them and use them as input.

New Update Convention

Each time a data pipeline creates a new update in the feed — i.e. uploads a new set of results — the timestamp of creation of the update is recorded and used as the prefix for the path in which the result files are stored.

For example, if we were to create a new update on May 16th 2018 at midday in a feed with location s3://my-bucket/my-prefix/v2.1/, then the results should be uploaded to s3://my-bucket/my-prefix/v2.1/20180516.120000/. This way we can keep a history of updates (or data pipeline run outputs) and still have a way to distinguish which is the latest, and to what specific moment in time those results belong to.

Update Manifest

Lastly, a data pipeline complying with the File Feed protocol should mark the created updates as complete when the upload of results is done. This is done by creating a file with name _SUCCESS, which should have as its content the number of files uploaded in the update — e.g. if we create an update and upload three files containing the results to it, then the _SUCCESS file’s content should be just “3” — and helps to distinguish those updates that are complete from those that are still being uploaded.

Note:

If our “blackboard” is an “eventually consistent” data store — such as Amazon S3 — it is important that we include the number of files in the update as there is a chance that a consumer will only see a subset of the uploaded results even if the update is marked as complete by the producer.

If you don’t know what eventual consistency means, I would suggest you also take a look to the CAP theorem to see why a distributed data store would make this tradeoff.

This mechanism also comes in handy when there is the need to “invalidate” an update. One can either remove the _SUCCESS file or add an empty file to the update, and that would cause the update to fail to be considered as valid when being consumed.

How To Use It

Let’s say we want to use the File Feed to synchronize the three pipelines in the example described at the beginning of this post, the pipeline that cleans the ads shown to users, the one that cleans the ads clicked by users, and the one that uses the results’ from both to calculate the effectiveness of ads shown.

Defining A Pipeline

First, we need to define each pipeline and define a location for each pipeline’s result.

Storing New Results

Each time a pipeline needs to store new results, it should:

  1. Create a new prefix inside the pipeline’s defined location with the current timestamp as name — i.e. in the case of the “Clean Ads Shown To Users” pipeline, if we were to store results for the ads shown to users on date 2018–05–20 and we were creating this update at 2018–05–21 16:00:00 we should create a new prefix s3://my-bucket/ads-shown-to-users/v1.0/2018–05–20/20180521.160000/ in which we will store our results.
  2. Upload the results to the created prefix — i.e. If we had our results split into 3 partial csv files, we would upload them to s3://my-bucket/ads-shown-to-users/v1.0/2018–05–20/20180521.160000/part-1.csv, s3://my-bucket/ads-shown-to-users/v1.0/2018–05–20/20180521.160000/part-2.csv, s3://my-bucket/ads-shown-to-users/v1.0/2018–05–20/20180521.160000/part-3.csv
  3. Upload the update manifest to the created prefix — i.e. In this case we should upload a new file with name _SUCCESS and content “3” to s3://my-bucket/ads-shown-to-users/v1.0/2018–05–20/20180521.160000/_SUCCESS

Retrieving Results

When a pipeline A needs to retrieve results from some other pipeline B, it should:

  1. Get all the existing prefixes in pipeline B’s location — i.e. We should list all the prefixes in s3://my-bucket/ads-shown-to-users/v1.0/2018–05–20/
  2. Sort these prefixes by their name, which is actually the timestamp in which they were created, in descending order
  3. Check whether the latest update can be considered valid by making sure the content of the _SUCCESS file matches the actual number of files in the update
  4. Download all files in the latest valid update found

Simple data pipeline synchronization

By implementing this protocol, it makes it very easy to synchronize data pipelines, and has proven to be very effective for our use case.

The fact that no additional components are required for tracking the outputs, or state, of different pipelines — such as a database containing the path to the latest results produced by a pipeline or a hive metastore — makes this solution a lot simpler than other approaches. And at the same time it is much easier to perform tests or human exploration of the data.

Versioning the feeds, even though not an enforced item in the protocol declaration (and instead is just part of the feed location definition), has also proven to be very effective in our case by protecting dependent pipelines from changes in the data definition contract of some of its predecessors until the pipeline is appropriately adapted.

Let us know if you found this post interesting and helpful, and if you use a different approach to deal with this same problem!

--

--

Javier Buquet
dataxu.technology

Software Engineer. Data Engineering @ Spotify - Scala, Python, Spark, Big Data, Clean Code, and more