Mesh Implementation — API — Data Contracts — Part 1

Paul Cavacas
7 min readMay 8, 2023

--

In this next installment of the Mesh Implementation series, we are going to discuss data contracts. In the first post we discussed Data Product Definition. In the next post we discussed the overall API layout and in the previous post we went into detail around the base Data Product services. This post will go into the beginning of how data contracts will be enforced. We will cover schematic checking of the Data Product.

Data Contracts are a large complex topic and what we will be building is the basic services around schema checking. When we add testing in a later post, we will start to cover the semantic meaning of the Data Contract. This post will cover manually checking the schema of a Snowflake table with the Data Product definition. As with all the code it can and will be expanded to check out types, such as SQL Server or Power BI or others as cases arise. Additionally, there are a lot of 3rd parties that are building much more complex Data Contract abilities and these services can be integrated into the API as needed.

Design

We want to be able to take a Data Product and ensure that the contract specified in the Data Product has not changed. Since these unintentional changes are one of the primary causes of downstream failures. In a Data Product definition, we define both Input Ports and Output Ports. The Mesh should check both Input and Output ports where possible, to both proactively and reactively respond to changes.

Input Ports

Input Ports are used to set up a contract around the source data that feeds into the Data Product. These ports define where the data comes from and the shape that it should be. It has a basic layout when coming from Snowflake that looks something like:

    "interfaceComponents": {
"inputPorts": [
{
"entityType": "inputPort",
"name": "snowflake",
"version": "1.0.0",
"promises": [
{
"platform": "snowflakeaccount",
"servicesType": "sqlTable",
"api": {
"schema": {
"databaseName": "SALES_DB",
"databaseSchemaName": "PUBLIC",
"tables": [
{
"name": "INVOICES",
"displayName": "Invoices",
"columns": [
{
"name": "ORDER_NUMBER",
"dataType": "NUMBER"
}
{
"name": "PRODUCT_ID",
"dataType": "NUMBER"
}
{
"name": "QUANTITY ",
"dataType": "NUMBER"
}
{
"name": "AMOUNT",
"dataType": "NUMBER"
}
]
}
]
}
}
}
]
}
]
}

You can see that in the Input Port we are defining where the data comes from, and which fields are available in the table. Since the Input Port is not critical for the definition of the Data Product it contains minimal information about the data. This allows checking that nothing has changed from the upstream provider but doesn’t present a tremendous burden about maintaining information that is not needed.

Output Ports

These ports define what data is provided by the Data Product, specifying where the data comes from. It has a basic layout when coming from Snowflake that looks something like:

        "outputPorts": [
{
"entityType": "outputPort",
"name": "snowflake",
"description": "",
"version": "1.0.0",
"promises": [
{
"platform": "SnowflakeAccountIdentifier",
"servicesType": "sqlTable",
"api": {
"services": [],
"schema": {
"databaseName": "SALES_DATABASE",
"databaseSchemaName": "PUBLIC",
"tables": [
{
"name": "SALESORDERS",
"displayName": "Sales Orders",
"tableType": "VIEW",
"x-tests": [
{
"test-category": "ML",
"test-type": "RowCountByDay",
"test-severity": "Warning",
"date-field": "Create_Date",
"date-offset": -1
}
]
}
]
}
},
"deprecationPolicy": {
"description": "Currently no deprecation policy exists changes happen automatically and replace the existing table"
},
"slo": [
{
"type": "loadDate",
"max": 47,
"unit": "hours"
},
{
"type": "factDate",
"max": 47,
"unit": "hours"
},
{
"type": "completenessPercent",
"max": 100,
"unit": "percent"
},
{
"type": "uptimePercent",
"max": 100,
"unit": "percent"
}
]
}
]
}

The output port defines detailed information about what is provided by the Data Product and where that data comes from.

The Mesh

The Mesh will use the information about the tables and columns from the Input and Output ports along with the connection information to retrieve the actual implementation of the corresponding tables to ensure that at a schema level the Data Product is providing what is expected.

It can use the Input Port information to preemptively check that the upstream data has not been changed. If it finds a change upstream it will stop the processing of the Data Product. This will leave the data in the Data Product still in a valid state, although it will become stale and not update with newer data, at least the data that is there will be valid. We will use the SLO sections to ensure that data that becomes stale is also notified, this will be described in a future article.

The Mesh will use the Output Port to ensure that the data being provided by the underlying Data Store doesn’t change inadvertently. This will prevent further downstream users of this Data Product from experiencing unintended changes.

API Setup

Two routes are being added to the API to support contract checking, one for the Input Ports and one for the Output Ports. Since the code is very similar between them, they will quickly be merged into 1 code path.

@router.post('/{data_product_name}/output-ports/{output_port}/contract_check')
def contract_check_product_table(data_product_name: str, output_port: str) -> list[dict]:
try:
logger.info(f'{Fore.GREEN}Mesh API{Fore.RESET}: Contract Check {data_product_name}:{output_port}')
return data_plane.data_contract_check(data_product_name, output_port, 'output')
except Exception as e:
logger.error(f'{Fore.GREEN}Mesh API{Fore.RESET}: data_products_load failed with {Fore.RED}{e}{Fore.RESET}')
raise HTTPException(status_code=500, detail="error occurred")

@router.post('/{data_product_name}/input-ports/{input_port}/contract_check')
def contract_check_product_table(data_product_name: str, input_port: str) -> list[dict]:
try:
logger.info(f'{Fore.GREEN}Mesh API{Fore.RESET}: Contract Check {data_product_name}:{input_port}')
return data_plane.data_contract_check(data_product_name, input_port, 'input')
except Exception as e:
logger.error(f'{Fore.GREEN}Mesh API{Fore.RESET}: data_products_load failed with {Fore.RED}{e}{Fore.RESET}')
raise HTTPException(status_code=500, detail="error occurred")

Both API implementations are almost identical, with the main difference in the API definition being the input-ports or output-ports in the URL itself and then when calling into the main code we pass in a parameter denoting whether we are checking input or output.

Implementation

Both API routes call into the same method data_plane.data_contract_check. This method is the one that will perform all the checks to ensure compliance with the Data Contract. What it checks is that the number of columns defined in the definition match the underlying table. Additionally, it checks that each column defined in the definition matches the corresponding column in the table both by name and data type.

data_plane.data_contract_check

The Data Contract Check method uses the schema defined by the provider, in this case Snowflake’s Information_Schema.Columns table to load all the columns that exist to check each one of them against the definition.

    def data_contract_check(self, data_product: dict, output_port: str, port_type: str):
test_results = []
# retrieve the outport port information
if port_type == 'output':
domain_name, data_product_name, promise, platform, database, schema = Mesh_Utilities.get_output_port_info(data_product, output_port)
elif port_type == 'input':
domain_name, data_product_name, promise, platform, database, schema = Mesh_Utilities.get_input_port_info(data_product, output_port)
# loop over all of the tables defined in the data product/output port
for table in promise["api"]["schema"]["tables"]:
# construct a query to retrieve the columns for the actual implementation
table_name = table["name"]
discovery_table = [d for d in promise["api"]["schema"]["tables"] if (d["name"] == table_name)][0]
sql = f'Select COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH, NUMERIC_PRECISION, NUMERIC_SCALE '\
f'From {database}.INFORMATION_SCHEMA.COLUMNS '\
f'Where TABLE_NAME = \'{table_name}\' '\
f'And TABLE_SCHEMA = \'{schema}\' '\
f'Order By ORDINAL_POSITION'
results = self.query_platform.run_query(platform, database, schema, sql)
self.logger.info(f'{Fore.GREEN}Mesh API{Fore.RESET}: {port_type}')
# ensure that the actual implementation number of columns matches the defined implementation
if port_type == 'output':
self.logger.info(f'{Fore.GREEN}Mesh API{Fore.RESET}: Checking Table {table_name} Column Counts - Found {len(results)} Expected {len(discovery_table["columns"])}')
test_results.append({
"data_product": data_product_name,
"database": database,
"table_name": table_name,
"test_type": 'ColumnCount',
"result": 'Passed' if len(results) == len(discovery_table['columns']) else 'Failed',
"severity": "Critical"
})

#loop over the columns defined in the data product
for column in discovery_table['columns']:
# find the corresponding column in the actual implementation
# and ensure that it exists
self.logger.info(column)
column_name = column['name']
data_type = column['dataType']
matched_column = results[results["COLUMN_NAME"].str.lower() == column_name.lower()]
# matched_column = [d for d in results if (str(d[0]).lower() == column_name.lower())]
self.logger.info(f'{Fore.GREEN}Mesh API{Fore.RESET}: Checking Table {table_name} Column {column_name} - Found {matched_column.shape[0]} Expected 1 ') #- Data Type Found {data_type} Expected {results["DATA_TYPE"]}
test_results.append({
"data_product": data_product_name,
"database": database,
"table_name": table_name,
"column_name": column_name,
"test_type": 'ColumnExists',
"result": 'Passed' if (matched_column.shape[0] == 1) else 'Failed',
"severity": "Critical"
})
# Also ensure that it matches between the actual implementation
# and the defined implementation
if matched_column.shape[0] == 1:
datatype = matched_column["DATA_TYPE"].iloc[0]
self.logger.info(f"datatype1 {data_type}, datatype2 {datatype}")
test_results.append({
"data_product": data_product_name,
"database": database,
"table_name": table_name,
"column_name": column_name,
"test_type": 'DataType',
"result": 'Passed' if (self.datatypes_match(datatype1=data_type, datatype2=datatype)) else 'Failed',
"severity": "Critical",
"message": '' if (data_type == datatype) else f'Expected: {data_type} Actual: {datatype}'
})

return test_results

The first section of this method reads through the definition file to get the part in which the appropriate port is located. Once it gets the appropriate port information it then loops through the tables defined by the port, because a Data Product may have 1 table as is the case here or it could be a full dataset with multiple tables.

As it loops over the tables defined it queries the information schema tables to get all the columns defined for the implementation table. Once this information is available it first checks that the count of columns in the database table matches what is defined in the definition.

As a final check it loops over all the columns and checks that each column exists in the underlying table and that the data types of each column match. When comparing the data types of the columns the code alias common types, so that they appear equal, i.e., string, varchar, text, etc. are all considered equivalent.

Improvements

The design currently only supports performing these checks against Snowflake tables, but we should enhance this to support other Data Stores and allow for more robustness in the checking.

As mentioned potentially using a 3rd party provider that has richer contract checking could be beneficial.

--

--

Paul Cavacas

Technologist who has been involved in many aspects of development, from Full Stack Developer to Data & Analytics