How to integrate Databricks with Snowflake-managed Iceberg Tables

Iceberg from DALL-E-2

Disclaimer: I am a Senior Solution Architect at Snowflake with 16 years of data strategy, architecture, and engineering experience. The views expressed here are mine alone and do not necessarily reflect the views of my current, former, or future employers.

You can also watch this in a demo on YouTube: https://www.youtube.com/watch?v=lIavIiBBvV8

There is a lot of discussion about table formats within the Big Data/Analytics ecosystem. Instead of focusing on the "why" of Open vs. Closed or Delta vs. Iceberg, which there will be continued debate on, this post focuses on the "how" of using Snowflake-managed Iceberg tables within Databricks. If you want to read more about why Iceberg, here is a blog comparing the differences among Delta, Iceberg, and Hudi formats.

There have also been multiple Iceberg product announcements from Snowflake and Databricks this year. Snowflake's public preview and Databrick's Uniform, which adds a layer of Iceberg metadata on top of Delta tables, were both announced. The Uniform pattern allows other engines to read Iceberg metadata generated on top of Delta, but Uniform doesn't allow Databricks to read Iceberg. I then came across an excellent blog from Steve Baldwin on how to use Iceberg inside Databricks. Why not extend this to Snowflake-managed Iceberg?

If you are interested in reading more about Automating Metadata with Externally Managed Iceberg Tables in Snowflake, check out my other blog:

Snowflake Public Preview — December 2023

As of this publication, Snowflake has made Iceberg Tables publically available (Public Preview). This means you can use CSP cloud storage (AWS, Azure, GCP) to store Iceberg data while gaining the performance and other benefits of Snowflake (e.g., security, governance, sharing.) You can read more about Snowflake's strategy of unifying Iceberg Tables, but in summary, there are two catalog configurations for an Iceberg Table:

  • Snowflake-managed — Snowflake can read/write with performance on par with Snowflake's native table format. The Iceberg catalog is managed by Snowflake and is made available to other compute engines via an open-source SDK.
  • Externally-managed — Snowflake can read Iceberg tables created by other engines (e.g., Spark). This approach is similar to external tables but is much more performant (>2x, as stated in the blog above.) The catalog is managed externally with Glue or Object Storage, with which Snowflake can integrate.

For this blog, I will focus on creating Snowflake-managed Iceberg Tables and using the Catalog SDK within Databricks.

Implementation

I will walk through the steps of creating Snowflake-managed Iceberg Tables on AWS, accessing the SDK, and leveraging Azure Databricks compute engine to read the files. This example uses files with the following statistics:

  • Customer file with 5M records ~ .25GB, and a unique ID field.
  • Transaction file with 100M records, ~3.5GB, and a CUSTOMER_ID FK to relate to customer ID.

Update 5/1/2024 — I have also included instructions for reading data in Databricks from Azure Storage, not just AWS. Please see “Use Databricks Spark” below.

Iceberg on Snowflake

Iceberg tables utilize a new construct called an External Volume. External Volumes are account-level objects required to read from and write to external cloud storage in AWS, Azure, or GCP. The cloud storage location and IAM information you designate for your Iceberg tables are stored in this external volume object. Note: Data storage usage is billed directly by your cloud storage provider, while your Snowflake account is billed for compute and cloud services usage.

  • Step 1: Create the external volume for your Iceberg data. This is similar to your Storage Integration but is specific to Iceberg tables.
use role accountadmin;
CREATE OR REPLACE EXTERNAL VOLUME EXT_VOLUME_ICEBERG
storage_locations =
(
(NAME = 'ice_us-east-1'
STORAGE_PROVIDER= 'S3'
STORAGE_BASE_URL= 's3://path_to_bucket/iceberg'
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::0123456789:role/Snowflake_Role_for_Iceberg')
);
  • Step 2: Similar to a storage integration, you must run the following command and update the trust policy in S3 (or equivalent). You can get the params from running DESC EXTERNAL VOLUME <external_volume_name>; the outputs, which are STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID. These outputs are needed in the instance of AWS to update the IAM role trust policy.
  • Step 3 (optional): Create an external stage pointing to the exact location as the external volume. Although not required, it's helpful to have a window into your bucket from within Snowflake. Note that this path must also be configured in your storage integration if you want to use this approach. On creation, your bucket will be empty.
--create a stage pointed to your iceberg files (optional). 
USE ROLE SYSADMIN;
USE ICE_DB.PUBLIC;
CREATE OR REPLACE STAGE ICEBERG_STAGE
url='s3://path_to_bucket/iceberg'
storage_integration = S3_INT;
--verify the stage is created and empty
LS @ICEBERG_STAGE;
  • Step 4: Create the Iceberg tables from randomly generated data. We use the Faker Python library in a Stored Procedure to help generate the data. Because the tables are large, I recommend using an XL warehouse to increase the speed to generate. For Iceberg table creation, we specify the external volume we created in Step 1, catalog, and base location. We specify CATALOG=’SNOWFLAKE' for Snowflake-managed tables.
USE ROLE SYSADMIN;
USE ICE_DB.PUBLIC;
--create the XL warehouse and use it
CREATE WAREHOUSE COMPUTE_XL_WH WAREHOUSE_SIZE ='X-Large';
USE WAREHOUSE COMPUTE_XL_WH;
--customer data which leverages the Snowflake-managed Iceberg Catalog 
--and External Volume
CREATE OR REPLACE ICEBERG TABLE ICEBERG_CUSTOMER
CATALOG='SNOWFLAKE'
EXTERNAL_VOLUME='EXT_VOLUME_ICEBERG'
BASE_LOCATION='customer' AS (
select
--customer info
ROW_NUMBER() OVER (ORDER BY seq4()) ID,
UPPER(faker('en_US', 'bothify', '##########' )::string) CUSTOMER_NUM,
faker('en_US', 'first_name', '')::string first_name,
faker('en_US', 'last_name', '')::string last_name,
faker('en_US', 'email', '')::string email,
faker('en_US', 'city', '')::string city,
faker('en_US', 'state', '')::string state,
faker('en_US', 'current_country_code', '')::string country,
faker('en_US', 'url', '')::string website,
faker('en_US','phone_number', '')::string phone_1,
faker('en_US','date_of_birth', '18')::date DOB,
faker('en_US','company', '')::string company
--profile
from table(generator(rowcount => 5000000)));
SELECT * FROM ICEBERG_CUSTOMER LIMIT 10;

--20 transactions per customer; 100M record Iceberg format table
CREATE OR REPLACE ICEBERG TABLE ICEBERG_TRANSACTION
CATALOG='SNOWFLAKE'
EXTERNAL_VOLUME='EXT_VOLUME_ICEBERG'
BASE_LOCATION='txn' AS (
select
--customer info
ROW_NUMBER() OVER (ORDER BY seq4()) ID,
ICEBERG_CUSTOMER.ID AS CUSTOMER_ID,
UPPER(faker('en_US', 'bothify', '?###########' )::string) TRANSACTION_NUM,
faker('en_US', 'date_time_this_year', '')::datetime(6) TRANSACTION_DATE,
uniform(0,30 , random(120)) QUANTITY,
REPLACE(LTRIM(faker('en_US', 'pricetag', '1'), '$'), ',', '')::float UNIT_PRICE,
faker('en_US', 'sentence', '4')::string TRANSACTION_DESC
--profile
from table(generator(rowcount => 20)), ICEBERG_CUSTOMER);

-- suspend the WH after use
ALTER WAREHOUSE COMPUTE_XL_WH SUSPEND;

--verify the tables are created
SHOW TABLES LIKE 'ICEBERG%';

--view the newly generated files were created in your S3 bucket
LS @ICEBERG_STAGE;
  • You can run a query on these new tables and verify that the files were created in your storage provider using the following: LS @iceberg_stage/; You can also see the files directly created in your service provider account.
Iceberg Data in S3 — generated from Snowflake.

Provide Access for an External User

It is a best practice to only expose the Iceberg tables to a specific user and role. A few lines of code, and we can set this up to be used within Spark. Note: No compute warehouse was given to ICEBERG_ROLE.

Spark will also require connectivity to your S3 bucket via IAM Access Keys.

  • Step 1: Create a new user and role for Spark access to the Iceberg tables.
USE ROLE SECURITYADMIN;
--create a new user and role.
CREATE OR REPLACE USER ICEBERG_USER PASSWORD = '*******';
CREATE ROLE ICEBERG_ROLE;
GRANT ROLE ICEBERG_ROLE TO USER ICEBERG_USER;

--Grant external volume, DB, and tables to the new role
GRANT USAGE ON EXTERNAL VOLUME EXT_VOLUME_ICEBERG to ICEBERG_ROLE;
GRANT USAGE ON DATABASE ICE_DB TO ROLE ICEBERG_ROLE;
GRANT USAGE ON SCHEMA ICE_DB.PUBLIC TO ROLE ICEBERG_ROLE;
GRANT SELECT ON TABLE ICE_DB.PUBLIC.ICEBERG_CUSTOMER TO ICEBERG_ROLE;
GRANT SELECT ON TABLE ICE_DB.PUBLIC.ICEBERG_TRANSACTION TO ICEBERG_ROLE;
  • Step 2: You will also need to create an IAM role (or equivalent) and provide (read-only) access to the S3 bucket where the Iceberg data is located. Once the user is created, you'll need an access key and secret to access the data.

Use Databricks Spark

When you log in to Databricks, you will need to create or modify a cluster. For this demo, I used the 10.4 LTS runtime (Spark 3.2.1 and Scala 2.12). The Catalog SDK config options are listed in the docs below, but I’ve added a few lines specific to AWS and Azure. You will also need a few libraries to be uploaded or grabbed from Maven. Ensure the Iceberg Spark Runtime version matches the version you use with Databricks.

In this setup, I used Azure Databricks connecting to files in Amazon S3 or Azure Blob Storage— this shows how easy it is to use the Snowflake Catalog SDK in a cross-cloud scenario.

More information on the Snowflake Catalog SDK is in the docs below:

  • Step 1: Modify your Cluster's Spark Config and Environmental Variables
  • Step 2: Add the Iceberg spark runtime, AWS iceberg bundle, and Snowflake JDBC driver libraries (if using Azure, there is no need to include the aws-bundle)
#spark config for Snowflake-managed catalog 
spark.sql.catalog.snowflake_catalog.catalog-impl org.apache.iceberg.snowflake.SnowflakeCatalog
spark.sql.catalog.snowflake_catalog org.apache.iceberg.spark.SparkCatalog
spark.sql.iceberg.vectorization.enabled false
#Snowflake Connection info
spark.sql.catalog.snowflake_catalog.uri jdbc:snowflake://myorg-myaccount.snowflakecomputing.com
spark.sql.catalog.snowflake_catalog.jdbc.user ICEBERG_USER
spark.sql.catalog.snowflake_catalog.jdbc.password ***

### AWS Config options ###
spark.sql.catalog.snowflake_catalog.io-impl org.apache.iceberg.aws.s3.S3FileIO
#environment variables. this is your AWS info to connect to your bucket
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=****
AWS_SECRET_ACCESS_KEY=*****
## End AWS Config Options ####

### Azure Config Options ###
spark.sql.catalog.snowflake_catalog.io-impl org.apache.iceberg.hadoop.HadoopFileIO
spark.hadoop.fs.azure.account.key.<storage_container>.blob.core.windows.net <storage container key>
### End Azure Config Options ###

#libraries
net.snowflake:snowflake-jdbc:3.14.3.
org.apache.iceberg:iceberg-aws-bundle:1.4.2. #aws only
org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.3.0
  • Step 3: Start your cluster, and let's query Snowflake-managed Iceberg data in Databricks! Remember that we didn't provide the Snowflake user with a warehouse. The compute will take place on Azure Databricks, directly accessing the Iceberg metadata and data, leveraging the Snowflake Iceberg Catalog.
%sql
USE CATALOG snowflake_catalog;
SHOW NAMESPACES;
SHOW NAMESPACES IN ICE_DB;

USE ICE_DB.PUBLIC;
SHOW TABLES;
%sql
SELECT STATE, SUM(UNIT_PRICE * QUANTITY) AS TOTAL_PRICE
FROM ICEBERG_CUSTOMER C, ICEBERG_TRANSACTION T
WHERE C.ID = T.CUSTOMER_ID
GROUP BY STATE ORDER BY 1;

You can also leverage pySpark to query Iceberg tables.

df_cust = spark.table("ICEBERG_CUSTOMER")
df_cust.show()
  • Step 4: Validate data is updated in Databricks once changed in Snowflake. Use Snowflake to insert data (100,000 Customer rows), and you can see the data immediately reflected in Databricks. In Snowflake, you can run.
INSERT INTO ICEBERG_CUSTOMER
select
--customer info
ROW_NUMBER()OVER (ORDER BY seq4()+5000000) ID,
UPPER(faker('en_US', 'bothify', '##########' )::string) CUSTOMER_NUM,
faker('en_US', 'first_name', '')::string first_name,
faker('en_US', 'last_name', '')::string last_name,
faker('en_US', 'email', '')::string email,
faker('en_US', 'city', '')::string city,
faker('en_US', 'state', '')::string state,
faker('en_US', 'current_country_code', '')::string country,
faker('en_US', 'url', '')::string website,
faker('en_US','phone_number', '')::string phone_1,
faker('en_US','date_of_birth', '18')::date DOB,
faker('en_US','company', '')::string company
--profile
from table(generator(rowcount => 100000));
  • Then, switch back to Databricks to get the new row count to validate that the 100,000 newly generated rows are automatically reflected.
df_cust = spark.table("ICEBERG_CUSTOMER")
df_cust.count()

Conclusion

Snowflake is fully committed to Iceberg and the open-source data format. In this blog, I shared how you can leverage Iceberg tables, created and managed in Snowflake, with a third-party engine — Databricks managed Spark. Snowflake is just getting started with Iceberg, and I’m excited to see additional features that will further support open-format data lake workloads.

Demo Video:

--

--