Andrew Bass
Zoba Blog
Published in
7 min readJul 28, 2021

--

by Andrew Bass, a software engineer at Zoba.

Zoba provides demand forecasting and optimization tools to shared mobility companies, from micromobility to car shares and beyond.

Scaling Zoba to more cities using Airflow

To help scooter and bike-share operators increase the utilization of their vehicles, Zoba provides demand forecasting and vehicle placement recommendations. In response to increased demand for these micromobility services over the past year, we have been onboarding new customers and expanding into new cities across the globe.

Since the start of the year, Zoba has onboarded well over a hundred cities. However, Zoba has very lean Engineering and Operations teams and the effort required to set up our product for new cities was a bottleneck to our growth: we needed onboarding new cities to take minutes, not days. To help automate this process, as well as run other scheduled and one-off workflows, we utilize Apache Airflow.

What is Airflow?

Airflow is an open-source project used to manage and run workflows that have dependencies. It was created to help data engineers manage their pipelines. However, today a large set of integrations have made it useful for many areas outside of data engineering.

The different types of work that can be run on Airflow are modeled with “operators”. The MySQL Operator executes a specified query, the Email Operator sends an email, the HTTP Operator executes a request, and so on.

Airflow users model their workflow by defining specific instances of an operator, producing “tasks.” They can then model dependencies between tasks so that the tasks run in a specific order. A collection of tasks, with dependencies, is referred to as a “DAG.” Here is a simple DAG that emails a dog picture every day to a list of subscribers.

The get_dog_photo task uses the HTTP Operator to retrieve the URL of a dog photo from an API. Then the send_dog_email task uses the EmailOperator to send an email embedded with the URL from the previous task. There is a dependency between the get_dog_photo task and send_dog_email.

At Zoba, we use Django for the bulk of our infrastructure. Since Django is written in Python, we can use Airflow’s PythonOperator to run specified functions in our applications.

Zoba and Airflow

Zoba has several multi-step workflows that benefit significantly from Airflow’s ease of configuration and scalability. We use Airflow to periodically sync new ride and maintenance events from our customers, an operation that happens on a set interval. Similarly, we use Airflow to coordinate all of our optimization runs.

While some DAGs run on a schedule, others are more ad-hoc. For example, the work to configure a new city requires a fixed set of steps to complete. As we grow and add more cities, moving this type of manually triggered workflow to Airflow saves both effort and time.

Onboarding a new city

Setting up a city (or “market”) involves both our Operations and Engineering teams. Our configuration takes into account warehouse locations, shift sizes, and van capacities, so our Operations team works directly with market managers to understand how they run their fleet in the city. The Engineering team configures the new markets in our infrastructure. Here are some typical steps:

  • Setup MDS Credentials: We typically receive ride and maintenance events from customers through the Mobility Data Specification (MDS) API.
  • Create Market Models: We store the configuration for each market in Django models.
  • Ingest Historical Event Data: We typically need 3–6 months of historical data to ensure we have enough ridership data to estimate demand accurately.
  • Set up periodic sync of new ride/maintenance events: We store configuration for periodic sync jobs as Django models.
  • Assign Market Boundaries: We use the historical data to automatically create geographical boundaries for the market. This boundary is used to limit the size of the area for which we need to model demand patterns.

There are some natural dependencies between these tasks. For example, we need to create the market model before creating the periodic event sync models, and we cannot sync historical event data until we have configured credentials. We could represent these dependencies with a graph:

Since each of the steps has predefined dependencies this workflow is a good fit for Airflow’s task and DAG model.

A closer look

Let’s dive a bit deeper into a specific part of this DAG to see how it works.

The “create_market_models” has no dependencies, so it can run first. It takes in the name of the city, a timezone, and a few other optional or customer-specific parameters. We use the PythonOperator to create a new model for this market in our database. It also creates a few related models: a model to store information about the customer’s warehouse, models that will hold vehicle deployment locations, etc. Our Airflow worker machines connect to our Postgres database and use the Django ORM directly to create these models.

Once this task completes successfully, “create_event_sync_models” becomes eligible to run. This task creates a set of Django models as well, and to properly link these models to the city, we need to pass the id of the market model that we just created in the previous task. We can do this using Airflow’s XCOMs system, which lets us pass data between dependent tasks. Essentially, XCOMs gives us access to a key-value store to communicate between tasks in the same DAG run. Using the new TaskFlow API that was recently released as part of Airflow 2.0, a simplified version of this might look like the following:

@task
def create_market_models(name: str, timezone: str) -> int:
market = Market.objects.create(name, timezone)
return market.id
@task
def create_event_sync_models(market_id: int) -> None:
EventSync.objects.create(market_id)
market_id = create_market_models()
create_event_sync_model(market_id)

When we use this new syntax to pass the return value from one task as a parameter to another, Airflow will add a dependency between the two tasks and make the appropriate XCOMs calls for us! There is a longer tutorial for TaskFlow in the Apache docs, but the long story short is that for DAGs that make heavy use of the PythonOperator, this new syntax is a much more concise way to pass data between tasks.

Benefits of using Airflow

Web interface and monitoring

Managing onboarding workflows on Airflow means that we take advantage of the provided web interface to kick off and monitor the new markets as the DAG is running. Both our Operations team and engineers can see the progress of these multi-step DAGs at a glance, and be alerted by email if they fail.

Airflow also exports a series of metrics that we sync to Datadog, where we can set up more custom alerting based on the status of our DAGs.

We can pass parameters to the DAG and trigger it from the Airflow web interface.

Another benefit of breaking out these onboarding steps into Airflow tasks is that if one of the steps fails, such as the syncing of historical rides, we can retry the DAG starting from the task that failed. Airflow does not need to redo any of the upstream parts of the workflow that ran successfully. This is a huge difference from more traditional scheduling systems, like cron jobs that trigger monolithic scripts.

Each Airflow task can have its own retry behavior, which can be helpful if different tasks have different failure patterns. In our case, creating new models in our database is fairly reliable, and is unlikely to succeed on the second try if the first fails. However, syncing historical events relies on customer systems, and unfortunately, we sometimes see transient failures. To account for this we increase the retry count of the tasks that interact with external systems.

Rate limiting

Airflow also lets us apply DAG and task-level rate limiting. If we are onboarding a batch of new markets, the ‘max_active_runs’ parameter will limit Airflow to only run a handful of these DAGs at the same time, queuing the excess. We are even more conservative for interactions with customer’s systems. We pull months worth of events during the historical sync, and we certainly do not want to be the reason a customer’s MDS server crashes. Using the ‘task_concurrency’ parameter, we can limit this specific task to only run for one or two markets at a time. This allows us to apply a global limit to how much load these onboarding DAGs can put on a customer’s system. This may take a bit longer, but the safety and stability of slowing down are worth it.

These types of guardrails are even more important to have in DAGs that are triggered manually. It’s easy for someone who is trying to finish onboarding a set of markets before the end of the day to trigger quite a few of the DAGs without realizing the potential impact on other systems. These Airflow parameters let us set limits in code.

Moving forward with Airflow

The core feature set of Airflow was originally designed for data pipelines that run at scheduled intervals. Our use of Airflow for manually triggered workflows goes beyond its original goals; however, we’ve found that moving this work to Airflow has significantly lightened the burden on both our Operations and Engineering teams. It’s made the process of adding new cities automatic and consistently repeatable.

As Zoba grows, finding tools like Airflow that drastically increase our productivity and quality means that we can make better use of our time to improve our core demand models and deliver new features for our customers.

If you are interested in joining our engineering team, please apply at https://www.zoba.com/careers.

--

--

Andrew Bass
Zoba Blog
Writer for

Harvard MBA / Masters of Engineering '24, Former Facebook Software Engineer