Replacing Apache Druid with Snowflake Snowpipe
Over the last decade, real-time reporting has become very important for making decisions based on the latest data. The customers and product team request that the reports contain real-time data so they can make up to the minute informed decisions. GumGum uses real-time data to (1) make rapid decisions while serving our campaigns, (2) monitor our ad lines in real time for anomalies and serving issues, and (3) supplement and verify our batch data in a Lambda architecture. We currently use Apache Druid, a time-series database, to accomplish this.
However, real-time reporting comes with one of the most difficult infrastructures to manage for the Data Engineering team. It is costly, needs a lot of maintenance, and needs more infrastructure resources to manage. Additionally, if more data is at the ingestion point, then more distributed clusters need to be built. It is even more difficult when your data is semi-structured. The semi-structured data needs more transformation on the streaming level before it is stored in the database to make it available for reporting. Whichever system you choose for real-time reporting will face these challenges, so it is important that the tool is flexible and scalable.
Many data warehouses support real-time reporting, but meeting reporting SLAs is just a small part of the full picture. Data accuracy, cost efficiency, scalability, and — most importantly for us — maintainability, all seemed to matter much more than the difference between 5 and 15-second latency measurements.
GumGum has been using Apache Druid as the primary real-time and historic datastore for 8 years. We self-managed the cluster — running hundreds of nodes, serving hundreds of TBs with a replication factor of 2, and processing 10+ incoming streaming jobs. The performance was excellent! Users were happy to build reports and make decisions based on that data to serve our business functions.
But, managing this reporting system became difficult. As we started receiving more feature-rich reporting requests, we required joins to new datasets and ad hoc query requests — both difficult to accommodate in Druid. Also, as our data volume scaled, there was continuous pressure to keep the cluster up and running. Even with a Lambda architecture that replaced streaming data with offline batch data, we still faced several issues as described below. While the technical SLAs were still real-time — nothing about the feature development or maintenance felt fast.
Common Issues we saw on weekly basis:
- Schema changes to the source and target need to be manually updated.
- When a historical node is destroyed by AWS, all in-flight queries and sub-queries will fail. Brokers will continue to route to the destroyed node until Zookeeper picks up the change.
- The middle manager gives an OOM error without handling the threads and cores. It causes a loss of real-time data.
- Some query throws OutOfMemoryError in the JVM process in the historical nodes, requiring node restart.
- Despite Druid being a massively parallel query system, a single query could end up being processed on hundreds of historical nodes; it completely lacks any fault tolerance on the query execution path.
- Network failures between components of Druid.
- When a broker sends sub-queries to historicals, it needs all of those sub-queries to complete successfully before it could return anything to the client. If any of the sub-queries fail, the whole query fails.
Druid is a fantastic tool — but the complex architecture with many moving pieces makes it a large task to manage or automate. We were looking for a fully managed solution that will give us near-zero maintenance as we rapidly scale our business and dataset volume.
We were already using Snowflake for our batch datasets as part of our Lambda architecture (we were previously using Redshift — read more on our Redshift to Snowflake journey here). When looking for our real-time replacement, we found Snowflake’s Snowpipe technology could be a good option again, and will result in a unified data warehouse for ALL data. Having one data reporting source makes it easier to build common tool sets. It makes it straightforward and simple to create reports for BI developers based on the availability of both real-time and batch data on a single database. Most importantly, having a single data warehousing tool makes it easy for engineers to automate and maintain our data architecture.
Snowpipe is a data ingestion service that loads real-time data into Snowflake tables. If you have new data in S3, it will run a copy statement when the data lands in the S3 bucket.
At GumGum, we have Spark streaming jobs running on Databricks that consume data from Kafka for different data structures. We can sink that output data to S3 in small-batch periods. Then Snowpipe will come into the picture to load that data into tables. That data will be ready for reporting from Snowflake.
Our engineering task was to start sinking the Spark Streaming output data to S3 (instead of sinking data to the Druid Tranquility node — a.k.a. middle manager). To push S3 data to Snowflake, we created a Table matching the output data, Snowpipe definition, Stages with the S3 bucket path, and File Formats.
To store the data in S3 we chose CSV/TSV format because it is optimal for Snowflake to ingest. The Spark Streaming job started to sink the data in S3 in 10 seconds batch (we configured the Spark streaming batch duration as 10–25s depending on job load). The S3 path is DateTime stored in the DateTime column.
dataFrame.withColumn("datetime", date_format(col("timestamp"), "yyyy-MM-dd-HH"))
Snowflake Streaming Data Ingestion
Now the Snowflake work starts. First, we create a TABLE that matches the data of S3 with the data type. We have also added several metadata columns (
FILE_ROW_NUMBER) to be able to know the row details better.
create or replace TABLE <streaming_table> (
TIMESTAMP TIMESTAMP_NTZ(9) NOT NULL,
After this, we will need to create a STAGE to add it to a Snowpipe to fetch the data.
create or replace STAGE <stage_name> url=’s3://<bucket_name>/<streaming_table_path>/’
Before we create the Snowpipe we have to create FILE FORMAT, which will have the delimiter with the extension we are using while storing the file in S3 (i.e. “TAB”).
create or replace FILE FORMAT S3_TSV_FORMAT TYPE=csv field_delimiter=’\t’ NULL_IF=(‘null’) FILE_EXTENSION=’.gz’
Now it is time to create a PIPE:
create or replace pipe <streaming_snowpipe_name> auto_ingest=true as
COPY into <streaming_table>(
timestamp, column_1, column_2, column_3, load_time, file_name, file_row_number )
select $1, $2, $3, try_to_number($6), $7, try_to_number($8),
current_timestamp::timestamp_tz, metadata$filename, metadata$file_row_number
file_format = S3_TSV_FORMAT
By giving the
file_format while creating the pipe, it will split the row by delimiter and we can select the columns while inserting the data.
try_to_number() is converting the column value to a number while inserting the data.
on_error=skip_file will skip the whole file if it finds the ERROR while inserting the data. The
AUTO_INGEST=true the parameter specifies reading event notifications sent from an S3 bucket to an SQS queue when new data is ready to load.
To start the
AUTO_INGEST we will need to give permission on the S3 bucket by adding the SQS queue ARN. Configure event notifications for your S3 bucket to notify Snowpipe when new data is available to load. The auto-ingest feature relies on SQS queues to deliver event notifications from S3 to Snowpipe. We can get the ARN when we create Snowpipe.
After configuring the Event notification and creating the Snowpipe, data will be auto-ingested from S3 to Snowflake Table. To monitor the pipe, the following commands can be used. It will return the execution state,
lastIngestedTimestamp, and many more fields to know more about the pipeline.
This next command returns the ERRORs that occur in a pipeline. We can see the ERROR that’s a result of parsing if data is not matching with the table schema.
select * from table(validate_pipe_load(
start_time=>dateadd(hour, -1, current_timestamp())));
We now have more than ten Snowpipes running on auto ingest, storing TBs of data every day. Removing Druid is the end of an era for GumGum. Simple architecture and low maintenance help the engineering team scale to meet business demands. It also makes our overall data easier to understand and adopt for new engineers. We have achieved the goal of simplifying our real-time architecture. In Druid, we had many infrastructure components, and now we have simple Snowpipe DDL statements. We still have a long way to go in automating the maintenance of our Snowflake real-time systems, but we have a simple and scalable data platform to do so.
Shutting down all of our Druid clusters saved significant costs and maintenance time. We have reduced the ingestion cost by 10x. Additionally, we used to do weekly maintenance, and in the last 6 months, we have only had to perform maintenance once (due to an unexpected schema change). In the migration, we lost Druid’s main feature of aggregating the time series data with low latency. But for us, Snowpipe performance is excellent and still allows us to make decisions for the business in real-time.