Automating daily runs for rt.live’s COVID-19 data using Airflow & ECS
Since early March, we’ve published daily updates of Rt, a measure of how COVID-19 is spreading for each state in the United States, at rt.live. We wanted to share our process for automating our daily updates, in the hope that it’s helpful for anyone setting up a similar pipeline.
We needed a set-up that:
- Ran daily on a schedule
- Would re-try steps automatically if transient failures occurred
- Could run many US states in parallel, since each states’ ML model run doesn’t depend on other states
- Could be manually re-run for a subset of states without rerunning the whole thing, if we needed to make a data correction for just one or two states
The core part of our pipeline is a PyMC3-based model that finds the most likely evolution of Rt given how cases (and testing) have evolved in each state. To automate the pipeline and run it daily, we chose Apache Airflow. We also considered Dagster and Prefect, both of which improve on Airflow in different ways. We ended up sticking with Airflow due to the extensive amount of documentation and integration in its ecosystem. For those unfamiliar with Airflow, it’s an open-source project originally created by Airbnb that allows you to programmatically create workflows of tasks.
The Airflow cluster itself is based on a design by Laura Helleckes and Michael Osthege, who tailored it to run PyMC3 within Airflow using the Celery executor. At the core, Airflow and PyMC3 are installed into the same Docker image, which allows us to create tasks using the PythonOperator. The container’s entrypoint can spin up an Airflow worker and assign it to different queues. In our case, we have two queues: a “light” queue that can run lightweight tasks like downloading data CSVs, and a “heavy” queue that runs the machine learning model.
In order to run many states in parallel, we opted for Amazon’s Elastic Container Service (ECS), specifically in Fargate mode. In this mode, we can create a Docker container that knows how to run as an Airflow worker, and quickly spin up N copies of that container without having to worry about the details of EC2 instances. Since we only need to run these tasks for about 30 minutes a day, this lets us control costs.
Airflow has a handy way of visualizing the pipeline as a directed acyclic graph (DAG). Ours looks like this (zoomed in on a subset of states):
Let’s walk through each step in the process.
1. Spin up ECS tasks
ECS has a few building block concepts:
• Task Definitions are the basic building blocks; they’re how you tell ECS what Docker container image to use, what volumes to mount onto that container, and how much CPU and RAM you’d like to allocate
• Services combine one or more task definitions, and specify the count of tasks you’d like for each type of task definition.
If you update a service (which you can do via an API) to set the desiredCount of a given task to N, ECS will take care of either starting or stopping copies of that Docker images until there are N healthy versions of it.
For the first step of our Airflow pipeline, we issue an ECS API call that sets desiredCount to 25, so that within a couple of minutes, there are 25 running copies of our image ready to work on our machine learning pipeline.
We keep our latest model code stored on an Elastic File System shared drive, so all the workers share the same model code.
2. Download raw testing and case counts
We use COVIDTracking as our raw-count data source, as they aggregate data from each state and provide it in one central location. We take their data and store it (with today’s date as the key) in S3, so that if we want to re-run the pipeline, we know we’ll be re-running it with the same data snapshot.
3. Pre-process the case & testing counts
Each state varies (sometimes significantly) in how it reports data, and sometimes will be delayed several days before it reports a large number of cases. Over time, we’ve added several data corrections to this step to account for one-off data issues. Again, we store the processed data in S3, keyed by date.
4. Run the ML model
By this point, we queue up one Airflow task per state, and as soon as the ECS tasks are up and running, they take one of the states and run the model. We’ve open sourced the model code in case you’re interested.
When the model finishes its sampling run, we also check whether the model properly converged on a consistent view of the data. At the end of this step, we store the inference data that comes out of the model into S3.
5. Summarize each state’s data
As our next step, we pull the inference data from S3 per-state, and render out a summary of the model output. This outputs a CSV file (into S3) that contains the mean, median, and credible interval for each day’s Rt value.
6. Aggregate all the state data
This is a simple step where we take all the per-state CSVs and create one country-wide CSV with each state’s data inside.
7. Publish to staging
Our web frontend expects a MessagePack file that’s been pre-processed to make our visualizations quick to run. This step takes the country-wide CSV, pre-processes it, encodes it as a msgpack file, and stages it onto S3. It also runs some sanity checks to see if any state’s new Rt value is very different from yesterday’s; if it is, it issues a warning (published to our Slack), and one of us will manually check if that change was due to a real increase in cases, or due to a reporting issue.
If all looks good, we can promote that staged msgpack file to the production view that everyone sees on rt.live. We also publish the country-wide CSV from the previous step, so that other sites can use our data if they’d like.
Re-running parts of the pipeline
If there’s a data issue that we need to fix, we can add a fix to the preprocessing step, and then re-run just:
- The preprocess step, without invalidating its downstream tasks
- The ML model step just for the states that need re-running
- The aggregation and staging steps
This means that rather than re-run 51 regions, we can just quickly re-run what’s needed.
The Airflow UI allows for Clearing task state (which makes the task eligible for retry). This can be used to retry parts of the pipeline, though we found it was easier to put together a simple bash script that takes one or more state codes and uses the Airflow CLI to clear the task state automatically:
RUNDATE=$(date -d "yesterday 10:00" '+%Y-%m-%d')
airflow clear -s $RUNDATE --no_confirm -t process_ct_data_us covid19
for var in "$@" do;
airflow clear -s $RUNDATE --no_confirm --downstream -t sample_us_$var covid19
8. Wind down the ECS tasks
Once all the heavy queue tasks are completed or failed, we set the desiredCount of ECS tasks back to 0. This step is set up to run regardless of whether the steps before it succeed or fail, since we don’t want to leave a bunch of ECS tasks running if the pipeline fails. This is done by passing in “trigger_rule”: “all_done” to the task definition.
Feel free to reach out if you’re setting up something similar and have questions on anything above. Thank you to Laura Marie Helleckes and Michael Osthege for their help and expertise on running pymc3 in an Airflow pipeline.