Building single source of truth using Serverless and NoSQL
We recently worked with a retail customer to help them accelerate their cloud journey from on-prem to Microsoft Azure. As with all our engagements, we learned a good deal of lessons and have some findings to share. My colleague Dag König and I summarized the outcomes and presented them in Stockholm Azure Meetup (March 2018). In this post, we’ll dive deeper into how we built an event-driven data ingestion pipeline, leveraging Serverless compute and NoSQL databases.
Table of Contents
- The Challenge
- Data ingestion
- Data storage
- Data distribution
- Load Testing
- Measuring performance
- E2E Architecture
Customer has a large and complicated data repository which is maintained by multiple systems — let’s call it Masterdata. Changes in Masterdata are synced between many systems. Today’s on-prem solution suffers from consistency problems, messages getting lost, poor performance at peaks. Customer wants to build a high scale cloud-hosted “Single source of truth” repository, including mechanisms for Masterdata changes to get pushed to 3rd party consumers. The solution needs to support an unpredictable load with a maximum of 1500 requests/sec.
We approached the problem by dividing it into these logical concerns;
When it comes to large scale data ingestion in Azure, Event Hubs provide a data streaming platform, capable of receiving and capturing millions of events per second. However the customer had enterprise high-value messaging requirements such as transactions, ordering and dead-lettering. Luckily we have such a service at our disposal in Azure Service Bus. We could have easily skipped Service Bus and directly persisted data to the storage, however the queue-based load leveling pattern allows us to decouple ingestion from storage, adds resilience in the form of retries, as well as enables asynchronous processing of ingested data.
Instead of directly exposing Service Bus to the source systems, we wanted a REST API with friendly URL. Azure Functions allows us to develop event-driven Serverless applications by providing a multitude of triggers and bindings. All functions have exactly one trigger which defines how the function is invoked. In just few lines below, we were able to create an HTTP trigger function which inserts JSON-serialized product entity to the Service Bus Queue called
(Thanks Adam Craven for pointing out) In case the output binding to Service Bus failed, an Exception will be raised from above function. Currently its being handled in client application (not shown in this post but represented by the Load generator) with retries using Polly. If the issue becomes more prominent in future, we plan to implement Dead-Lettering in above function.
Our next big challenge was to decide where to store the data. From prior experience with RDBMS, customer highlighted the challenges they faced including maintenance of normalized schema, scaling issues, complexity of queries. There was some temptation around Graph databases, however customer didn’t require traversing a network of relationships — something Graph databases are exceptionally good at.
Due to changing nature of the data as consequence of evolving business requirements, we chose document database. Document databases in Azure can be provisioned as IaaS — MongoDB Atlas or mLab, as well as PaaS — Azure Cosmos DB. Customer’s affinity towards PaaS and the elastic-scale, low-latency capabilities tipped this decision in Cosmos DB’s favor.
Continuing our pipeline from Service Bus, we connected another function which gets triggered when a product appears in the Service Bus Queue and inserts that product to Cosmos DB.
The function above is self-explanatory with a notable exception that
DocumentClient avoids improper instantiation anti-pattern. Even though
IDisposable interface, its intended to be instantiated once and reused throughout the application lifetime. The use of static client is further explained by our colleague Matías Quaranta in this Cookbook.
Here is how we instantiated the
A noteworthy parameter to
DocumentClient constructor is
ConsistencyLevel.ConsistentPrefix. Consistent prefix guarantees that reads never see out of order writes. If products arrived in order
p1, p2, p3, then a client sees either
p1,p2,p3, but never out of order like
p2,p1,p3 — a reasonable compromise between Strong and Eventual Consistencies.
Looking deeper into how we upsert data into Cosmos DB;
In terms of throughput and storage, Cosmos DB collections can be created as fixed or unlimited. Fixed collections have a maximum limit of 10 GB and 10,000 RU/s throughput. For our design to be future-proof, we chose unlimited collections. To create an unlimited collection we must specify a partition key — an important decision that can’t be changed after a collection is created. We must pick a property name that has a wide range of values so that data is evenly distributed. For this reason, we explicitly enhanced our data with a new property
partitionKey and populated it with
productGroupId (assuming this field has a wide range of evenly distributed values).
Note the last statement in above method; Every response from Cosmos DB includes a custom header —
x-ms-request-charge which contains RU consumed for the request. This header is also accessible through Cosmos DB .NET SDK as
RequestCharge property. We logged this value for telemetry reasons (more on this in measuring performance section below).
Once we had our data ingested and stored, we wanted to get notified as CRUD operations happen on storage. The Change Feed support in Cosmos DB feels like a match made in heaven for this very scenario —triggering notification when a document is operated on. Here is how we created a change feed trigger function.
Next step was to forward these events to tens of consumers through HTTP Webhooks. Customer had a requirement to be able to read consumer URL and HTTP headers from a table, so that consumers can be dynamically added and configured.
Logic Apps is an extremely productive service, when it comes to building data integrations and B2B communication in the cloud. In-fact in our first attempt, it took less than an hour and 6 Logic App Actions to build the entire workflow of picking changes from change feed, to reading consumer configuration from Azure Table storage, to finally dispatching those changes to Webhooks with retry logic. However the per-action pricing model of Logic Apps meant
400k changes * 6 actions * 50 consumers = 3000$ !!!
Fortunately, we have another way of defining stateful Serverless workflows in Azure besides Logic Apps — Durable Functions! This extension to Azure Functions lets us define workflow in code using an orchestrator function. Orchestrator functions can only be triggered via an
OrchestrationTrigger. Best place to start this trigger in our case was from inside change feed function.
Change feed gives us a list of complete documents which were inserted/modified. Notice that instead of sending entire document to the orchestrator function, we’re only sending
partitionKey. By default all durable function call payloads are serialized into Azure Queue storage, which means maximum message size limit of 64 KB applies. We overcame this limit by only sending enough properties to retrieve the complete document later. (Large message support has been acknowledged by Azure Functions team and the issue is being tracked here).
Here is the
We deserialized the Cosmos DB properties sent earlier from change feed, then read the list of consumers — (For simplicity we’ve chosen to store consumer HTTP endpoints as a pipe
| delimited string in Application Settings. In reality they reside in Table Storage). After that we create an
ActivityTrigger per consumer.
ActivityTrigger enables us to author stateless functions that can be asynchronously called by orchestrator functions. The Azure Functions Scale Controller automatically scales them out by adding VMs (on consumption plan). When we call activity functions we can specify a retry policy for error-handling (similar to Polly) as we’ve done above with the object
We’ve deliberately wrapped the creation of
ActivityTrigger in another method so that once retries are exhausted, we can temporarily ban problematic consumers. (In-fact consumer URLs point to an HTTP trigger function which randomly returns
500 Internal Server Error). As expected, here is what goes on in activity function;
Its all fun and games until the inevitable question — “But, Will it scale?” The only way to find out is by measuring. Our colleague Ville Rantala has already done a great job of Load testing with Azure Container Instances and wrk. Leveraging his work, we created a simple bash script to generate load against the HTTP trigger function which was created earlier for data ingestion.
The above script uses containerized version of wrk2 (wrk2 is wrk modified to produce a constant throughput load).
WRK_OPTIONS specified above, make sure the benchmark runs for 2 minutes, using a single thread, keeping 100 HTTP connections open, with a throughput of ~1500 requests per second. In addition to
TARGET_URL the container also takes
SCRIPT_URL as an environment variable, which points to a LuaJIT script. Script allows us to generate dynamic HTTP request body;
With trivial efforts, we were able to add Application Insights to all Azure Functions Apps and have a powerful tool for measuring performance of entire flow. Application Insights provides live metrics streaming, designed to have minimal latency.
Through the use of Live Metrics, we discovered and made useful enhancements to our solution.
- Azure Functions can be hosted in 2 different modes: Consumption plan and App Service plan. Consumption plan automatically scales out as necessary to handle load, hence we initially chose it for all function apps. Although the Azure Functions team has made significant improvements in auto-scaling of HTTP trigger functions on consumption plan, we required even more aggressive ramp up and instead chose to host HTTP function on App Service Plan. (For 1500 req/sec, 7 S3 instances were chosen). Consumption plan can be migrated to App Service plan with the following Azure PowerShell snippet.
- As our colleague Martin Šimeček pointed out, disabling Azure Functions built-in portal logging by removing
AzureWebJobsDashboardsetting improves the performance!
- In order to achieve higher Service Bus throughput, we created it with Express option and partitioning enabled.
- In order for telemetry collection to not compromise solution performance, we enabled Application Insights sampling by configuring the Azure Functions
- As mentioned during data storage, we logged Cosmos DB
RequestChargeas custom metric —
product_RU. The following Application Insights Analytics query allows us to project this value; (This data is also available in Cosmos DB metrics blade albeit with some delay)
- By default, all Cosmos DB data is indexed. When dealing with large documents, its best to employ a custom indexing policy which only indexes select properties. The following policy reduces our RU cost by only indexing
With all components of our design in place, we ended up with this overall architecture;