ETL with standalone Spark containers for ingesting small files

Joe Corey
Joe Corey
May 4, 2018 · 4 min read

At Mic, we have high volumes of data streaming into our ingestion pipeline from various sources. Much of our data is generated from user interactions on the website (article views, video plays, etc.), but some come from analytics APIs from platforms like Facebook, Apple News and YouTube. We use these APIs to give our newsroom up-to-date analytics on how their stories are performing.

With first-party data, sends events to the AWS API Gateway, which then tosses the JSON onto Kinesis (a data-streaming service), and then blocks those events into 1/5/15-minute intervals of JSON files for storage on S3 (storage layer). With third-party data, a number of Lambda functions poll APIs and save that data to S3.

In any event, our data ends up on S3 and then needs to be uploaded to Redshift. We’ve explored two routes for doing this upload before settling on our current solution. The rest of this post walks through that exploration and describes our current setup.

We first looked at AWS Lambda. Lambdas are flexible server-less functions that can be triggered by the creation of S3 files. Their reactive nature is a benefit, but we encountered two limitations with this approach:

  • Lambdas can only run for up to five minutes (and with very little computing power), which make them a poor choice for large files.
  • Using Lambdas to call the Redshift COPY function requires you to provide the schema to map from your files to Redshift columns. Mic operates under continuous development and often creates new event types for monitoring our site. Having to frequently reconfigure schema files is not the best use of anyone’s time.

So then we looked at AWS Glue. Glue sounded like a promising option. It’s essentially a service that manages on-demand Spark clusters for ETL jobs. Glue is serverless in the sense that you can edit a single function in either Python or Scala through its UI. And though it has a UI, it offers great flexibility because you still have access to the underlying Spark context (though they do wrap the DataFrame in their own DynamicFrame class to solve things like different field types for the same column). There are, however, three major drawbacks to this approach:

  • Spinning up a hadoop cluster (which has a minimum billing period of 10 minutes) takes a while and is very expensive. Perhaps overkill for the many small files we typically deal with.
  • When dealing with so many files, we found their bookmarking system (designed to keep track of which data has already been uploaded) tends to breaks down and display misleading error messages. Their support team was unable to provide constructive solutions, which was surprising since they’re usually quite helpful.
  • Lastly, with this approach, it makes more sense to have the ingestion run less frequently (versus every file), which means the data isn’t visible as quickly.

The solution we eventually landed on involves running dockerized single-node Spark instances. Lambdas are great in their flexibility and triggering capabilities, so we are using them to call Fargate tasks whenever a file lands on S3. Fargate is Amazon’s new container service: You can call tasks that use docker images without having to manage the underlying servers. Our main ETL program is written in Scala, which is then packaged up in Docker. We use CircleCI for continuous build and deployment. After configuration, all we have to do is push a change to our Github repository, and it triggers an image rebuild and push to the Amazon container registry (the CircleCI configuration file can be found in our repository).

Since we set the Lambda to always use the latest container version, this makes pipeline modifications quick and easy. Inside the Scala program, we are running a local Spark node and we use Spark-Redshift to upload to Redshift (unfortunately, development on the spark-redshift library has stopped since Databricks was acquired).

One of the biggest benefits of using Spark is automatic schema inference of your JSON or CSV files. That, and the easy syntax for transformations on your data. We’ve also implemented a couple of helpful features such as schema/column comparison. By calling Redshift and comparing the schemas, we can run an `ALTER TABLE ADD COLUMN …` and add columns just prior to upload. This is very helpful, since our fast-paced development cycle causes new fields to be added to JSON files frequently.

Occasionally, there can be an error in the upload process. In this case, we take the file name and pop it on a queue in SQS. Every 24 hours (using CloudWatch events), or when triggered manually, a special lambda function calls the Fargate task with arguments indicating it should reprocess the files from the error queue. If the errors should persist, then the file names will get tossed back on the queue. Typically, though, we will identify the problem by monitoring our logging/dashboard infrastructure (set up with Elasticsearch and Kibana) and make the necessary changes. Fixing errors in the pipeline now only requires the relevant changes in code, followed by a push to Github, and the data in question will be reprocessed automatically.

You can see an open-sourced version of the code on our Github here.

Mic Product Blog

Updates and ideas from the product team at Mic

Joe Corey

Written by

Joe Corey

Lead Data Engineer at Mic

Mic Product Blog

Updates and ideas from the product team at Mic

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade