Load Data into Delta Tables
Querying Files Not Registered as Data Objects
- It is possible to run queries on a directory of files that are not currently registered as any kind of data objects (i.e., a Table).
- It is possible to run queries on many types of data files, such as — PARQUET, CSV, JSON etc.
- In most companies, the users will require to access data from External Cloud Storage Locations. Hence, a Workspace Administrator will be responsible for configuring access to these External Cloud Storage Locations.
Reading Data Files Using Databricks
- It is possible to read a file of formats, such as — PARQUET, CSV, JSON etc., from Databricks using SQL Commands, or, Python Syntax.
- In the SQL Command, after writing SELECT * FROM, instead of providing a qualified Table Name, the actual path where a file resides in an External Cloud Storage Location, is provided.
Read a PARQUET File from a Directory
- The sub-directory parquet-files in the directory path dbfs:/FileStore/tables/partner-training-folder/data-files/1.Delta-Lake/parquet-files/ is a collection of PARQUET files.
One of such PARQUET file is demo_parquet_file.parquet that contains the following data -
- Another PARQUET file present in the same sub-directory parquet-files of the above-mentioned directory path is demo_parquet_file_2.parquet with the following data -
- It is possible to query these PARQUET files as if each of these files were a Table in a Relational Database Management System.
- To read the data of all the PARQUET files at once, present in the sub-directory parquet-files of the above-mentioned directory path, using the SQL Command -
Step 1: Write the SQL syntax SELECT * FROM.
Step 2: Then write the type of the file Databricks would expect to find, i.e., PARQUET in this case.
Step 3: Provide a Dot (.).
Step 4: Finally, write the directory path, along with the sub-directory, where Databricks would find the specified file type, i.e., PARQUET in this case.
- Although, the output of the SQL Command would look like Rows of a Table, but those would be just the data present in the underlying PARQUET file.
# Create a Spark Configuration Variable for Directory Path for PARQUET Files
spark.conf.set('directory.path.parquet','dbfs:/FileStore/tables/partner-training-folder/data-files/1.Delta-Lake/parquet-files')
SELECT * FROM
PARQUET.`${directory.path.parquet}` LIMIT 10;
Output -
Read a CSV File from a Directory
- The sub-directory csv-files in the directory path dbfs:/FileStore/tables/partner-training-folder/data-files/1.Delta-Lake/csv-files/ is a collection of CSV files. Each of the CSV files have a Header row that contains the Column Names and is delimited with a | (PIPE) character.
One of such CSV file is demo_csv_file.csv that contains the following data -
- Another CSV file present in the same sub-directory csv-files of the above-mentioned directory path is demo_csv_file_2.csv with the following data -
- It is possible to query these CSV files as if each of these files were a Table in a Relational Database Management System.
- To read the data of all the CSV files at once, present in the sub-directory csv-files of the above-mentioned directory path, using the SQL Command -
Step 1: Write the SQL syntax SELECT * FROM.
Step 2: Then write the type of the file Databricks would expect to find, i.e., CSV in this case.
Step 3: Provide a Dot (.).
Step 4: Finally, write the directory path, along with the sub-directory, where Databricks would find the specified file type, i.e., CSV in this case.
- Although, the output of the SQL Command would look like Rows of a Table, but those would be just the Rows present in the underlying CSV file.
# Create a Spark Configuration Variable for Directory Path for CSV Files
spark.conf.set('directory.path.csv','dbfs:/FileStore/tables/partner-training-folder/data-files/1.Delta-Lake/csv-files')
SELECT * FROM
CSV.`${directory.path.csv}` LIMIT 10;
-- In this case, the headers of the CSV file are read as data values as well
Output -
Limitation of Reading a CSV File from a Directory
- In the output of the above Cell, it can be clearly seen that, the schema is not inferred correctly while reading the data from all the CSV files at once, present in the sub-directory csv-files of the directory path dbfs:/FileStore/tables/partner-training-folder/data-files/1.Delta-Lake/csv-files/, using the SQL Command.
- The data of all the columns, in each row, are displayed in a single column, separated by a | (PIPE). The single column does not have any name, and is displayed as _c0.
- The column headers are also displayed as column value, separated by a | (PIPE).
- So, although the values of all the columns, in each row, are displayed being separated by a | (PIPE), but in a single column. All the columns are not displayed distinctly.
- Hence, more information needs to be provided in order to read data from CSV files efficiently.
Using read_files () Table-Valued Function to Overcome the Limitation of Reading a CSV File from a Directory
While reading the data from a file of any format, like — CSV, JSON etc., if Databricks can’t generate the correct schema by reading that file, the users can provide additional information that tells Databricks what kind of data it should be looking for in order to create the correct schema.
So, while reading the data from a file of any format, the additional information can be provided into the Built-In Table-Valued Function, i.e., read_files ().
To read a CSV file with the correct schema, the following Options are passed to read_files () -
- 1. format => “csv” : Since, the data files are in the csv format.
- 2. sep => “csv” : Since, the data fields are separated by the | (PIPE) character.
- 3. header => true: Since, the first row of data should be used as the column names.
- 4. mode => “FAILFAST” : This option will cause the SQL statement to throw an error, if there is any malformed data.
_rescued_data : By default, the column _rescued_data will be provided by the read_files () to rescue any data that doesn’t match the schema that Databricks would infer.
SELECT * FROM read_files(
'${directory.path.csv}',
format => "csv",
sep => "|",
header => true,
mode => "FAILFAST"
);
Output -
Read a JSON File from a Directory
- The sub-directory json-files in the directory path dbfs:/FileStore/tables/partner-training-folder/data-files/1.Delta-Lake/json-files/ is a collection of JSON files.
One of such JSON file is demo_json_file.json that contains the following data -
- Another JSON file present in the same sub-directory json-files of the above-mentioned directory path is demo_json_file_2.json with the following data -
- It is possible to query these JSON files as if each of these files were a Table in a Relational Database Management System.
- To read the data of all the JSON files at once, present in the sub-directory json-files of the above-mentioned directory path, using the SQL Command -
Step 1: Write the SQL syntax SELECT * FROM.
Step 2: Then write the type of the file Databricks would expect to find, i.e., JSON in this case.
Step 3: Provide a Dot (.).
Step 4: Finally, write the directory path, along with the sub-directory, where Databricks would find the specified file type, i.e., JSON in this case.
- Although, the output of the SQL Command would look like Rows of a Table, but those would be just the Objects present in the underlying JSON file.
# Create a Spark Configuration Variable for Directory Path for JSON Files
spark.conf.set('directory.path.json','dbfs:/FileStore/tables/partner-training-folder/data-files/1.Delta-Lake/json-files')
SELECT * FROM
JSON.`${directory.path.json}`;
-- In this case, the starting brace of the JSON file is read as data value as well
Limitation of Reading a JSON File from a Directory
- In the output of the above Cell, it can be clearly seen that, the schema is not inferred correctly while reading the data from all the JSON files at once, present in the sub-directory json-files of the directory path dbfs:/FileStore/tables/partner-training-folder/data-files/1.Delta-Lake/json-files/, using the SQL Command.
- The JSON file is a Multi-Line file. The beginning brace (
[
) and the ending brace (]
) are also displayed as the data the column _corrupt_record, which is not correct. - Hence, more information needs to be provided in order to read data from the JSON file in the Multi-Line mode.
Using read_files () Table-Valued Function to Overcome the Limitation of Reading a Multi-Line JSON File from a Directory
While reading the data from a file of any format, like — CSV, JSON etc., if Databricks can’t generate the correct schema by reading that file, the users can provide additional information that tells Databricks what kind of data it should be looking for in order to create the correct schema.
So, while reading the data from a file of any format, the additional information can be provided into the Built-In Table-Valued Function, i.e., read_files ().
To read data from a Multi-Line JSON file with the correct schema, the following Options are passed to read_files () -
- 1. format => “json” : Since, the data files are in the json format.
- 2. multiline => true: Since, the file is a Multi-Line JSON file.
- 3. mode => “FAILFAST” : This option will cause the SQL statement to throw an error, if there is any malformed data.
_rescued_data : By default, the column _rescued_data will be provided by the read_files () to rescue any data that doesn’t match the schema that Databricks would infer.
SELECT * FROM read_files(
'${directory.path.json}',
format => "json",
multiline => true,
mode => "FAILFAST"
);
Output -
How to Create Table from the Data of the Files in Databricks?
- It is possible to create some Metadata for the data present in the files, of formats, such as — PARQUET, CSV, JSON etc. and work with those data as if each of those files were a Table in a Relational Database Management System.
Create Table by Using CREATE TABLE AS SELECT (CTAS) Statement
The CREATE TABLE AS SELECT, i.e., CTAS statement is used to create and populate a Delta Table using the data, retrieved from an input SQL query.
Using the CREATE TABLE AS SELECT statement, it is possible to create and populate a Delta Table with data at the same time.
The CREATE TABLE AS SELECT statement automatically infers schema information from the SQL query result, and, does not support manual schema declaration.
The CREATE TABLE AS SELECT statement is useful for External Data Ingestion from sources with well-defined schema, such as — PARQUET file, Table etc.
There is another form of the CREATE TABLE AS SELECT, i.e., CTAS statement, which is CREATE OR REPLACE TABLE AS SELECT statement.
- If there is a Cell in a Notebook, having a CREATE TABLE AS SELECT, i.e., CTAS statement, to create a Delta Table, say myTable by reading a PARQUET file, and, in the same Notebook, if there is another Cell, having a CREATE OR REPLACE TABLE AS SELECT statement, to create the same Delta Table, i.e., myTable by reading the same PARQUET file, then Databricks will not throw any error when the latter Cell runs.
This happens, because, if the Delta Table, i.e., myTable already exists, when the latter Cell runs, it would just replace the definition of the already created Delta Table, i.e., myTable.
The USING DELTA is not necessary to use in the CREATE TABLE AS SELECT, i.e., CTAS statement, because, the default type of Table that is created when a CREATE TABLE AS SELECT, i.e., CTAS statement is run in Databricks is a Delta Table.
- The USING DELTA can be used in the CREATE TABLE AS SELECT, i.e., CTAS statement essentially to call out the fact that the Table to be created is going to be a Delta Table.
- If the USING DELTA is not provided in the CREATE TABLE AS SELECT, i.e., CTAS statement, the effect would still be the same, i.e., the Table to be created is going to be a Delta Table.
Create Managed Table by Using CREATE TABLE AS SELECT (CTAS) Statement
- The source of data for the Delta Table, created using the CREATE TABLE AS SELECT, i.e., CTAS statement, would be based on the result of the input SQL query, i.e., the SELECT * FROM query.
This type of Delta Table would be a Managed Delta Table. That means, the data is going to be copied from the provided directory in the input SQL query into the Default Location of the Metastore that is configured by the Databricks Administrator for the Databricks Workspace.
CREATE OR REPLACE TABLE myTable
USING DELTA AS
SELECT * FROM PARQUET.`${directory.path.parquet}`
SELECT * FROM myTable;
Output -
Create External Table by Using CREATE TABLE Statement
External Tables are tables where the data is stored outside of the Managed Storage Location specified for the Metastore, Catalog, or, Schema.
External Tables should be used only when it is required to directly access the data outside of the Databricks Clusters, or, Databricks SQL Warehouses.
In order to provide access to an External Storage Location, once the External Storage Location is properly configured by a Cloud Administrator, a Databricks User with the necessary privileges can create the External Tables using codes like the following -
- CREATE TABLE
table_name
(column_names_with_data_type
)
USING CSV
OPTIONS (
header = "true",
delimiter = "|"
)
LOCATION ""
One thing to note is that the options are passed with keys as unquoted text, and, values in quotes.
The LOCATION keyword is used to point to the path to the pre-configured External Location.
When the DROP TABLE command is run on an External Table, Unity Catalog does not delete the underlying data.
NOTE: Depending on the Databricks Workspace Settings, a Databricks User might need the assistance of a Databricks Administrator to load the Libraries and configure the requisite Security Settings for some data sources.
How to Load Data into a Delta Table Incrementally?
- It is possible to create a Delta Table and then load data into that table incrementally.
- One option for loading data into a Delta Table incrementally is using the COPY INTO statement.
COPY INTO Statement
- The COPY INTO statement loads data from data files into a Delta Table. This is a Retriable and Idempotent operation, meaning that the data files in the Source Location, be it an External System or DBFS, that have already been loaded are skipped.
- This option is potentially much cheaper than Full Table Scans for data that grows predictably.
- Following are the steps how the COPY INTO statement works -
Step 1: An empty Delta Table is created.
Step 2: The COPY INTO statement loads the data for the first time into the empty Delta Table by inferring the schema of the existing files in the Source Location.
Step 3: The COPY INTO statement copies the data from the new files that were added since the last time the COPY INTO statement ran.
When to Use COPY INTO Statement?
- When it is desired to capture new data but not re-ingest the files that have already been read, it is preferrable to use the COPY INTO statement.
What are the Pre-Requisites to Using COPY INTO Statement?
- The following expectations should be met in order to use the COPY INTO statement -
- Schema of the incoming data should be consistent.
- Duplicate records should try to be excluded, or, handled downstream.
How COPY INTO Statement is Idempotent?
- The COPY INTO statement keeps track of the files it has ingested previously.
After running the COPY INTO statement for the first time, if the COPY INTO statement is run again, it would not ingest any additional data if there is no change in the data files in the Source Location.
Create a Delta Table and Load Data from the PARQUET Files into It by Using COPY INTO Statement
The following options are specified with the COPY INTO statement to load a Delta Table by reading data from a source with PARQUET files -
- FROM
<path>
: The path to the data files in the Source Location.
In the below example, the data is located in the DBFS. This woukd not normally be the case in the real world. In the real world, data files would be located in another location, such as an Object Store, or, a Databricks Volume.
- FILEFORMAT: The type of the data files. In this case, it would be parquet.
- COPY OPTIONS: There are a number of key-value pairs that can be used. Out of those, in the below example, the option of merging the schema of the data is specified as a key-value pair.
Currently, there is no schema of the created empty Delta Table, as there is no column present with assigned data type.
The COPY INTO statement would merge any schema, it finds in the files present in the path in the Source Location, into the Delta Table, which will essentially create the schema of the Delta Table for the first time based on the current format of the files in the Source Location.
DROP TABLE IF EXISTS bronze_table;
CREATE TABLE IF NOT EXISTS bronze_table;
COPY INTO bronze_table
FROM '${directory.path.parquet}'
FILEFORMAT = parquet
COPY_OPTIONS ('mergeSchema' = 'true');
SELECT * FROM bronze_table;
Output -
Limitation of Tables Created On Top of External Data Sources
- The CREATE TABLE AS SELECT, i.e., CTAS statement, and, the COPY INTO statement take any old standard PARQUET, or, CSV file that is not a Delta Format file, and, essentially read the data present in those files to make it available in the Delta Lake format within the Databricks Data Intelligence Platform.
Now, the different capabilities of Delta Lake, like — ACID Transaction, Time Travel etc. are available to be used on the Delta Lake data, read from a PARQUET, or, CSV file by the CTAS statement, and, the COPY INTO statement, which would have never been available to be used on the data of any old standard PARQUET, or, CSV file. - Hence, using CREATE TABLE AS SELECT, i.e., CTAS statement, and, the COPY INTO statement allow to take advantage of the performance guarantees, and, optimization guarantees associated with Delta Lake, and, the Databricks Data Intelligence Platform.
- Whereas, if a Table, or, a Query is defined against an External Data Source, it cannot be expected that the Table, or, the Query can take advantage of the performance guarantees, and, optimization guarantees associated with Delta Lake, and, the Databricks Data Intelligence Platform.
Example: While Delta Lake Tables will guarantee that always the most recent version of the source data is queried, Tables that are registered against External Data Sources may represent older cached versions.
Can Databricks File System (DBFS) be Used to Store Data for Delta Tables?
- Databricks does not recommended creating a MANAGED table, or, an EXTERNAL table using data in the Databricks File System (DBFS), because, using Databricks File System (DBFS) bypasses the Unity Catalog Data Governance Model.
- If an attempt is made to create an EXTERNAL table using data in the Databricks File System (DBFS), an error will be raised.
How to View the Metadata of the Delta Table?
Using DESCRIBE
- By running DESCRIBE
<table-name>
, it is possible to see the Column-Level Metadata of a Delta Table, i.e., the Column Names and the corresponding Data Types of each of the Columns.
DESCRIBE myTable;
Using DESCRIBE EXTENDED
By running DESCRIBE EXTENDED <table-name>
, it is possible to see the following important Metadata of a Delta Table -
- 1. Column-Level Metadata of a Delta Table, i.e., the Column Names and the corresponding Data Types of each of the Columns
- 2. Catalog & Database: The names of the Catalog and the schema (Database), where a Delta Table got created.
- 3. Created Time: When a Delta Table is created.
- 4. Type & Location: Whether a Delta Table is created as a MANAGED or EXTERNAL table (Type), and, the path (Location) to the underlying data (Default Location of Metastore, if MANAGED).
- 5. Provider: Whether the created table is a Delta Table, or, PARQUET Table.
- 6. Owner: The Owner of a Delta Table. Usually, the Owner of a Delta Table is the one who creates it, unless the Owner is changed explicitly.
DESCRIBE EXTENDED myTable;
Output -
How to Create Temporary View from the Data of the Files in Databricks?
- A Temporary View is a data object that is not registered in the schema.
So, a Temporary View is available in the Notebook, in which it is defined, in the Current Session of the Cluster. - Since, a Temporary View is not registered in the schema, it is only available as long as the Notebook, in which it is defined, is connected to the Current Session of the Cluster, and, continues to run.
CREATE OR REPLACE TEMP VIEW myTempView AS
SELECT * FROM CSV.`${directory.path.csv}`
SELECT * FROM myTempView LIMIT 10
Output -