Cosmos DB: a versatile tool for real-time (spatial) queries.

Guusje Boomgaard
NS-Techblog
Published in
14 min readJul 7, 2023

General introduction­

In the Netherlands every day many people travel by train. The way the train system is deployed, reservations are unavailable. This means that every day you can choose freely which train you will take to your destination and at what time. People often use a travel planner to explore different options for train travel and subsequent transfers by bus, tram and metro transfers. A travel planner will give you information about which connections you can take to reach your destination, but also more specific information about your trip, such as the indicator how crowded a train is and the average punctuality of a train. Travellers who are more flexible, i.e. who don’t mind taking a train earlier or later, can use this detailed information to increase their comfort during their trip. Within NS there are different information processes to provide this information to the travel planner. In this blog we will take a closer look at how Azure Cosmos DB is involved in these processes. For more context, you can also read a Microsoft customer story that presents our case (see this link). This blog goes into detail about the technical considerations when integrating Cosmos DB into your daily processes.

The blog consists out of two parts, the first part is about using Cosmos DB for powering APIs, the second part is about using Cosmos DB as a state store in real time processes.

Our APIs are built in the Azure cloud using the FastAPI framework within an Azure Function. Using Azure Cosmos DB has allowed us to not only store our daily batch predicted crowdedness, but also enables us to do streaming updates throughout the day and perform spatial queries in real time.

Geospatial APIs

Goal: Given a trip in a travel planner, query the corresponding crowdedness for that trip based on approximate spatial coordinates and departure time.

As a new feature for a travel planner application that includes tram, buses, and metros we want to add some information about the expected crowdedness. Therefore, the goal is to provide an API to query the predicted crowdedness for a tram, bus or metro between a given two stops (I will call this a `trip` from now on). This does not sound too complicated, namely, you just query a specific tram that travels from stop A to stop B. However, there is a small catch. The stop identifiers in our database do not match with the identifiers in the travel planner. The travel planner contains different operators (companies that are responsible for the operating of the vehicle) operating throughout the country, with different types of vehicles (buses, metros, trams, ferries). As there are millions of trips per day that can be queried, an efficient matching strategy between the two data sources is crucial. To find the appropriate level of crowdedness, we need to know that we have the correct two stops for a given trip at the correct time of the day. Looking at the data, some ideas come to mind for matching the data on stop name, the location, the line, arrival, and departure time.

Example of a travel planner

Stop Names. As a first attempt, we check to see if stop names match. Unfortunately, we conclude that there is no common naming convention between the two sources and that there are too many variations to attempt a string match, including some exceptions. As you can see in the database example item below (“Amsterdam street” vs. “A’dam Str.”) is one of the many variations we find.

GPS coordinates. Next, we compare the GPS coordinates of a sample of stops, only to find that they don’t match (see our database example compared to the travel planner: 52.36319 vs 52.36266015923805). The difference may seem small, but given that for example, bus stops are often placed close together, one for each direction, the location of the stop must be matched carefully.

Departure/arrival times. Again, no exact match (2023–05–28T09:07:00.000000Z vs. 2023–05–28T09:05:00.000000Z). This is understandable as the crowdedness indicators are set in advance and the exact departure/arrival times may change over time.

Cosmos DB item representing a trip

So, our initial explorations of the two datasets tell us that we need to combine multiple values to match the two data sources. However, if we’re going to do this in real-time, approximate matching on string, time and/or coordinates needs to be efficient. Imagine how happy were to find out that Cosmos DB supports geospatial queries! (see documentation here)

Graphical represtentation of the geographical matching of stops from the two different data sources.

Instead of writing a complicated query to determine a bounding box around the coordinates, we can use Cosmos DB’s built-in ST_DISTANCE() function to find a match within a given radius of the stop in the travel planner. All we need to do is store the locations of the stops as a Point datatype in our Cosmos database, using the GeoJSON format. We decide to match stops from both sources if they are within 20 meters of each other (i.e. a 20m margin of error).

In the image above shows how this matching is achieved. The stops from both sources (travel planner in black, stop database in yellow) are shown in the figure above. A unique trip from the travel planner can now be efficiently identified using its operator, line number, departure time and its departure stop. A tram trip from stop A to stop B in the travel planner is queried in the API to retrieve its predicted crowdedness. The database is queried for the relevant operator line, approximate departure time and then for stops within 20 m of stop A.

Without this feature, we would have had to query a set of approximate matches, and then find the closest match by creating some logic in our code or writing a query with some sort of bounding box around the coordinates. However, because Cosmos DB solves this problem for us, our code can be simplified and the number of items retrieved from our Cosmos database can be limited to one per query.

Query to retrieve a trip from our Cosmos DB

Performance

In terms of performance, querying crowdedness at a stop took 31 ms on average. This is quite fast considering that both the coordinates and the departure time are approximately the same.

Results of a performance test (Locust) on the API that retrieves crowdedness for a trip from the Cosmos DB.

Partitioning

To take advantage of Cosmos DB’s scalability and performance optimizations, the choice of your logical partition is very important. In Cosmos DB there are two important types of partitions, which are logical partitions — this is the partition key that you assign to your data — and horizontal partitions — these are the physical partitions that Cosmos DB uses to partition the data. You do not have to do anything for this second type of partitioning, Cosmos DB manages it for you, but understanding how this works will help you choose your logical partition key. In the most optimal scenario, these horizontal partitions are similar in size and there are no so-called hot partitions (partitions that are queried significantly more than the others).

This is important, because when your database receives many queries, Cosmos DB will scale to keep latency low by dynamically updating the physical partitions based on the throughput and the amount of stored data, with the goal of distributing the throughput evenly among the physical partitions. If one of the physical partitions were to get all the throughput, this scaling would not be optimal and this will increase your latency.

It is important to note that:

  • Each logical partition can store a maximum of 20GB and each physical partition can store a maximum of 50GB of data.
  • There is no limit to the number of physical partitions in your database.

Key takeaway: choose your logical partitions so that they are evenly distributed in size and query frequency. Use Azure metrics in the portal for visual feedback.

Horizontal scaling

The allocated RUs for a container are evenly distributed across its physical partitions, so it’s important to choose a logical partition key that distributes throughput consumption evenly. For fast results you want your queries to target a single partition at a time and avoid cross-partition queries. You can achieve this by not having too many small partitions. On the other hand, you also do not want one big partition to get all the requests (i.e., hot partitions) since assigned RUs are distributed evenly across partitions. If that partition gets 90% of the queries, the chances of hitting the RU limit for that partition increase quickly.

Graphical representation of logical partitions (6) divided over (3) physical partitions. The colors are indicating that logical partitions 1, 2 and 5 are queried with a relatively higher frequency than the others.

Imagine that in the above situation 30.000 RU/s are evenly distributed across the three physical partitions. Let’s say the left two logical partitions (1 & 2) receive so many requests that 10.000 RU/s is almost reached, and the rate needs to be limited so as not exceed the 10.000 RU/s. And say partition 5 gets a lot of requests. Cosmos DB will then rearrange its physical partitions to spread the load more evenly and make better use of its total of 30.000 RU/s. For example, if logical partitions 1 and 3 are switched, the left physical partition will be relieved of some of its load and will be further away from its assigned limit of 10.000 RU/s. On the other hand, if partition 1 had received all the requests or partition 1 & 2 combined, Cosmos DB would not be able to relieve this physical partition by rearranging and your latency will increase.

Partition key choices

How we implemented our partition key with this knowledge was partly by reasoning and partly by trial and error. Our dataset consisted of multiple agencies, which were not evenly distributed. Using the agency as a logical partition key would therefore not evenly distribute the query load. In the end, we decided to create a constructed partition key consisting of the date, agency, and line number. This divides each agency down into smaller partitions and limits the skewness of the distribution. When you are figuring out your optimal partition key, it can be helpful to look at some partition metrics in the Azure Portal (Cosmos resource -> monitoring -> metrics and Cosmos resource -> monitoring -> insights). Here you can see the RU/s per partition and how much data is stored per partition. This will give you a feel and feedback on how your users are interacting with your database and how to optimize the partitioning.

Indexing policy

Each container in Cosmos DB has an indexing policy that describes how an item should be indexed. By default, every property of an item is indexed. An indexing policy can be further optimized by setting up an indexing mode (Consistent | None) and by including or excluding property paths.

We set the indexing mode to Consistent, as None is particularly useful for speeding up bulk operations, and in our use case, we upload a new batch daily, which is still relatively small.

The indexing policy is set to automatic, which means that each new item is automatically indexed as it is written.

Including and excluding property paths ensures that only specifically specified properties are indexed, rather than all, based on your expected usage. Property paths are paths that describe how to traverse the nested json structure of an item, to get to the desired property. For example, we can see that the property departure_time_utc is nested inside the start_stop in the crowdedness_message. Therefore, its property path becomes /crowdedness_message/start_stop/departure_time_utc/?. In our case, our API had an endpoint that queried the database for a combination of agency, line, departure time and spatial coordinates. It also required either the included or excluded paths to contain a “/*” path. Taking this into account, we optimized our indexing policy by excluding all paths (“/*”) and including only agency, line and departure_time_utc. To enable our spatial query, we set spatial indexes on the stop locations.

Composite indexes are required when queries contain an ORDER BY clause with two or more properties. However, even when this is not the case, composite indexes are recommended to improve performance when filters are applied to multiple properties. In our query, multiple filters are used, so we add the composite index on agency, line and departure_time_utc. It is recommended that properties that are used in range queries are defined last in the composite index, so, we add de departure_time_utc to the bottom of the list.

Indexing policy of our Cosmos database to enable spatial queries

Note that for our composite indexes, the property paths do not end with /?. This is because it is required that composite indexes point to scalar values (i.e. /?) and therefore it is implicitly included in every composite index.

During this project, we learned a lot about the flexibility of using cosmos as a backend for our API. However this is not the only use case we have for using Cosmos DB.

Key takeaway: apply indexes only to properties that are used to query the data, and remember that the order in which you define your composite indexes matters.

Update state store

One of our current projects is to predict train crowdedness in the Netherlands. To do this, we run a large nightly batch and make updates to the system during the day. This nightly batch contains the predicted crowdedness for the scheduled trains with the latest information that is known at the moment of creating this batch prediction. Updates are needed during the day to take into account the changes to the train schedule. This is reflected in the number of trains and seats available on a given route. This update stream works as shown in the image below. This is an event-based approach on calculating the updates. In these systems each change is pushed to the receiver using Azure Event Hubs.

Current architecture of our state store

The use of Cosmos DB to store the current state of the number of seats and passengers allows for the use of low latency checking and updating of events. The source of events that contain train schedule updates is the travel information of NS. These events do not only provide the composition of a train but also the time delay or the route changes. Therefore, there are many duplicate updates for the trains type. Luckily, there is a unique identifier for trains, namely a train number. This train number is a unique number for a route on a so-called “traffic date”. (A traffic date is the date on which a train is scheduled to run, these traffic dates go from 00.00 till 2.00 the next day (so a 26h day). This prevents issues with trains crossing the date boundary.) To reduce the number of duplicate events it is important to check if the new event contains a different train, because at the moment we’re only interested in updates that have an impact on the predicted crowdedness, such as an increase or decrease in the number of seats. Therefore, we filter events that match the current state in our Cosmos DB. This filter greatly reduces the number of events, making them easier to process further down the line.

Configuration of cosmos

In the earlier part of this blog, we discussed how to configure your Cosmos database for different use cases. Using Cosmos DB for a state store requires some changes to those best practices. The idea of the state store is that all documents are uniquely identifiable by just the id and partitioning key. This means that you can turn off all indexing policies, which makes writing extremely fast. If the id and partitioning key are known in the process, this means that reading is also fast because you can just retrieve the correct document.

Current state update

Currently, our Cosmos database is a state lookup entry, separate from storage of the event flow of updates. To interact with the state, Azure Functions uses SQL queries to retrieve the document that matches the current the event. It then updates the document to reflect the changes from the current event and pushes it back to the database as well as to the Event Hubs downstream for further processing. This makes it difficult to extend the process as it is difficult to manage latency between multiple event streams that are updating fields. To enable these multiple event streams, a new architecture is required, one of microservices, so that the different streams don’t need to be aware of each other.

What is stored in the state store will be everything required to provide a crowdedness event towards the outside world. To do this the documents contain a crowdedness message as a document, with metadata surrounding the message. This metadata is useful for the query process but also for the upgrades towards the microservice architecture. In the query methodology you can use multiple metadata fields, or actual data fields to retrieve the correct document.

Cosmos DB item template

Microservice Architecture

Planned future improvements to the architecture will be to move to microservices to take full advantage of Cosmos DB and allow for easier extensibility and maintainability. To achieve this, we need to move to the change feed architecture. This means that the Cosmos DB state store is updated by several microservices after which the database generates an overview of these changes in the form of events which can be processed by Azure Functions towards Azure Event Hubs.

Our current data storage needs to be redesigned because querying the documents is no longer a good solution for the microservice approach. It is much better and faster to use the unique ids per partition key for the lookup of documents and perform updates. To achieve this, we need to figure out something that is known in the different events and is unique. Currently the idea is in train number, traffic date, and the start station identifier. This should be known but may need to change in the future.

The resulting new architecture is shown in the diagram below. The main advantage is mostly the one-way flows in the stream, combined with the Cosmos DB to manage update times and merges. This means that less complicated logic needs to be built downstream to manage conflicting information.

Proposed microservices architecture

Conclusion

We hope that sharing our experiences will inspire you to consider Cosmos DB for your solutions, or, if you are already using it, to explore the many features it has to offer. We are currently continuing with the architecture updates we mentioned above. In the future, we plan to write an update on our experiences with this and subsequent projects in Azure.

Our experience has shown us that Cosmos DB is a powerful database technology for scaling applications and working with events, but it has its very own peculiarities. This means that considerations for Cosmos DB are completely different from those for other database types. Using the technology to its full potential can save you a lot of management time in the development process. In our experience, some early experiments with different settings is the easiest way to start using this technology.

Credits

This blog is written by Yorick Fredrix and Guusje Boomgaard; we are both data engineers working at NS working on short term crowdedness predictions for travellers. This team is part of the Data department at NS.

--

--