Why You Should Use Databricks Autoloader

Leigh Robertson
5 min readDec 7, 2022

--

If you aren’t already using Databricks Autoloader for your file ingestion pipelines, you might be wasting compute or worse, missing late arriving data. Introduced around the beginning of 2020, Databricks Autoloader has become a staple in my file ingestion pipelines. It provides a highly efficient way to incrementally process new data, while also guaranteeing each file is processed exactly once. I will cover the core Autoloader concepts in this article but for a more detailed explanation I would suggest watching this video.

The Problem

Let’s say we have an e-commerce company called wesellstuffonline.com and are interested in understanding user behavior on our homepage. Specifically, we would like to understand how much each button is clicked from our homepage. For simplicity’s sake, there are three possible buttons to click, “my_profile”, “my_cart”, and “view_item”. Let’s also say that someone else at the company has done the work so the data, which is pictured below, arrives in our cloud storage account in a .csv file format. (Note, .csv was chosen out of familiarity but Autoloader can handle many different file types like .json, .parquet, etc.) It is now the responsibility of the data engineering team to set up a pipeline which can process and make sense of this data. An additional requirement is that there is a desire to keep compute costs low and any solution should incrementally process new files.

Below I have mocked up a diagram of a simple ELT solution for this approach. I have pictured the whole sample pipeline but this article specifically is concerned with steps 1 and 2, the raw ingestion using Autoloader.

High level showing the proposed design for this pipeline

Possible Solutions Not Using Autoloader

Given the raw data provided arrives nightly, let’s explore non-autoloader solutions to process and interpret the data. These files contain all the clicks that occurred on the website the prior day. This data can arrive in one of two formats, landing without any organization or landing partitioned by something like a date, say YYYY/MM/DD/*. Now without going into implementation, let’s conceptualize two possible ways to consume only the new files.

Potential Solution #1: In the first scenario where there is no partition, one solution might be to scan the entire bucket and only process the latest file based on the modification date that comes from the cloud storage. While this approach is valid, it is hard to scale as it requires a full scan of the whole bucket to understand which files are new each time. As the number of files grows, this solution will quickly become unfeasible.

Potential Solution #2: Now let’s say the data arrives partitioned by YYYY/MM/DD/*, and that the data should arrive daily by 3 AM for the day prior. For example, website activity from 11/7 lands in the cloud storage account on 11/8 at 3AM. With this cloud structure, it might seem simple to just create a parameter for the path and to read the latest data dynamically. While that is true, what happens if some of the data arrives post 3 AM? In this pattern the late arriving data would most likely not be processed because it’s only set to look back one day. This could be overcome by reprocessing the last two days, or some fixed lag, but would come at the cost of additional compute. Another approach could be to combine these two solutions and scan a set of the previous X days to understand if any files arrived late. This could work, but if the size of the daily partitions was very large then solution #2 still runs into the same problem as solution #1.

How Autoloader Solves This Problem

Don’t worry! Autoloader can circumvent scanning the whole bucket and easily handle late arriving data.

For this example, the data arrives in the cloud environment in a folder structure as such, website_log_data/YYYY/MM/DD. To set up an Autoloader job, a path to the files is required. In this example, I plan to point to the root directory of website_log_data and not to an individual date or year. I recommend this so all new files will be captured regardless of which partition they arrive in or when. After selecting the directory, Autoloader will set up a queue and add all the files currently in the bucket to that queue. The first time this job runs, it is going to process everything in the queue and write out the status to two metadata folders, called commits and offsets. The commits folder helps to keep progress of the data and files that have been loaded. This is especially helpful it makes recovering from job failures super painless. The offsets folder tells where Autoloader should start reading from on the next run, kinda like a bookmark. I will touch on these checkpoint features in a subsequent post, but for now know that they are used to track progress.

After the initial processing, only new files will get added to that queue, which means no need to scan a whole bucket or use unnecessary compute to guarentee that no files have been missed! The next time the job is triggered, Autoloader will visit the queue to only get the new files since the last job run. Finally we can avoid scanning the entire bucket to incrementally process the new data like in solution one, and are also protected against missing any late arriving data like in solution two.

Autoloader Code

Below is the code needed to set Autoloader on a directory. I have included some common options and some non-standard ones as well. The full documentation is located here. I have intentionally left some options out, such as schemaEvolutionMode, as I plan to cover those in a future post. An option I have included and highly recommend is the pathGlobFilter option. This option allows for the suffix to be specified of the files in the bucket. This ensures that if some weird file type, like a .csv when .gz is expected, will be ignored.

autoloader_df = (

spark.readStream.format(“cloudFiles”)

.option(“cloudFiles.format”, “json”)

.option(“cloudFiles.schemaLocation”, ‘autoloader_meta_data/adobe_analytics/website_traffic/schemas/’)

.option(“inferSchema”, “true)

.option(“pathGlobfilter”, “*.gz”)

.option(“header”, “false”)

.load(“website_log_data/”)

)

Below is the code for writing this data frame to a given table. Most of the options are standard but there are two I would like to discuss in further depth. The first is the maxFilesPerTrigger. This option is very important for limiting the input to reduce the likelihood of an out of memory exception occurring. Once that limit is reached, the process will load that amount’s worth of data and then continue where it left off. The second important option is the availableNow function as the trigger parameter. This option allows for Autoloader to run in a batch fashion.

(

input_df.writeStream.format(“delta”)

.outputMode(“append”)

.option(“maxFilesPerTrigger”, 20000)

.trigger(availableNow=True)

.option(“mergeSchema”, “true”)

.option(“checkpointLocation”, “autoloader_meta_data/adobe_analytics/website_traffic/checkpoints/”)

.partitionBy(“date_column”)

.table(delta_table)

.awaitTermination()

)

Conclusion

I hope this article has shown you how and why you should use Databricks Autoloader for file ingestion pipelines. Please like this post if you found it helpful! I also have only touched on a few of the benefits that Databricks Autoloader can provide. As I mentioned in the article, I look forward to writing additional blog posts on features like microbatching, checkpointing, and schema evolution mode. Let me know if there are any other Databricks features you’d like me to cover or are using in the comments below.

--

--