Azure Cosmos DB : Change Feed processor for Java

Inbaraj S
Walmart Global Tech Blog
9 min readDec 14, 2020
Photo Credit: Anrita

I recently started working on a project which involved integration of several different systems, some legacy and some new. In the course of figuring out the plumbing and various nitty-gritty of the entire solution, Azure Cosmos DB was chosen as the intermediate storage at one of the middle layers. In this post I will be specifically discussing how our team used the Azure Change Feed Processor to process millions of events from Azure Cosmos to the Data Sink.

Problem at hand

We had a producer base of around thousands of users generating millions of events per day to our Cosmos DB. The event producers are spread across different geography. These events are fed in from store devices firing API events, Kafka events from multiple systems. Adding to this, we had multiple dependent service integrations which was expecting the data in a processed format in near real time. We had a requirement to validate data, process it and resave it back to the database as well as push it to the dependent systems. We also had a requirement to provide API to provide information on processed information.

Problem statement visual representation

The Research

I came across Azure’s Change feed processor pattern which has been used across multiple platforms to read feed, process and repeat data at a massive level. But wanted to understand how it does what it does in a different way from existing systems.

You can work with the Azure Cosmos DB change feed using either a push model or a pull model. With a push model, a server (the change feed) pushes work to a client that has business logic for processing this work. However, the complexity in checking for work and storing state for the last processed work is handled on the server.

The pull model’s continuation tokens and the change feed processor’s lease container are both “bookmarks” for the last processed item (or batch of items) in the change feed.

Using the CFP Library or Azure Functions, you get a push model, where your code sits and waits for changes that get pushed to it by Azure Cosmos DB in real-time. In a pull model, where you can control the pace of consumption. Similar to the (non-recommended) direct approach, your code queries the change feed to “pull” the changes from it, rather than waiting for changes to get pushed.

While the push model usually provides a better approach, there are some cases where the pull model can be easier to work with. For one, it has the unique ability to query for changes by partition key. Also, by assuming control of change feed consumption, it’s easier for those one-time data migration scenarios as well.

Using a Change feed processor [default implementation] over pull model comes with many advantages as listed below:

Photo credit : Official Website

Change Feed Processor library offers an “at least once” guarantee, ensuring that you won’t miss processing any events.

Few downsides of change feed processor were as follows:

  • At least once guarantee also came with a problem of potentially the same message being processed more than once. We have seen duplicates being received on occasions of system failure, lease switching on partitions or some application failures.
  • Batch processing number was an intricate number to set as the value too high would cause lot of duplicate processing on failure, and value too low would result in time latency on processing since there was a feed poll delay.

We went ahead with the push model of change feed processor for multiple reasons:

  • We had multiple logical partition and data size would eventually result in multiple physical partition as well, so CFP push model keeps the partition management clean.
  • To avoid tedious lease(cursor) management
  • To avoid maintaining all the document acknowledgement manually
  • Checkpoint management of CFP ensured zero data failure in case of exceptions.

Azure Cosmos Change Feed processor

The article mentioned in Microsoft official website can be summarised as follows.

Azure Cosmos DB Change Feed exposes database logs to outside of Cosmos DB. It notifies the user immediately when there is any change in the database. It supports all Inserts and Updates. Azure Cosmos DB change feed enables efficient processing of large datasets with a high volume of writes. Change feed processor also offers an alternative to querying an entire dataset to identify what has changed.

Photo credit: Official Page

Components of change feed processor

  • Monitored container: All inserts and updates are made to this container. Change feed is generated from this container.
  • Lease Container: It acts as a state storage and coordinates processing of change feed across multiple workers.
  • The Host: It is an application which listens to the changes in the change feed. Multiple host instances with same lease configuration can run in parallel but each instance should have a different host name.
  • The Delegate: The delegate is the code that defines what you, the developer, want to do with each batch of changes that the change feed processor reads.

Implementation

We will be using Azure sdk for java version 4.8.0 for this blog.

We would need the following basic parameters set from Azure Cosmos DB:

  • hostname: We can use a simple name as ‘host-1'
  • feedContainer: We can set this container in our Cosmos DB instance, make note of the partition key.
  • leaseContainer: Same as above.
  • The Delegate: The delegate is function you define to process each batch of changes that the change feed processor feeds in.

We would need a client making connection to Cosmos DB for change feed processor. Multiple options are provided, see sample code:

Consistency level - Represents the consistency levels supported for Azure Cosmos DB client operations in the Azure Cosmos DB service.

Note: The requested Consistency Level must match or be weaker than that provisioned for the database account.
Consistency levels by order of strength are STRONG, BOUNDED_STALENESS, SESSION and EVENTUAL.

Sample code for creating a simple change feed processor:

It is a great approach to add change feed processing to your module, We can use the following setting for a change feed to interact with Azure Cosmos DB containers:

Note: Be careful on resetting processing to start from beginning as
This option can be used when:
- Lease items are not initialised; this setting will be ignored if the lease items exists and have a valid continuation token.
- Start continuation token option is not specified.
- Start time option is not specified.

Performance Testing

At peak, our solution demands high number of reads and writes that has to be accommodated in parallel. As a part of our perf testing strategy, we also wanted to check this behaviour.

To test parallel read(s) and write(s) on Cosmos DB using the change feed processor, push data into Cosmos DB container for 15mins, calculate parallel feed consumption while write in progress, and calculate how long the pending feed take time in batch processing to finish. Note RU consumption for the same.

Cosmos DB Configuration:
Multi region deployment with
Consistency : STRONG
maximum scale count: unlimited
Max RU of Cosmos DB container : 4000

Change feeder Configuration:
maximum items per page or FeedResponse: 100
lease renew interval: 17 seconds
lease acquire interval: 13 seconds
lease expiration interval: 60 seconds
feed poll delay:
5 seconds

Source container — Container used to write the test data for subsequent reading within the change feed processor.
Success Container — Container used to store data post processing it in change feed processor handle method.

Steps
- Use Jmeter to fire 40 request per second from 4 Virtual users for 15 mins with payload size per request : 3kb
- Get feed at change feed processor, process and store it back to success container.

Test Summary and findings
- Insert 56000 records/rows in source container in 23 mins
- Successful post-processing in change feed processor and store to success container when inserts finished (parallel write-read) in 23 mins ~ 32000 records.
- 32360 request in 23 mins — 23.44 request processed per second while write ON in parallel on the same container.
- RU consumption is well within 50% of the maximum allocation RU. Efficient RU usage.

RU usage for Source container

Notable findings

  • Maximum items per page defines how big of a batch you fetch from change feed processor for processing, this forms a bottleneck as the change feed processor wait for you to mark this batch as processed before going to the next batch of feed.
    Setting this value very high results in a tricky condition on failure scenario. Assume we set this value to 200 instead of 100. Let’s say during post processing 109th node fails, then the entire batch fails if the node processing fails. We would need a way to handle failure and retry on large items per page count.
  • Lease fetch Delay : We saw time taken around 2 to 3 secs for getting the next batch of feed pages once the last batch finishes. The default poll delay value is 5sec, so setting any value below 3 seconds is redundant since the next batch will take around 3 secs to get partition leave. Also keep in mind the time required to process each batch of documents.

Things to watch

Parallel processing

  • Multiple change feed processors can distribute the work evenly by using the leasing mechanism
  • If number of change feed processor instances are less than the available physical partitions, then each change feed processor instance will have unique ownership of multiple partitions.
  • If number of change feed processor instances are more than the available physical partitions then additional instances will sit idle
  • If multiple change feed processors want to process the same events, then concept of lease prefix is used. Each subscriber will use a different lease prefix to process the same events.

Scalability

The number of change feed processor instances can grow and shrink, and the change feed processor will dynamically adjust the load by redistributing accordingly.

  • If a processor crashes, hangs or loses network connection, its leases will be expired and distributed between other available change feed processor instances.
  • If a processor owns all leases and another one comes up, the latter one will start stealing leases from the former, so that leases will be equally divided between running hosts eventually.

Resiliency

If your delegate implementation has an unhandled exception, the thread processing that particular batch of changes will be stopped, and a new thread will be created. The new thread will check which was the latest point in time the lease store has for that range of partition key values, and restart from there, effectively sending the same batch of changes to the delegate. This behaviour will continue until your delegate processes the changes correctly and it’s the reason the change feed processor has an “at least once” guarantee, because if the delegate code throws an exception, it will retry that batch.

To prevent change feed processor to go into a continuous retrial loop, we can write better delegate function on the receiving end. We had done a basic resilience4j based second level resilience for the system to further strengthen our system. I will be writing another blog on it soon.

Cost

Along with system behaviour, we monitored our cost usage also during testing. We had used 11K RUs to test different scenarios and it was costing us ~$500 per month. We then scaled the RU consumption back to 4K and since then our system was always well within 2K RUs. Increased load will lead to increased cost. Continuous monitoring is recommended.

What Next?

We had seen the advantages of Azure change feed processor, along with deploying it in code and using it. In this post you have seen how easily it can be done by manually adding the Azure Cosmos sdk and adding the implementation of Cosmos change feed processor. Adding a sample codebase for your reference (https://github.com/Inbaraj-S/ChangeFeedProcessor_Demo). It can also be done by Azure Functions with a Cosmos DB trigger, which is a topic in itself for another blog.
Azure change feed processor has very little support online for Java developers. So if you every try out something on it, do share it with the community. Happy coding :)

--

--