Real Time Event Stream Reconciliation Pattern

Rajesh Ojha
8 min readApr 7, 2022

--

Context:

Currently, many of software systems rely on real time event stream processing. While dealing with event stream processing, Data Reconciliation is an important step/process that needs to be performed to ensure data completeness and integrity.

Especially in SOX based applications, Data completeness and Data integrity plays a key role in ensuring SOX compliance of the software systems.

Problem Statement:

Most organizations require Data reconciliation of Fast Data (Large volume data processing) event streams at real time. To identify the gaps and reconcile huge volumes of data in a manual or batch process will not meet the requirement and is not efficient. Along with this, there are often specific business/system requirements to perform arbitrary actions based on the time elapsed until successful reconciliation. This time based action can span from seconds to multiple hours.

What is Data reconciliation?

Data reconciliation is a process of co-relating event with another event based on some event key. In the current problem context:

A message would be deemed as successfully reconciled if ‘Reconciliation Engine’ has got a message for a given key from all the different sources.

In case, a message for a given key is not received from one or more sources based on pre-defined business case(eg. time elapsed since first message) then, those kinds of messages should be marked as mismatched/unreconciled messages.

In the below diagram, Message K1A1 represents a message with Key(K1) and A1 as the corresponding message payload from Source Source_A. For this message with Key K1 to be reconciled, all sources need to emit message with key K1 and their source specific payloads.

Potential Approaches to reconcile data at real-time:

Data reconciliation of Fast Data at real time can be categorized into two broad steps as mentioned below:

1. Identify missed messages based on a given message key

2. Reconcile data with source using the identified missed key

In this post, we will focus on the first step of identifying missed messages in real-time. The possible approaches for the second step will be discussed in my next post. Let’s analyze three patterns used to identify missing data, namely:

1. Database Based Identification

2. Event Sourcing — In Memory State Identification Approach

3. Event Sourcing with CQRS — In Memory State Identification Approach

1. Database Based Identification:

This is most commonly used approach where database is used heavily for state management. As shown in the below diagram, messages from all the sources are ingested into the reconciliation database and database is primarily used for state management.

Typically, the event from first source is inserted into the database. Subsequently, events from other sources with matching key are appended/upserted to the existing event. An upsert involves a read followed by an update operation. A scheduled process which runs at specific intervals needs to be created to fetch records which do/do not have all the required sources to make it a complete message based on the source_A, source_B and source_C availability.

Reconciliation Table:

From time sequence standpoint, below sequence represent the database at different sequence numbers.

TS* = Event Received Time Stamp

Above table represents the sequence in which the events are inserted into the database. Typically, data attributes being inserted includes event message key, message state and a flag to represent that specific Source data has been received.

As shown above, with each new source, time & space complexity would increase exponentially to update the latest state and eventually the state column might grow very large. Time-Space complexity would increase as on every new record, entire state data is fetched and then being re-written after appending the incoming message.

Below table describes the time complexity for the above operations to be performed:

* Typically, upsert operation is twice costlier than a simple write. Also, since it is reading entire combined state for a given key and writing it again, it might not be efficient.

Along with the above cost, frequency of the scheduled process would also incur additional cost on DB therefore it would have to be a trade off on granularity of scheduled process interval vs DB cost. More granular time intervals would increase the DB cost while higher granular interval would cause delay in identifying reconciled messages.

Pros of this solution:

i. It enables you to create stateless application with state management solely being done at Database layer.

ii. Lightweight at application code Layer and Heavy on Database layer. This makes the application easy to maintain.

Cons of this solution:

i. To create lightweight application, most of the complexity is passed at the database layer requiring us to choose very performant and costlier DB.

ii. With cloud DB cost model, cost incurred is directly proportional to number of Reads and Upserts. With this DB heavy approach, cloud cost might be on the higher side.

iii. Index sizes might grow large and it would have to be carefully planned to ensure reduced write latency and limit DB cost.

iv. Smaller scheduled process interval would incur very high cost on DB and larger scheduled process interval will result in delay in identifying matched/mismatched orders. Therefore, Finding a right balance between scheduled process interval and acceptable delay in identification may be difficult.

v. With each record appending every time to the existing state in DB, the resultant record might grow very large depending on number of sources and message payload sizes. We might want to leverage claim check pattern to take care of this to ensure only key information is passed to state manager.

2. Event Sourcing — In Memory State Identification Approach:

In the 1st approach, we saw that there was lot of DB activity for reconciliation activity. We can reduce this DB activity and selectively choose when to really hit DB with the new approach as shown below. We can do this by implementing event sourcing along with taking advantage of a distributed application persistence which can maintain message key state in distributed application memory. (There are good tech frameworks which support it out of the box. Akka EventSourcing framework is one of them).

As per this approach, every message will be persisted as an event in database and in-memory state is updated with the current state. Based on the requirement, in-memory state for event key can be retained until matched/mismatched message business case criteria has been met. Once a key has been identified as matched or mismatched then all the needed events and can be published to output and the in-memory state can be discarded. In event of any disaster, the in-memory state can be recomputed from the events already stored in the event sourced write database. Event state can also be snapshotted in case we want to reduce the recovery time.

From time complexity standpoint, below operations would be executed whenever a message with a given key would be coming to this pattern.

As we can see, this solution considerably reduces the time complexity of reconciliation and performs better as we keep adding more sources for reconciliation. However, this solution might work better if we only focus on matched messages. Identifying long term mismatches can increase the solution complexity.

Pros of this solution:

i. Time and Space complexity is reduced considerably as only write operation are performed and no read operation is attempted unless Disaster recovery is performed. Also, this approach doesn’t do any kind of upserts which are typically the costliest operation.

ii. As it is lightweight at database layer, cloud cost is reduced considerably as compared to the first approach.

iii. In this approach, application logic can ensure a massively horizontal scalable database giving lot of flexibility, performance and DB cost advantage

iv. It delivers higher performance as operations are done in memory thereby avoiding DB and network latency.

Cons of this solution:

i. It creates a stateful application wherein more thought process is typically required while designing the solution.

ii. Lightweight at database layer and heavy at application layer thereby increasing maintenance effort

iii. This solution might work better if number of matched messages tend to be much higher as in-memory state would keep on releasing as the records gets reconciled. Long term mismatches can increase the in-memory state size considerably unless few other things (3rd approach mentioned below) are done to handle it.

3. Event Sourcing with CQRS — In Memory State Identification Approach:

As mentioned in the 2nd approach, one of the disadvantages was that sometimes in-memory state might grow large. In such case, we can extend approach 2 to include event sourcing pattern with CQRS(Command and Query Responsibility Segregation) pattern as well. It will help to implement clean domain modeling where command and query concerns can be separated and query models can be extended to handle multiple use cases based on system requirement. Updated diagram with approach would be:

As shown above, in this approach, we don’t have to read entire combined state and update it back. Query database or Application state can be selectively used(that’s why dotted line from 2 sources) to identify the reconciled and mismatched messages to ensure real time identification along with not worrying about the granularity of scheduled process interval. It will isolate the command and query logic to enable separation of concerns and independent scaling.

Pros of this solution:

i. Command and Query concerns are separated wherein Query model doesn’t have to deal with state.

ii. Independent scaling possible along with flexibility to choose different type of DB based on the use case.

iii. Reads would be more performant. Since reads are done more than writes, overall system would be most optimal.

Cons of this solution:

i. 2 databases being created which might require more maintenance.

ii. Data consistency between Read and Write might have to designed well along with dealing with eventual consistency.

Conclusion:

All the approaches mentioned above have their own benefits and challenges. Based on the requirement:

First approach might be better if we are not dealing with huge amount of data and prefer stateless application. However, it might be difficult to scale up a DB considering the latency and cost for large volume of data.

Second approach might be better if we want to process huge data at fast pace with reduced latency and DB cost and are focussed primarily on reconciled messages.

Third approach might suit as well if we have to process large volume of data at fast pace and need to handle dynamic scenarios to handle both matched and mismatched messages. However, it might need more maintenance effort.

Above approaches are some of the approaches I experimented with while reconciling fast data. Kindly feel free to comment or email to share any comments/feedback/suggestion on the above mentioned approaches. Would love to get additional insight.

--

--