Building single source of truth using Serverless and NoSQL

We recently worked with a retail customer to help them accelerate their cloud journey from on-prem to Microsoft Azure. As with all our engagements, we learned a good deal of lessons and have some findings to share. My colleague Dag König and I summarized the outcomes and presented them in Stockholm Azure Meetup (March 2018). In this post, we’ll dive deeper into how we built an event-driven data ingestion pipeline, leveraging Serverless compute and NoSQL databases.

Table of Contents

The Challenge

Customer has a large and complicated data repository which is maintained by multiple systems — let’s call it Masterdata. Changes in Masterdata are synced between many systems. Today’s on-prem solution suffers from consistency problems, messages getting lost, poor performance at peaks.​ Customer wants to build a high scale cloud-hosted “Single source of truth” repository, including mechanisms for Masterdata changes to get pushed to 3rd party consumers.​ The solution needs to support an unpredictable load with a maximum of 1500 requests/sec.


We approached the problem by dividing it into these logical concerns;

Data ingestion

When it comes to large scale data ingestion in Azure, Event Hubs provide a data streaming platform, capable of receiving and capturing millions of events per second. However the customer had enterprise high-value messaging requirements such as transactions, ordering and dead-lettering. Luckily we have such a service at our disposal in Azure Service Bus. We could have easily skipped Service Bus and directly persisted data to the storage, however the queue-based load leveling pattern allows us to decouple ingestion from storage, adds resilience in the form of retries, as well as enables asynchronous processing of ingested data.

Instead of directly exposing Service Bus to the source systems, we wanted a REST API with friendly URL. Azure Functions allows us to develop event-driven Serverless applications by providing a multitude of triggers and bindings. All functions have exactly one trigger which defines how the function is invoked. In just few lines below, we were able to create an HTTP trigger function which inserts JSON-serialized product entity to the Service Bus Queue called productsQueue.

(Thanks Adam Craven for pointing out) In case the output binding to Service Bus failed, an Exception will be raised from above function. Currently its being handled in client application (not shown in this post but represented by the Load generator) with retries using Polly. If the issue becomes more prominent in future, we plan to implement Dead-Lettering in above function.

Data storage

Our next big challenge was to decide where to store the data. From prior experience with RDBMS, customer highlighted the challenges they faced including maintenance of normalized schema, scaling issues, complexity of queries. There was some temptation around Graph databases, however customer didn’t require traversing a network of relationships — something Graph databases are exceptionally good at.

Due to changing nature of the data as consequence of evolving business requirements, we chose document database. Document databases in Azure can be provisioned as IaaS — MongoDB Atlas or mLab, as well as PaaS — Azure Cosmos DB. Customer’s affinity towards PaaS and the elastic-scale, low-latency capabilities tipped this decision in Cosmos DB’s favor.

Continuing our pipeline from Service Bus, we connected another function which gets triggered when a product appears in the Service Bus Queue and inserts that product to Cosmos DB.

The function above is self-explanatory with a notable exception that DocumentClient avoids improper instantiation anti-pattern. Even though DocumentClient implements IDisposable interface, its intended to be instantiated once and reused throughout the application lifetime. The use of static client is further explained by our colleague Matías Quaranta in this Cookbook.

Here is how we instantiated the DocumentClient

A noteworthy parameter to DocumentClient constructor is ConsistencyLevel.ConsistentPrefix. Consistent prefix guarantees that reads never see out of order writes. If products arrived in order p1, p2, p3, then a client sees either p1 or p1,p2 or p1,p2,p3, but never out of order like p1,p3 or p2,p1,p3 — a reasonable compromise between Strong and Eventual Consistencies.

Looking deeper into how we upsert data into Cosmos DB;

In terms of throughput and storage, Cosmos DB collections can be created as fixed or unlimited. Fixed collections have a maximum limit of 10 GB and 10,000 RU/s throughput. For our design to be future-proof, we chose unlimited collections. To create an unlimited collection we must specify a partition key — an important decision that can’t be changed after a collection is created. We must pick a property name that has a wide range of values so that data is evenly distributed. For this reason, we explicitly enhanced our data with a new property partitionKey and populated it with productGroupId (assuming this field has a wide range of evenly distributed values).

Note the last statement in above method; Every response from Cosmos DB includes a custom header — x-ms-request-charge which contains RU consumed for the request. This header is also accessible through Cosmos DB .NET SDK as RequestCharge property. We logged this value for telemetry reasons (more on this in measuring performance section below).

Data distribution

Once we had our data ingested and stored, we wanted to get notified as CRUD operations happen on storage. The Change Feed support in Cosmos DB feels like a match made in heaven for this very scenario —triggering notification when a document is operated on. Here is how we created a change feed trigger function.

Next step was to forward these events to tens of consumers through HTTP Webhooks. Customer had a requirement to be able to read consumer URL and HTTP headers from a table, so that consumers can be dynamically added and configured.

Logic Apps is an extremely productive service, when it comes to building data integrations and B2B communication in the cloud. In-fact in our first attempt, it took less than an hour and 6 Logic App Actions to build the entire workflow of picking changes from change feed, to reading consumer configuration from Azure Table storage, to finally dispatching those changes to Webhooks with retry logic. However the per-action pricing model of Logic Apps meant 400k changes * 6 actions * 50 consumers = 3000$ !!!

Fortunately, we have another way of defining stateful Serverless workflows in Azure besides Logic Apps — Durable Functions! This extension to Azure Functions lets us define workflow in code using an orchestrator function. Orchestrator functions can only be triggered via an OrchestrationTrigger. Best place to start this trigger in our case was from inside change feed function.

Change feed gives us a list of complete documents which were inserted/modified. Notice that instead of sending entire document to the orchestrator function, we’re only sending id and partitionKey. By default all durable function call payloads are serialized into Azure Queue storage, which means maximum message size limit of 64 KB applies. We overcame this limit by only sending enough properties to retrieve the complete document later. (Large message support has been acknowledged by Azure Functions team and the issue is being tracked here).

Here is the OrchestrationTrigger function;

We deserialized the Cosmos DB properties sent earlier from change feed, then read the list of consumers — (For simplicity we’ve chosen to store consumer HTTP endpoints as a pipe | delimited string in Application Settings. In reality they reside in Table Storage). After that we create an ActivityTrigger per consumer. ActivityTrigger enables us to author stateless functions that can be asynchronously called by orchestrator functions. The Azure Functions Scale Controller automatically scales them out by adding VMs (on consumption plan). When we call activity functions we can specify a retry policy for error-handling (similar to Polly) as we’ve done above with the object retryOptions.

We’ve deliberately wrapped the creation of ActivityTrigger in another method so that once retries are exhausted, we can temporarily ban problematic consumers. (In-fact consumer URLs point to an HTTP trigger function which randomly returns 500 Internal Server Error). As expected, here is what goes on in activity function;

Load Testing

Its all fun and games until the inevitable question — “But, Will it scale?” The only way to find out is by measuring. Our colleague Ville Rantala has already done a great job of Load testing with Azure Container Instances and wrk. Leveraging his work, we created a simple bash script to generate load against the HTTP trigger function which was created earlier for data ingestion.

The above script uses containerized version of wrk2 (wrk2 is wrk modified to produce a constant throughput load). WRK_OPTIONS specified above, make sure the benchmark runs for 2 minutes, using a single thread, keeping 100 HTTP connections open, with a throughput of ~1500 requests per second. In addition to TARGET_URL the container also takes SCRIPT_URL as an environment variable, which points to a LuaJIT script. Script allows us to generate dynamic HTTP request body;

Measuring performance

With trivial efforts, we were able to add Application Insights to all Azure Functions Apps and have a powerful tool for measuring performance of entire flow. Application Insights provides live metrics streaming, designed to have minimal latency.

Live metrics stream from Data ingestion Azure Functions App

Through the use of Live Metrics, we discovered and made useful enhancements to our solution.

  • Azure Functions can be hosted in 2 different modes: Consumption plan and App Service plan. Consumption plan automatically scales out as necessary to handle load, hence we initially chose it for all function apps. Although the Azure Functions team has made significant improvements in auto-scaling of HTTP trigger functions on consumption plan, we required even more aggressive ramp up and instead chose to host HTTP function on App Service Plan. (For 1500 req/sec, 7 S3 instances were chosen). Consumption plan can be migrated to App Service plan with the following Azure PowerShell snippet.
  • As our colleague Martin Šimeček pointed out, disabling Azure Functions built-in portal logging by removing AzureWebJobsDashboard setting improves the performance!
  • In order to achieve higher Service Bus throughput, we created it with Express option and partitioning enabled.
  • In order for telemetry collection to not compromise solution performance, we enabled Application Insights sampling by configuring the Azure Functions host.json file;
  • As mentioned during data storage, we logged Cosmos DB RequestCharge as custom metric — product_RU. The following Application Insights Analytics query allows us to project this value; (This data is also available in Cosmos DB metrics blade albeit with some delay)
  • By default, all Cosmos DB data is indexed. When dealing with large documents, its best to employ a custom indexing policy which only indexes select properties. The following policy reduces our RU cost by only indexing productText.

E2E Architecture

With all components of our design in place, we ended up with this overall architecture;


Conclusion

The entire code is available on GitHub and we’ve deliberately kept it as generic as possible, in order for it to be a reusable solution. For single-click repeatable deployments, there is also an ARM template available.