DBT Models using Spark on EMR

Durga Gadiraju
itversity
Published in
6 min readAug 25, 2022

Let us learn how to build DBT Models using Apache Spark on AWS EMR Cluster using a denormalized JSON Dataset.

Here is the high-level agenda for this session.

  • DBT for ELT (Extract, Load, and Transformation)
  • Overview of DBT CLI and DBT Cloud
  • Setting up EMR Cluster with Thrift Server using Step
  • Overview of the Semi-Structured Data Set used for the Demo
  • Develop required queries using Spark SQL on AWS EMR
  • Develop the Spark Application on AWS EMR using DBT Cloud
  • Run the Spark Application on AWS EMR using DBT Cloud
  • Overview of Orchestration using Tools like Airflow

Here is the YouTube Video on the topic which will be live streamed soon on our YouTube Channel.

Get Started

Let us get started with a long-term cherishable journey.

Special Note to Udemy Family

  • Thank You for being a member of our extended family
  • Make sure to rate us and also provide feedback as demonstrated. Your rating and feedback are very important for our community's success. You can use the below video for your reference to leave a rating as well as a review.

DBT for ELT (Extract, Load, and Transformation)

First, let us understand what ELT is and where DBT comes into play.

  • ELT stands for Extract, Load, and Transformation.
  • DBT is a tool that is used purely for Transformation leveraging target database resources to process the data.

What is ETL in Data Warehousing?

Keep in mind that ETL stands for Extract, Transform, and Load.

What is ELT and how it is different from ETL?

Let us understand what is ELT and how it is different from ETL. The main difference is Transformations are primarily done using SQL leveraging Target Data Warehouse Server Capacity. Most of the modern Data Warehouse Technologies support ANSI SQL with the ability to process a variety of data at volume. Instead of using intermediate tools to transform the data, we can leverage Data Warehouse Server capabilities to support both BI as well as ETL/ELT Workloads.

How does DBT support ELT?

As per the official documentation of DBT, it is primarily built to streamline T in ELT. As enterprises are increasingly leaning towards deploying ETL/ELT Workloads leveraging the capacity of Target Data Warehouse Servers on Cloud with serverless abilities, DBT can provide required features such as modularizing the Transformation logic, unit testing, etc.

Based on the requirements and design we need to modularize and develop models using DBT. Once the models are developed and run using DBT, the models will be compiled into SQL Queries and run using the target database.

The open-source community of DBT has developed adapters for all leading databases such as Spark, Databricks, Redshift, Snowflake, etc.

Overview of DBT CLI and DBT Cloud

DBT CLI and DBT Cloud can be used to develop DBT Models based on the requirements.

  • DBT CLI is completely open source and can be set up on Windows Mac or Linux-based desktops.
  • As part of DBT CLI installation, we can take care of installing dbt-core along with the relevant adapters based on the target database.

Setting up EMR Cluster with Thrift Server using Step

As we are not processing a significantly large amount of Data, we will set up a single-node EMR Cluster using the latest version. If you are not familiar with AWS EMR, you sign up for this course on Udemy.

DBT Internally uses JDBC to connect to target Database and hence we need to ensure the Spark Thrift Server is also started as the EMR Cluster comes up with Spark. At the time of configuring a single node cluster make sure to add a step with command-runner.jar and sudo /usr/lib/spark/sbin/start-thriftserver.sh so that the Spark Thrift Server is started after the cluster is started.

Here is the screenshot to configure the step.

Overview of the Semi-Structured Data Set Used for the Demo

Here are the details of the Semi-Structured Data Set used for the Demo. The data set has 5 columns.

  1. order_id which is of type integer
  2. order_date which is a string representation of the date
  3. order_customer_id which is of type integer
  4. order_status which is of type string
  5. order_items which is of type string. But the string is a valid JSON Array.

We can covert string which contains JSON Array to Spark Metastore array<struct> using the from_json function of Spark SQL. However, we need to make sure to specify the schema as a second argument while invoking from_json on top of the order_items column in our data set.

Develop required queries using Spark SQL on AWS EMR

Here are the queries to process the semi-structured JSON Data using Spark SQL.

Spark SQL has the feature of providing the path of files using SELECT Query.

SELECT * FROM JSON.`s3://airetail/order_details`

The column order_items is of type string which has JSON Array stored in it. We can convert to Spark Metastore Array using from_json as below.

SELECT order_id, order_date, order_customer_id, order_status, 
explode_outer(from_json(order_items, 'array<struct<order_item_id:INT, order_item_order_id:INT, order_item_product_id:INT, order_item_quantity:INT, order_item_subtotal:FLOAT, order_item_product_price:FLOAT>>')) AS order_item
FROM JSON.`s3://airetail/order_details`;

Here is the final query which has the core logic to compute monthly revenue considering COMPLETE or CLOSED orders.

WITH order_details_exploded AS ( 
SELECT order_id, order_date, order_customer_id, order_status,
explode_outer(from_json(order_items, 'array<struct<order_item_id:INT, order_item_order_id:INT, order_item_product_id:INT, order_item_quantity:INT, order_item_subtotal:FLOAT, order_item_product_price:FLOAT>>')) AS order_item
FROM JSON.`s3://airetail/order_details`
)
SELECT date_format(order_date, 'yyyy-MM') AS order_month,
round(sum(order_item.order_item_subtotal), 2) AS revenue
FROM order_details
WHERE order_status IN ('COMPLETE', 'CLOSED')
GROUP BY 1
ORDER BY 1;

Develop the DBT Models using Spark on AWS EMR

Let us go ahead and set up the project to develop the required DBT Models to compute monthly revenue. We’ll break the overall logic to compute monthly revenue into 2 dependent DBT Models.

Here are the steps that are involved to complete the development process.

  1. Setup DBT Project using Spark Adapter
  2. Run Example Models and confirm if the project is set successfully
  3. Develop Required DBT Models with core logic
  4. Update Project File (change project name and also make required changes related to the models)

Here is the code for the first model order_details_exploded.sql where we will be preserving the logic for exploded order details in the form of a view.

{{ config(materialized='view') }} SELECT order_id, order_date, order_customer_id, order_status, 
explode_outer(from_json(order_items, 'array<struct<order_item_id:INT, order_item_order_id:INT, order_item_product_id:INT, order_item_quantity:INT, order_item_subtotal:FLOAT, order_item_product_price:FLOAT>>')) AS order_item
FROM JSON.`s3://airetail/order_details`

Here is the code for the second model monthly_revenue.sql where we will be preserving the results in a table in a specified s3 location. The configurations related to creating the table pointing to a specific s3 location can be specified in either this model or at the project level by updating dbt_project.yml.

{{ config( materialized='table', location_root='s3://airetail/monthly_revenue' ) }} SELECT date_format(order_date, 'yyyy-MM') AS order_month, 
round(sum(order_item.order_item_subtotal), 2) AS revenue
FROM {{ ref('order_details_exploded') }}
WHERE order_status IN ('COMPLETE', 'CLOSED')
GROUP BY 1
ORDER BY 1

Run the DBT Models using Spark on AWS EMR

As the development of the DBT Model using Spark Adapter is done let us see how to run and validate the same.

  1. Run the DBT Project with 2 models
  2. Login into the EMR Cluster and launch Spark SQL
  3. Run a query pointing to the target location in which the monthly revenue data is preserved.

Overview of Orchestration using Tools like Airflow

DBT Applications are primarily developed to take care of implementing required transformation logic using the ELT pattern. The overall pipeline might require beyond the transformation logic. We need to make sure the entire pipeline is orchestrated.

One of the ways we can orchestrate the pipeline is by using orchestration tools such as AWS Step Functions, Airflow, etc.

Here is one of the common designs when it comes to building an end-to-end pipeline in which DBT plays a critical role.

Originally published at https://www.analytiqs.io on August 25, 2022.

--

--