Databricks Autoloader for simplified data Ingestion

Savitha Gunasekaran
Version 1
Published in
8 min readMay 9, 2024
Photo by Markus Spiske on Unsplash

Databricks Autoloader is a powerful feature that automatically ingests and loads raw data files into Delta Lake tables from cloud storage. It uses a Structured Streaming source called “cloudFiles” to monitor input directories for new files in various formats. As soon as new files arrive, Autoloader loads them into a Delta Lake table. It supports both batch and streaming ingestion in Scala, Python and SQL, and can handle large-scale data migration or real-time loading efficiently without manual intervention.

How does Databricks Autoloader work and why it simplifies data ingestion?

As files are discovered in the cloud storage, their metadata is persisted in a scalable key-value store in the checkpoint location of your Auto Loader pipeline. This key-value store ensures that data is processed exactly once.

In case of failures, Auto Loader can resume from where it left off by information stored in the checkpoint location and continue to provide exactly-once guarantees when writing data into Delta Lake. You don’t need to maintain or manage any state yourself to achieve fault tolerance or exactly-once semantics.

In the past, when a file arrived in cloud storage (for example, AWS S3), a complex setup was required. S3 file notifications needed to be enabled, which would send a message to either SNS or SQS. AWS Lambda would then consume this message and trigger the Databricks job. However, this pipeline had additional monitoring requirements to ensure it was functioning correctly. Unfortunately, there was still a possibility of downtime, which could lead to discrepancies in the target data.

With Auto Loader, it simplifies this process significantly. It requires minimal setup and is fault-tolerant. Additionally, it effectively manages schema evolution, making it a powerful tool for ingesting data efficiently.

Here are some of the notable key capabilities of Databricks Autoloader:

1) Incremental and Efficient Data Ingestion

Databricks Autoloader can incrementally and efficiently process new data files as they arrive in cloud storage without any additional setup. It can load data files from S3, ADLS (Gen1 + Gen2), GCS, Azure Blob Storage, and DBFS.

2) Support for Multiple File Formats

Databricks Autoloader can ingest a wide range of file formats, including JSON (JavaScript Object Notation), CSV (Comma separated values), Parquet, ORC (Optimized row columnar format), Apache AVRO, Text files, BinaryFile.

3) Highly Scalable

Databricks Autoloader can process billions of files to migrate or backfill a table, supporting near real-time ingestion of millions of files per hour.

4) Exactly-Once Processing

Databricks Autoloader tracks ingestion progress by persisting file metadata in a cloud-based checkpoint location using a scalable key-value store (Apache RocksDB). This ensures that each file is processed exactly once, providing fault tolerance and allowing the ingestion pipeline to resume from the last successful checkpoint in case of failures or interruptions.

5) Schema Inference and Evolution

Autoloader can automatically infer the schema of the ingested data, enabling table initialization without manually defining the schema upfront. It can also handle schema evolution by detecting changes in the data schema and automatically updating the target Delta Lake table schema to accommodate new columns or data types.

6) Handling Corrupted or Malformed Data

Databricks Autoloader can handle faulty or malformed data by capturing it in a designated _rescued_data column within the target Delta Lake table, ensuring that no data is lost or ignored during the ingestion process.

7) Integration with Delta Live Tables

Autoloader supports both Python and SQL in Delta Live Tables, enabling autoscaling of compute resources, data quality checks, automatic schema evolution handling, and monitoring via metrics in the event log.

8) Cost-Effective File Discovery

Databricks Autoloader’s file discovery cost scales with the number of files being ingested rather than the number of directories, reducing cloud storage costs for large-scale ingestion pipelines.

9) File Notification Mode

Databricks Autoloader supports a file notification mode, where it automatically configures and subscribes to notification and queue services for file events from the input directory, enabling efficient ingestion of high volumes of files as they arrive.

10) Backfill Support

Databricks Autoloader supports backfilling operations, ensuring that all files are processed, even if they were missed during the initial ingestion run.

Databricks Autoloader supports a wide range of file formats, they are:

  1. JSON (JavaScript Object Notation)
  2. CSV (Comma separated values)
  3. Parquet
  4. ORC (Optimized row columnar format)
  5. Apache AVRO
  6. Text files
  7. BinaryFile

Databricks Autoloader provides two modes for identifying new files in the cloud storage location:

1) Directory Listing mode:

Directory Listing Mode is the default mode used by Databricks Autoloader. In this mode, Autoloader identifies new files by periodically listing the contents of the input directory on the cloud storage. This mode allows you to quickly start Autoloader streams without any additional permission configurations, as long as you have access to the data on cloud storage.

  • How it works: Autoloader optimizes the process of listing directories by leveraging cloud storage APIs to retrieve a flattened response, reducing the number of API calls required and, consequently, the associated cloud costs.
  • Incremental Listing (deprecated): For lexicographically ordered files (e.g., date-partitioned or versioned files), Autoloader can leverage incremental listing, which involves listing only the recently ingested files rather than the entire directory. This optimization is available for ADLS Gen2, S3, and GCS.
  • Lexical Ordering of Files: To take advantage of incremental listing, files need to be uploaded in a lexicographically increasing order, such as versioned files (e.g., Delta Lake transaction logs), date-partitioned files, or files generated by services like AWS DMS.
  • Backfill Mechanism: To ensure eventual completeness of data, Autoloader automatically triggers a full directory listing after completing a configured number of consecutive incremental listings (default is 7).
  • Performance: Directory Listing mode is suitable for small to medium-sized directories or when the volume of incoming files is moderate. For large directories or high volumes of files, File Notification mode is recommended.

2) File Notification Mode:

File Notification Mode is an alternative mode that Autoloader can use to automatically set up a notification service and queue service that subscribes to file events from the input directory. This mode is more performant and scalable for large input directories or a high volume of files but requires additional cloud permissions.

  • How it works: Autoloader automatically sets up a notification service (e.g., AWS SNS, Azure Event Grid, Google Pub/Sub) and a queue service (e.g., AWS SQS, Azure Queue Storage, Google Pub/Sub) that subscribe to file events from the input directory.
  • Cloud Resources: Autoloader creates and manages the required cloud resources (notification service, queue service) on your behalf, provided you have the necessary permissions.
  • Scalability: File Notification mode can scale to ingest millions of files per hour, making it suitable for high-volume data ingestion scenarios.
  • Permissions: To use File Notification mode, you need to grant Databricks Autoloader elevated permissions to automatically configure the required cloud infrastructure (notification service, queue service).
  • Limitations: Changing the input directory path is not supported in File Notification mode. If the path is changed, Autoloader may fail to ingest files already present in the new directory.

Both of them provide exactly-once data processing guarantees, and you can switch between them at any time while maintaining the ingestion state. The choice between the two modes depends on specific requirements, such as the volume of incoming data, the size of the input directory, and the desired performance and scalability.

CloudFiles Parameters:

Here is a list of cloudFiles parameters that you can configure for working with Databricks Autoloader.

Common Databricks Autoloader options (valid for both Directory listing and File notification):

The following are some commonly used cloudFiles parameters that you can configure for both Directory Listing or File Notification mode:

  • cloudFiles.allowOverwrites — This Boolean parameter determines whether to allow modifications to input directory files to overwrite existing data. It is available in Databricks Runtime 7.6 and above. The default value is false.
  • cloudFiles.backfillInterval — This parameter accepts an interval string (e.g., “1 day” or “1 week”) and allows you to configure Autoloader to trigger asynchronous backfills at the specified interval. This option ensures that all files eventually get processed, as file event notification systems do not guarantee 100% delivery of all uploaded files. It is available in Databricks Runtime 8.4 and above.
  • cloudFiles.format — This required parameter specifies the data file format in the source path. Allowed values include “avro”, “binaryFile”, “csv”, “json”, “orc”, “parquet”, and “text”.
  • cloudFiles.includeExistingFiles — This Boolean parameter determines whether to include existing files in the stream processing input path or to only process new files arriving after initial setup. The default value is true.
  • cloudFiles.inferColumnTypes — This Boolean parameter specifies whether to infer exact column types when leveraging schema inference. By default, columns are inferred as strings when inferring JSON and CSV datasets.
  • cloudFiles.maxBytesPerTrigger — This parameter specifies the maximum number of new bytes to be processed in every trigger. You can specify a byte string (e.g., “10g”) to limit each microbatch to a specific data size.
  • cloudFiles.maxFilesPerTrigger — This parameter specifies the maximum number of new files to be processed in every trigger. When used together with cloudFiles.maxBytesPerTrigger, Autoloader consumes up to the lower limit of either parameter.
  • cloudFiles.partitionColumns — This parameter accepts a comma-separated list of Hive-style partition columns that you would like inferred from the directory structure of the files.
  • cloudFiles.schemaEvolutionMode — This parameter specifies the mode for evolving the schema as new columns are discovered in the data.
  • cloudFiles.schemaHints — This parameter allows you to provide schema information to Autoloader during schema inference.
  • cloudFiles.schemaLocation — This required parameter (when inferring the schema) specifies the location to store the inferred schema and subsequent changes.

Directory listing options

  • cloudFiles.useIncrementalListing: This deprecated parameter specifies whether to use incremental listing rather than full listing in Directory Listing mode. Databricks recommends using File Notification mode instead.

File notification options

  • cloudFiles.fetchParallelism — This parameter specifies the number of threads to use when fetching messages from the queuing service in File Notification mode.
  • cloudFiles.pathRewrites — This parameter is required if you specify a queueUrl that receives file notifications from multiple S3 buckets, and you want to leverage mount points configured for accessing data in these containers. It allows you to rewrite the prefix of the bucket/key path with the mount point.
  • cloudFiles.resourceTag — This parameter allows you to specify a series of key-value tag pairs to help associate and identify related resources across different cloud providers.
  • cloudFiles.useNotifications — This Boolean parameter specifies whether to use File Notification mode (true) or Directory Listing mode (false) to determine when there are new files.

Advanced Databricks Autoloader Features:

1) Schema inference:

Databricks Autoloader can automatically infer the schema of the data being ingested, even if the schema changes over time. This is particularly useful when dealing with semi-structured or evolving data formats like JSON or CSV. To enable schema inference, you need to specify the cloudFiles.schemaLocation option, which determines the location to store the inferred schema and subsequent changes.

2) Schema evolution:

In addition to schema inference, Databricks Autoloader supports schema evolution, which means it can handle changes in the schema of the ingested data over time. This is achieved by using the cloudFiles.schemaEvolutionMode option, which specifies the mode for evolving the schema as new columns are discovered in the data.

The default schema evolution mode is “addNewColumns” when a schema is not provided. This mode allows Autoloader to add new columns to the existing schema as they are discovered in the data. If a schema is provided, the default mode is “none”, which means that Autoloader will not automatically evolve the schema, and any new columns in the data will be ignored.

You can also provide cloudFiles.schemaHints to guide Autoloader during schema inference, allowing you to specify additional schema information or override the inferred types.

Conclusion:

Databricks Auto Loader simplifies data ingestion, provides fault tolerance, and seamlessly integrates with Delta Live Tables for efficient data pipelines. It’s a valuable tool for managing large-scale data workflows in the cloud.

About the Author:
Savitha Gunasekaran is a Databricks Engineer at Version 1.

--

--