Data Pipeline Optimizations: Implementing a View Caching Layer in an ELT Platform

Philip Catterall
ActionIQ Tech Blog
Published in
9 min readApr 24, 2024

By Patrick Conway, Senior Backend Engineer, and Philip Catterall, Senior Product Manager

ActionIQ’s ELT Pipelines run over 15 thousand Spark jobs every day to load and prepare our customers’ data for querying. For our customers to have the most up-to-date data to power their customer experiences, it’s critical that our pipelines are reliable and always produce correct results. But materializing hundreds of terabytes of data every day is expensive—over 70% of our AWS EC2 compute bill for our Spark clusters comes from ingest pipeline usage. Since ActionIQ’s pricing is based on compute usage, neither us nor our customers benefit from inefficiencies in the pipelines.

In this post, we’ll explore a new view caching strategy we implemented to significantly reduce the compute usage of ActionIQ’s ELT pipelines.

How ActionIQ Pipelines ingests and transforms data

ActionIQ’s customer data platform lets enterprise brands connect all their customer data together — either by ingesting datasets into the platform or by querying their data warehouse directly — and gives marketers easy and secure ways to activate data anywhere in the customer experience.

ActionIQ’s Pipelines product allows users to create scalable ELT pipelines to prepare raw customer data for marketing use cases. After ingesting raw customer data from source systems and normalizing to our internal table format, the transformation step in the pipeline allows our customers to define SQL-based views on top of their raw source data to create a cleansed and comprehensive 360 view of their customers. They can also create views based on other views to keep transformation logic modular and reusable. We materialize these views at ingest-time to improve downstream performance when users run queries for insights and when we export data to downstream channels.

Figure 1: A high-level view of ActionIQ’s ELT pipeline process

Understanding the Problem

All Pipelines can be scheduled independently of each other based on when different sources have new data available. As an example, take the pipeline in Figure 2, represented as a directed acyclic graph (DAG). We’ll use the notion of medallion tables to represent the stages of the pipeline from raw source data (bronze) to normalized, cleaned data available for marketing users in ActionIQ (gold).

Below is an example of a typical dependency graph for a portion of a customer’s ingest pipelines:

Figure 2: An example pipeline DAG that will be used as a reference for the rest of the post

Here, we have three source datasets: src_users_us, src_users_intl, and src_clickstream_events. These three sources are configured to be ingested at three different times during the day. A requirement is that gold tables must be up-to-date with the latest source data they reference. To meet this requirement, whenever new source data is ingested, we determine what gold tables reference the updated source and run jobs in our Spark clusters to materialize the gold tables.

Imagine that we ingest a new version of src_users_us. You can see that gold tables users__unioned and dim_users depend on it, so we need to run jobs to recompute those materialized views.

Figure 3: Gold tables that depend on src_users_us

To materialize dim_users , we need compute the following views:

  • stg_users_us
  • stg_users_intl
  • users__unioned
  • stg_clickstream_events_normalized

Later that day, we ingest src_clickstream_events, which gold tables fct_clickstream_events and dim_users reference.

Figure 4: Gold tables that depend on src_clickstream_events

You can start to see where a lot of redundant work is introduced: there are branches of the DAG that have not changed but are still being recomputed. For example, when we ingest new data for src_clickstream_events but there is no new data for src_users_us or src_users_intl, we do not need to recompute users__unioned or anything else in that branch of the DAG.

This is where our opportunity can be found. If our system knew when it didn’t need to recompute an already up-to-date view, we could greatly reduce the work required to materialize the final gold tables.

Getting Theoretical

To estimate the potential savings, let’s define a model for the cost of materializing a view based on the number of Spark executor vCPU cores-seconds used by the job and the cost-per-core for the EC2 instances those executors run on:

Materializing a view when also recursively calculating the inputs in it’s upstream DAG gives us the following:

For example, for the DAG from the section above, the cost of users__unioned could be calculated as:

Since we also run materialization DAGs for the other two gold tables, the total cost becomes:

If we were to compute each view only once, we would save the equivalent of:

We ran this analysis on all the table dependencies in our existing pipelines, which resulted in a projected a theoretical savings of 40%. That was significant enough for us to prioritize the implementation of a caching solution.

Identifying the Solution

Ideally, we would only perform one of each view computation. To achieve this, we boiled the solution down to three pieces:

  1. Persist all intermediate views so that the system can attempt to reuse them
  2. Ensure that that order of job execution enables possibility of reusing existing results
  3. Track the versions of inputs when computing a view to know when we can return a cached result vs needing to recompute it

The Pipelines platform is maintained by our Data Platform engineering team, but the transformations themselves are defined and configured by customers, our consulting partners, and ActionIQ’s own Professional Services Data Engineers. We wanted our solution to be a drop-in replacement, completely transparent to our end users so they would not need to redefine any transformations or configurations and still get the same results.

Design and Implementation

Persist Intermediate State

When we compute a materialized view for any node in a DAG, we persist that intermediate table to S3 as well as track execution metadata, like version, in MySQL. Tracking all intermediate state puts us in a position to potentially re-use this partial computation in future executions, rather than recomputing the same view from scratch each run.

Persisting state does not come with zero costs. Physically materializing a dataframe and writing the results to parquet files in S3 can add latency. However, we assume that in most cases, the additional I/O cost is less than the benefit we’d get from potentially duplicate computations in the future. We will dive deeper into this in a later section.

Orchestrating Execution Order

When it is time for a job to run, we create a DAG of all the inputs to materialize a view. If we wanted to materialize fct_clickstream_events, we would first need to compute stg_clickstream_events__normalized. So, we add a request to materialize stg_clickstream_events__normalized, and the job to materialize fct_clickstream_events will wait until the call to stg_clickstream_events__normalized returns.

We can generalize this logic to a recursive function:

def getLatestView(view):
if (view uses other views as inputs)
for each inputView in inputs
getLatestView(inputView)

if (most recent version of view is up-to-date relative to its inputs)
return cachedResults
else
materialize(view)

We need all recursive inputs to complete successfully in order to materialize a view. If any upstream job fails, then our target collection will fail as well.

Deduplicating Jobs

Figure 5: Views that need to be recomputed when dim_users is materialized are highlighted in blue

Imagine that we want to materialize dim_users in the example above. When the materialization request is submitted, we will perform the following checks:

  1. What is the newest version of user__unioned in our data store?
  2. Which version of user__unioned was used the last time a materialized view for dim_users was created?

If (1) has a more recent timestamp than (2) then we know that dim_users must be out-of-date relative to the newest available data for user__unioned. On the other hand, if (2) is equal to (1) then we know that user__unioned must be up-to-date relative to dim_users.

We apply the same logic to all the inputs to the view. All of this logic happens in our application services before a job is submitted to our Spark clusters.

Results

Across the board, we saw a dramatic reduction in Spark core-seconds, and thus cost, to run our customers’ Pipelines. Globally, we’ve seen an almost a 50% reduction in core-seconds, which translates to about $108k/month.

Figure 6: Total Spark application core-seconds before and after memoization

Customers with particularly complex pipelines saw the most improvement, with one customer seeing a 90% reduction in core-seconds used:

Figure 7: Total Spark application core-seconds for one customer with particularly complex pipelines

The overall number of jobs run and the failure rate (represented as red bars) has remained relatively stable.

Figure 8: Materialization Spark application runs by success vs failure

With this optimization, we’re making an explicit tradeoff between storage and compute by caching more intermediary views. However, we are seeing that the total amount of data being stored has remained relatively flat. This may be because a less-eager materialization process results in less versions of the final gold tables being stored since more of the calls to materialize them result in no-ops.

Figure 9: Aggregated object storage in GBs relative to memoization rollout

Challenges and Future Opportunities

Testing Framework

Tooling to be able to test and compare before/after results is absolutely necessary, but not easy. Any non-determinism in transformations (e.g. time-based filtering) can lead to inconsistent testing results, but does not mean that there is an issue. While verifying changes along the way, we had to pay close attention to whether or not there was any non-determinism in transformations to validate the results.

Ideally, we would have built a testing harness that would have replaced any non-determinism with deterministic behavior. However, I could do a good enough job validating results without spending the time to do this. Also, there is a philosophical question of “are you sure that replacing non-determinism will not affect your auditing ability?”

DAG state persistence and operator observability

Currently, the DAG is constructed in memory of the service that manages in-flight jobs. This means that if the service were to crash, we would lose our state, but since each intermediary table is cached to S3, it limits the redundant computation required if a job fails.

Additionally, the DAG that we construct is implicit in the call-stack. This limits the observability of the job for operators since there’s no visual representation of “what is the progress of my DAG?”.

This was a deliberate tradeoff given we wanted to limit the surface area of the change. Ideally, the DAG would be orchestrated and durably persisted in a dedicated workflow engine like Temporal.

Conclusion

Overall, this project was a big success. We were able to drastically reduce compute consumption without negatively impacting the frequency, scale, latency, or correctness of their data. This translated to substantial cost savings for both us and our customers.

This is just one example of the constant improvements we’re making to continue to evolve ActionIQ’s purpose-built data layer to be best-in-class for customer experience applications.

ActionIQ is a new kind of customer data platform for enterprise brands, giving marketers easy and secure ways to activate data anywhere in the customer experience. ActionIQ’s unique composable architecture means data can stay securely where it lives, and marketing teams only use the tools they need. We are backed by top-tier VCs Andreessen Horowitz, Sequoia Capital, and March Capital. Enterprise brands such as Autodesk, Bloomberg, The Washington Post, e.l.f. Beauty, Atlassian and many more use our composable CDP to achieve growth through better customer experiences.

Learn more at https://www.actioniq.com/
Read our product blog at https://www.actioniq.com/blog/
Explore our career openings at https://www.actioniq.com/careers/
Want to get in touch? Contact us: https://www.actioniq.com/contact-us/

--

--