Sitemap
CodeX

Everything connected with Tech & Code. Follow to join our 1M+ monthly readers

Basic Transformations in Delta Lake

--

Cloning Delta Lake Tables

Delta Lake has two options for efficiently copying the Delta Lake Tables.

  • 1. DEEP CLONE
  • 2. SHALLOW CLONE

Cloning is a great way to set up tables for testing SQL code while still in development.

What is DEEP CLONE?

  • Deep Clone operation fully copies the data and metadata from a Source Delta Table to a Target Delta Table.
  • This copy occurs incrementally. So, executing this command again can sync the changes from the Source Delta Table to the Target Delta Table.
    CREATE OR REPLACE TABLE bronze_table_deep_clone
    DEEP CLONE bronze_table;
CREATE OR REPLACE TABLE bronze_table_deep_clone
DEEP CLONE bronze_table;

Output -

DESCRIBE EXTENDED bronze_table_deep_clone;

Output -

Why DEEP CLONE is Slow?

  • Because, all the data files must be copied over, this can take quite a while for large datasets.

What is SHALLOW CLONE?

  • If the target is to create a copy of a Delta Table quickly to test out some changes that need to be applied on the copied table, without the risk of modifying the current Delta Table, Shallow Clone operation can be a good option.
  • Shallow Clone operation just copies the Delta Transaction Logs, meaning that the data doesn’t move.
  • So, with the Shallow Clone operation, although there would be two Delta Tables — a Source Delta Table, and, a Target Delta Table, but effectively both the tables will work on the same data.
CREATE OR REPLACE TABLE bronze_table_shallow_clone
SHALLOW CLONE bronze_table;

Output -

DESCRIBE EXTENDED bronze_table_shallow_clone;

Output -

In this case, no Statistics information is there as the data did not get copied.

Updating the Cloned Tables

  • All the data modifications that are applied on the Deep Cloned Table, and, the Shallow Cloned Table, are tracked and stored separately from the Source Delta Table.
  • Even in case of a Shallow Cloning, if updates are run on both of the tables, or on any one of the tables, those modifications will be tracked and stored separately for the Shallow Cloned Delta Table from the Source Delta Table, even though the updates for both the tables will be implemented on the same data.

Complete Overwrite of Delta Table

It is possible to use overwrites to atomically replace all of the data in a table. The complete overwrite becomes an operation in the Transaction Log of the Delta Table upon which it gets implemented.

Following are the benefits of overwriting tables instead of deleting and recreating tables -

  • Overwriting a table is much faster because it doesn’t need to list the directory recursively or delete any files.
  • The old version of the table still exists, and, so the old data can easily be retrieved using the Time Travel.
  • Overwriting a table is an atomic operation. Hence, concurrent queries can still read the table while the data of the table is being deleted.
  • Due to ACID Transaction guarantees, if overwriting a table fails, the table will be in its previous state.

Spark SQL provides the following two easy methods to accomplish the complete overwrite of a Delta Table -

1. CREATE OR REPLACE TABLE

2. INSERT OVERWRITE

Complete Overwrite of Delta Table Using CREATE OR REPLACE TABLE Statement

  • The CREATE OR REPLACE TABLE statement fully replaces the contents of a Delta Table each time it is executed.
# Create a Spark Configuration Variable for Directory Path for PARQUET Files
spark.conf.set('directory.path.parquet','dbfs:/FileStore/tables/partner-training-folder/data-files/1.Delta-Lake/parquet-files/event-files')
CREATE OR REPLACE TABLE my_events AS
SELECT *
FROM PARQUET.`${directory.path.parquet}`;

How to Display the History of the Transaction Log of a Delta Table?

The DESCRIBE HISTORY <table-name> is used to display the history of the Transaction Log of a Delta Table.

Each row in the output of DESCRIBE HISTORY <table-name> is an entry in the Transaction Log for each action performed on a Delta Table, which changes the data of that Delta Table.

The output of DESCRIBE HISTORY <table-name> contains the following important information -

  • 1. Version: Version is an integer value assigned for each action that is performed on the data of a Delta Table, which is incremented by 1 every time a change is made to the data of that Delta Table.
  • 2. Timestamp: Timestamp specifies the time when a change is made to the data of a Delta Table.
  • 3. userId: userId specifies the id of the Databricks User, who made a change to the data of a Delta Table.
  • 4. username: userName specifies the name of the Databricks User, who made a change to the data of a Delta Table.
  • 5. operation: operation specifies the action that made a change to the data of a Delta Table.
  • 6. operationParameters: operationParameters might specify the information of predicates, or, partitionBy etc. that made a change to the data of a Delta Table.
  • 7. job: job specifies the Job ID, if the action that made a change to the data of a Delta Table was triggered by a Databricks Workflow.
  • 8. notebook: notebook specifies the Notebok ID, if the action that made a change to the data of a Delta Table was run in a Databricks Notebook.
  • 9. clusterId: clusterId specifies the ID of the Databricks Cluster on which the action that made a change to the data of a Delta Table was run.
DESCRIBE HISTORY my_events;

Output -

Complete Overwrite of Delta Table Using INSERT OVERWRITE Statement

The INSERT OVERWRITE statement provides a nearly identical outcome as the CREATE OR REPLACE TABLE statement.

Data in the Target Delta Table will be replaced by the data from the SQL query used in the INSERT OVERWRITE statement.

Following are the features of the INSERT OVERWRITE statement -

  • INSERT OVERWRITE statement can only overwrite an existing table, and, cannot create a new table like the CREATE OR REPLACE TABLE statement.
  • INSERT OVERWRITE statement can overwrite only with new records that match the current schema of the table.
    Thus, this can be a safer technique for overwriting an existing table without disrupting the downstream consumers.
  • INSERT OVERWRITE statement can overwrite the individual partitions.

When the INSERT OVERWRITE statement is implemented on a Delta Table, the table history records the operation as a WRITE, and, the mode would be Overwrite in the operationParameters.

INSERT OVERWRITE my_events
SELECT *
FROM PARQUET.`${directory.path.parquet}`;
DESCRIBE HISTORY my_events;

Output -

Difference Between Complete Overwrite of Delta Table Using CREATE OR REPLACE TABLE Statement and the INSERT OVERWRITE Statement

A primary difference between using the CREATE OR REPLACE TABLE statement, and using the INSERT OVERWRITE statement has to do with how Delta Lake enforces the schema on write.

  • A CREATE OR REPLACE TABLE statement will allow to completely redefine the schema of the Target Delta Table.
  • An INSERT OVERWRITE statement will fail if the schema of the Target Delta Table is tried to change by passing additional columns in the SQL query of the INSERT OVERWRITE statement, unless Optional Settings are provided.
INSERT OVERWRITE my_events
SELECT *, current_timestamp()
FROM PARQUET.`${directory.path.parquet}`;

Output -

CREATE OR REPLACE TABLE my_events AS
SELECT *, current_timestamp() AS Updated
FROM PARQUET.`${directory.path.parquet}`;
DESCRIBE HISTORY my_events;

Output -

Merge Updates

It is possible to upsert data from a source table, source view, or a source DataFrame into a Target Delta Table using the MERGE statement in Spark SQL.

Delta Lake supports INSERTS, UPDATES, and, DELETES in MERGE statement, and, supports extended syntax beyond the SQL standards to facilitate advanced use cases.

Following is the MERGE syntax used in Spark SQL -

  • MERGE INTO target a
    USING source b
    ON {merge_condition}
    WHEN MATCHED THEN {matched_action}
    WHEN NOT MATCHED THEN {not_matched_action}

The main benefits of the MERGE statement are the following -

  • UPDATES, INSERTS, and, DELETES are completed as a single transaction.
  • If the MERGE statement fails the entire single transaction fails. Hence, the data in the Target Delta Table will not be in an unknown state.
  • Multiple conditions can be added in addition to matching fields as merge_condition.
  • MERGE statement provides extensive options for implementing custom logic.

A Small Transformation Using Built-In Spark SQL Function

The column FirstTouchTmstp is in the wrong format. Currently, it represents the value in microseconds. That means that the data type of this column is INTEGER.
The value represents the number of microseconds since 1st January 1970.

CAST: The CAST keyword allows to change the data type of a column from one data type to another.
The syntax for the CAST keyword is CAST (column AS data_type).

The target is to -

First, use the CAST keyword to convert the data type of the column FirstTouchTmstp to TIMESTAMP.

  • To convert the values into TIMESTAMP, first, the values in microseconds need to be divided by 1 million, i.e., 1e6 to convert the values from microseconds to just seconds.
  • Then, the values in seconds will be converted into TIMESTAMP data type using the CAST keyword.

Finally, again use the CAST keyword to convert the data type of the column FirstTouchTmstp from TIMESTAMP to DATE.

CREATE OR REPLACE TEMP VIEW my_events_view AS
SELECT *, CAST(CAST(FirstTouchTmstp / 1E6 AS TIMESTAMP) AS DATE) AS FirstTouchTmstp_Date, current_timestamp() AS Updated
FROM PARQUET.`${directory.path.parquet}`
SELECT * FROM my_events_view;

Output -

Use MERGE Statement With Additional Custom Logic in WHEN MATCHED Clause

The target is my_events_table table, having alias as t, and, the source is my_events_view, which is a temporary view. The source has the alias as s.

The values of the column UserID of both the source and target will be compared in the merge_condition.

  • When the values of the column UserID of both the source and target matches, the custom logic is provided as another set of conditions to be matched in order for the action specified for WHEN MATCHED clause to take effect.
    The custom logic is if the value of the column Email is NULL in the target, and, simultaneously, if the value of the column Email is NOT NULL in the source, only then the values of the columns Email, and, Updated in the target will be updated with the values of the columns Email, and, Updated in the source respectively.
  • When the values of the column UserID of both the source and target do not match, the values of the columns UserID, Email, and, Updated from the source will be inserted to the columns UserID, Email, and, Updated in the target respectively.

First, create the target table my_events_table.

CREATE OR REPLACE TABLE my_events_table
(
UserID STRING NOT NULL,
FirstTouchTmstp_Date DATE NOT NULL,
Email STRING,
Updated TIMESTAMP NOT NULL
)

Now, perform the MERGE operation.

MERGE INTO my_events_table t
USING my_events_view s
ON t.UserID = s.UserID
WHEN MATCHED AND t.Email IS NULL AND s.Email IS NOT NULL THEN
UPDATE SET Email = s.Email, Updated = s.Updated
WHEN NOT MATCHED THEN
INSERT (UserID, FirstTouchTmstp_Date, Email, Updated)
VALUES (s.UserID, s.FirstTouchTmstp_Date, s.Email, s.Updated)

Output -

SELECT * FROM my_events_table;

Output -

INSERT-ONLY MERGE Statement for De-Duplication

  • Many source systems can generate duplicate records. MERGE statement can help in De-Duplication.
  • Sometimes a row of data comes from a source system that already exists in the target system. In that case, the duplicate row of data does not need to be re-inserted into the target system as that already exists.
  • So, it is possible to avoid re-inserting a duplicate row of data into the target system by using the INSERT-ONLY MERGE statement.
    This optimized command uses the same syntax as normal MERGE statement, but only provides a WHEN NOT MATCHED clause. It does not provide the WHEN MATCHED clause at all.
  • Like the previous example, the target is my_events_table table, having alias as t, and, the source is my_events_view, which is a temporary view. The source has the alias as s.
  • Like the previous example, the values of the column UserID of both the source and target will be compared in the merge_condition.
    In addition to that, the values of the column FirstTouchTmstp_Date of both the source and target will also be compared in the merge_condition.
  • When the values of the column UserID of both the source and target matches, as well as the values of the column FirstTouchTmstp_Date of both the source and target matches, then that incoming row from source would be a duplicate row.
    Since, no SQL query is provided for the WHEN MATCHED clause, that means, for the duplicate rows in source, no action would be taken in target. Meaning no insert, or, update will occur in target when duplicate rows are found in source.
  • When the values of both the columns UserID, and, FirstTouchTmstp_Date in both the source and target do not match, then only, the entire row of source will be inserted into the target using INSERT * query in the WHEN NOT MATCHED clause.
    In the WHEN NOT MATCHED clause, it is also possible to provide additional custom logic.
CREATE OR REPLACE TEMP VIEW my_events_view AS
SELECT *, CAST(CAST(FirstTouchTmstp / 1E6 AS TIMESTAMP) AS DATE) AS FirstTouchTmstp_Date, current_timestamp() AS Updated
FROM PARQUET.`${directory.path.parquet}`
SELECT * FROM my_events_view;

Output -

MERGE INTO my_events_table t
USING my_events_view s
ON t.UserID = s.UserID AND t.FirstTouchTmstp_Date = s.FirstTouchTmstp_Date
WHEN NOT MATCHED THEN
INSERT (UserID, FirstTouchTmstp_Date, Email, Updated)
VALUES (s.UserID, s.FirstTouchTmstp_Date, s.Email, s.Updated)

Output -

SELECT * FROM my_events_table;

Output -

Omitting and Renaming Columns from Existing Tables

Simple transformations like — changing column names, or, omitting columns from target tables can be easily accomplished during the table creation.

The following SQL query creates a new table containing a subset of columns from the my_events_table table. The target is to intentionally leave out some columns of the source table.

  • Only, the columns UserID, and, Email will be selected from the my_events_table table.

Also, the selected source columns will be renamed in the below SQL query with the assumption that a downstream system has a different naming convention that the source data.

  • The column UserID will be renamed as user_id, and, the column Email will be renamed as user_email.
CREATE OR REPLACE TABLE my_events_table_subset AS
SELECT UserID AS user_id, Email AS user_email
FROM my_events_table;
SELECT * FROM my_events_table_subset;

Output -

Declare Schema with Generated Columns

It is possible to create a Delta Table with columns with default values using the Generated Column.

Generated Column: Generated Column is a special type of column which has its value automatically generated based on the user-specified logic over other columns in the Delta Table.

Example : The column FirstTouchTmstp_Date is a Generated Column, which is created based on the column FirstTouchTmstp that currently has microseconds as its value.

  • The column FirstTouchTmstp is first divided by 1 million, i.e., 1e6.
  • Then the CAST keyword is used to convert the result to TIMESTAMP data type.
  • Finally, the TIMESTAMP value is converted to DATE data type using the CAST keyword.

The Generated Column is specified at the time of creating a new Delta Table with the following information -

  • Column names and data types, followed by the GENERATED ALWAYS AS keyword.
  • Inside the parenthesis, the logic of generating the column is specified.
    Here the Generated Column is created to calculate the date
  • A descriptive column comment for the generated column. This is optional. Usually, the description is written with explaining the logic to create the Generated Column in a simple English, or, any other language for people to understand.

When the Generated Column is specified at the time of creating a new Delta Table, only the structure of the Delta Table is created, and, the Delta Table contains no data.
Hence, when data, that does not already contain the value of the Generated Column, is added to the Delta Table, the value will be automatically computed for the Generated Column.

CREATE OR REPLACE TABLE my_events_tbl_wth_generated_col
(
UserID STRING NOT NULL,
FirstTouchTmstp INTEGER NOT NULL,
Email STRING,
FirstTouchTmstp_Date DATE GENERATED ALWAYS AS
(
CAST(CAST(FirstTouchTmstp / 1E6 AS TIMESTAMP) AS DATE)
)
COMMENT "Generated Based on `FirstTouchTmstp` Column"
)

The Spark Config Variable, i.e., spark.databricks.delta.schema.autoMerge.enabled needs to be set manually to true to make sure that when the MERGE statement is used, it performs actions, which will create the Generated Columns in the Delta Table.

The Spark Config Variable, i.e., spark.databricks.delta.schema.autoMerge.enabled is not enabled by default.

CREATE OR REPLACE TEMP VIEW my_events_view_for_generated AS
SELECT *, current_timestamp() AS Updated
FROM PARQUET.`${directory.path.parquet}`
SELECT * FROM my_events_view_for_generated;

Output -

SET spark.databricks.delta.schema.autoMerge.enabled = true;

MERGE INTO my_events_tbl_wth_generated_col t
USING my_events_view_for_generated s
ON t.UserID = s.UserID AND t.FirstTouchTmstp = s.FirstTouchTmstp
WHEN NOT MATCHED THEN
INSERT *
SELECT * FROM my_events_tbl_wth_generated_col;

Output -

Add a Table Constraint

Because Delta Lake enforces Schema on Write, Databricks can support standard SQL Constraint management clauses to ensure the quality and the integrity of the data added to a Delta Table.

Databricks currently supports two types of Constraints -

  • 1. NOT NULL Constraints
  • 2. CHECK Constraints

For both the types of Constraints, it must be made sure that no data violating the Constraint is already in the Delta Table prior to defining the Constraint.

Once, a Constraint has been added to a Delta Table, data violating the Constraint will result in write failure.

  • In the example below, a CHECK Constraint is added on the column FirstTouchTmstp_Date in the table my_events_tbl_wth_generated_col.
  • CHECK Constraint looks like standard WHERE clause that is used to filter a dataset.
ALTER TABLE my_events_tbl_wth_generated_col
ADD CONSTRAINT valid_FirstTouchTmstp_Date CHECK (FirstTouchTmstp_Date > '2020-01-01');
  • Table Constraints are shown in the TBLPROPERTIES field.
DESCRIBE EXTENDED my_events_tbl_wth_generated_col;

Output -

--

--

CodeX
CodeX

Published in CodeX

Everything connected with Tech & Code. Follow to join our 1M+ monthly readers

Oindrila Chakraborty
Oindrila Chakraborty

Written by Oindrila Chakraborty

I have 13+ experience in IT industry. I love to learn about the data and work with data. I am happy to share my knowledge with all. Hope this will be of help.

No responses yet