Dataplex — Data Processing using Custom Pyspark/Spark

Shashank Tripathi
Google Cloud - Community
6 min readDec 15, 2022

Introduction:

Dataplex is an intelligent data fabric that provides a break free from data silos, unified search, and data discovery, based on business context, across distributed data. It also Centralized security and governance enabling distributed ownership of data with global control. Dataplex provides automatic metadata discovery for data stored in Cloud storage and Big Query. Apart from this users can access the data securely and query it via BigQuery as external tables. Users can run data quality and custom PySpark which runs a serverless batch task. Dataplex also provides fully-managed, serverless Spark environments with simple access to notebooks and SparkSQL queries.

In this post, we will focus on two major areas :

  • Setup the Data Management — Lake, Zone, Asset.
  • Setup and run transformation using Custom PySpark Task.

Will cover each area in Detail and the required Step needed to run it.

Prerequisites:

To get access to Dataplex UI and run the custom PySpark task, one should have the appropriate access to the role or service account granted in Cloud IAM. If the Data transformation task needs to read or update the Dataproc Metastore instance attached to the lake, then the service account needs the Dataproc Metastore Viewer or Editor role also.

Setup the Data Management — Lake, Zone, Asset :

Step 1: Create a Data Lake:

From the Navigation menu > under Analytics > select Dataplex > under Manage Lakes -> select Manage -> create

Step 2: Define the Data Zone:

It is an important step when setting up the Data Lake. Zone restricts the type of Data you can store. There are two types of Zone in Data Lake:

  • Raw Zone: Data that is in its raw format and not subject to strict type-checking.
  • Curated Zone: Data that is cleaned, formatted, and ready for analytics. The data is columnar, Hive-partitioned, in Parquet, Avro, Orc files, or BigQuery tables. Data undergoes type-checking, for example, to prohibit the use of CSV files because they do not perform as well for SQL access.

We will create the Raw (RAW_ZONE) and Curated (CURATED_ZONE) Zone under the Lake (Dataplex-demo). For each zone created, Dataplex automatically creates a BigQuery Dataset.

Click on the newly created Data Lake > Click Add Zone under the Zones tab.

Follow the same steps to create the required zone (Raw/Curated).

Step 3: Add assets:

In this step, we will add the Asset under the required Zone i.e map to the data stored in either Cloud Storage or BigQuery. You can map data stored in separate Google Cloud projects also as assets into a single zone. Once this is done and data discovery jobs access the metadata we can see the entity which represents the metadata for structured and semi-structured data (table) and unstructured data (files).

Click on the created Raw Zone > Click Add Asset > Under Type -> Select Storage bucket -> Select the desired bucket for raw data -> Provide Discovery settings (Inherit/Override) -> Review assets -> Submit

For Adding the Asset in the Zone we have two options:

  • Storage Bucket
  • Big Query Dataset

Note: When setting up the Storage Bucket as an Asset one should take care of creating an object folder in the bucket since for each folder there will be one table in BigQuery Dataset. Dataplex automatically creates the Dataset with the name of the Zone (Raw/Curated) which is created inside the Lake. The reason for creating the Object Folder (Prefix) within a bucket is since Asset follows Hive Style conventions, a folder represents an entity or group of entities with similar schema.

Dataplex discover these assets as two separate tables and each prefix/folder could host many Hive-style partitioned files and Dataplex will discover those files and create a table, as long as the files have the same schema.

Setup and run transformation using Custom PySpark Task:

Now, we have the Asset present in the required zone, we create a custom PySpark task and run the transformation job by passing the argument and reading the data from the Raw Zone Asset and transforming it, and writing it in the Cloud Storage bucket which is mapped with Curated Zone Asset. Dataplex supports scheduling custom code execution, using cron or a one-time run. We can schedule the custom task using Spark (Java), PySpark (<=3.2), or SparkSql. These scripts are executed over the serverless Spark processing and built-in serverless scheduler.

Before moving further to the step you must be wondering what is the benefit of running the Custom PySpark Task on Dataplex: Running a custom PySpark task provides more flexibility for writing the transformation jobs and storing them in a curated zone which will automatically create the tables in BigQuery. Apart from this, the developer who is familiar with Spark’s background will be able to write the transformation code easily in Java, Spark, PySpark, or Spark SQL.

Step 1: Prepare the PySpark Script and place it in the GCS Bucket.

Dataplex_custom_spark_template.py

The above script placed in the GCS bucket reads the argument specified in the Add Argument Section in the Dataplex Create Task UI. The script takes the source, target path, and source and target format and runs the transformation, and writes it to the target path which is registered as an Asset in the Curated Zone.

Step 2: Enable the Private Google Access:

Enable Private Google Access for your network and/or sub-network. If not specified it will take the default subnet. Will Enable the Private Google Access in the default Subnet.

From the Navigation menu > under Networking > select VPC Network > Click default -> select the subnet region in which Dataplex Task will run -> Click Edit -> Enable Private Google Access.

Step3: Create a Custom Spark Task:

Create the Spark Task as mentioned in the below Steps:

Under Manage Lakes -> select Process -> Create Task -> Click on Create Custom Spark Task.

Step 3a: Select the Dataplex Lake, Enter the details for the Task Configuration :

  • Type: Spark/ PySpark.
  • Main Class or Jar file: For PySpark : Fully qualified Cloud Storage URI and For Spark fully qualified name of a class.
  • File uris: GCS URIs of files to be placed in the working directory of each executor, Example: log4j-spark-driver-template.properties files can be placed in each executor for logging.
  • Archive Files: Archives files are extracted in the Spark working directory.
  • Add Argument: Enter the Argument to be passed in GCS Python script. Example: Enter source, target path and source, target format used.

Step 3b: Set the Schedule: We can select between two options:

  • Run Once : Select Immediately or Cron Option.
  • Daily : Select any option like Daily, Weekly, Monthly or Custom.

Step 3c: Select under Customize Resources -> Network Configuration -> Check default VPC should be selected, since we enable the Private Google Access for default VPC.

Step3d: Verify the Custom Spark Job you run:

Conclusion

In this article, we saw how we can configure and run the PySpark task in Dataplex for Data transformation and processing. We also talked about how to pass multiple arguments and use it in your PySpark script to perform the transformation and various points to take care of when working with Data management in Dataplex.

References

Hope you enjoyed this article and found it useful. Thanks KARTIK MALIK for your inputs. You can reach me at LinkedIn.

--

--