How to speed up data marts with view materialization

Anton Poliakov
Manychat Tech Blog
Published in
8 min readDec 21, 2021

Hey! My name is Anton Poliakov and I develop analytic data warehouse and ELT processes in ManyChat.

Several years ago, we chose Snowflake as a service to support our data platform. With the growth of data volumes to hundreds of millions of rows (spoiler alert: and then tens of billions), we asked the question: “How can we spend less time calculating queries for daily reporting?” The ideal option was to use the view materialization technique, which allows you to access the pre-computed results of data marts much faster than the original data.

We could have given control of our pipeline to such popular data transformation tools as dbt, Matillion, or Dataform. At the beginning of 2020, however, none of them had the ability to fine-tune for the needs of Snowflake and ManyChat. Plus, we didn’t want to pay for another third-party tool. So we decided to reinvent the wheel by creating our own way to work with data materializations. I’m now going to explain how this works in a little more detail.

The problem of large views

In ManyChat, we use the anchor model. The distributed data layer already contains over 150 TB of data in 1,500+ tables. Every day, in our Snowflake storage, 30 people run hundreds of analytical queries, and these are not only analysts, but also developers, HR, and accounting.

Number of tables in an anchor model layer
Number of tables in an anchor model layer

Working with the sixth normal form used in the anchor model, you inevitably have to use a large number of joins, so over time analytical queries become more and more complex.

Very often, to build a report, you need to join more than 10 tables, and despite micro-partitions, filters, and proper table-clustering, this slows down queries. On the one hand, it is easy to scale up the size of a warehouse in Snowflake, on the other hand, the price for its use will also double, and this is not very appealing to young startups.

To simplify the work of analysts, queries that are too long are decomposed into small views that can be reused in different reports. This makes it easier to organize your code and standardize development by providing a single data source for derived views.

Each analytic query to the view resumes the warehouse, starts consuming Snowflake credits, and can also be queued during the high load. In addition, the result of the work of views, based on continuously updated data, is not constant throughout the day and the warehouse constantly has to recalculate them. Although some of the data is already in the warehouse cache, this process can be time-consuming.

If the data from the derived objects (Derivative Data) disappears or turns out to be inconsistent, we always have the opportunity to recalculate it from the sources (Primary Data) without losses. Every time someone writes a query to the target with the result of aggregation, the system actually starts the entire process of receiving and transforming data.

But what to do when, with growing data volumes, each view takes tens of minutes to calculate instead of a couple of seconds?

Complete materialization

In today’s world, it is difficult to imagine that someone could be waiting for the results of the previous month’s report for 10, 20, or even 30 minutes. That is why the result of the entire pipeline is usually materialized.

Materialization is a database object that contains the results of a query.

We decided to start by adding a create or replace table command to all existing views and updating them on a daily basis.

In the overwhelming majority of analytical reports, there is no need to update data in real-time: near real-time is enough, for example with a single-day delay. We have therefore decided to take all data from the storage at the end of the previous day for most analytical reports.

All data, starting from any past date up to the end of the previous day, will fall into the fully materialized view.

Complete materialization fully overwrites preexisting data without history. This method is suitable for small tables with few join/order/group by/window functions. In warehouse M, such a query should take no more than 5–10 minutes.

To access data from set B, the system will only need to use data from materialized object A. All other datasets, materialized or not, can be thrown out of the query calculation.

This approach made it possible to update the result of the views once a day, using a fairly cheap warehouse of size M. It also became possible to use this data in other views and reports without reprocessing.

Before: Calculations of analytical views were performed on the S warehouse and consumed a sizable number of credits daily. In addition, at some points, the warehouse was overloaded and many queries went to the queue (orange), even if there was a second cluster up within a multi-cluster mode.

The horizontal axis is the time of day, the vertical axis is the number of credits used. Each bar represents 5 minutes of warehouse operation.

After: Creation of fully materialized views was moved to a warehouse of size M. Analytic queries on the collected data remained on a single-cluster S warehouse.

The warehouse of size M is more expensive to use, but due to the fact that updates are carried out in one batch, one after another, the warehouse does not stand idle and only works for a specified period of time. This means that money is no longer wasted on resuming and warming up the cache of the warehouse or timeouts before shutting it down.

Queries on the S warehouse began to consume fewer resources, queues disappeared and query execution time decreased.

The total credit consumption of the two single-cluster warehouses S and M was reduced by about 20% compared to the original consumption of the auto-scaled multi-cluster S warehouse. The waiting time for the result of analytical reports has decreased from tens of minutes to 0–5 minutes.

Incremental materializations

Complete materialization worked great on small amounts of data. When joining tables of hundreds of millions of rows, everything happened quickly, and each query took no more than 5 minutes to complete. But we are growing very quickly, and in a few months, the volume of data has increased tenfold.

Joins of tables with tens of billions of rows began to reveal the lack of performance of a warehouse of size M. We could still handle such volumes, but the time to create tables was growing. It sometimes took more than 30 minutes to materialize a single report, and with 30–40 materializations per day, the time for assembling all data marts reached 5–7 hours.

Of course, we could solve the problem by increasing the size of a warehouse to L or even XL (and further to XXXXL). But initially, we chose the path of economizing and consuming resources in a measured way. This led us to the logical development of our materialization system: materialization with an increment.

Incremental materialization works according to a different principle than with complete materialization: only the part of the target table of interest to us is updated, rather than the entire table. Only data that has changed or been added since the last materialization is added to the target table. This method allows you to work with a smaller set of data, reducing the execution time of queries for materializations as well as costs.

Complete materialization

The old target table is deleted and a new one with up-to-date data is created instead.

Incremental materialization

The old materialization table is not removed, but records that are not yet in it are added.

This type of materialization implies an incremental update of the target table by adding (insert/delete + insert) new data to the existing ones using the specified partition key.

Any increment partition can be chosen. In most cases, this is a column with the date of the event/model update. This approach made it possible to reduce the amount of data needed to obtain the increment in the view.

The running time of the incremental materialization was reduced by 70–80% compared to a full rewrite of the existing table. Despite the growth in data, the approach of taking only the data over the past few days to refresh the materialization has helped freeze infrastructure spending at one tier.

How do we manage our materializations?

Materializations are updated every morning within our python ELT framework in a dependency-aware order. After assigning each view a grade depending on the level of connectivity with other objects in the storage, the entire data lineage is recursively calculated for each view and its contents from the source to the result in materialization. Taking into account the number of dependencies and their weight, a DAG (directed acyclic graph) is built to materialize each view one by one. This sometimes happens asynchronously. For example, if there is no direct relationship between two views, they can be run in parallel.

ManyChat materialization dependency graph
ManyChat materialization dependency graph

Conclusion

Thanks to the materialization of views, we were able to reduce the assembly time for daily reporting by several times. Our new pipeline allowed us to reduce warehouses runtime and reduce the cost of Snowflake infrastructure by tens of percent. A year has passed since the implementation of the materialization system, and all this time, the consumption of credits has remained stable, while understanding how the Snowflake optimizer works allowed us to write fast queries and use many hacks through the python code of our framework.

Ask me about the nuances of our ELT processes or aspects of working with Snowflake in the comments: I’ll be sure to get back to you.

--

--

Anton Poliakov
Manychat Tech Blog

Lead Data Engineer at ManyChat. DWH architect and Python developer.