Data is Silver, Fresh Data is Gold

How we leveraged AWS S3 notifications to build more reactive data acquisition pipelines

Bertrand Chardon
Inside Doctrine
9 min readJun 13, 2022

--

photo by Pavel Czerwinski on Unsplash

by Alexandre Pasmantier and Bertrand Chardon

Data is at the center of everything in the 21st century, it helped build the empires of the internet era and is driving technological transformation at a blazing speed as well as fostering an analytics and ML-driven revolution.

At Doctrine, that data takes various forms, such as legislation, legal commentary and court orders, which are first gathered from different sources, then enriched and finally connected together to provide precious legal insights to our customers.

As such, the notion of “freshness” is absolutely paramount for lawyers and overall legal professionals using our platform, as it might be a differentiating factor in their ultimate legal strategies, which makes it a particular focus for our engineering team as a result.

In this article, we explore technical opportunities that could allow us to push closer to that end goal and we provide insights on potential improvements to our current data acquisition pipelines.

How we achieve this in the current state of affairs

Our current data acquisition pipelines are for the most part built on the general concept of batch processing in the form of “ETL” pipelines.

In practice, it takes the form of Airflow DAGs composed of related / interconnected tasks such as an extraction phase, some transformation on the extracted data, an optional normalisation phase and then a final phase where the clean and final data is stored (in the DB, in a cache, on S3, etc)

The pains

This kind of workflow works for most intents and purposes and has served us well in the past five years. It offers decent uncoupling, allows us to schedule independently and relies upon familiar tools, Airflow and our database

In practice though, it does impose certain straining limitations on the developer on the long run, which we’ll do our best to explicit here:

Issues with batch sizes

As you can see in the screenshot above, our tasks are organised in Airflow as DAGs, graphs that help us enforce the dependencies between tasks in a straightforward way, i.e. by saying “task B should be executed when task A is done”, which is exactly what is done in practice for most pipelines (i.e. when the extraction phase is finished, start the transformation, then when THAT is finished, start the load, etc).

This is a very direct, straightforward and sequential approach but it does have an issue: if the processed batch size is too big (as may happen from time to time at Doctrine since we sometimes process large batches of legal documents as they become available), the downstream tasks might have to wait for a while, since the WHOLE upstream task has to complete before they can kick in.

Issues with scaling / distribution

Because the state needed to synchronise the tasks between themselves needs to be preserved, this system does rely heavily on a combination of the database and S3, requiring a lot of extra glue code for bookkeeping, computing deltas and ensuring that we are processing everything from the previous steps of the pipeline

It also puts the database at the center of the whole operation, making it a potential bottleneck, were we to suddenly need a burst of processing throughput in the pipeline

Finally, it makes it harder to distribute operations, by forcing us to rely on distribution from a software perspective in the python side of things

Issues with task frequencies

Because the tasks are not actually synchronised, a lot of effort goes into ensuring that the different steps execute smoothly, leaving us wondering whether or not a given step was executed “too early” or if a given step has the proper execution schedule based on the schedules of other tasks of the DAG, all due to the batch approach, which doesn’t provide the needed processing granularity

An interesting opportunity…

As we’ve seen right above, most of our pipelines make heavy use of two off-the-shelf bricks from AWS: namely Aurora for our DB and S3 for storage.

It turns out that S3, one of the main components of the cloud platform and the more essential building brick of the modern data pipeline, provides a mechanism to signal changes and updates to stored objects and buckets: S3 Event Notifications.

In the rest of this article, we’ll delve into those technical opportunities and how they help steer us in a direction where it’s easier for us to circumnavigate the issues mentioned in the previous paragraph.

AWS S3 Event Notifications

AWS Simple Storage Service or S3 in short is one of the oldest services on the AWS Cloud Platform and has seen massive usage by all kinds of applications and services from day one

At its core it’s a service dedicated to object storage and has seen multiple upgrades and additional features since its inception

One of them, called AWS S3 Event Notifications allows to send notifications to either EventBridge, SQS, SNS or Lambda every time something happens on S3 (be it creation, deletion, modification, etc).

Given that much of our acquisition pipelines are already using AWS S3 as their backend storage service, it seemed like something that could put to profit to edge a bit more towards our goal of more reactive systems.

with such a system in place, the core of the acquisition pipeline might become something like:

  1. upload an object to S3, possibly consecutive to the extraction phase from the data source (see this article for more about how we achieve this efficiently at Doctrine)
  2. upon object creation, S3 sends a notification to one of its supported destination services, in our case a AWS SQS queue
  3. a consumer (in our case a python script) would get those notifications off the queue
  4. using the information from the queue, the same script would fetch the complete object from S3
  5. that object would then be processed / transformed
  6. upon completion, the object could then be stored in a different location on S3

it should be noted that this pattern is composable, since step 7 can be conveniently weaved into step 2 in a subsequent stage of the pipeline (we’ll come back to this later in this article)

Configuring S3 Event Notifications

Since we use Terraform for all of our infrastructure needs, setting up such a pipeline would require us to create different kinds of resources on AWS: a bucket, several queues and the notifications themselves.

The configuration would end up looking something like this:

This defines three resources:

  1. a SQS queue to which notifications will end up being sent. That particular queue is configured to keep messages for 7 days and would typically have a redrive policy that would send any message received more than maxReceiveCount (in our case, set to 4) times to a dead letter queue. The definition of this queue is not provided here since it falls out of scope for this article
  2. a S3 bucket called my_bucket
  3. the associated bucket notifications (note that we’re only interested in the object creations for a particular prefix in that bucket, only if the associated files have the .zip extension, this allows us to control the amount of notifications we’ll end up getting and to filter only the most relevant object events)

for more information about the configuration of S3 Event Notifications through Terraform, see the documentation for the associated resource

Consuming the notifications

The notifications received from the queue would end up looking something like this (some information has been eluded on purpose for readability):

the Records key is what we’re interested in here, it contains information about the bucket that emitted the notification as well as the complete path to the associated object (here, path/to/the/object.zip in bucket my_bucket)

Those notifications are queued up in the SQS queue and we need to pull them out of it before processing

To do exactly that, we are interacting with the queue using the python library boto3 which contains an easy-to-use interface to most of the AWS services

It lets you consume queue elements as follows:

Upon receiving the message, we would then process it further to get the reference to the created object using the Body.Records[].object.key property of the message in order to finally fetch the newly created object itself from S3 (again, using boto3, which we will not covering in this article)

The result of the object processing can then be sent to any storage service.

In our case, and as mentioned above, we might want to send that result to S3, which might kick off the very same event cycle we have described so far, for further processing of the acquired data (such as a pipeline that would require multiple independent transformation phases)

In the end, the final processed data can then be stored somewhere for consumption by other downstream processes

Are we there yet?

As we’ve seen, S3 Event Notifications are a good fit for event-driven architectures, allowing us to be informed in real-time and at scale about relevant lifecycle events of our data collection pipeline, leaving the nitty-gritty to AWS S3 itself and providing top-shelf integration with other quality AWS services such as SQS, SNS, Lambda or Event Bridge

An additional bonus is the composability of the architecture, where S3 itself serves as a connector between similar stages of the pipeline, where the output of one stage can directly serve as input for the subsequent phase with no need for any kind of glue code

This has allowed us to develop several data acquisition pipelines with ease, without having to rely on complicated state from the database and giving us confidence in the ability of our scripts to scale to hundreds of consumers were it needed

As with most things in engineering though, you must always be aware of the trade-offs that come with your weapon of choice

In our case, the downsides we’ve identified are as follows:

  1. vendor lock-in:
  • we are dependent on Amazon’s message format, though this can be overcome by designing our code with the proper abstractions (not presented in the current article)
  • the messages transiting in the queues are notifications and as such only contain references to the created objects and not the full objects themselves which means we still need extra code to then fetch the objects from S3.
  1. only one consumer: this is obviously the case when using queues but can easily be overcome by using other technologies (such as SNS) in combination with what is presented here.
  2. debuggability/observability leaves a bit to be desired due to the very nature of queues:
  • when debugging, we frequently need to replay a given set of messages more than once which can suddenly become tedious when using queues (a message is typically consumed once and deleted in the process). This can typically be solved by providing utilities that can generate messages on-demand

Converging to the horizon, a new hope

In the current state of affairs, we are still quite reliant on task scheduling via Apache Airflow for data extraction and between queues, which means we still cannot fully benefit from a full-fledged real-time event-driven architecture, leaving us short of our initial data freshness expectations.

Fortunately, as we have already mentioned in this article from past January, a more long-term, all-encompassing solution involving this very paradigm is currently being experimented with at Doctrine by the data task force, a solution that will make use of battle-tested technologies such as Apache Kafka, which will provide a feature-rich, storage-capable synchronization layer allowing us to go past the limitations listed in the previous paragraph and should help nudge us further towards the once elusive goal of actual real-time data acquisition and processing, so that we can get ever so close to our mission: ensuring that lawyers and legal professionals have the latest and greatest data at their disposal.

--

--