Sync Delta Tables Stored in DBFS (Managed or External) To Unity Catalog

Wahid Atoui
4 min readMay 5, 2023

--

To seemlessly migrate from hive metastore to unity catalog, Databricks comes with a synchronization process which can be ran in one shot (or on a schedule) by using the upgrade button on the UI or the sync command.

However, there are a couple of scenarios that are not yet automatically supported by the synchronization process. So one would need to find a custom way to get the work done. Two options (are listed below and) will be the subject of this short article:

  • Managed delta tables
  • External delta tables stored in DBFS

Databricks recommends for these use cases to recreate the tables in unity catalog and copy the data using a deep clone. But if you are a person who’s not a fan of copying/moving data, you can follow my humble demo. I’ll show you how I get to work around.

The second type of delta tables cloning is called shallow clone, which is way more cheaper to create because it does not copy the data files. However shallow cloning delta tables directly to unity catalog is not yet supported.

Bypassing this limitation means doing the shallow clone to a bridge external location and after synchronizing the result of the last shallow clone to the unity catalog.

First thing first is to parametrize your notebook

dbutils.widgets.dropdown("SourceSchema", "default", [database[0] for database in spark.catalog.listDatabases()])
dbutils.widgets.text("TargetCatalog", "first_catalog")
dbutils.widgets.text("TargetSchema", "first_schema")
dbutils.widgets.text("ShallowCloneLocation", "/mnt/lake/lab/clones")

Then create a temp hive metastore to manage the structure of your source delta tables (to be synchronized) and to keep the same name.

source_schema = dbutils.widgets.get("SourceSchema")
spark.sql(f'create database if not exists hive_metastore.{source_schema}_shallow_clone')

Now we have to loop through [1] source tables (provider: delta, type: managed/external, location: dbfs) in the source schema, [2] shallow clone them and [3] synchronize the result of the shallow clone to the target catalog/target schema.

[1] Get source tables


from pyspark.sql.types import StructType,StructField, StringType
from pyspark.sql.functions import col, regexp_extract

tablesDF = spark.sql(f'show tables in hive_metastore.{source_schema}').where("isTemporary = 'false'")

schema = StructType([
StructField('Catalog', StringType(), True),
StructField('Database', StringType(), True),
StructField('Table', StringType(), True),
StructField('Provider', StringType(), True),
StructField('Type', StringType(), True),
StructField('Location', StringType(), True)
])
tabledetailsDF = spark.createDataFrame([],schema)

for row in tablesDF.collect():
df = (
spark
.sql(f"SHOW TABLE EXTENDED IN hive_metastore.{source_schema} LIKE '{row.tableName}';")
.select(col('information'))
)
detailsDF = (
df
.withColumn('Catalog', regexp_extract(col('information'), 'Catalog: (.*)',1))
.withColumn('Database', regexp_extract(col('information'), 'Database: (.*)',1))
.withColumn('Table', regexp_extract(col('information'), 'Table: (.*)',1))
.withColumn('Provider', regexp_extract(col('information'), 'Provider: (.*)',1))
.withColumn('Type', regexp_extract(col('information'), 'Type: (.*)',1))
.withColumn('Location', regexp_extract(col('information'), 'Location: (.*)',1))
.drop(col('information'))
)
tabledetailsDF = tabledetailsDF.union(detailsDF)

tabledetailsDF = tabledetailsDF.where("lower(provider) = 'delta' and lower(type) in ('managed','external') and lower(location) like 'dbfs%'")

[2][3] Shallow clone and synchronize source tables

target_catalog = dbutils.widgets.get("TargetCatalog")
target_schema = dbutils.widgets.get("TargetSchema")
shallow_clone_location = dbutils.widgets.get("ShallowCloneLocation")

print(f'The list of delta tables in (source catalog: hive_metastore, source schema: {source_schema}) to sync into (target catalog: {target_catalog}, target schema: {target_schema})')
display(tabledetailsDF)

for row in tabledetailsDF.collect():
print(f'Shallow clone the table hive_metastore.{row.Table} ...')
location = shallow_clone_location + '/' + row.Table
print(f"create or replace table hive_metastore.{source_schema}_shallow_clone.{row.Table} shallow clone hive_metastore.{source_schema}.{row.Table} location '{location}'")
spark.sql(f"create or replace table hive_metastore.{source_schema}_shallow_clone.{row.Table} shallow clone hive_metastore.{source_schema}.{row.Table} location '{location}'")

print(f'Sync the table hive_metastore.{source_schema}_shallow_clone.{row.Table} ...')
syncDF = spark.sql(f'sync table {target_catalog}.{target_schema}.{row.Table} from hive_metastore.{source_schema}_shallow_clone.{row.Table};')
display(syncDF)

Here is the result of running the above script for a couple of source test tables,

Once you wrapped all the above codes into a notebook you can run it on a schedule, which suits you, using databricks workflow to synchronize your tables stored in dbfs.

I recomend using this custom pattern as a temporary solution until you stabilize/finalize your workload, and also resolve dependencies between new migrated views and tables stored in unity catalog. Moreover as rule of thumb you have to avoid storing (sensitive or not sensitive) data in dbfs because the later is accessible to all users in your workspace.

--

--