Open Format in an Omni-cloud World: From EDW targets to Structured Streaming on Delta Lake
Motivation
My team — Next Generation Communication Platform — at Carvana is responsible for customer communication, engagement and retention. Our services offer important insight into AI and Workforce Management metrics. Since the service was initially not designed with data use cases in mind, GCP’s Log Router provides a convenient turn-key solution using simple manifests to stream logs for analytics. The natively supported destinations are:
- Big Query (current solution)
- Cloud Storage (GCS)
- Pub Sub
- Cloud Logging (Log bucket)
Over time, streaming log entries into Big Query faces several challenges:
A. Target Inefficiency
When using Big Query as a log routing target, the table can at best be partitioned by log timestamp. We have no ability to partition on event type (a nested field) to enable filter pushdown. Nor can we cluster on message or conversation id to improve query performance. Our legacy services are monolithic in nature and the logs can contain hundereds of event types. Consequently, each transformation job resulted in full table scans ($5/TB). Updating metrics computed upon a 2TB table every 5 minutes had been costing us $300,000/year.
B. Event Availability
To avoid expensive queries, we deployed many filtered sinks. Making a new event available for analysis in a data warehouse became an involved process. Senior developers need to be involved to make changes and deploy manifests for newer sinks. Then a manual backfill is performed matching hundreds of nested fields’ schema. Finally, new statements are added to the already complex SQL transformation logic. The process is time consuming and prone to errors. We need to be able to reduce deployment nightmare and optimize analytics event tables.
C. Schema Management
Since many events shared one sink, what type of events should contain what fields are not transparent to users; NULLs have caused great confusion. We want to fan out the tables by event type so each table only contains necessary information for users to query.
When using BQ as a sink, the event schema will be determined by the first record seen, with no ability to evolve. We need a way to accommodate new schemas and prevent errors as platform service updates.
D. Data Export
Customer Communication data needs to live outside of its own data warehouse and cloud for other teams’ consumption. Selecting from large tables plus Storage API cost plus network egress ($0.12/GB) from BQ turned out to be forbiddingly expensive — $200,000/yr for a 2TB daily export and aggregation. We need a way to store data in open format, transform and make it available at minimal cost.
Architecture
A few alternatives were evaluated as an alternative to filtered BQ sinks. Based on timeliness, reliability, automation, ease of consumption, and cost benefits, I implemented an event group -> Cloud Pub Sub -> Cloud Storage -> Spark Structured Streaming architecture.
What is Apache Beam?
An open source programming model for building batch and streaming data pipelines. Beam streaming is real time.
What is Apache Spark?
An open source, distributed computing framework that hides the complexity of parallelization and partitioning from users. The code written is parallel by nature. Spark streaming is microbatch and near real time.
What is Dataflow?
GCP’s serverless, no-op platform for running Beam Jobs.
On a high level, the components are:
- One log sink per service using Pub Sub as destination.
- One Dataflow (Beam) job per topic exporting data from Pub Sub to Cloud Storage.
- One Dataflow snapshot per month per job needed for disaster recovery.
- One Autoloader (Spark) job per service creating a Bronze table in Delta Lake.
- One notebook job per event/group creating Silver tables in Delta Lake for query.
- Ad noc notebook jobs as needed exporting data from Delta Lake.
Why do we need the Beam job? Because Pub Sub isn’t yet supported by Spark Structured Streaming. It is on the roadmap; we may no longer need the Dataflow job in a year.
Produce Events to Pub Sub
Ease of Deployment [Solves A & B]
Deploying my proposed streaming stack requires only once deployment per service.
- Deploy a filtered log sink in the legacy project with pub sub as a destination. Grab the unique writer identity generated for cross project destination.
- Deploy the pub sub topic. Ensure the pub sub and k8s config connector service accounts’ permission.
- Deploy the Dataflow job exporting Pub Sub messages to Cloud Storage.
- Create a Dataflow job snapshot to save the state of a streaming pipeline every 30 days.
Snapshot limitations:
- Works only for Pub Sub source
- Can only be restarted via CLI with Python/Java code
Consume Events by Delta Live Tables
DLT is a new framework for building scalable and testable pipelines with Data medallion architecture. With simple and declarative syntax , data engineers and analysts can mix Python and SQL, batch and streaming jobs.
DLT manages task orchestration, data quality and error handling in one place. All table properties, optimization, data definition and business logic can be documented in code.
DLT uses Autoloader as the streaming backend to ensure exactly once consumption. A watermarking strategy can be used to deduplicate events delivered at least once byPub Sub.
Having Analytics events stored as Delta tables enables us to use their change data feed to export data. [Solves D]
Data Lineage [Solves B & C]
Out DLT pipeline separates events into several data layers in the Delta Lake:
- Bronze: Raw router logs deduplicated by id
- Siver: Filtered, cleansed analytics event with type casting and expectations
- Gold: Business level rollups and aggregates
- Expectations: Data quality check and action on violation
Infer Column Type [Solves B & C]
Set cloudFiles.InferColumnTypes to true to star explode jsonPayload. This will create one column per nested field in one function call.
We can add schema hints for datetime columns and other exceptions.
Automatic Schema Evolution [Solves C]
A DLT pipeline can upcast the data type and automatically recover when schema change is detected.
Declarative and Elegant Syntax
We use the following code to find the latest change data by record. The merge function needs to account for out of order, i.e. receive a delete before insert.
// Merge function
def mergeFunc(
microBatchChangeData: DataFrame,
batchId: Long
): Unit = {
DeltaTable
.forName(tableName)
.as(“t”)
.merge(
microBatchChangeData
.transform(
findLatestChangeCustomOrder(
windowColName = pkColName,
strTimeColName = strTimeColName
)
)
.withColumn(“premature_delete”, lit(false))
.as(“c”),
s”c.${pkColName} = t.${pkColName}”
)
.whenMatched(“c.Op == ‘D’”)
.delete()
.whenMatched(“c.latest > t.latest”)
.updateAll()
.whenNotMatched(“c.Op == ‘D’”)
.insertExpr(
Map(
s”${pkColName}” -> s”c.${pkColName}”,
“premature_delete” -> “true”,
“latest” -> “c.latest”
)
)
.whenNotMatched(“c.Op != ‘D’”)
.insertAll
.execute()
}
// Then change data is merged into the target table:
val changeData = spark.readStream(
format = “cloudFiles”,
options = Map(
“cloudFiles.allowOverwrites” -> “false”,
“cloudFiles.format” -> “parquet”,
“cloudFiles.inferColumnTypes” -> “true”,
“path” -> s3Path,
)
)
changeData.writeStream
.format("delta")
.foreachBatch(
mergeFunc(
…
) _
)
.outputMode("update")
.option("checkpointLocation", checkpointPath)
.option("mergeSchema", "true")
.trigger(Trigger.AvailableNow)
.start()
Equivalent snippet in DLT. Tombstone is already handled and the simple, declarative syntax greatly reduced the barrier of entry for writing streaming jobs.
dlt.create_streaming_live_table(
name=”events”,
comment=”log events”,
partition_cols=[“event_date”],
path=s3_path,
)
dlt.apply_changes(
target="silver",
source="bronze",
keys=["event_id"],
sequence_by=col("event_datetime"),
stored_as_scd_type=1,
)
Optimization [Solves A]
- Enzyme is useful with streaming aggregation, join and union.
- Enhanced scaling is enabled in DLT to reduce streaming costs.
- Bronze table is partitioned by event date and type to enable filter pushdown and faster query.
- Silver tables can be further Z-ordered on high cardinality columns based on usage.
Conclusion
An assortment of low-code connectors today boasts the ability to make data available for analytics in real time. However, many such turn-key solutions face these challenges: cost-inefficient EDW targets; inability to evolve schema; forbiddingly expensive data exports due to cloud and vendor lock-in. The alternative: An open data lake that unifies batch and streaming workloads. Delta Lake decouples data storage from proprietary formats, dramatically reducing data extraction costs.
Bronze landing zones in Delta format build the foundation for a medallion architecture. Spark Structured Streaming provides a unified ingestion strategy for both batch and streaming workloads. Streaming should mean the ability to automatically handle new data and avoid reprocessing of historical data; real time is optional depending on use cases. Autoloader is such a technique to discover newly arrived data and ensure exactly once, incremental processing. DLT further simplifies streaming jobs, accelerating the development cycle.