Event-Driven Ingestion with Databricks Workflows and Serverless DBSQL

Databricks SQL SME
DBSQL SME Engineering
10 min readDec 13, 2023
Author Screenshot: Successful run of SQL

Author: Julia Hofmeister

Introduction:

Event-driven data ingestion is quickly becoming a requirement for many organizations, with use cases ranging from telemetry and autonomous driving to fraud detection and human resource management. In the past, engineers had to create event buses and message queues to get a notification when a new file was available to be loaded and then manually kick off the data processing pipeline. Or, they had to continuously run the data pipelines which was costly and resource-intensive, especially if files were being loaded irregularly.

Now, Databricks lets you easily load data into your SQL pipelines as soon as it lands in your cloud storage without needing to run always-on streams and wait for clusters to start up. When you combine Databricks Workflows event-driven triggers with Serverless SQL task types, you can ingest data seamlessly as it comes in without having an always-on cluster. Using the COPY INTO command with Databricks Triggered Workflows is the final piece to have your automated and cost-effective event-driven data pipeline up and running!

Note: your workspace must be on the “Premium” plan or above to use Databricks SQL. File arrival triggers on Databricks require Unity Catalog to be set up. Please refer to this documentation for information on upgrading your workspaces to Unity Catalog.

What is DBSQL Serverless?

With a serverless architecture, the underlying compute — what powers your SQL queries — is managed by Databricks and spins up in seconds without needing to manually turn on clusters. Unlike Databricks Data Engineering Clusters, you only pay a single price for DBSQL Serverless, instead of having 2 line items for both the Databricks and underlying machine costs. Since you are not paying for both the cluster cost and the DBUs, the total cost of ownership (TCO) is usually lower than using a Classic or Pro SQL warehouse, especially for in-frequent and sporadic data loads. To learn more about serverless and how it differs from other SQL warehouse types, check out our documentation.

What are Databricks Workflows?

Databricks Workflows is an advanced orchestration tool that is similar to Airflow or Azure Data Factory. However, a key difference is that there are no extraneous costs associated with using Workflows — all you pay for is the underlying clusters/warehouses the jobs utilize while it is running. With Workflows, you can chain together different notebooks, Delta Live Tables pipelines, SQL queries, Dashboard Refreshes, Python wheels or scripts, and JARs. You can also add if/else conditions or Spark submit commands. In addition to Databricks-specific products, Workflows integrates with DBT — you can add a DBT project to your pipeline just as easily as a Databricks notebook. In this post, we will be using Workflows with different SQL queries. Check out our documentation on workflows here.

What is “COPY INTO”?

COPY INTO is a SQL command that lets you load data from cloud storage into a Delta table. It supports many common file formats, including JSON, CSV, Parquet, Avro, and text files. COPY INTO is idempotent by default, so files are only processed once (you can also set the ‘force’ option to ‘true’ if you want to reload the entire source directory). This saves time and cost as your ETL pipeline is processing every file only once instead of a full load each time. It supports target schema evolution, merging, mapping, and inference. You can create a target schema by creating an empty table before loading the data or you can have COPY INTO infer the schema for you by setting <code> COPY_OPTIONS (‘mergeSchema’ = ‘true’) </code>

Our full documentation on COPY INTO is located on this page.

Now we are going to create a Serverless and full automated event-driven ingestion pipeline on Databricks with the following steps:

  1. Create a Serverless SQL Warehouse — This can be a new, dedicated warehouse, or an already existing warehouse.
  2. Define the Ingestion Pipeline — Create the Landing Table and the COPY INTO Ingestion Script to be run
  3. Create a Databricks Workflow — Create a new Job with 1 task that calls the SQL Query created above. This can either be called from a SQL file or a Databricks query object. SQL file sources are better for production.
  4. Create Job Trigger — Define a Job Trigger that will automatically run when files are dropped
  5. Monitor Job Runs — Monitor your event driven job!

Step 1 — Create a Serverless SQL Warehouse

Your first task is to create a serverless SQL warehouse, either through the UI or the API. When creating a new SQL warehouse in your workspace’s UI, serverless is enabled by default. Choose a cluster size based on your anticipated workload. The more transformations needed and data throughput, the larger your cluster size should be. In general, Databricks recommends starting with a larger size than you think you may need and scaling down from there. Using Intelligent Workload Management, Databricks SQL (DBSQL) will down or up-size your warehouse as needed and prioritize workloads efficiently. The “Auto stop” button should be turned on if you don’t want the warehouse running 24/7. For scaling, increase the maximum number of clusters if multiple users or queries will be using the warehouse at the same time. Make sure the “Serverless” button is checked and then click “Create” — you now have a new serverless SQL warehouse!

For more information on scaling and sizing a serverless SQL warehouse, take a look at our documentation on the subject.

Author Screenshot: DBSQL Warehouse Creation with Serverless

Step 2 — Define the Ingestion Pipeline

Your next step is to create the SQL query to ingest files from cloud storage. In your Databricks workspace, click ‘New’ and then ‘Query’ under the ‘SQL’ section of the menu. In the example below, we create the pipeline with 2 statements: the CREATE TABLE and the COPY INTO statement like so:

All of the options available are located here in our documentation.

CREATE TABLE IF NOT EXISTS main.serverless_sql_test.real_estate (
id BIGINT,
url STRING,
region STRING,
region_url STRING,
price LONG,
type STRING,
sqfeet INT
);

COPY INTO main.serverless_sql_test.real_estate
FROM 'abfss://domain.dfs.core.windows.net/' -- or s3/gcs
FILEFORMAT = CSV
FORMAT_OPTIONS ('header'='true', 'inferSchema'='true','mergeSchema'='true')
COPY_OPTIONS ('mergeSchema'='true');

If you don’t know the schema of your files that is ok too. Instead you can define a schema-less table for ingestion with something like this:

CREATE TABLE IF NOT EXISTS main.serverless_sql_test.real_estate;

COPY INTO main.serverless_sql_test.real_estate
FROM 'abfss://domain.dfs.core.windows.net/' -- or s3/gcs
FILEFORMAT = CSV
FORMAT_OPTIONS ('header'='true', 'inferSchema'='true','mergeSchema'='true')
COPY_OPTIONS ('mergeSchema'='true', ‘force’=’true’);

Note a few interesting format and copy options:

  1. FILEFORMAT — I chose CSV, but COPY INTO supports many file formats documented here.
  2. FORMAT_OPTIONS — This depends on the file format above, but the inferSchema and mergeSchema options are the most common and incredibly useful options. These options allow you to either dynamically infer the schema as it comes in, or be more rigid about how you want to handle schema changes.
  3. COPY_OPTIONS — This allows you to specify other generic options such as handling corrupt or missing files, specifying starting place, and starting over with ‘force’ = ‘true’. There are many other copy options available here.

Since COPY INTO is idempotent, you can run either of the queries to do a full load of any files in the cloud storage location right now and then use that same query in your workflow later to load any new files when they arrive into the location designated in the query. You can set COPY_OPTIONS (‘force’ = ‘true’) if you want to disable idempotency and have each data file loaded into the table regardless if it has been processed before.

In the above example, there are some CSV-specific options in the example queries as we know the files being ingested have a header and we want the query to infer the schema so that each field is not defaulted to a string type. The ‘mergeSchema’ option is added to both the format and copy options so that the incoming data can be loaded into the table even if there is a schema mismatch. The schema of the table will evolve according to the arriving data if ‘mergeSchema’ is set to true. If your use case requires a very specific schema, these options are false by default but if the incoming data does not match the target schema the query will fail.

Make sure to save your SQL query and add a name so that you can use it in subsequent steps. Now that we have a working ingestion pipeline with the COPY INTO statement, lets make it run as a serverless event-driven job.

Step 3 — Create a Databricks Workflow with the SQL Task Type

Now it is time to create the Databricks Workflow we will use with the query created in step two. In the Databricks workspace, we can click ‘New’ and then ‘Job’ under the ‘Data Engineering’ section of the menu. We then add a name for the new job and task then under the ‘Type’ menu select ‘SQL’. In the ‘SQL task’ dropdown, select ‘Query’ and then enter in your saved query’s name.

If you look through some of the menu choices you can see options to add in a SQL dashboard, alert, or file to a workflow. For example, data is loaded into a table with a SQL query and then an alert is run on that table to check that the a column’s records are not over a certain threshold or the amount of records has exceeded a specified value. With Workflows, you can chain all kinds of tasks together to create as elaborate of a data pipeline that you need.

Author Screenshot: New Job with a SQL Type and “Query” task source

Click ‘Create’ and you will see a page like this:

Author Screenshot: Succesfully created SQL Task Type on a Serverless Warehouse

One awesome feature is the “file” option in the SQL Task type source. This allows you to connect to a Git Repo and run the SQL Task type directly from a source-controlled repo — this is recommended for production ETL jobs. This version would look like this:

Author Screenshot: SQL Task Type from File

For the ‘SQL Warehouse’, either select the one we created in step one or use an existing one in your workspace. The query that was created in step two does not have parameters but you can add them in your own queries — check out our documentation on query parameters here. When you are testing Workflows out with your query pipelines, feel free to experiment with notifications or retries.

There are lots of settings you can play around with on this page. When running ETL pipelines, make sure to add tags for better visibility into your workflows. You can add tags for development/UAT/production, business units, or however your organization separates out costs. These tags can then be utilized in the Databricks System Tables and other operational logs for cost and operational monitoring.

Step 4 — Create Job Trigger

Now that you have a Workflow created, click the ‘Add trigger’ button and this menu will show up:

Author Screenshot: Job Trigger Page

Select the ‘File arrival’ trigger type and make sure the ‘Trigger Status’ is ‘Active’.

Add the storage location used in the query you created during step two — feel free to test the connection to make sure the Unity Catalog external location is configured properly.

In the ‘Advanced’ section, you can configure waiting time between triggers and after a file is dropped to control the sensitivity of your ingestion pipeline. After adding the cloud storage location you want the Workflow to monitor for new files, save the trigger and Workflow.
Databricks is now watching for any new data added to the location. There are a few limitations to file arrival triggers:

  1. Only 50 jobs per workspace can have the file arrival trigger type.
  2. The file arrival trigger can only work for external locations with up to 10,000 files. If there are more, then the location will not register new files and the trigger will not work. This is best for temporary file landing zones or a small number of file pipeline ingestion use cases. If your use case requires larger limits, please reach out to us directly.
  3. File arrival triggers will not work on external locations that are secured by Azure Storage firewalls.

Try uploading a new file to the container/bucket. Within thirty seconds, a new job run should show up.

Author Screenshot: Automatically run job triggered from a file upload!

You can click into the ‘Start time’ link to see the query that was run and how many records were inserted.

Author Screenshot: Successful run of SQL

That is it! We have created an event-driven pipeline. Add more queries, notebooks, files, alerts, and other tasks to your workflows to create an end to end serverless data ingestion pipeline that is triggered by new files arriving.

Please let us know if you found this helpful and reach out if there is anything specific content you would like to see from the DBSQL SME group. Thanks for reading!

--

--

Databricks SQL SME
DBSQL SME Engineering

One stop shop for all technical how-tos, demos, and best practices for building on Databricks SQL