Automatic import of metadata assignments from Excel file to WKC catalog using REST API.

Andrzej Wrobel
6 min readJul 11, 2023

--

By Andrzej Wrobel and Kriti Rajput

In the modern business landscape, data plays a pivotal role in decision-making and strategic planning. Many organizations rely on spreadsheets, such as Microsoft Excel, to manage and analyze their data. At the same time, businesses are increasingly leveraging knowledge graphs and ontology frameworks, such as Watson Knowledge Catalog (WKC), to capture and represent their domain-specific knowledge. However, manually bridging the gap between these two worlds can be time-consuming and error-prone.

To address this challenge, we present a script that automates the process of reading data from Excel spreadsheets and establishing associations with business terms stored in WKC using its REST API. This script enables seamless integration between structured data analysis and the rich knowledge representation capabilities of WKC, empowering businesses to gain valuable insights and unlock the full potential of their data.

The script utilizes the RESTful architecture, a widely adopted approach for building web services, to interact with the WKC REST API. By leveraging this API, the script can programmatically access and manipulate the business terms and associated metadata stored in WKC, including their relationships, attributes, and semantic information.

Once the business terms have been identified within the data, the script utilizes the WKC REST API to search for those terms within the business glossary. It retrieves the associated metadata and maps them back to the corresponding data elements in Excel. This linkage between the data and business terms enhances the understanding of the data’s context, allowing for deeper analysis and more informed decision-making.

The automated data analysis script offers several benefits to businesses. It saves time and effort by eliminating the manual process of identifying and associating business terms with data elements. It also reduces the risk of human error, ensuring consistent and accurate mapping between data and knowledge graph. Furthermore, it enables the seamless integration of structured data analysis with domain-specific knowledge captured in WKC, fostering a holistic and insightful approach to data-driven decision-making.

Python script leverages panda library and contains few sections:

  1. Return a Cloud Pak for Data token needed for executing subsequent APIs
  2. Finding catalog ID for the provided catalog Name from WKC environment
  3. Given term name, find the business term in the WKC glossary
  4. Given asset details, find the asset in the WKC catalog
  5. Update Asset column list of terms.
#!/usr/bin/env python
# coding: utf-8

import os
import pandas as pd
import requests
import json
import random
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

globalDebug=True
globalDebug=False


# Provide your Cloud Pak for Data (WKC) credentials
cpd_url='https://cpd-wkc.apps.cpd-host.ibm.com'
cpd_username='admin'
cpd_password='password'
catalog_name='Test Term Assignment'

# Return a Cloud Pak for Data token needed for executing subsequent APIs
def getCPDtoken(cpd_url,cpd_username,cpd_password):
# get token
url = cpd_url + '/v1/preauth/validateAuth'

try:
response = requests.get(url,auth=(cpd_username,cpd_password),verify=False)
response.raise_for_status()
except requests.exceptions.HTTPError as err:
print("Failed to obtain Cloud Pak for Data authentication token. ERROR: ", err)
return -1
except requests.exceptions.RequestException as e: # This is the correct syntax
print("Failed to obtain Cloud Pak for Data authentication token. ERROR: ", e)
return -1
mltoken = response.json()["accessToken"]

return mltoken

# This module returns the catalog ID for the provided catalog Name
def getCatalogID(cpd_url,token,catalogName):
# Endpoint for getting All catalogs defined on the platform
url = cpd_url + '/v2/catalogs?limit=10000'

# token to authenticate to the platform
header = {'Content-Type': 'application/json', 'Authorization': 'Bearer ' + mltoken}

# GET all catalogs
try:
response = requests.get(url,headers=header,verify=False)
response.raise_for_status()
except requests.exceptions.HTTPError as err:
print("Failed to get list of Catalogs defined in WKC. ERROR: ", err)
return -1
except requests.exceptions.RequestException as e: # This is the correct syntax
print("Failed to get list of Catalogs defined in WKC. ERROR: ", e)
return -1

catalogsList = response.json()
for c in catalogsList['catalogs']:
#print("Catalog details: ", c)
if c['entity']['name'] == catalogName:
return c['metadata']['guid']
print("Catalog: ", catalogName, " not found")
return -1

# Given term name, find the business term in the WKC glossary
def findTermID(termName, cpd_url, token):
url = cpd_url + f'/v3/search'

# NOTE: you must construct mltoken based on provided documentation
header = {'Content-Type': 'application/json', 'Authorization': 'Bearer ' + token}

payload = {
"_source": ["entity.artifacts.global_id", "metadata.name", "metadata.artifact_type"],
"query": {
"match": {
"metadata.name": termName
}
}
}

try:
response = requests.post(url, json=payload, headers=header,verify=False)
response.raise_for_status()
except requests.exceptions.HTTPError as err:
print("ERROR: ", err)
return -1
except requests.exceptions.RequestException as e: # This is the correct syntax
print("ERROR: ", e)
return -1
results = json.loads(response.text)
if results and results['size'] < 1:
print("No glossary terms found with specific name: ", termName)
return -1

# Confirm the business term name is an exact match
# Search can return multiple results
termFound = False
for row in results['rows']:
# ignore assets that are NOT data_asset
aType = row['metadata']['artifact_type']
if aType != 'glossary_term':
continue

tName = row['metadata']['name']
termGlobalId = row['entity']['artifacts']['global_id']
tName = tName.lower()
termName = termName.lower()

if tName == termName:
termFound = True

if termFound:
print("term: ", termName)
return termGlobalId
return -1

# Given asset details, find the asset in the WKC catalog
def findAssetID(assetName,cpd_url,token,catalogID):
if globalDebug:
print("finding Asset ID asset Name: ", assetName)

url = cpd_url + f'/v3/search?auth_scope=catalog'

# NOTE: you must construct mltoken based on provided documentation
header = {'Content-Type': 'application/json', 'Authorization': 'Bearer ' + token}

payload = {
"_source": ["entity.assets.table_name", "metadata.name", "metadata.artifact_type", "artifact_id", "entity.assets.resource_key",
"entity.assets.catalog_id"],
"query": {
"match": {
"metadata.name": assetName
}
}
}

try:
response = requests.post(url, json=payload, headers=header,verify=False)
response.raise_for_status()
except requests.exceptions.HTTPError as err:
print("ERROR: ", err)
return -1
except requests.exceptions.RequestException as e: # This is the correct syntax
print("ERROR: ", e)
return -1
results = json.loads(response.text)
if results and results['size'] < 1:
print("No assets found in catalog: ", catalogID, " with specific name: ", assetName)
return -1

# Confirm the data asset name is an exact match
# Search can return multiple results
assetFound = False
colFound = False
#print("number of results: ", len(results['rows']))
for row in results['rows']:
#print("row: ", row)

# ignore assets that are NOT data_asset
aType = row['metadata']['artifact_type']
if aType != 'data_asset':
continue

# In some cases, search return assets from project even though scope is set to catalog
if 'catalog_id' not in row['entity']['assets'].keys():
continue

aName = row['metadata']['name']
aId = row['artifact_id']
aName = aName.lower()
assetName = assetName.lower()

if aName == assetName:
assetFound = True

# Check Catalog
catID = row['entity']['assets']['catalog_id']
if catID != catalogID:
if globalDebug:
print("ERROR Asset: ", assetName, " with ID: ", aId, " found but belongs to catalog: ", catID)
continue
correctCatalog = True

if assetFound:
print("asset: ", assetName, " in catalog: ", catalog_id)
return row['artifact_id']

return
def checkCol(colName,catalogID):

colFound = False
# Add check for column Name
colNames = {row['name'] for row in assetMetadata['entity']['data_asset']['columns']}
if colName not in colNames:
colFound = False
if globalDebug:
print("ERROR column: ", colName, " not found in: ", colNames, " for asset: ", assetMetadata['metadata']['name'], " with id: ", assetID)
colFound = True

return colFound

# Given an asset ID, return all asset details from WKC
def getAssetDetails(asset_id,token,catalog_id):
url = cpd_url + f'/v2/assets/{asset_id}?catalog_id={catalog_id}'

# NOTE: you must construct mltoken based on provided documentation
header = {'Content-Type': 'application/json', 'Authorization': 'Bearer ' + token}

try:
response = requests.get(url,headers=header,verify=False)
response.raise_for_status()
except requests.exceptions.HTTPError as err:
print("ERROR: ", err)
return -1
except requests.exceptions.RequestException as e: # This is the correct syntax
print("ERROR: ", e)
return -1
results = json.loads(response.text)

return results

# Update Asset column list of terms.
def updateColumnTerm(term_global_id,term_name, cpd_url, token, asset_id, catalog_id, col_name):
if asset_id == -1:
return -1

print("Updating asset column term for asset: ", asset_id)
url = cpd_url + f'/v2/assets/{asset_id}/attributes/column_info?catalog_id={catalog_id}'

# NOTE: you must construct mltoken based on provided documentation
header = {'accept': 'application/json', 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + token}

tdn = f'"term_display_name":"{term_name}"'
tid = f'"term_id":"{term_global_id}"'
payload = f'[{{"op": "add", "path": "/{col_name}", "value":{{"column_terms":[{{{tdn},{tid}}}]}}}}]'

try:
response = requests.patch(url, data=payload, headers=header,verify=False)
response.raise_for_status()
except requests.exceptions.HTTPError as err:
print("response: ", response.text)
print("ERROR: ", err)
return -1
except requests.exceptions.RequestException as e: # This is the correct syntax
print("ERROR: ", e)
return -1
results = json.loads(response.text)

return results

# Get a token for running APIs against CP4D
mltoken = getCPDtoken(cpd_url,cpd_username,cpd_password)

# Get Catalog ID
catalog_id = getCatalogID(cpd_url,mltoken,catalog_name)
print("Catalog: ", catalog_name, " has id: ", catalog_id)

# Input csv filename that include lineage information defined
excel_data_df = pd.read_excel('col_term_map.xlsx', header=None)

for index, row in excel_data_df.iterrows():
asset_name = row[0]
col_name = row[1]
term_name = row[2]

print("Searching for entry defined in row: ", index, " and table name: ", asset_name, " ,column name: ", col_name, " ,term name: ", term_name)

termGlobalID = findTermID(term_name, cpd_url, mltoken)
print("Term Global ID: ", termGlobalID)
if termGlobalID == -1:
continue

src_assetID = findAssetID(asset_name, cpd_url, mltoken, catalog_id)
if src_assetID == -1:
continue

assetMetadata = getAssetDetails(src_assetID,mltoken,catalog_id)
if assetMetadata == -1:
continue

colCheck = checkCol(col_name,catalog_id)
print("Column ", col_name, " found")

updateColumnTerm(termGlobalID, term_name, cpd_url, mltoken, src_assetID, catalog_id, col_name)

Required headers of the Excel file is:

  • Asset Name
  • Column Name
  • Term Name

Association would be presented in Data Asset tab

Summary

In this article we covered how easy Excel data can be processed and used for associating data asset columns with business terms.

--

--