Elixir Powered Event Metrics

Dimitri Tishchenko
Making Change.org
Published in
7 min readSep 26, 2020

Knowing what previous actions a user has taken on our site allows for experimentation targeting and custom user interfaces but requires storing data points on the actions of millions of users. The solution we built combines the speed & concurrency of Elixir with the NoSQL capabilities of Postgres. It regulates high volume writes with Broadway, stores semi-structured data via JSONB and exposes an API via Phoenix for high volume reads. At Change we call it the Activity Metrics Service.

Event Metric Aggregations

Let’s say we have a stream of events that represent a social media share by a user. Each event contains a user id and the share channel that was used (eg, Facebook, Whatsapp..). We are going to maintain a record associated with the user id and increment an entry in that record to reflect that a share occurred and on what channel. Associated with every record type we also store a timestamp of last occurrence. This timestamp is updated on every increment. This aggregated share data allows you to answer questions like “How many times has this user shared to facebook and when was the last time that happened?” and “What is this user’s most used share channel?” A typical activity metric looks like:

The service allows for two types of aggregations; simple and scalar. Simple metrics record the occurrence of an event. They maintain a count and a last updated timestamp. For example in the data above a simple metric named “share.email” counts the amount of times a user has shared to the email share channel and when that happened last.

Scalar aggregations allow for recording values that have a number associated with them (eg, a financial transaction amount). These more complex aggregations maintain the count and last updated timestamp, as with simple metrics, and additionally the cumulative sum and the last value seen.

With these aggregated data points we can answer the questions above. To find the most popular share channel for a user we would find the entry that has the highest count. Similarly to find the most recently used share channel we would find the maximum last_updated entry.

The Service

The service is written in Elixir. It is deployed to Kubernetes using Terraform. Each Kubernetes pod is behind an AWS elastic load balancer (ELB) and hosts a Pheonix endpoint to fetch metrics for given user. Each Kubernetes pod is limited to one vcpu and has “+sbwt none” and “+S 1” vm arguments set to limit the number of schedulers started. This flag makes Elixir a “better neighbour” when running in Kubernetes alongside other pods (this is no longer a concern on Erlang 23).

Activity Metrics service is an umbrella project with three apps; Metrics API, Metrics Ingestor and Repository. The metrics api is a Phoenix application that exposes an endpoint for looking up metrics for a given user id. Metrics Ingestor is a Broadway application used for processing messages coming from AWS SQS. The Repository app uses Ecto to communicate with Postgres.

Our message bus consists of an SNS topic and a series of SQS queues with filter subscriptions for each service. Messages flow from our front end through the message bus to listening services. When Broadway processes an event from its SQS queue it calls a stored procedure to increment a metric in the user record.

The metrics are available through a Phoenix API endpoint. The client provides a user id and the service returns a record with metrics for the user or a 404 error if no metrics are available. When building our UI, our front end servers make a call to the server to get metrics for the current user.

The Database

We store the activity metrics in Postgres, taking advantage of its NoSQL capabilities to use it as a document store. This is done by using a table with an id, a JSONB column and a set of dates to keep track of creation and last modified dates. The id represents the id of the user and is the unique key on the table. The JSONB column is updated through a series of stored procedures that perform atomic upsert increments on the metrics. “Atomic upsert increment”?? Let’s break that down. Atomic as in all modifications on the same record are done sequentially. This ensures that we have accurate counts. Upsert means that if the record does not exist it is initialized with a value of one and the current date and if it does exist the value is incremented and the date associated with this metric is updated.

The Backfill

Before we started the service in production we decided it would be most effective with lots of historical data. At the time we had 680 million events in JSON files in S3 representing three years of user share data events. The challenge was to get the data out of S3 and into Postgres as fast as possible. We wrote a back-pressured backfill script that read files from S3 and wrote them to Postgres using a configurable amount of workers. This ran the production RDS cluster at 100% cpu utilization for just over three and a half days as it imported all of the event data from S3. After running the script the database had over 182 million records. All this processing was done from a single pod running as a triggered job in our Kubernetes cluster.

The backfill script follows what Dave Thomas describes as “hungry consumers” in chapter 19 of the excellent book Pragmatic Elixir. A central coordinator Genserver process uses a dynamic supervisor to create a set of worker processes. These process call out to an S3 file lister that is responsible for fetching pages of file paths from an S3 bucket. When initialized, each worker calls out to the S3 lister to receive a file and parses the contents to get a list of events. The same metric incrementing stored procedures that are used by Broadway are called by the backfill script to process each event. When a worker finishes processing a file, it makes a call to get the next file. When the S3 lister has finished with all the files it responds with a special message to stop the worker from asking for more files.

The flow of data is back-pressured because it follows a pull model. The end consumer dictates the pace of the entire pipeline. If inserts into the database slow down then the workers will slow down and wait for the transactions to complete before asking for additional files to process.

With this many workers running in parallel, we ran into issues with S3 rate limiting. Luckily ExAws.S3 comes with an exponential backoff retry mechanism that will have workers wait to make S3 calls if the volume of calls is too high. We also found that starting the workers one second apart helped with the initial contention for S3 resources. Since this was a long running backfill the max time for the exponential backoff was set to an hour so that calls would eventually resolve.

Performance

New Relic service response times

Despite the large amount of records in the database this service has an average response time of 1.2 ms at 10k rpm (requests per minute). During the peak traffic periods of March 2020 the response time was still sub 3ms when the service was handling 40–50 k rpm.

Elixir ❤️

Elixir was the perfect fit for this service and is starting to become my favourite programming language. The high level abstraction of Broadway allowed us to get this service set up quickly. Using the various primitives of OTP allowed us to create a robust data transfer script that was able to saturate the write capacity of a production AWS RDS cluster enabling the transfer of data at the fastest possible speed with very little code running from a single server.

--

--