Data Ingestion: File Source on Databricks

Ryan Chynoweth
DBSQL SME Engineering
7 min readMar 25, 2024

Introduction

Databricks offers many data connectors, enabling it to seamlessly read from and write to various systems. For example, one can read data out of an application database, generate product recommendations, then write those predictions back to the application database (or online table) to surface to customers in the web interface. This capability not only positions Databricks as a centralized lakehouse for analytics but allows it to integrate effectively within an organization’s operational framework by orchestrating data movement and embedding analytics. By connecting directly to data sources, Databricks can efficiently pull data into memory and save it to tables registered in its catalog.

While direct connectivity to data sources is available, there are instances where customers opt not to establish direct connections and instead utilize external tools to push data into cloud storage as files. Databricks can ingest files incrementally as a batch or stream process. For instance, consider a scenario where an organization, for various reasons, do not establish hybrid connectivity between Databricks and an on-premises database. In such cases, they may resort to running scripts locally to publish changed data to a cloud storage account then load the data into Databricks tables for further analysis.

In this article, I will cover how to organize the bronze (or raw) zone of the lakehouse to support file ingestion. Furthermore, I will explore a few best practices for processing file-level data from the bronze zone into silver and gold delta tables.

Unity Catalog

Whether it is a new pipeline or the first one, integrating the workload with Unity Catalog (UC) will streamline the ingestion process. If you are an existing Databricks customer and have not activated UC, I highly recommend doing so as it is a non-breaking change; it simply expands your ability to create additional catalogs while leaving your existing hive_metastore catalog and workloads unaltered. Therefore, if you have a requirement for silver and gold level tables to be registered in your legacy hive metastore for user consumption, this can still be achieved if you’re sourcing data from a Unity Catalog object.

With Unity Catalog, you will be able to leverage Volumes which is a simplified method of accessing data and files stored in cloud object storage. To create a volume please refer to the Databricks documentation. File ingestion workloads will typically have three different volumes:

  • Source Data: This volume will contain the files being published to the cloud as an append-only source. If you have update and delete operations then you can mark those in the data itself to support merge operations.
  • Schema: This volume maintains the specified or inferred schemas of my source data. It is used with Auto Loader to enforce schema on read. For example, if I have a silver table my_catalog.my_schema.my_table then it will have a schema at the /Volumes/schemas/my_catalog/my_schema/my_table location.
  • Checkpoints: When using Auto Loader, data is read as a stream which means when you write data to a table you need to store a checkpoint. I will map checkpoints associated to tables using their fully qualified object name in the file path. For example, if I have a silver table my_catalog.my_schema.my_table then it will have a checkpoint in the /Volumes/checkpoints/my_catalog/my_schema/my_table location.

The added benefit is that volumes are associated with the schema object in Unity Catalog so the volume and tables will be governed under the same structure.

Structure of the Ingestion Zone

Let’s consider a scenario where data is ingested from the popular Microsoft demo database, AdventureWorks, which includes a table named reporting.sales. Our objective is to collect change data every hour for ingestion into Databricks. With this in mind, assume the following key-value pairs:

  • Database Name: adventureworks
  • Schema Name: reporting
  • Table: sales
  • Ingestion Frequency: 1 file per table per an hour (deletes, updates, and inserts)
  • File Type: Apache Parquet
  • Current DateTime: March 18, 2024 3:00:00pm UTC

For this process let’s ensure that related data objects are properly organized, therefore, the file above should map to the following location in a volume: /Volume/adventureworks/reporting/sales/2024/03/18/ adventureworks_reporting_sales_20210331150000.parquet

Data is being published once an hour and the goal is to incrementally load the file data to transform and save to a table. The file structure should be organized as shown in the image below.

Example directory structure

Notice that each file has a unique name and path, with the datetime that the file was published to cloud storage within the name and as directories. Furthermore, each file is organized in a hierarchical manner where each day has an associated set of files. The name of the files should be unique, and in this case attaching the published timestamp to the file name achieves that goal.

Bronze Sub-Directories

Sub-directories in the bronze layer are organized with temporal partitions: year/month/day/hh/mm/ss. While our incremental process should only read a single file each hour, it is important to not go too granular in your folder structure because to discover which files have not been published requires listing directories and comparing checkpoints. The more directories mean more list commands which impacts performance. The depth of your ingestion zone is dependent on the file “publish rate”. The publish rate is the frequency of which data and/or files are arriving in the data lake.

For example, if files are added to the data lake from the source system once an hour, then the sub-directories should stop at the month or day level. If it is at the day level then each directly will have at most 24 files and at the month level would be at most 744 files which is manageable in a single directory. Creating a directory structure that stops one or two levels above the publish rate is ideal and should maintain optimal performance overtime. I recommend you check out the small files problem as described here.

Databricks Auto Loader

Auto Loader is a Spark Structured Streaming data source that enables users to incrementally process files that are published to cloud storage. There are two modes to track which files have been processed: file notification and directory listing. File notification is a process used when there is a high volume of files; however, since it requires additional cloud resources to support tracking files in storage users will default to the directory listing mode. Directory listing will issue API calls to list the current state of the directory and compare that state against the previous state to determine which files to process.

The structure of your lakehouse ingestion zone is not only important for organization, but when using directory listing mode it has an impact on performance as well. Check out the documentation that shows the number of list commands required to discover new files. The more list commands required, the longer it will take to load new data.

One benefit of structuring the ingestion zone as described in this article, is that it helps maintain the minimum number of list commands required. Additionally, using the temporal directories enables customers to dynamically filter the source directory so that older partitions are completely ignored.

Using our previous example where files are published once an hour, the solution will create a new directory every day to store the files: /Volumes/adventureworks/reporting/sales/2024/03/18. The command to read the files could look something like the following:

df = (spark.readStream.format("cloudFiles") 
.option("cloudFiles.format", "parquet")
.trigger(availableNow=True) # incremental batch mode
.schema(<schema>)
.load('/Volumes/adventureworks/reporting/sales/*/*/*/*.parquet')
)

The above commands will always look to list and read files from all years, months, and days available. Overtime this could add up and impact performance. This is also unnecessary as there is no need to list files from 2021 as it is unlikely that anything new is being saved there. So editing your code to evaluate only the required directories will maintain optimal performance overtime.

from datetime import datetime
current_year = datetime.now().year
current_month = datetime.now().month
df = (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.trigger(availableNow=True) # incremental batch mode
.schema(<schema>)
.load(f'/Volumes/adventureworks/reporting/sales/{current_year}/{current_month}/*/*.parquet')
)

The code block above will only look at the current month’s of data, maintaining optimal performance overtime instead of continuously attempting to list all files from history. Please note that you may want to adjust to handle date cut over events. For example, at the start of every year or month you will want to ensure that the previous period has completely been processed before halting listing for that directory.

By default Auto Loader will open a continuous stream of data, however, since data is published once an hour it is recommended to use .trigger(availableNow=True) to load newly available data. Furthermore, users can leverage file triggers to trigger the data load when data arrives. Check out my previous blog on Lakehouse triggers!

Ingestion with COPY INTO

Let’s say that you are heavily using Databricks SQL for ETL, then you will likely want to leverage the SQL version of Auto Loader which is in the form of a copy into command. By default the command will perform incremental processing but it is possible to force a full reload of the data as well.

CREATE TABLE IF NOT EXISTS adventureworks.reporting.sales;

COPY INTO adventureworks.reporting.sales
FROM '/Volumes/adventureworks/reporting/sales/*/*/*/*.parquet'
FILEFORMAT = PARQUET
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

Conclusion

Planning and organizing the structure of the ingestion zone to align with target schemas and tables facilitates efficient data management and governance. Unity Catalog plays a critical role in streamlining and providing resources to make the process easier. Leveraging these tools and practices not only enhances the reliability and scalability of data pipelines but also lays a solid foundation for advanced analytics and decision-making on the Lakehouse platform.

Disclaimer: these are my own thoughts and opinions and not a reflection of my employer.

--

--

Ryan Chynoweth
DBSQL SME Engineering

Senior Solutions Architect Databricks — anything shared is my own thoughts and opinions