An Introduction to Streaming ETL on Azure Databricks using Structured Streaming & Databricks Delta — Part I
In my previous role I developed and managed a large near real-time data warehouse using proprietary technologies for CDC (change data capture), data replication, ETL (extract-transform-load) and the RDBMS (relational database management software) components. To be precise, our process was E-L-T which meant that for a real-time data warehouse, the database was continuously running hybrid workloads which competed fiercely for system resources, just to keep the dimensional models up to date. At times frustrated by sub-optimal performance, challenges of latency and vendor lock-in, I had often considered migrating to an ETL process built using open source / big data technologies (known as ETL-offloading) and whether this would provide the promise of true horizontal scalability. Naturally I was wary of the fundamental differences between the technologies, learning curve involved and the development time required.
Recently however, I was fortunate enough to work on a POC which required a streaming ETL pipeline using Azure Databricks, so I had a rare glimpse into what this migration challenge might have been like, the major differences and whether there would be any significant trade-offs.
In this blog I want to share my experiences by building a more “traditional” yet real-time ETL pipeline using the latest and greatest from Azure Databricks. I say “traditional” because the result should represent a star schema in a data warehouse, specifically Azure SQL Data warehouse, although in streaming mode for low latency between source and target. I also wanted to consider incorporating the popular concept of a data lake which is a cost-effective scalable storage option. Fortunately Azure Data Lake Storage gen 2 went GA at the beginning of the month and the connector for Databricks is fully supported.
The first part of this blog will cover the main concepts and components, as well as the overall architecture used for the demonstration. Part II goes through the complete setup in Azure, and Part III walks through the ETL pipeline developed in a series of Databricks notebooks. However if you’re eager to dive right into the detail, you will be able to run the pipeline with minimal setup in Azure.
But if you feel like a light-hearted diversion before we begin, here’s a semi-fictional journey through time from an ETL developer’s perspective…
Databricks was founded by the creators of Apache Spark and offers a unified platform designed to improve productivity for data engineers, data scientists and business analysts. The Databricks platform provides an interactive and collaborative notebook experience out-of-the-box, and due to it’s optimised Spark runtime, frequently outperforms other Big Data SQL Platforms in the cloud.
Azure Databricks, is a fully managed service which provides powerful ETL, analytics, and machine learning capabilities. Unlike other vendors, it is a first party service on Azure which integrates seamlessly with other Azure services such as event hubs and Cosmos DB.
As you would expect with such a service, it’s quick and simple to spin up clusters which can auto-scale, and auto-terminate — so your cluster will shutdown after a specified amount of time, once your jobs have completed. These two features make the service cost-effective, as you’re only billed while your cluster is running based on the number and size of the nodes (VMs) in your cluster. More information on billing can be found in part II. If you want to run the demonstration with the notebooks provided in part III, you can provision a 14 day free trial and you will only pay for the underlying Azure VMs. Once the resource is created in Azure, a Databricks workspace will be provisioned for you, which essentially stores all your Databricks assets, such as notebooks and libraries. Even without any running clusters you can still access the workspace, view notebooks, schedule jobs etc. and you won’t be charged until you spin up a cluster. It’s worthwhile noting that in a streaming scenario or one which requires a dedicated analytics cluster for a BI tool, you would need a dedicated cluster, although it could be set to auto-scale depending on the workload.
Structured Streaming & Databricks Delta Lake
Together with Azure Databricks, the two key components that in my opinion really unlock a true ETL / data warehousing use-case, are Spark Structured Streaming and Databricks Delta (now known as Delta Lake). With these improvements, mere mortals can now attempt to develop (both batch and streaming) ETL and analytics pipelines in Spark. Why so? Before structured streaming, there was Spark Streaming which used RDDs & DStreams (based on relatively low-level operations on Java/Python) and before Databricks Delta, concurrent reads/writes, DML operations and performance optimisations were limited and complex.
Structured streaming on Azure Databricks provides a reliable, exactly-once, fault-tolerant streaming platform, using a simple set of high-level APIs. You can treat an incoming stream as an unbounded table and it allows you to handle late or out-of-order data. More importantly through a feature called watermarking, you can join streaming data (with static or other streaming data) and instruct the engine how long to “wait” for records to join. This is a significant departure from how our RDBMS based ELT process ran whereby it would run a number of mappings but only on one change set (CDC table) at a time. It was done so because you could not rely on the master-detail events being recorded in the same (micro-batch) time window.
One may ask why couldn’t the Spark engine simply wait indefinitely for these late arriving records to appear. Well, it could but, but to do so requires keeping unbounded state, in other words, storing the keys in memory without any threshold at which they can be purged, which will ultimately exhaust all available memory and lead to out of memory (OOM) failures.
So whilst the ability to handle late arriving data may be very useful in a near real-time ETL scenario, it is not without it’s limits and consequence, so carefully evaluated watermarking thresholds will need to be defined to limit state. Crucially, data arriving beyond this threshold would then need to be handled in a batch process.
On a more positive note, the code changes between batch and streaming using Spark’s structured APIs are minimal, so once you had developed your ETL pipelines in streaming mode, the syntax for running in batch would require minimal re-coding. (An example of this is provided in the final notebook.)
Turning our attention now to Databricks Delta, there are certain “exclusive” features which simplify and optimise streaming (and batch) ETL scenarios. Essentially it’s an optimized Spark table with a number of SQL-like features:
- ACID transactions — Multiple writers can simultaneously modify a data set and see consistent views. Writers do not affect readers.
- DELETES/UPDATES/UPSERTS — Writers can modify a data set without interfering with jobs reading the data set. Merge operations are well supported too.
- Statistics, data skipping and ZORDER clustering — Reads are 10–100x faster when statistics are tracked about the data in each file, allowing Delta to avoid reading irrelevant information.
With Delta you can write batch and streaming data into the same table, and other notebooks and clusters can read from the same table and get a consistent up-to-date view. Additionally, Delta can improve data access speeds by organizing data into large files that can be read efficiently. This is done by coalescing small files into larger ones.
So essentially it’s a transactional storage layer designed specifically to harness the power of Apache Spark and the Databricks filesystem (DBFS). The underlying data is stored as Parquet files in DBFS but Delta maintains a transaction log that efficiently tracks changes to the table.You can read and write data stored in Databricks Delta using the same familiar Apache Spark SQL batch and streaming (structured) APIs.
So is Databricks Delta the perfect solution to all ETL challenges perhaps?
Whilst the gap between the two technologies does seem to be closing, one still needs to be aware of the fundamental differences between Delta and a RDBMS. It’s files as opposed to records, it’s a columnar vs row based storage format and hence certain point operations like merges or updates (although possible) are unlikely to run as fast, mainly due the lack of real indexes. Even more so in streaming scenarios, so this is where you may need to consider alternatives, like Cosmos DB, if you have workloads which depend on running these kinds of operations in a time sensitive manner. Hopefully by running the demonstration in Part III yourself, you can evaluate these considerations.
Dataframes, Datasets, transformations & lazy evaluation
If you are new to Spark and coming from a SQL background, I’d recommend we cover a few “foreign” concepts which you’ll need to get your head around: dataframes, transformations, actions and lazy evaluation. I’ll not go in to any great detail so if you fancy some good bed-time reading, I recommend “Spark: The Definitive Guide: Big Data Processing Made Simple” written by Matei Zaharia, who started Spark in 2009 and co-founded Databricks, and Bill Chambers, who is a PM at Databricks.
Dataframes and Datasets are both distributed, immutable table-like data structures which can span thousands of computers. Because they’re immutable we need to perform transformations on them but store the result in another dataframe. A transformation can be anything from casting a column to a different type, applying a built- or custom function (UDF) to joining dataframes. So it’s common to “chain” multiple dataframes each with their own transformations before achieving the output you require. Spark will form an optimal execution plan, but only execute them when you specify an “action” — such as displaying the count in the notebook, or writing to an output sink. Hence the notion of lazy evaluation.
You can express business logic (in R, Python, Scala, or Java) as a set of logical transformations in SQL, DataFrames or Datasets, and Spark will transparently compile that logic down to RDDs and generate an underlying execution plan.
To begin with, you’re likely to only use Dataframes rather than Datasets, but if you’re wondering about the difference between the two, it’s that Datasets are strongly typed whereas Dataframe types are only evaluated at runtime.
Lastly Spark supports a subset of the ANSI SQL 2003 standard, so you can develop many parts of your pipeline in SQL-like notation and you can even interoperate between dataframes and SQL as you need.
Whilst the following demo is loosely based on the POC I mentioned earlier, it has, for the purposes of confidentiality been altered and changed, particularly the data set. The scenario I’ve decided to use is based on the frequently cited Ad Tech use-case, although for simplicity I’ve ignored certain metrics like clicks and conversions which are fairly well covered already.
Assume the business is in the advertising sector and monitors the volume of adverts displayed (impressions) on certain websites. They are only interested in adverts displaying particular brands. The business is not interested in the usual clicks and conversions. In this scenario, adverts and impressions arrive as a continuous stream / feed of json messages (could be files) from a 3rd party. Adverts contain metadata such as when it was created, a unique identifier, name and which brand it pertains to. Impressions are, when, and on which site, an advert was displayed, including the session and user information. Fortunately we have an advert ID in both to link these however we don’t have any sort of unique key for each impression.
There are some static operational/reference sources which contain slowly changing data:
- a SQL DB which contains a list of brands being monitoring
- a web application which the operational team uses to maintain a list of domains that are being monitored.
For convenience, assume that both of these data sets are already pre-filtered by a 3rd party based on these specific brands and domains. Adverts are an important source of reference data and individual adverts need to be easily accessible by the operational application.
The set of source tables and the target star schema we need to generate is as follows:
Note, I’ve excluded the crucial time dimension for brevity, and you’ll also notice I’ve included a batch ETL scenario to showcase how we could implement a Type II slowly changing dimension requirement.
Given the scenario we’ll implement the following architecture:
The streaming source section might seem overly complicated so this deserves a little explanation. To generate a streams of data we will use a utility to generate json data for both adverts and impressions. The utility supports defining a schema of your choice but unfortunately only supports sending data to IoT Hub. We’ll need to use stream analytics to consume the stream and route it based on the contents of the file either to Event hub or Cosmos DB.
Looking at the store section now, you may wonder why or how we can send a stream to Cosmos DB. In the scenario description above, it was mentioned that adverts need to be accessible by an operational application, from which individual adverts need to be quickly accessed. This is a good fit for Cosmos DB which provides scalability and single-digit-millisecond latency. Stream analytics can easily send a stream to Cosmos DB and we can read it back again as a stream using the superb change feed feature. Stream analytics will route Impressions to event hubs and Databricks will read both of these streams, run the ETL pipeline and stream the results to Azure SQL Data warehouse. There will also be various pipelines run in batch to provide a more comprehensive set of scenarios.
For a more realistic demonstration, I wanted to showcase how we take two streams, join them, as well as join with other slowly changing reference data. In fact, one of the advantages of Databricks, (more specifically Spark), is that you can read from many different data sources and data formats with relative ease, perform joins and transformations, and then write the output to a multitude of targets. This is why it has become a popular choice for data lake analytics.
Next, in the 2nd part of this blog, we’ll build out the above architecture but if you’re eager just to run the pipeline there will be an option described in part III to achieve this with minimal setup required.