Research computing in a startup with Celery

Gailin Pease
Singularity
Published in
5 min readMar 16, 2023

--

Why Celery?

At Singularity, we work with large and complex data about the electricity grid, and we need to be able to quickly iterate on our analysis and modeling. Sometimes this data is huge; a recent project requires us to run multiple models, each producing more than 500 GB of data, and synthesize the results into actionable insights.

Here’s how we use Celery to make that possible.

In an academic research setting, computationally intensive research tasks like this would usually be handled on a dedicated research computing cluster, with a job manager (for example, Slurm) and networked storage. Jobs too large to run on a single machine could use a message passing interface (for example, MPI). We needed similar capabilities, but because we’re a startup, we also wanted a system that we could eventually transform into a production system.

Enter Celery. We use Celery in our production applications to run slower jobs in the background, where they won’t affect the user experience. Celery makes it easy to use a distributed task queue, so it’s well suited to exactly this task, providing a simple interface for assigning work to many machines. We wanted to make Celery work for our research as well. In some cases, we were able to directly take our production Celery infrastructure. In other cases, our research needs required us to build new patterns to make Celery work for us.

Making Celery work for us

Celery with ECS

We use SQS as our Celery message broker. This means that we can leverage the same infrastructure management tools for research computing as we use in production, including Github actions and Terraform. Separately managing a research computing cluster, even one hosted by AWS, would have introduced overhead, as each team member working on our research code would have to learn another set of tools.

A key benefit of deploying research tasks using Celery is the ability to run enormous jobs in parallel. Since we don’t want to pay for the large instances needed to run research tasks all the time, we use autoscaling rules to create more instances as needed, and remove them when a job is finished. Currently, we’re using AWS’s built in CPU-based autoscaling rules. In the future, we plan to write custom autoscaling rules, which will allow us to scale based on queue size and customize scaling to meet our research needs in a cost-effective way.

Capturing shared dispatch functionality

With many research jobs running remotely and in parallel, some of which are dependent on others, it’s important to have consistent rules around logging and error handling. To handle this, we were able to adapt a pattern from our production infrastructure (which demonstrates how useful it can be to use the same tools across our organization!).

If we wrote error handling and logging in each task, it might look like this (in pseudocode):

def my_task(model_name, weather_year):
init_logger()

try:
run_analysis()
except MaxRetriesExceededError:
# Log info about retry exceeded task
except NetworkRelatedError:
# Log and raise error so Celery retries
except DeterministicMathError:
# Log error but don’t retry, since this won’t be fixed
except Exception:
# Log and raise any other error

In most cases, the types of errors and their handling are consistent across tasks. The errors we see in a research environment and the way we want to handle them are different than in our production environment — in a research environment, we expect jobs to fail, perhaps because a matrix is not invertible or some data was zero that shouldn’t have been. We don’t want to re-try, because we know these cases would always fail. We can capture all of this logic in a wrapper:

def resilient_job(fn):
@wraps(fn)
wrapper(*args, **kwargs):
init_logger()

try:
fn(*args, **kwargs)
except MaxRetriesExceededError):
# Log info about retry exceeded task
except NetworkRelatedError):
# Log and raise error so Celery retries
except DeterministicMathError):
# Log error but don’t retry, since this won’t be fixed
return wrapper

@worker.task()
@resilient_job
Def task(model_name, weather_year):
run_analysis()

Managing file storage in Celery

Research file storage has to be extremely flexible. We don’t know at the start of a project what our data access patterns will be; often, we don’t even know what types of data we’ll end up producing and consuming. As we experiment, we want to be able to quickly create, inspect, and consume new types of data.

Our first approach to this was simple: in each Celery task, we simply downloaded the data we needed and uploaded the results at the end. In pseudocode, this looks like:


def task(model_name, weather_year):
download(model_name)
download(weather_year)

elec= run_analysis()

write(elec)
upload(elec)

clean_up(model_name)
clean_up(weather_year)

While this works the first time, once you’ve scaled to half a dozen jobs, many of which use the same inputs and outputs, you end up with a lot of repeated upload and download code, much of which contains hard coded logic about where data is stored.

We address this problem by using a wrapper that declares what files a task will output and detects from task parameters what files it needs:

def job_file_supplier(job_creates):
def wrap_job(fn)
@wraps(fn)
wrapper(*args, **kwargs):
for file_needed in kwargs:
download(file_needed)
fn(*args, **kwargs)
for created in job_creates:
upload(created)

for needs in kwargs:
clean_up(needs)
return wrap_job

@job_file_supplier(elec_data)
def task(model_name, weather_year):
elec = run_analysis()
write(elec)

Now the wrapper captures file upload and download consistently across tasks. As long as function argument declarations encode necessary files consistently across tasks, each data type can be handled consistently by the wrapper. Of course, this is still not as robust as a production database, but it allows us flexibility. In our research, that flexibility is a necessity, but it doesn’t need to come at the cost of unmaintainable, hardcoded file access patterns.

What’s next

In this post we’ve covered how we’re currently supporting our research computing needs — but at a startup like Singularity, our requirements are constantly changing, and so our systems will continue adapting. If you’re interested in what we’re doing, or if you think you could help us do it better, keep up to date with when we’re hiring.

--

--