Spark & Tables

Often our Spark jobs need to access tables or data in various formats from different sources. Based on the source type or use case we choose different approaches. Few approaches I have mentioned below with sample codes. The detail end to end configurations have not been shown.

Spark Table with Selective Source Files from Azure Data Lake Storage

The metadata are stored at Spark end (Hive metastore) however, the actual data have been kept at the ADLS.

Problem to be solved:

  1. We have existing records in object store or data lake and we want to create a Hive table based on the data.
  2. We want to selectively load the files using regular expression e.g. from the following folder, we want to load 20180105.csv only and not the test.csv.
Files inside date-wise folders at ADLS

Configuration / Sample Code:

  1. The following code is written in the Azure Databricks with the actual data held in the Azure Data Lake Store (ADLS).
  2. “/mnt/My_Facts/” — the mount location. Databricks File System (DBFS) allows mounting an ADLS location so that we can access the data without specifying credentials every time.

%sql
CREATE TABLE MyFactTable1
(
 Date INT,
 DimId1 STRING,
 DimId2 STRING,
 CurrencyCode STRING,
 Country STRING
)
USING CSV
OPTIONS (header ‘false’, inferSchema ‘true’, delimiter ‘,’)
LOCATION “/mnt/My_Facts/*/*/20*.csv”

Cautions:

  1. In case the underlying files/records are deleted, Spark will throw exception. spark.catalog.refreshTable(“MyFactTable1”) needs to be executed to refresh the Spark catalog. The same ‘refreshTable’ method needs to be executed in case to load any newly added file in the source location.
  2. Without partition, query can be slower for larger records.

Spark Table with Partition based on Source Folder Structure at Azure Data Lake Storage

The metadata are stored at the Spark end (Hive metastore) however, the actual data have been kept at ADLS. Partition information are defined based on source data folder structure.

Problem to be solved:

  1. We have existing records in object store or data lake and we want to create a Hive table based on the data.
  2. The files are within appropriate folder structure (e.g. YYYY/MM) and we want to take benefit by using these as partitions (partitions are very important while accessing terabytes/petabytes of records).

Source Folder Structure (example):

Files inside date-wise folders at ADLS

Configuration / Sample Code:

%sql
CREATE EXTERNAL TABLE MyFactTable2
(
 Date INT,
 DimId1 STRING,
 DimId2 STRING,
 CurrencyCode STRING,
 Country STRING
)
PARTITIONED BY (Year STRING, Month STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’
TBLPROPERTIES (‘header’ = ‘false’, ‘inferSchema’ = ‘false’)
LOCATION “/mnt/My_Facts/”;

ALTER TABLE MyFactTable2 ADD PARTITION (Year = ‘2017’, Month = ‘11’) LOCATION “/mnt/My_Facts/2017/11”;
ALTER TABLE MyFactTable2 ADD PARTITION (Year = ‘2017’, Month = ‘12’) LOCATION “/mnt/My_Facts/2017/12”;

Now we can write queries like:

SELECT * FROM MyFactTable2 WHERE Year = 2017
SELECT * FROM MyFactTable2 WHERE Month = 12 AND Year = 2017

Data from any new files loaded under the folders (in our example, inside Month = 11 and 12) can be queried.

Cautions:

  1. If the data folders (in this example under MM) contain files which we don’t want to load (selective data load), this syntax won’t support; no regular expression is supported here (check the first example which supports the regular expression).
  2. In case we want to include new folders (e.g. Year = 2018 and Month = 01), we need to add those as partitions each time by — ALTER TABLE MyFactTable2 ADD PARTITION (Year = ‘2018’, Month = ‘01’) LOCATION “/mnt/My_Facts/2018/01”;

For further details refer: https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-table.html

If the data source is in a different cloud region than the region hosting the running Spark job, we could incur extra charges for out-of-region data transfer. Apart from that, network latency will be more than the intra-region communication (e.g. Azure Data Lake Storage & Azure Databricks both are in East US2).

Spark Table with Azure SQL Database

Problem to be solved:

  1. We don’t want to write Azure SQL Database connectivity code in each Spark jobs / Databricks notebooks and instead can create a Hive table and refer the table in our code/Databricks Notebooks.

Configuration / Sample Code:

%sql
CREATE TABLE flights2
USING org.apache.spark.sql.jdbc
OPTIONS (
 url ‘jdbc:sqlserver://<sql_database_server_name>:1433;database=<database_name>;user=<user>;password=<password>;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;’,
 database ‘<database_name>’,
 dbtable ‘<schema_name>.<table_name>’
)

Cautions:

  1. Check the Azure SQL Database pricing tier/subscription level (unit: Database Transaction Units/DTUs) if that would be fine to support the Spark job, increasing DTUs will increase cost.

Spark Table with Azure SQL Data Warehouse

Problem to be solved:

  1. We don’t want to write Azure SQL DW connectivity code in each Spark jobs / Databricks notebooks — boilerplate code.

Configuration / Sample Code:

%sql
CREATE TABLE flights
USING com.databricks.spark.sqldw
OPTIONS (
 url ‘jdbc:sqlserver://<server-name>:1433;database=<database_name>;user=<user>;password=<password>;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;’, database ‘<database_name>’,
 dbtable ‘<schema>.<table_name>’,
 forward_spark_azure_storage_credentials ‘true’,
 tempdir ‘wasbs://<container>@<storage_account>.blob.core.windows.net/<container>’
)

We also need to set the following in Spark Configuration once.

%scala
spark.conf.set(“fs.azure.account.key.<storage_account>.blob.core.windows.net”, “<storage_account_access_key>”)

Cautions:

  1. Check the Azure SQL Data Warehouse units (DWU) if that would be fine to support our Spark job, scaling out will increase cost.

Spark Table with Azure Cosmos DB

Problem to be solved:

  1. Visualize Azure Cosmos DB records using Power BI (for details, refer: https://github.com/Azure/azure-cosmosdb-spark/wiki/Configuring-Power-BI-Direct-Query-to-Azure-Cosmos-DB-via-Apache-Spark-(HDI))
  2. We don’t want to write Cosmos DB connectivity code in each Spark jobs / Databricks notebooks — boilerplate code.

Configuration / Sample Code:

%sql
CREATE TABLE flights
USING com.microsoft.azure.cosmosdb.spark
options (
 endpoint ‘https://<endpoint_url>:443/’,
 database ‘<database_name>’,
 collection ‘<collection_name>’,
 masterkey ‘<masterkey>’
)

Cautions:

  1. While using Azure Cosmos DB we may face the RequestRateTooLargeException at Spark end. We can check the Cosmos DB Throughput (increasing this will increase cost). Otherwise, we can use ‘query_pagesize’ option (refer: https://docs.microsoft.com/en-us/azure/cosmos-db/spark-connector)

Spark Table with Azure Blob Storage

Problem to be solved:

  1. We want to create a Spark table backed by Azure Blobs.

Configuration / Sample Code:

  1. Configure the container having blob with ‘Private’ access (i.e. no anonymous access)
Container access level

2. Set the Storage Account access key into the Spark Configuration:

%scala
spark.conf.set(“fs.azure.account.key.<storage_account>.blob.core.windows.net”, “<storage_account_access_key>”)

3. Create the table in Spark:

%sql
CREATE TABLE MyManagedFactTableBlob
(
 Date INT,
 DimId1 STRING,
 DimId2 STRING,
 CurrencyCode STRING,
 Country STRING
)
USING CSV
OPTIONS (header ‘false’, inferSchema ‘false’, delimiter ‘,’)
LOCATION ‘wasbs://<container>@<storage_account>.blob.core.windows.net/’

Cautions:

  1. spark.catalog.refreshTable(“MyManagedFactTableBlob “) needs to be executed to refresh the Spark catalog in case new blobs are added into the container.

Spark Table with Databricks

DBFS is a distributed file system installed on Databricks Runtime clusters. For Azure, files in DBFS persist to Azure Blob storage, so we will not lose data even after we terminate a cluster. Here, data is managed by Spark only.

Problem to be solved:

  1. We want to create the table/load the records into Databricks/Spark end. Azure Databricks stores the records in Parquet format by default, with Snappy compression algorithm.
  2. Comma/Tab separated source files could be loaded as a managed table at DBFS with approximate 90% compression of data.
  3. Non-partitioned records could be loaded at DBFS with appropriate partition key. We should use partitions while working with large record sets.
  4. (If) we want to use Azure Databricks Standard tier only, which is cheaper than the Premium tier.

Configuration / Sample Code:

%scala
val df = spark.read
 .option(“sep”, “\t”)
 .option(“header”, “false”)
 .option(“inferSchema”, “true”)
 .csv(“/mnt/My_Facts/*/*/20*.tsv”)
df.write.mode(SaveMode.Overwrite).partitionBy(“DateKey”).saveAsTable(“<dbName>.<tableName>”)

Cautions:

  1. Spark table doesn’t support UPDATE/DELETE/MERGE operations. So, in some cases we may need to perform multiple operations rather than a single command available in Databricks Delta (e.g.: https://docs.azuredatabricks.net/spark/latest/spark-sql/language-manual/merge-into.html)

Spark with Databricks Delta Table

At the time of writing this, Databricks Delta is in preview mode (refer: https://azure.microsoft.com/en-in/updates/azure-databricks-delta-now-in-preview/) .

Problem to be solved:

  1. We want to create the table/load the records into Databricks/Spark end and we want to use CRUD operations on the table. Great while updating/correcting a Delta table with very large record set. With normal Spark table, this is not possible.
  2. Tracks changes to the table.
  3. For further details, please refer: https://docs.azuredatabricks.net/delta/delta-intro.html

Configuration / Sample Code:

%scala
val df = spark.read
 .option(“sep”, “\t”)
 .option(“header”, “false”)
 .option(“inferSchema”, “true”)
 .csv(“/mnt/My_Facts/*/*/20*.tsv”)
df.write.format(“delta”).mode(SaveMode.Overwrite).partitionBy(“DateKey”).saveAsTable(“<dbName>.<tableName>”)

Cautions:

  1. Available in Azure Databricks Premium tier only, costlier than the Standard tier.

The above codes were tested with Azure Databricks Runtime Version 4.2 (includes Apache Spark 2.3.1, Scala 2.11)