Publish Cloud Dataprep Profile Results to BigQuery
Use Case
For data governance purposes, customers often want to store the profile metadata generated by Cloud Dataprep Premium when jobs are run. In this scenario, a customer wants to retain the profiling metadata in BigQuery for reporting purposes.
This article describes how to use webhooks and Cloud Functions to automatically publish Dataprep-generated profile information into BigQuery (after making an intermediate stop in GCS).
We will build the following automated process:
- Run a Cloud Dataprep job with profiling enabled.
- In Cloud Dataprep, invoke a webhook that calls a Cloud Function.
- The Cloud Function calls the GET profile results API.
- The Cloud Function saves the API response to GCS.
- The Cloud Function triggers a separate Cloud Dataprep job to process the JSON API response and publish a BigQuery table.
If you don’t already have access to Cloud Dataprep Premium, and you want to try this yourself, you can sign up here.
Step-by-step instructions
Step 1: Understand the API output containing the profile metadata
Whenever you run a job with profiling enabled, Cloud Dataprep generates metadata about the profiling results. There are three types of profile metadata information that Cloud Dataprep will output:
- profilerRules: Contains information about each DQ rule and the number of passing and failing rows for each rule.
- profilerTypeCheckHistograms: Contains information about the number of missing, mismatched, and valid records for each column in your dataset.
- profilerValidValueHistograms: Contains information about min/max/median values for numeric or date columns, and the top 20 unique values by count for string columns.
These profile results appear in the Cloud Dataprep UI, and can also be retrieved through an API call. In order to publish the profile metadata to BigQuery, you will need to make an API call to return the JSON representation of the profile information.
You can read about the API call at this link: https://api.trifacta.com/dataprep-premium/index.html#operation/getProfilingInformationForJobGroup
Step 2: Create a Cloud Function to invoke the profile results API
In order to invoke the profile results API after a Cloud Dataprep job finishes, we need to create an HTTP Cloud Function. This type of cloud function can be called from a Cloud Dataprep webhook. You can find the full node.js code for the cloud function on Github. For step-by-step instructions about how to create a Cloud Function, refer to the Google Quickstart guide.
Note: There are two main blocks of work included in the Cloud Function. This section applies to the first block of work that saves the profile results API to GCS. We will return to the Cloud Function in step 4 and discuss the second block of work that runs a Cloud Dataprep flow and writes an output to BigQuery.
First, we want the cloud function to invoke the job results profile API. The code block shown below (extracted from the full index.js Cloud Function code on Github) submits a GET request to the job results profile API endpoint and stores the response body in a temporary variable. Note that you will need to populate the “DataprepToken” variable with your own API token. Instructions for generating an API token for Cloud Dataprep can be found at this link.
In preparation for storing the JSON response in GCS, I wanted to give some thought to how to create a fully automated, end-to-end pipeline. Ideally, we could invoke the same cloud function after any Dataprep job run, and programmatically identify the profile corresponding to the specific dataprep job and the correct version of the profile. We can use dynamic name and date variables in our output filename to accomplish this.
The following code block makes two additional Dataprep API calls that allow us to retrieve the name of the output from the job run that created our profile JSON. We will be invoking the GET JobGroup API, and the GET WrangledDataset API. The output from these calls is stored in a variable called “name”.
Next, we need to upload the JSON to GCS. At this point, I recommend giving some thought to the directory structure that you want to use for all of the intermediate JSON outputs. I chose to save each JSON file to the following path on GCS:
gs://my-bucket/my-email/profiles/
When I call the API, each of the new profile JSON files will be stored in this path and named based on the original output name (retrieved in the previous code block) and the date. You will want to adjust the variable named “filename” to include your GCS filepath. Note the addition of the “name” and “date” variables in the filename.
We have now saved the JSON output from the profile results API to GCS. Before proceeding to the next step, I recommend saving a sample JSON output to your designated path on GCS. You can do this by manually invoking the profile results API, renaming the file according to the pattern you configured in your Cloud Function code, and uploading the file to GCS.
Step 3: Create a Cloud Dataprep flow to process the profile JSON
Next, we need to build a Cloud Dataprep flow to transform the profile results JSON file and create a BigQuery table.
You will typically want to structure the profile metadata JSON files so that you retain all of the useful information, but do not constrain the schema of the output table based on the schema of the profiled source.
I’ve created a sample flow using Dataprep and saved it to Github. I recommend downloading this zip file, importing it into Dataprep, and using it as the basis for your own work. The flow that I created produces three separate output tables in BigQuery, one for each type of profile metadata generated by the profile results API. It also generates two dynamic columns for each output: one column that contains the source filename, and one column that contains the current date and time.
Go ahead and replace the source dataset in this flow with your JSON output file from GCS.
You will also need to reconfigure each of the outputs to point to your BigQuery instance. Click on one of the outputs to open the details panel on the right side of the screen. From the details panel, select “Edit” next to “Manual Destinations”:
On the Publishing Settings screen, hover your mouse over the “Append to BigQuery” destination and click the “Edit” button on the far right:
Navigate to your desired BigQuery dataset and click “Create new table” on the right side of the screen. Give your output table a name, and click the “Append to this table every run” option. This will ensure that you retain the history of profile results in your BigQuery table.
Click “Update”, followed by “Save Settings” to configure your BigQuery output.
At this point, we have a flow that identifies a static file on GCS and produces a BigQuery table. We can add parameters to our source to make this flow dynamic.
Right-click on your source dataset and choose “Replace with dataset with parameters” from the pop-up menu:
This will open a dialog box that allows you to configure a parameterized path. We will add two parameters in Dataprep that map to the “name” and “date” variables from our Cloud Function code.
First, we will create the “name” variable. Highlight the portion of the filepath that corresponds to this variable and click the “Add variable” button that appears over your selection:
Give this variable a name and click “Save”. In my example code, I’ve named this variable “original_output_name”.
Next, we will create a date/time variable. Highlight the portion of the filename that contains the date and time, but do not select the seconds. Dataprep does not support date/time variables that include seconds. After highlighting the date and time, click the “Add Datetime Parameter” button that appears over your selection:
From the pop-up box, enter the format for your date/time variable. If you’re using my example code, the format is YYYY-MM-DD_HH:mm. Configure the date range that will be evaluated at job run time. Since this flow will be triggered by our Cloud Function immediately after the output JSON file is saved on GCS, you should select the option that reads “Date is from … ago until present.” The next box allows you to further configure the date range; in the image below, I’ve set 5 minutes as our lower bound.
IMPORTANT tip: Make sure you pay attention to the timezone. If you configure an incorrect timezone in this variable, Dataprep will not be able to identify the correct source file.
Click “Save” to set your date/time variable.
Since Dataprep’s date/time variable doesn’t support seconds, we need to add a third variable to our filepath. Highlight the seconds, including the “:”, and choose the “Add Pattern Parameter” button:
Select the “Wildcard” option and hit “Save”.
After you have configured all of the parameters, you can click “Replace” in the bottom right side of the screen.
You have now built a flow that will dynamically identify the correct profile JSON file on GCS, transform that file, and write a table to BigQuery.
Step 4: Add code that allows your Cloud Function to trigger the profile JSON processing flow
Let’s return to our Cloud Function. The next step in our process is to automatically trigger the flow that we just built. To do this, we need to invoke the run JobGroup API. This API requires you to pass a JSON body that contains a wrangledDataset ID and any parameter overrides.
To find the wrangledDataset ID, click on the recipe immediately before your output object. The wrangledDataset ID will appear in the URL bar after “recipe=”.
For example, if I am using this API to run a job and create the output named “profilerRules”, the wrangledDataset ID is tied to the recipe named “profilerRules”. Looking at the URL bar, you can see that the wrangledDataset ID is “165355”.
Since our profile processing flow includes a parameterized datasource, we will also need to include a variable override in the JSON request body. This override should use the variable name that you configured in the previous step. In my sample code below, I passed in the Cloud Function variable called “name” to dynamically pass the correct value into the API call. This allows my function to be reusable for multiple sources.
In my sample code, I only have one API call that produces the profilerRules output in BigQuery. If you want to produce all three outputs from my sample flow, you will need to duplicate this code block and change the wrangledDataset ID to point to the recipe attached to each of the other outputs.
With this addition, we have now completed our Cloud Function. You can find the full code on Github.
Step 5: Create a webhook on your source flow
Finally, you will need to create a webhook on your source flow that will invoke the HTTP Cloud Function. Webhooks live at the flow level in Dataprep, and allow you to invoke post-processing tasks like Cloud Functions after a Dataprep job has completed. You can also pass information about the job execution into a webhook. We will use this functionality to send the job ID into our cloud function to retrieve the correct profile results.
To create a cloud function, click on the three dots icon at the top of the flow and choose “Configure webhook tasks”:
You will need to populate your webhook with the following information:
- URL: The URL for your HTTP cloud function.
- Headers: Enter “content-type” and “application/json” as your headers.
- Body: Enter {“jobid”:”$jobId”} as your request body. The variable $jobID will pass the most recent job execution ID to your Cloud Function.
- Trigger event: Job success. This will only invoke the Cloud Function when your job successfully runs.
- Trigger object: You can decide to trigger the Cloud Function for any output in your flow, or for only specific outputs in your flow.
An image of my configured webhook is shown below:
Click “Save” to save your webhook.
Step 6: Trigger the full flow
You are now ready to test the end-to-end process by running a job from your source flow. When this job completes, the webhook will call the HTTP Cloud Function. You can monitor the status of the Dataprep jobs in the Dataprep UI, and the status of your Cloud Function in the GCP console.
When your job completes, you will see the status of each of the activities, including the webhook call, in the UI:
You’ve now created an end-to-end flow to consume the profile metadata generated by Dataprep in BigQuery.
For more articles about Dataprep and BigQuery, check out the following links: