Bounded Batch Architecture

Nalin Bhan
Capillary Technologies
13 min readDec 15, 2023

Capillary Technologies’ Analytics platform, Insights+, operates on the backbone of a resilient multi-tenant ETL pipeline. The growth of Capillary has been synonymous with an increasing number of tenants and, as a natural consequence, a surge in the underlying data volume that this ETL pipeline must handle. This dynamic landscape demands an ongoing dedication to adaptation and innovation to meet the mounting data challenges head-on. In our preceding blog, we thoroughly explored the initial design of the transform phase within the ETL pipeline. In this second part, we seamlessly resume the narrative from where we concluded, delving deeper into the ongoing evolution of this critical component.

Let’s recap what was covered in the previous blog to set the context. Our journey commenced with the acknowledgment of analysts’ requirement for derived columns obtained by joining different facts. Since this often led to performance issues and resource choking, we made a strategic decision to enrich the facts by incorporating the Key Performance Indicators (KPIs) desired by analysts as part of our ETL. This approach aimed to eliminate the necessity for additional post-processing on the existing facts. However, it’s crucial to recognize that executing fact-to-fact joins for KPI derivation being a computationally expensive operation, introduced significant delays in our ETL run. This computational intensity further posed scalability challenges to our system, particularly as the data volume increased.

We discussed strategies that were instrumental in optimizing the cross-fact joins. However, we didn’t shy away from discussing the pitfalls associated with these strategies. Part II takes a closer look at the challenges posed in the first leg of our exploration and, more importantly, how we overcame them.

The optimization we covered focused on leveraging the batch pipeline efficiency to reduce the facts being joined using user_id. The transformation was then applied to the reduced facts, the output of which was then merged into the history table. The below diagram depicts the same…

This approach while highly effective during its implementation, had certain flaws that became more glaring as Capillary continued to grow and a multitude of new clients were onboarded onto Insights+. With some clients bringing extensive and sizable datasets, the advantages of our initial strategy became noticeably diminished in the face of these new scenarios and it struggled to adapt to the rapid changes in data patterns.

To summarize the drawbacks of this strategy in a few words, we can say that it relied heavily on some patterns in data for it to be efficient. For example, the percentage of end users in the delta data compared to the entire users database. Any fluctuations or variations in these patterns over time would also lead to corresponding fluctuations in ETL runtimes. A few examples of these fluctuations:

Bulk Jobs affect millions of users and thus increase the percentage of end users in delta data

  1. Sending campaigns to more than 3% of end users
  2. Points Expiry jobs at the month’s end affect millions of users.

Nature of Tenant Business

  1. In the case of FMCG Verticals, the frequency of customers doing a transaction is very high (once every 3 days or so) due to which the percentage of end users affected is expected to be high
The variation in the percentage of users for different verticals

We could not afford to live with these drawbacks in the long term as more Enterprise brands got onboarded since they have very strict SLA criteria for ETL completion and not meeting the SLA would lead to a significant loss in business.

There was a need to think of an alternate approach that could adapt to these fluctuations in data patterns. We had to revisit the initial scenario and reconsider the Analysts’ use cases. The optimization we are going for should not only work, but it should always work be it with fluctuating data patterns or with a high volume of data as more enterprise brands are onboarded to our platform.

Analyzing different scenarios for Fact-to-Fact Joins

Key Performance Indicators (KPIs) within our system are created from SQL queries involving joining two or more facts. To introduce a more abstract term, we refer to these as transformations. A comprehensive evaluation of all transformations within our system led to their categorization into two distinct buckets:

Stateless Transformations

In this category, transformations are characterized by their independence from the previous set of events within the system. The events can be seamlessly mapped from one fact to another without relying on any historical context. To illustrate, let’s revisit the example from our previous blog about computing TotalPoints awarded on a bill. Here, the computation of TotalPoints is entirely self-contained within the current bill and does not require any reference to the previous bill by the end user. This type of transformation operates in a stateless manner, focusing solely on the immediate set of events.

Stateful Transformations

Unlike stateless transformations, stateful transformations involve a dependency on the order and details of the previous set of events that entered the system. The chronological sequence of these events becomes crucial for accurate computation. Consider the scenario where we aim to rank transactions at the end-user level. In this case, the computation of a bill’s rank depends on information about the rank of the preceding bill conducted by the same customer. The stateful nature of these transformations highlights the importance of historical context and the sequential order of events for accurate processing.

Optimizing Computation of Stateless Transformations

In this blog, our primary focus will be directed toward the intricate handling of stateless transformations. We do this using Bounded Batch Architecture, an in-house solution that draws inspiration from streaming engines, employing advanced techniques like watermarking and state management. To demystify its inner workings, let’s revisit the familiar example explored in our previous blog, centering around two crucial facts: Transactions and Points.

Transaction refers to a transaction done by the end-user at one of the brands.

Points represent any event in which the loyalty points were debited/credited to/from an end user’s account.

Though Transactions and Points are different facts, there is a correlation between them. An end-user may be awarded points on doing a transaction or he may redeem points to use as currency for a specific transaction. Hence, each transaction can be associated with a certain number of points. Our objective is to add a column called TotalPoints to the Transactions fact table, representing the total points awarded for each transaction. To achieve this, we need to derive the values from the Points fact table. The SQL query for this calculation is given below:

SELECT 
trans.*,
nvl(totalPoints, 0)
FROM
Transaction trans
LEFT JOIN (
SELECT
t.Id,
SUM(
nvl(p.Points, 0)
) AS totalPoints
FROM
Transactions t
LEFT JOIN Points p ON t.Id = p.TransactionId
GROUP BY
t.Id
) pts ON trans.Id = pts.Id;

The optimization discussed in our previous blog primarily centered around reducing the tables in the context of batch computation and subsequently joining them. The Bounded Batch architecture, however, tackles this challenge through an entirely different approach.

Let’s explore some key ideas that shape the core of Bounded Batch Architecture

Limit computation to most dynamic data

What is a “Fact” again? A Fact is essentially a collection of events streaming into the system, every event having a timestamp indicating When it occurred called “event_date”. By checking the event_date column, we can derive the event’s year or month. To enhance our fact tables, let’s introduce an event_month column. We’ll call this the “event partition” column.

The incoming event data falls into two categories: Inserts (new data) and Updates (modifications to existing data). When assessing the activity across event partitions, the ‘most active’ region, characterized by the highest entropy, typically resides in the latest partition, where most inserts and updates occur. As we move to the older partitions, the level of activity starts reducing significantly. Taking the example of the Transactions fact, recent transactions are more likely to undergo updates, such as customers modifying details or returning items. However, as transactions age, the probability of updates decreases substantially.

A diagram depicting a decrease in entropy as partitions become older

The image above illustrates the decline in entropy as we progress to older partitions. These partitions can be broadly categorized into two zones:

  • Hot Partitions: Regions with the highest entropy and frequent updates.
  • Cold Partitions: Region with almost negligible changes in state.

Now, determining the optimal number of partitions that make up the hot partitions involves considering various factors:

Nature of the Fact

Let’s consider a hypothetical fact where events are immutable. i.e. there are only inserts. In this scenario, since there will be no change in the older partitions, the hot-partition window will reduce to only the latest partition leading to faster computation. Conversely, for fact having very frequent updates, the hot-partition window expands to encompass multiple older partitions. The number of event partitions that should be included in the hot-partition window is very much dependent on the nature of the fact in question.

Analysts’ Requirements

The level of data freshness required by analysts plays a pivotal role. Take, for instance, the transactions-to-points attribution scenario. In the depicted example, we’ve grouped 6 partitions (spanning from Jun 2023 to Nov 2023) as hot partitions. Assuming these 6 partitions capture 99% of updates, the resulting data post-attribution achieves a 99% freshness level. However, if analysts find 90% freshness satisfactory, the window can be narrowed to the last 3 partitions (Sep 2023 to Nov 2023), significantly boosting computational speed.

There is a clear trade-off here. The higher the percentage of freshness required by the Analysts, the more partitions are encompassed in the hot-partition window, and the greater the computational expense and the time to complete the computation

Move towards a bounded computational model

Even though our scope of computation is reduced to hot partitions, hot partitions may span over months if not years for some use cases. On checking the attribution logic of most of the existing KPIs what we observed is that in the majority of scenarios, we want to directly map a column from one fact to another fact. Also in most of these scenarios, the events of the corresponding facts will lie around the same date.

The highlighted red partitions indicate the likelihood of points for the transaction being present

Let’s understand this idea with the provided illustration. In April 2023, a transaction took place. When looking for the matching month in the points dataset containing records for this transaction, the most probable match is April 2023. As a result, the transformation originally applied to the entire hot-partitions zone can now only be carried out on individual partitions. This change marks a shift from an unbounded computation to a bounded one. The same transformation can be independently applied to each partition in parallel, without any interdependence.

What is the advantage of doing this?

In a Spark cluster, optimal job execution relies on the dataset fitting into the session memory. A substantial dataset may lead to data spillage to disk, causing slower performance, while an excessively small dataset results in suboptimal cluster utilization.

Optimizing the Spark cluster for peak performance becomes feasible when the dataset’s size is predictable. By partitioning the entire dataset into month-wise segments, we effectively datasets with predictable sizes for the cluster to process efficiently.

Handle the challenges that come with executing a multi-tenant workflow

Achieving this optimization is not without its challenges, especially considering our ETL operates in a multi-tenant fashion. The workflow execution is designed to ensure that the ETL pipeline executes independently for each tenant, preventing issues in one tenant’s pipeline from affecting others. The diversity among Capillary’s tenants, ranging from large enterprises to self-employed startups, results in significant differences in data volume. With tenant-specific pipelines, the datasets mentioned earlier are also tenant-specific. Consequently, a configuration suitable for enterprise brands may not work as effectively for smaller brands due to variations in data volume.

To address this, a workaround involves changing the event partition column. In previous examples, we used event_month as the partition column. For tenants with less data volume, considering event_year as the partition key can be beneficial. This adjustment increases the dataset size by a factor of 12 and reduces the number of jobs by the same factor and can help to ensure the size of the dataset lies in the sweet spot of maximum optimal performance regardless of which tenant it belongs to.

Fitting all the pieces together…

Based on the above three principles there is a clear idea of how the transformation will pan out in the new architecture.

  1. Suppose we have a target table where we want to refresh the data. This target table will also have an event_month/event_year as a partition column.
  2. Add an event partition column to all the input fact tables involved. Should the event_partition be month or year? That depends on the tenant this computation is being done for.
  3. Limit the scope of computation only to Hot Partitions. How many event partitions can be considered Hot Partitions? That depends on the Analysts’ use case and the nature of the fact. Analysts have the freedom to decide what suits their needs.
  4. Apply the transformation query on each partition to ensure we are always doing a bounded computation.
  5. Overwrite all partitions in the target table.

The need for watermarking

This architecture operates under the assumption that events in associated facts will generally have similar dates, allowing them to reside in the same partition. However, as it turns out this assumption may not universally hold. Take the Transactions to Points attribution, for instance; there may be occurrences where a transaction with a backdated timestamp enters the system. This situation can lead to scenarios where the transaction date falls in an earlier month than the date on which points are awarded for that transaction, resulting in the transformation producing incorrect totalPoints values for these backdated transactions.

While such scenarios may be considered corner cases in the Transaction to Points attribution, there are use cases where they become more prevalent. Let’s explore another example to illustrate this point.

Capillary’s Engage+ sends targeted messages to end users as part of campaigns to increase customer engagement. Let’s consider a fact ContactInfo in which each event refers to a message sent to an end user. Now, Analysts may be interested in finding the effectiveness of these campaigns. The way to do so would be to calculate the Hit Rate, i.e., the number of customers that responded to a message. Whether a customer responded to a message or not would be identified via a transaction done by the customer following the campaign.

This is another fact-to-fact join use case where we would want to join ContactInfo and Transactions facts to get details of customers who responded to sent messages. If we try to apply bounded batch architecture to do this transformation it poses a problem. To understand why please refer to the below illustration

The highlighted red partitions indicate the likelihood of transactions being done in response to sent messages

Suppose a campaign was executed in April 2023, a customer contacted via the campaign may respond within a few days or in the coming months. Consequently, the likelihood of the corresponding transaction event existing within the same partition diminishes significantly. If bounded batch architecture is applied in this context, it results in a substantial error percentage in attribution, which is deemed unacceptable.

The solution to this challenge is the use of watermarking, a concept adopted from the streaming engines.

Watermarking in streaming engines refers to a mechanism used to track the progress of event processing and manage event time in a data stream. It is crucial in scenarios where events in a stream are not strictly ordered by their occurrence time or where there might be delays in the arrival of events.

In batch computation, we implement watermarking by establishing a look-ahead or look-behind window for each table partition engaged in attribution. The dimensions of this window should be configured to encompass the majority of events pertinent to the use case and align with the requirements of Analysts.

In the ContactInfo to Transaction attribution, we’ll look ahead 2 months while executing for each partition.

As we can imagine, using watermarking degrades performance since we’re increasing the bounds of our computation. Again there is a clear trade-off here:

As the size of the watermarking window increases, the computed data becomes fresher but we move from bounded to unbounded lookup which in turn increases the computational expense.

Conclusion

The bounded batch architecture introduces a paradigm shift in our approach to data processing, challenging the longstanding assumption that data must always be entirely fresh. The traditional pursuit of optimizations prioritizing real-time freshness is reevaluated as we acknowledge that analysts, utilizing our data warehouse, are more flexible in their expectations.

The key insight lies in providing them with tools like the Hot-Partition window and watermarking, empowering them to navigate and control the delicate balance between data freshness and computation time. This flexibility enables analysts to make informed tradeoffs, opting for faster computation when necessary or prioritizing data freshness based on defined SLAs. Bounded batch architecture not only challenges the conventional norms but also equips analysts with the means to tailor their data processing approach according to specific needs, ushering in a new era of efficiency and adaptability in the realm of data analytics.

Coming Up Next

In our next blog, we’ll explore how Bounded Batch Architecture tackles the complexities of stateful transformations. Stay engaged for a deeper dive into this fascinating aspect!

Acknowledgment

Special thanks to Satish Tvv for designing, developing, and contributing to different parts of this project

--

--