A Technical Implementation of Data Mesh– SLOs & SLAs

Paul Cavacas
9 min readAug 20, 2023

--

In this installment of the Technical Implementation of Data Mesh series we are going to address SLOs & SLAs. To see the other articles in this series, visit the Table of Contents.

SLAs are an important part of a Data Quantum’s properties to help establish and verify Trustworthiness of that Data Quantum. It provides a set of metrics that are tracked over time to ensure that not only is the information being provided correct but provides a mechanism for the consumers to know that it has been verified over time.

Primer

A brief description about what is an SLO and SLA and what is the difference between them. An SLO or Service Level Objective is a specific goal that is trying to be met, for example a 5-minute Freshness SLO is stating that the data provided by the Quantum will never be older than 5 minutes, so as changes happen in the source system within 5 minutes those changes will be propagated into the Data Quantum.

An SLA or Service Level Agreement is a measurement of the SLO over time. For example, the SLA might state that over a 30-day period the freshness will remain within 5 minutes 99.9% of the time.

Design

Inside the Output Port of the Data Quantum, we need to be able to define as many SLOs as needed. Inside each SLO there should be a corresponding SLA. The SLO is defined inside of the Output Port because different Output Ports in the same Data Quantum will have different SLOs. For example, in a Data Warehouse you might have a freshness SLO of 5 minutes and in storage you might have a lesser SLO which is only updated hourly.

Then we need the Mesh to be able to check the SLOs on a regular basis and store the results of these checks. Then we need to have a way to calculate the SLA using the results from the various SLO checks that were performed. Both will be defined as API endpoints inside the Data Product Plane of the Mesh.

Data Quantum Definition

The first thing we need to do is to determine where in the definition file we are going to define the SLO/SLA information. As mentioned in the design section we want to put the SLO/SLA information inside of each Output Port in the Data Quantum definition file.

Below is a sample of the definition file with the SLO and SLA information defined.

{
"dataProductDescriptor": "1.0.0",
"info": {
},
"interfaceComponents": {
"inputPorts": [],
"outputPorts": [
{
"promises": [
{
"platform": "snowflake_platform",
"servicesType": "sqlTable",
"api": {
"services": [],
"schema": {
"databaseName": "Customer",
"databaseSchemaName": "PUBLIC",
"tables": [
{
"name": "CUSTOMER",
"displayName": "Customer",
"tableType": "Table",
"columns": []
}
]
}
},
"slo": [
{
"type": "duplicationRate",
"max": 0,
"unit": "count",
"column": "SHIP_TO_NUMBER",
"tableName": "CUSTOMER",
"sla": {
"target": 99.9,
"overXDays": "30"
}
}
]
}
]
}
],
"discoveryPorts": [],
"observabilityPorts": [],
"controlPorts": []
},
"internalComponents": {
"applicationComponents": [],
"infrastructuralComponents": []
},
"tags": []
}

You can see in the above definition that there is a table called customer and it has a field called SHIP_TO_ID and that we are defining an SLO to check for the duplicationRate of this field and that we expect this duplication rate to be 0, meaning no duplicates are expected. Additionally, an SLA is defined under the SLO that says we agree that there will be 0 duplicates 99.9% of the time over any given 30-day period.

As you can see it is straightforward to read this SLO/SLA information. It also means that in the Mesh API endpoints we need to be able to handle whatever different types are defined, so as you add additional types, additional implementation logic will be added into the API endpoints.

Implementation

SLO API endpoint

Now that we know what we are trying to do and what the definition looks like we will expand the API to be able to check the SLO for a given data product. With this API endpoint we will be able to have something regularly called the endpoint and it will record the current state of the SLO compliance.

Router Endpoint

First, we need an endpoint in the data_product_routes file as the starting point, which can look something like what is below. This simply creates an endpoint under each data product and calls into the data product plane to do the work.

@router.post('/{data_product_name}/slo/check')
def check_slos(data_product_name: str, api_key: APIKey = Depends(auth.get_api_key)) -> list[dict]:
try:
logger.info(f'{Fore.GREEN}Mesh API{Fore.RESET}: Check SLO Compliance for {data_product_name}')
return data_plane.check_slos(data_product_name)
except Exception as e:
logger.error(f'{Fore.GREEN}Mesh API{Fore.RESET}: check_slos failed with {Fore.RED}{e}{Fore.RESET}')
raise HTTPException(status_code=500, detail="error occurred")

Data Product Plane

In the data product plane, we will again delegate the work to calculate the SLO to the slo_platform class. We are creating separate platform classes for different pieces of functionality, so that we remain vendor agnostic. Inside of the SLO we will be building the checks against a Snowflake Datawarehouse, but this class can easily be extended to include other output port types.

    def check_slos(self, data_product_name: str) -> list[dict]:
# Get the data product from its name
data_product = self.get_data_product_by_name(data_product_name)
self.logger.info(f"{Fore.GREEN}Mesh API{Fore.RESET}: Check SLO - Retrieved Data Product {data_product['info']['name']}")
# Use SLO platform to check the data product
return self.slo_platform.check_slo(data_product)

Checking the SLO in the SLO_Platform

Now we get into the meat of what is doing the work, the check_slo function in the SLO_Platform class. Here the first section of the function is reading through the JSON file to find the definition that we saw above.

We then define a switch statement for each SLO Type that we want to implement. This switch statement points to different functions that will be called. Then we implement a loop over all the SLOs defined in the Data Quantum and call the appropriate function through the switch statement. After the SLO is calculated the results are then uploaded into storage, so that they can be used later to calculate the SLA compliance.

    def check_slo(self, data_product: dict) -> list[dict]:
# find the Snowflake output port
domain_name, data_product_name, promise, platform = Mesh_Utilities.get_output_port_info(data_product, 'snowflake')

results = []

if promise == None or "api" not in promise or "schema" not in promise["api"] or "databaseName" not in promise["api"]["schema"]:
return results

database = promise["api"]["schema"]["databaseName"]
schema = promise["api"]["schema"]["databaseSchemaName"]
# find all of the tables defined in the output port
tables = [d for d in promise["api"]["schema"]["tables"]]

# define each SLO that is available and the method that is used to check it
switch={
'loadDate': self.check_load_date,
'factDate': self.check_fact_date,
'completenessPercent': self.not_implemented_check,
'uptimePercent': self.not_implemented_check,
'duplicationRate': self.check_duplication_rate
}

# find the defined SLOs and check each one for compliance
for slo in promise["slo"]:
self.logger.info(f"{Fore.GREEN}Mesh API{Fore.RESET}: Checking SLOs - Checking {slo['type']}")

container_name = ""
if "info" in data_product and "domain" in data_product["info"]:
container_name = data_product["info"]["domain"]
container_name = container_name.lower()
if container_name != "":
if switch.get(slo["type"]) == None:
result = self.not_implemented_check(slo["type"], 0, '', '', '', '', None)
else:
result = switch.get(slo["type"])(slo, slo["type"], slo["max"], slo["unit"], platform, database, schema, tables)

filename = f"slo_{datetime.datetime.utcnow().strftime('%H%M%S')}.json"
self.logger.info(f'{Fore.GREEN}Mesh API{Fore.RESET}: Checking SLOs - Uploading results to {container_name}\\{slo["type"]}\\{filename}')
azure_platform.upload_json_file_to_container(results, container_name, f"slos\\{slo['type']}", filename)

results.append(result)

return results

Check Duplicate Rate SLO Check

Below is an example of one of the implementations of the SLO check, which is the functions that are used in the switch statement from the previous section. The implementations will vary greatly based on what the SLO is doing, and which type of output port is used, in this example we are checking for duplicate values for a column using a SQL Statement and then returning JSON results containing the information about whether it met the SLO.

    def check_duplication_rate(self, slo: dict, type: str, max: int, unit: str, 
platform: str, database: str, schema: str, tables: list[dict]) -> dict:
if "column" not in slo:
return {
"Type": type,
"Status": "Failed",
"Message": f"No columns found, unable to check SLO"
}

column_name = slo["column"]
table_name = slo["tableName"]

query = f'Select Count(*) as Count_Of_Duplicates From (Select {column_name}, Count(*) From {database}.{schema}.{table_name} Group By {column_name} Having Count(*) > 1) data'
result = self.query_platform.run_query(platform, database, schema, query)

# transform the time into the appropriate measure
count = result["COUNT_OF_DUPLICATES"].iloc[0]

return {
"Type": type,
"Status": "Success" if (count <= max) else "Failed",
"Message": f"{max} allowed, actual count {count}"
}

SLAs

Now that we have a method to check the SLO compliance and store those results, the next step is to calculate the SLA compliance, remember SLA is a measurement of the SLOs over time against some target. We will create a separate endpoint to calculate the SLAs, so that the SLO checks are a separate activity from SLA check.

Router

Similarly, the SLA calculation code in the router simply passes through to the Data Product Plane.

@router.post('/{data_product_name}/sla/calculate')
def calculate_slas(data_product_name: str, api_key: APIKey = Depends(auth.get_api_key)) -> list[dict]:
try:
logger.info(f'{Fore.GREEN}Mesh API{Fore.RESET}: Calculate SLA Compliance for {data_product_name}')
return data_plane.calculate_slas(data_product_name)
except Exception as e:
logger.error(f'{Fore.GREEN}Mesh API{Fore.RESET}: calculate_slas failed with {Fore.RED}{e}{Fore.RESET}')
raise HTTPException(status_code=500, detail="error occurred")

Data Product Plane

Again, in the Data Product Plane we will call out to the SLO Platform, so that the specific work to check various types of SLOs against different output port types can be encapsulated.

    def calculate_slas(self, data_product_name: str) -> list[dict]:
# Get the data product from its name
data_product = self.get_data_product_by_name(data_product_name)
self.logger.info(f"{Fore.GREEN}Mesh API{Fore.RESET}: Calculate SLA - Retrieved Data Product {data_product['info']['name']}")
# Use SLO platform to check the data product
return self.slo_platform.calculate_slas(data_product)

Calculate SLAs in SLO Platform

Here at the top of the code we are parsing the JSON to get to the appropriate SLA section in the definition file. Then we loop over the various SLOs and calculate the time frame that we want to load the saved SLO checks for and load all the JSON files that were saved during the SLO check process. Then we call a separate method to perform the actual calculation, which will be explained below. Finally, we package up the results into a JSON object and upload those results into storage, so we can map the SLA compliance over time.

    def calculate_slas(self, data_product: dict):
domain_name, data_product_name, promise, platform = Mesh_Utilities.get_output_port_info(data_product, 'snowflake')

results = []

if promise == None or "api" not in promise or "schema" not in promise["api"] or "databaseName" not in promise["api"]["schema"]:
return results

for slo in promise["slo"]:
self.logger.info(f"{Fore.GREEN}Mesh API{Fore.RESET}: Calculating SLOs - Checking {slo['type']}")

container_name = ""
if "info" in data_product and "domain" in data_product["info"]:
container_name = data_product["info"]["domain"]
container_name = container_name.lower()
if container_name != "":
if "sla" in slo and "overXDays" in slo["sla"] and "target" in slo["sla"]:
days = slo["sla"]["overXDays"]
min_date = datetime.datetime.now() - datetime.timedelta(days=days)
max_date = datetime.datetime.now()
slo_recordings = self.infrastructure.get_files_in_folder_between_dates(container_name, f"slos\\{slo['type']}", min_date, max_date)
sla_calculation = self.calculate_sla(days, slo_recordings)
results.append({
"SLO": slo['type'],
"SLA_Value": sla_calculation,
"SLA_Target": slo['sla']['target'],
"Message": f"{slo['sla']['target']} over {slo['sla']['overXDays']} days"
})

filename = f"slo_{datetime.datetime.utcnow().strftime('%H%M%S')}.json"
self.logger.info(f'{Fore.GREEN}Mesh API{Fore.RESET}: Calculating SLA - Uploading results to {container_name}\\{slo["type"]}\\{filename}')
azure_platform.upload_json_file_to_container(results, container_name, f"slos\\{slo['type']}", filename)

return results

Calculate SLA calculation

This function takes all the saved JSON information from the SLO checks and ensures that they all fall within the timeframe and sorts the list. Then it loops through all the results and checks each one to see if it was failed or successful. If the current one that is being looped over fails it adds the time between that check and the next one, keeping a running total of the amount of time the SLO check failed..

After looping through all the results, we need to check the status of the last SLO check to see if it failed because if it is, then we need to add the additional time between that last check and now to the running total.

Once we have the total failed time, we calculate the percentage of time that was not failed and return that as a value like 99.9, representing 99.9%.

    def calculate_sla(self, days: int, slo_values: list, min_date: datetime):
if slo_values == None or len(slo_values) == 0:
return 0

slo_values = sorted([d for d in slo_values if "Status" in d and "Date" in d and d["Date"] >= min_date], key=lambda x: (x["Date"]))

total_failed_time = datetime.timedelta()

for i in range(len(sorted_list) - 1):
current_item = sorted_list[i]
next_item = sorted_list[i + 1]

if current_item["Status"] == "Failed":
current_date = current_item["Date"]
next_date = next_item["Date"]

time_difference = next_date - current_date
total_failed_time += time_difference

if sorted_list[len(sorted_list) - 1]["Status"] == "Failed":
current_date = sorted_list[len(sorted_list) - 1]["Date"]
next_date = datetime.datetime.now()
time_difference = next_date - current_date
total_failed_time += time_difference

return 100 - ((total_failed_time.total_seconds() / datetime.timedelta(days=days).total_seconds()) * 100)

At this point we have the code necessary to handle many of the base cases around creating and checking both SLOs and calculating the resulting SLAs that needed.

Improvements

As with all the code and logic in this series it is not production grade and just an initial pass on functionality. You can and will need to extend both the different types of SLOs that will be provided as well as the different output ports that will be supported. Additionally, the SLAs can only be defined over a measurement of several days, so being able to provide additional time frames that can be checked could be useful.

Final Thoughts

This article covered an important topic for building trustworthiness in your Data Mesh and Data Products specifically. Having all the SLOs & SLAs specifically defined as part of the Data Contract allows the consumers of the Data Product to have a clear understanding about what is being provided.

Please feel free to comment and add your ideas and thoughts.

--

--

Paul Cavacas

Technologist who has been involved in many aspects of development, from Full Stack Developer to Data & Analytics