GumGum Tech Blog
Published in

GumGum Tech Blog

Replacing Apache Druid with Snowflake Snowpipe

Introduction

black background with hundreds of lines coming out of a central point
Photo by Hunter Harritt on Unsplash

Situation

  • Schema changes to the source and target need to be manually updated.
  • When a historical node is destroyed by AWS, all in-flight queries and sub-queries will fail. Brokers will continue to route to the destroyed node until Zookeeper picks up the change.
  • The middle manager gives an OOM error without handling the threads and cores. It causes a loss of real-time data.
  • Some query throws OutOfMemoryError in the JVM process in the historical nodes, requiring node restart.
  • Despite Druid being a massively parallel query system, a single query could end up being processed on hundreds of historical nodes; it completely lacks any fault tolerance on the query execution path.
  • Network failures between components of Druid.
  • When a broker sends sub-queries to historicals, it needs all of those sub-queries to complete successfully before it could return anything to the client. If any of the sub-queries fail, the whole query fails.
Meme with a toddler in front of a burning house with the words “YOUR DATA PIPELINES” and “IT WAS JUST A ‘TINY SCHEMA CHANGE’ THEY SAID…”
Druid Architecture

Task

Lambda Architecture
Streaming Pipeline

Action

dataFrame.withColumn("datetime", date_format(col("timestamp"), "yyyy-MM-dd-HH"))
.coalesce(1).write.option("compression","gzip")
.option("delimiter","\t").mode(SaveMode.Append)
.partitionBy("datetime").csv(s3Path)

Snowflake Streaming Data Ingestion

The Snowpipe Auto Ingest process flow
create or replace TABLE <streaming_table> (
TIMESTAMP TIMESTAMP_NTZ(9) NOT NULL,
column_1 VARCHAR(8),
column_2 VARCHAR(256),
column_3 NUMBER(38,0),
LOAD_TIME TIMESTAMP_TZ(9),
FILE_NAME VARCHAR(16777216),
FILE_ROW_NUMBER VARCHAR(16777216)
)
create or replace STAGE <stage_name> url=’s3://<bucket_name>/<streaming_table_path>/’
storage integration=<>;
create or replace FILE FORMAT S3_TSV_FORMAT TYPE=csv field_delimiter=’\t’ NULL_IF=(‘null’) FILE_EXTENSION=’.gz’
create or replace pipe <streaming_snowpipe_name> auto_ingest=true as
COPY into <streaming_table>(
timestamp, column_1, column_2, column_3, load_time, file_name, file_row_number )
from(
select $1, $2, $3, try_to_number($6), $7, try_to_number($8),
current_timestamp::timestamp_tz, metadata$filename, metadata$file_row_number
from @<stage_name>
)
file_format = S3_TSV_FORMAT
on_error= skip_file
PATTERN=’.*.csv.gz’
show pipes;
Creating the Event Notification for Snowpipe
Enter the ARN that we get while creating the Snowpipe
select SYSTEM$PIPE_STATUS(‘<streaming_snowpipe_name>’)
select * from table(validate_pipe_load(
pipe_name=>'<streaming_snowpipe_name>',
start_time=>dateadd(hour, -1, current_timestamp())));

Conclusion/Result

--

--

We’re hiring! Check out https://gumgum.com/engineering

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store