Snowflake, the Anchor Model, ELT, and how we deal with it in ManyChat

Anton Poliakov
Manychat Tech Blog

--

Hey! My name is Anton Poliakov and I am developing an analytic data warehouse and ELT processes for ManyChat. In the world of big data, there are currently several major players that people look to when choosing tools and approaches to analytic systems. Today I will tell you how we decided to deviate from the classic and boring OLAP solutions in the form of Vertica or Exasol, instead trying out the less common but very attractive Snowflake cloud DWaaS (Data Warehouse as a Service) as the basis for our storage.

From the very outset, we were faced with the question of choosing tools for working with a database and building ELT processes. We didn’t want to use bulky and obvious readymade solutions like Airflow or NiFi and went down the path of fine customization. This was a protracted leap into the unknown, which continues to this day and is proving quite successful.

Overview of ManyChat data

ManyChat is a platform for companies to communicate with their clients through instant messengers. Our product is used by over 1.8 million businesses around the world who communicate with 1.5 billion subscribers.

My team is developing a storage and ELT platform for collecting and processing all available data for further analytics and decision-making.

We get most of the data from our own application: user button clicks, popups, events, and changes to backend models (user, subscriber, templates, interaction with our API, and dozens of others). We also get information from logs and historical data from Postgres databases.

We also receive some data from external services, which we interact with via webhooks. For now, that means Intercom, Wistia, and Stripe, but the list is growing.

Data for analysts

ManyChat analysts use data from the DDS layer (Data Distribution Service) for their work, where they are stored in the sixth normal form (6NF). In fact, analysts are well aware of the data structure in Snowflake and choose how to join and manipulate sets using SQL.

In their daily work, analysts write queries to dozens of tables of different sizes, which take a certain amount of time for the DBMS to process. Due to its architecture, Snowflake is well suited for big data analytics and complex SQL queries. I will provide specific numbers:

  • Large tables range in size from 6 to 21 billion rows;
  • The average number of micro-partitions scanned in a single analytical query is 1,052;
  • The ratio of the number of requests using an SSD to requests without a local disk is 48/52.
  • Our data volume in Snowflake is approximately 25TB. The uncompressed data volume is around 125TB (compression factor x5).

The table below shows the performance of real queries over the last month, depending on the number of objects used in them. All these requests were executed on an S-size warehouse (requests from ELT processes were not involved in these calculations).

Requests executed in less than 1 second have been moved to a separate group. This allows us to separate requests using SSDs with local cached data from those that have to read most of the data from slow HDDs.

A higher number of objects in the request makes it harder to process.

In this example, the queries were analyzed by searching the names of existing tables in the SQL code of the analyst queries. This allows us to identify the approximate number of used objects.

Anchor model

When laying out data in storage, we use the classic anchor model. This model allows you to flexibly respond to changes in stored data or the addition of new data. It also makes it possible to compress data more efficiently and work with it faster.

For example, to add a new attribute to an existing entity, you can simply create another table and inform the analysts about the need to make joins for it.

A little bit about Snowflake

Different warehouse sizes are available in Snowflake

The DBMS emphasizes on-demand computational capacity, as with many other AWS products. The budget is spent only if you use the capacity provided for calculations — you will be charged for every second the warehouse is in operation. That is to say, in the absence of requests you will only spend money on data storage.

For simple queries, you can use the cheapest warehouse. For ELT processes, depending on the amount of data being processed, we purchase a warehouse that is suitable in size — XS/S/M/L/XL/2XL/3XL/4XL — just like clothing sizes. After loading and/or processing, you can turn it off so as not to waste money. The warehouse shutdown time can be configured, from ‘shut down as soon as the query is finished’ to ‘never shut down’.

Hardware allocated for each warehouse size and cost per second of work

Read more about Snowflake warehouses here.

Currently, ManyСhat uses 9 different warehouses:

  • 2 X small for ELT processes with small datasets of up to a billion records.
  • 4 small for queries from Tableau and ELT processes that require large joins and heavy calculations, for example filling in a string attribute. This warehouse is used as the default for analyst work.
  • 1 medium for data materialization (view materialization).
  • 1 large for working with large data volumes.
  • 1 X large for one-time uploads/edits of huge historical data.

Snowflake features

Architecture

All warehouses in the system work in isolation. The architecture of the Snowflake solution is presented in three layers:

  1. Data warehouse layer
  2. Query processing layer
  3. Service layer for authentication, metadata, etc.
Snowflake architecture illustration

Snowflake works with hot and cold data. Data stored in S3 on regular HDDs (remote disk) is considered ‘cold’. When requested, it takes longer to read and load separately onto fast SSDs for each warehouse. While the warehouse is running, the data is available on the local SSD (local disk), which speeds up queries by a factor of several times compared to working with cold data.

In addition, there is a common cache for query results for all warehouses (result cache) covering the previous 24-hour period. If the data hasn’t changed during this time it will not be read again when repeat queries are run on any of the warehouses. More details can be found here.

Micro-partitions

One of the more interesting features of Snowflake is the fact that it works with dynamic micro-partitions. Databases usually use static partitions but in some cases, for example, when data skew occurs, the data can be unevenly distributed between partitions, which complicates/slows down query processing.

Snowflake stores all tables in very small partitions containing 50 to 500 MB of uncompressed data. The DBMS stores information about all the rows in each micro-partition in metadata, including:

  • The range of values ​​for each column of the partition;
  • The number of distinct values;
  • Additional options.

This approach allows you to work with incredibly large tables containing millions and hundreds of millions of micro-partitions. Queries can interact only with data that matches the required conditions. The details and nuances of data partitioning in Snowflake can be found here.

ManyChat ELT pipelines

This is what data streams, storage layers, and processing in ManyChat look like:

Data comes to the DWH (data warehouse) from several sources:

  • PHP backend — events and data model changes;
  • External APIs — Intercom, Wistia, Facebook, and others;
  • ManyChat frontend — events from the frontend;
  • WebHooks (services that send data through webhooks).

Let’s see how this scheme works, using an example of an event from the backend:

  1. The PHP backend dispatches a ManyChat new account creation event.
  2. Redis accepts the data and queues it up.
  3. A separate Python process reads this queue and stores the data in temporary JSON, loading it into Snowflake later.
  4. In Snowflake, we run the data across all the necessary layers using Python–ELT processes and, as a result, separate it according to the anchor model.
  5. Analysts use DDS and SNP data layers to assemble aggregated data marts into a DMA layer.

SA* layer abbreviations stand for staging area (for archive/loading/extract)

  • SNP is a layer for storing aggregated historical data from backend databases.
  • SAE is a layer for storing raw data from Redis as a single variant column.
  • SAA is a layer for storing enriched data from Redis with the addition of service columns with dates and load IDs.
  • SAL is a more detailed data layer with typed columns. The tables in it only store actual data; each time the load script is started, the truncate table operation is carried out.
  • DDS — 6NF for storing data in the form “1 SAL column ⇒ 1 DDS table”.
  • DMA is an analytical layer that stores views, materializations, and analyst research based on DDS.

Statistics on objects in diagrams

Using 6NF, DDS allows you to store large amounts of data in a very compact way. All connections between entities are carried out through integer surrogate keys, which fit perfectly and are processed very quickly by even the weakest XS warehouse.

SAA takes up over 80% of storage due to unstructured variant data (raw JSON). Once a month, the SAA layer drops data into the historical schema.

This is only the beginning of the journey, however, and we plan to increase the number of sources, and hence the amount of incoming data several times over on an annual basis.

ELT pipeline

We separated the entire process into several important independent parts.

Extract and Load

  • Reading data from Redis into Snowflake data lake. Loading data from Redis should be as simple as possible: the code only performs one function without affecting the rest of the system.

Transform

  • Data transformation inside Snowflake. This implies loading data from the SAA layer into SAL with the collection of statistics, maintaining the history of downloads, and informing Slack about the appearance of new models and/or fields in the models.
  • Building DDS. Lots of parallel working processes and loading of data.

Extract data to one place

Redis is a data exchange bus used frequently in ManyChat, and our project is no exception. For a quick and painless start, Python was chosen as the language for writing the ELT engine, and Postgres was chosen for storing logs and statistics. In our architecture, Redis serves as a location for the temporary storage of incoming information from all sources. Data in Redis is stored as a JSON list.

ManyChat data storage structure in Redis

Each list can contain from 1 to N different data models. Models are combined into lists by deduction. For example, all user clicks are put into one list regardless of the source, but they can have different data models (lists of fields).

The keys for lists in Redis are invented names that describe the models it contains.

An example of some bus names and models:

  • EmailEvent (events occurring with email)
    — email
    — email_package_reduce
  • SubscriberEvent (when a subscriber account is created or modified, it appears in this queue)
    — subscriber
  • ModelEvent (data models from the backend and their events)
    — account_user
    — pro_subscription
    — wallet_top_up

There are thousands of different models!

The entire ELT is built in Python, using multiprocessing. The hardware for the entire ELT in ManyChat runs on AWS on the m5.2xlarge instance:

  • 32 GB RAM
  • Xeon® Platinum 8175M CPU @ 2.50GHz

Load data to Snowflake

RedisReader.py is a script for continuously reading the Redis bus. We use multiple readers set up with supervisord for each queue that constantly keeps the required readers running.

The script continuously monitors a given Redis bus for the new data.

  • If there is no data in the bus, it waits for several seconds and tries to read the data again.
  • If the data has appeared, the script reads it through blpop()and adds it to the JSON file.

When the number of lines in the file reaches a certain level or the specified time has elapsed, RedisReader stops writing the file and starts loading it into Snowflake.

All actions in RedisReader are multiprocessing-safe and are designed to make loading as safe as possible while simultaneously using multiple processes to read a single Redis queue.

The received data is first loaded into multiple Snowflake SAE tables in parallel and is then inserted into the SAA table in a single process with a simple insert into command, without blocking already existing data. The data is mapped to the following columns:

  • model — the name of the loaded data model
  • event_dt — the date of data entry
  • raw — the data itself in JSON format (variant)
  • launch_id — internally generated value

Transform data inside Snowflake

The SAA layer is the data lake in our architecture. Further loading of data into SAL is accompanied by logging, obtaining statistics for all fields, and creating a new SAL table, if necessary.

  1. The first step is to get a list of data that has not yet been processed. The task of the script is to take the number of unprocessed lines for each model.
  2. After that, statistics are collected for each model. We store data about min/max values ​​in a column, data types, numbers of non-zero entries, and many other auxiliary characteristics.
  3. Next, the data is loaded from the SAA layer into the SAL. Only data mapped by engineers with a description of the field, the correct type, and name goes into the SAL.

Building DDS

The data ingestion into the DDS layer is based on data from the SAL schema. We have added useful features: a choice method for tracking data changes (slowly changing dimension) in the form of SCD1/SCD0, as well as faster non-blocking table inserts.

Data is loaded into each table in the DDS layer using a separate process. This allows us to work with multiple tables in parallel and not waste time on sequential data processing.

Uploading to DDS is divided into 2 stages:

  1. Entities are first loaded to form a surrogate key;
  2. The attributes and relationships are then loaded.

Loading entities

Loading entities implies loading only unique values ​​into tables of the DDS.E_{EntityName} type, where EntityName is the name of the loaded entity.

Since the entities are not related to each other in any way, it is perfectly possible to load them in parallel with each other using the built-in multiprocessing functionality.

Loading relationships and attributes

Loading relations and attributes are implemented in a similar way. The only difference is that when inserting data into the DDS schema, more joins and data validation are carried out.

Relations and attributes are not related to each other and depend only on the previously loaded entities, so we can certainly load them in parallel with each other.

Pseudocode of one of the loaders

The loader checks provided launch options every time within the DEFAULT_OPTIONS variable. If they are specified, and all checks are done, the data loader will start to transfer data from SAA to the SAL layer and fill all given DDS tables.

Conclusion

We are currently loading data from more than 30 Redis queues on an ongoing basis. As soon as the data appears in them, they immediately fall into the SAA layer and wait for their turn to be processed and brought into DDS. On average, we observe 1,500 events per second, ranging from 100 to 5,000 depending on the time of day and seasonality.

Our pipeline has allowed us to reduce the amount of manual work done by engineers to a minimum and to establish processes of development and interaction with the entire company.

At the same time, many third-party processes were implemented, including data quality, data governance, and view materialization.

In fact, adding a new loader now boils down to filling in the fields in the Google Sheet where we store all metadata and building a model for future tables in the DDS schema.

Feel free to contact me by email if you have any questions plaha.anton@gmail.com

--

--

Anton Poliakov
Manychat Tech Blog

Lead Data Engineer at ManyChat. DWH architect and Python developer.