Background Processing in Elixir with GenStage

By Nitin Misra

At Scripbox, we recently started working on a new sub-system which had the requirement to be highly concurrent and distributed. We stumbled on Elixir and found it to be the best tool for the job. We got started quickly with the awesome Phoenix framework for Elixir.

But soon we realised the need of a background processing system backed by a persistent queue system (like Redis). After a quick search for existing packages, we zeroed on Exq and Verk.

We decided to use Exq and it worked really well for us. But soon we hit a roadblock when we had to add rate-limiting to several third party integrations we use. Our initial solution to add rate-limiting was using the ex_rated package.

Here is how we built the initial solution -

  • Added ex_rated to each package we use for communicating with third party APIs and defined a ETS bucket , time-scale, and number-api-calls.
  • ex_rated starts a timer for each ETS bucket and only allows the configured number of API calls within the time scale. It raises exception in case the API limit is exceeded.
  • Our Phoenix app will receive API calls from other sub-systems and will queue jobs via Exq to be processed in background.
  • The job worker will make API call to one of these third party services which are rate-limited by ex_rated.
  • In case we exceed the third party rate-limit the job will fail and be retried with exponential backoff.

This approach worked well initially but there were some serious issues -

  • Unnecessary calls to Redis — Under situations of high traffic, the rate-limit could easily be exceeded and there will be way too many retries. This puts unwanted load on system resources and unnecessary network calls to Redis.
  • Duplicate logic for rate-limiting — We would need to add rate-limiting logic to each third party API integration we use.

We needed a cleaner solution to address these issues. That is when we decided to build our own background processing system in Elixir.

These were the main design goals of the new background system -

  1. It should use Redis as the persistent queue system. (This was a personal preference)
  2. It should support categorising jobs in multiple pipelines/queues.
  3. It should support specifying the rate-limit for each pipeline.
  4. It should process jobs in each pipeline with a back-pressure mechanism. The process producing jobs should not put pressure on the process consuming those jobs. The consuming process should specify how many jobs it can process.
  5. It should allow specifying concurrency (no. of workers) for each pipeline.
  6. Other common features a background processing system should have like job-scheduling , exponential-retry , web-ui , etc.

This is when we came across Announcing GenStage, by José Valim. We decided we could leverage GenStage to develop the back-pressure mechanism between the producers and consumers of jobs.

  • GenStage is a new Elixir behaviour for exchanging events with back-pressure between Elixir processes.
  • This means that the process consuming jobs will not get overloaded by the process producing jobs.
  • GenStage normally has several stages.
  • Stages are computation steps that send and/or receive data from other stages.
  • When a stage sends data, it acts as a producer.
  • When a stage receives data, it acts as a consumer.
  • Stages may take both producer and consumer roles at once and is called a producer-consumer stage.

We create three stages for each GenStage pipeline -

  • Stage 1 — The Producer: The producer pulls jobs from Redis. We create one producer process for each pipeline.
  • Stage 2 — The ProducerConsumer: The producer_consumer demands jobs from the Producer and passes them downstream to the consumer stage. It only demands the configured number of jobs (rate_limit_count) at a configured time interval (rate_limit_scale) to ensure it does not go over the pipeline’s rate-limit. We create one producer_consumer process for each pipeline.
  • Stage 3 — The Consumer: The consumer stage has max_demand of 1, so that it processes only one job at a time. We create multiple consumer processes as per the concurrency config setting.

Our Implementation (the important parts)

Below is the example code for our GenStage stages. For brevity, we skipped lots of error handling and other code related to pipeline metrics.

The Producer

The ProducerConsumer

The Consumer

The producer process assigns itself a name based on the pipeline_name (E.g email_producer). This helps the producer_consumer stages to subscribe to their respective producer stage. The producer_consumer stages follow the same naming convention (E.g email_producer_consumer) so that the consumer stage can subscribe to their respective producer_consumer stage.

This setup works really well for us, for a number of reasons:

  • The producer stage won’t overload the consumer stage as the producer_consumer stage will only ask for events it can process.
  • We can have multiple stages for different pipelines with different rate-limit and concurrency settings.
  • Any error/crash in one of the pipeline stage will not effect other pipelines.
  • No unnecessary network traffic to Redis.

There are other features we have built into the background system like job retry, job scheduling, metrics, web-ui, etc.

We are planning to open source our background processing library soon. Keep an eye out for this space.

Summary

At Scripbox we have been very happy using Elixir/OTP as a core technology for our backend services. We feel privileged to be part of the awesome Elixir community and would like to contribute more to make it more awesome.


We are unlocking engineering challenges, the kind that come frequently, and with scale. If you think these are the kind of challenges that you would love to solve, do check out our open positions.

Like what you read? Give Scripbox Technology Blog a round of applause.

From a quick cheer to a standing ovation, clap to show how much you enjoyed this story.