Mesh Implementation — API — Data Product Services

Paul Cavacas
9 min readApr 30, 2023

--

This is Part 3 of Technical Data Mesh Implementation. In the first part we covered the JSON Schema for a Data Product. In the second part we covered the overall API design. In this part we will start building out the underlying API Services.

To start the API, we will delve into developing the various services around Data Product Maintenance, since this is the backbone of almost everything that Mesh will work against, we need to be able to register Data Products into the Mesh and return information about them.

Before we get into the details around the Data Product Services portion of the API, we will spend a few minutes discussing the overall API repo setup.

API Setup

We are using FastAPI in Python to build the API services, but the API layer can be implemented with whatever technology makes the most sense. The first thing that is needed to do is to set up the various routers that will be responsible for handling the API endpoints. We will have 3 routers, which may be extended as we progress, plus a couple of base routes set up. The 3 routers that are being created are Experience Router, Data Product Router, and Admin Router, plus the base routes.

Base Routes

The base routes are just a couple of simple routes that will be set up to allow for health checks to be performed against the API. We will create 2 base routes: root and version.

Root

app = FastAPI()
app.include_router(experience_routes.router)
app.include_router(data_product_routes.router)
app.include_router(admin_routes.router)

@app.get("/")
def root():
try:
return {
"status":"success"
}
except:
logger.error(f'{Fore.GREEN}Mesh API{Fore.RESET}: Critical Error In root')
return {
"status": "failed"
}

You can see that we are setting up the FastAPI and including the 3 routers that we mentioned and what will be described below. Then we are creating a base route (/). This route can be used by various monitoring software to ensure that the API is up.

Version

@app.get("/version")
def version():
try:
with open('VERSION', 'r') as f:
version = f.read().replace("\n","")
logger.info(f'{Fore.GREEN}Mesh API{Fore.RESET}: Version {version}')
return {
"version":version
}
except:
logger.error(f'{Fore.GREEN}Mesh API{Fore.RESET}: Critical Error In Version')
return {
"version":"unknown"
}

Like the root route we set up a version route that will read a version file, which is maintained by the CI/CD pipeline to return the current version of the API.

Experience Router

The Experience Router corresponds to the items that we want to put in the Experience Plane of the Data Mesh, things like searching and other high-level functionalities.

Search

router = APIRouter(
prefix=""
)

@router.get("/search")
def search(q: str) -> list[dict]:
try:
logger.info(f'{Fore.GREEN}Mesh API{Fore.RESET}: Searching Data Products for {q}')
return experience_plane.search_data_products(q)
except Exception as e:
logger.error(f'{Fore.GREEN}Mesh API{Fore.RESET}: search for {q} failed with {Fore.RED}{e}{Fore.RESET}')
raise HTTPException(status_code=500, detail="error occurred")

This route will accept a search parameter and return all Data Products that match the specified search criteria.

Data Product Router

The responsibility of the Data Product Router is to provide all the services that are needed for dealing with Data Products. In the below code snippet, you can see there are routes for returning all data Products, Registering (Updating) Data Products, and returning full information for a single data product.

router = APIRouter(
prefix="/data-products"
)

@router.get('/')
def data_products_load() -> list[dict]:
try:
logger.info(f'{Fore.GREEN}Mesh API{Fore.RESET}: Listing Data Products')
return data_plane.get_all_data_products(True)
except Exception as e:
logger.error(f'{Fore.GREEN}Mesh API{Fore.RESET}: data_products_load failed with {Fore.RED}{e}{Fore.RESET}')
raise HTTPException(status_code=500, detail="error occurred")

@router.post('/')
async def update_data_product(info: Request) -> str:
try:
logger.info(f'{Fore.GREEN}Mesh API{Fore.RESET}: Updating Data Product')
data = await info.json()
return data_plane.upload_data_product(data)
except Exception as e:
logger.error(f'{Fore.GREEN}Mesh API{Fore.RESET}: data_products_load failed with {Fore.RED}{e}{Fore.RESET}')
raise HTTPException(status_code=500, detail="error occurred")

@router.get('/{data_product_name}')
def data_product_load(data_product_name: str) -> dict:
try:
logger.info(f'{Fore.GREEN}Mesh API{Fore.RESET}: Loading Single Data Product {data_product_name}')
result = data_plane.get_data_product_by_name(data_product_name.lower())
if (result == None):
raise HTTPException(status_code=404, detail="Data Product Does Not Exist")
return result
except Exception as e:
logger.error(f'{Fore.GREEN}Mesh API{Fore.RESET}: data_products_load failed with {Fore.RED}{e}{Fore.RESET}')
raise HTTPException(status_code=500, detail="error occurred")

For those not familiar with FastAPI the first statement causes all the routes defined below it to have that as the prefix, i.e., /data-products/{data_product_name}. Additionally, anything defined in the route with {} around it means that it is variable and will take whatever is supplied there and put that into the variable of the same name in the method signature, i.e., /data-products/sales will result in the data_product_name variable having the value of sales.

Admin Router

The admin router will supply different routes that are used to provide higher level privileged actions against the Mesh. Its layout is like the Data Product Router, with a prefix of admin and we will cover its function in more detail in a later post in this series.

Design

Storage

Now that we have covered the basics of routers and how the repo is being set up let’s discuss the overall design of Data Products and how Data Products will be registered into the Mesh. The Mesh will use cloud storage as its data store, in this case Azure. Using storage keeps the solution agnostic of any vendor or solution.

We will create 1 Storage Account in Azure for the entire Mesh and separate containers for each domain. Under each of these containers we will create folders for each Data Product and JSON files underneath the folder for the Data Products themselves. Having the folders and the ability to have multiple JSON files allows us to easily version and evolve Data Products over time.

Having containers for each domain allows the solution to remain organized and easily findable. One thing to keep in mind is that this Storage Account is only for the Mesh and not the data itself. The teams that own the Data Products can choose to have the data in whatever format and location makes the most sense for the team, whether that is raw storage or a database or a Kafka topic, doesn’t matter.

Additionally, we will be creating a single top-level folder called data_products in each container. This allows us to have other folders for different information stored close to each data product, keeping it highly findable, both by machine and human.

API

We are going to create a couple of APIs to handle the basic Data Product needs, these APIs will include things like returning all the Data Products, returning a single Data Product, registering a new or modified Data Product and searching for a Data Product.

At this point we are only building the APIs that will power the Mesh, but I could see building a front-end on top of these APIs to create something that the people using the Mesh could take advantage of. It could include things like a Data Catalog and the ability to work directly with the data and the Data Products.

Implementation

In the above router section, we saw all of the top-level API endpoints, here we will dig into how each of these paths work. Each of the routes had a variable that was data_plane, this variable is referencing one of the top-level planes that we discussed previously, it is referencing the Mesh_Data_Product_Plane.

Mesh_Data_Product_Plane.upload_data_product

This method is responsible for taking the JSON supplied to the API and uploading it into the correct folder in storage. The implementation is below.

    def upload_data_product(self, data: dict) -> str:
# Get the domain and name of the data product
domain = data["info"]["domain"]
data_product_name = data["info"]["name"]
version = data["info"]["version"].replace('.', '_')

# Logging
self.logger.info(f"{Fore.GREEN}Mesh API{Fore.RESET}: \t\tuploading {domain}.{data_product_name}.{version}" )

# Call infrastructure for update
return self.infrastructure.update_data_product(domain=domain, data_product_name=data_product_name, version=version, data=data)

The first couple of lines extract information out of the JSON to get the correct domain, which will correspond to the container. It then extracts the name and version of the data product, which will correspond to the folder and filename in storage. Then you can see that the bulk of the work is being done in the infrastructure object, which is the Mesh_Infrastructure_Plane.

Mesh_Infrastructure_Plane.update_data_product

    def update_data_product(self, domain: str, data_product_name: str, version: str, data: dict):
return self.upload_content(domain, 'data_products\\' + data_product_name, data_product_name, version, data)

This is a very simple method that is just a passthrough to the upload_content method.

Mesh_Infrastructure_Plane.upload_content

    def upload_content(self, container_name: str, path: str, content_name: str, version: str, data: dict) -> str:
# create connection to the container in Azure
container_name = container_name.lower()
blob_service_client = Mesh_Infrastructure_Plane.create_blob_service_client(self.key_vault_name, self.key_vault_secret)
container = blob_service_client.get_container_client(container_name)
# if the container doesn't exist create it
if (not container.exists()):
container_created = azure_platform.create_container(blob_service_client, container_name)
else:
container_created = True

if container_created != True:
return f"{container_name}.{content_name} failed"

# upload the content into Azure
blob_storage_cli = AzureBlobStorageProcessing(container_name=container_name,
kv_name=self.key_vault_name,
kv_secret=self.key_vault_secret)

blob_storage_cli.upload_blob(files=[{'file_name': f"{content_name}.v{version}.json",
'content': json.dumps(data)}],
path=f'{path}',
overwrite_file=True, create_full_path_on_storage=False)
return f"{container_name}.{content_name} successfully uploaded"

This method, while it has a little bit of length to it, is all just standard code to interact with Azure Storage. It will create the correct container (which remember is the domain) if it needs to and then creates the blob and uploads the JSON. In the end we have a file in the format of sales/data_product/sales_orders/sales_orders_v1_0_0.json. Where sales is the domain, sales_orders is data product name and v1_0_0 is the version.

Mesh_Data_Product_Plane.get_all_data_products

This method is used to load and return all the data products that have been registered with the Mesh. It can be run in 2 ways either returning just summary information or returning full information about the data product. I envision this to be used by the front-end, which would pull only the summary information for display to the end users.

    def get_all_data_products(self, summary_info: bool = True) -> list[dict]:
data_products = self.infrastructure.get_all_data_products()

if summary_info:
# Results will be a list of summarized version or full version of the data products
return [
{
"name": d["info"]["name"],
"description": d["info"]["description"],
"domain": d["info"]["domain"]
} for d in data_products]
else:
return data_products

You can see that the bulk of the work is done in the get_all_data_products method from the infrastructure plane once the data is returned if summary information is being requested then it pulls out just the key information and returns, otherwise it returns the full details.

Mesh_Infrastructure_Plane.get_all_data_products

    def get_all_data_products(self) -> list[dict]:
# create a connection to Azure
blob_service_client = Mesh_Infrastructure_Plane.create_blob_service_client(self.key_vault_name, self.key_vault_secret)

results = []
# loop over all of the containers in storage
for container in self.get_containers(self.key_vault_name, self.key_vault_secret):
container_client = blob_service_client.get_container_client(container)
generator = container_client.list_blobs()
# loop over all of the blobs in each container to find the data products
# and load them to the list
for blob_properties in generator:
if blob_properties.name.startswith('data_products') and blob_properties.name.endswith('.json'):
blob_client = container_client.get_blob_client(blob_properties.name)
blob_data = blob_client.download_blob(encoding='utf-8')
data_product = json.loads(blob_data.readall())

results.append(data_product)

return results

Again, this code is very standard code for working with Azure containers and blobs. You can see that it is finding all the containers and then searching in the data_products folder under each domain/container. Then it finds all JSON files under each of these containers and returns those JSON objects.

Mesh_Data_Product_Plane.get_data_product_by_name

This method is used to return a single data product by its name. The design of the Mesh is assuming that all data products will have unique names, if this wasn’t the case then the below can be modified to handle that.

    def get_data_product_by_name(self, data_product_name: str) -> dict:
# Return filtered list of all the data products
data_product = [d for d in self.get_all_data_products(False) if (d["info"]["name"].lower() == data_product_name)][0]
return self.replace_known_entities(data_product)

You can see that this method is building on the previous one calling the get_all_data_products method and the finding the 1 data product that matches with the same name. You can see that there is another call at the end named replace_known_entities, we will cover this in another section, but this method is one that will help with the interoperability of the data products. It will be used to take globally recognized concepts such as customer or product and ensure that they are handled in similar fashions across all data products.

Mesh_Experience_Plane.search_data_products

The last method we will talk about here is the search method. This one will again be very simple because it will build upon what we have already created.

    def search_data_products(self, search_term: str) -> list[dict]:
return [d for d in self.data_plane.get_all_data_products() if (d['name'].find(search_term) != -1 or d['description'].find(search_term) != -1)]

In this method you can see we are again calling the get_all_data_products method and then taking the results from this and searching in the name or the description of the data product to find ones that match the specified search term.

Improvements

There are several things that could be done to this to improve the overall architecture, performance and usability of the Mesh in general and these specific APIs.

The first such improvement would be around caching the Data Product definitions. Currently the APIs are reading from the storage each time somebody is interacting with any of the APIs, we could create some sort of caching layer to improve the overall performance. Something like a Redis cache or even storing the Data Products in a database. If we choose the database root, something like Snowflake with a SnowPipe setup would be very useful here. Each time, we wrote a file into storage it would automatically get ingested into tables in Snowflake.

As we mentioned in the beginning of the article having a front-end to display these results would obviously give us something easy for people to use and interact with.

I’m sure there are a ton of other improvements or enhancements that we could make. Please let me know your thoughts and what other improvements you come up.

--

--

Paul Cavacas

Technologist who has been involved in many aspects of development, from Full Stack Developer to Data & Analytics