Connecting Snowflake to CKAN for publishing to the Open Data Portal

Automated API calls using External Functions

Get the source at Github!

Objective

The State of California leverages the CKAN data curation product to supply the citizens of California with a myriad of datasets from government departments on the California Open Data Portal. The datasets are publicly available and downloadable by anyone. The current state details how the State extracts and loads data via a python application. This small application requires an Azure Virtual Machine instance that needs to either be online regularly or manually turned on in order to extract and load. This requires maintenance for a small application that they would like to avoid. In this article I detail how we can push datasets from Snowflake directly to CKAN with an External Function.

Components involved

Current State

The current state requires a managed server that must be monitored by the infrastructure team. To avoid some of the work associated with this small server, the team was interested in having a more native Snowflake connection to the CKAN server managed by OpenGov. The python code that has been deployed runs on a regular basis and is triggered by a Windows OS Task.

Once a day the python code checks for updates and uploads the data up to CKAN. To do this, the python code extracts the data from Snowflake and drops it to the local file system as a csv. A byte stream then pushes the data to CKAN. This means we have a copy of the file in Snowflake, a temporary copy of the data on the python server, and a copy of the data in CKAN. Each step in the process physically moves the data around.

Future State

We want to build a pipeline that removes as much infrastructure from the equation as possible. As such, we will leverage Snowflake as our source of truth and make contact in a somewhat direct fashion to OpenGov. We will need the Azure API Management proxy service to broker the API calls out of Snowflake as a requirement of the External Function.

Components involved

Digging into the details is now a Snowflake native exercise and we’re going to use several Snowflake components to accomplish the automating of publishing datasets to CKAN. In this next diagram, I’ve included some additional components.

Continuous Data Pipeline for CKAN publish

On the left hand side we have the Source Systems which, outside of the scope of this article, will be ELT’d into Snowflake. The source systems are diverse so each one may require a different load process. Fivetran is in the mix for delivering that data, but once that is complete the source of truth for data is Snowflake. To publish data into CKAN there are required metadata fields that need to be populated. So while the raw dataset is now in Snowflake, we need to provide an interface to map the metadata to that raw dataset. This is recorded into a Control List table which has a STREAM on it. The STREAM acts as the mechanism to start the publish process.

Code snippets are found in a github repository.

Front End

I’m going to use Retool to build a front end for adding the metadata to the control list table. The general flow of the process is going to be straightforward. The UI will register only new datasets at this time. The UI will identify which tables are available to publish within a particular database and schema.

General workflow for what is planned

The Retool SaaS application allows us to easily stand up a new user interface that we can use to push data into Snowflake. I’ve mocked up a small app that will mirror the required attributes for publishing a new CKAN Package and Resource. Upon collecting the values, an INSERT pushes the data into the CONTROL_TABLE where our continuous data pipeline begins.

Front end courtesy of Retool. Easy form created to push data to Snowflake
Front end courtesy of Retool. Easy form created to push data to Snowflake

The “Table to Publish” drop down will be populated with only tables or views that have been configured in the PUBLISHED_DATASETS schema. An ELT tool will be required to move data from their source transactional systems into Snowflake to consolidate access across disparate systems. When replicating or transferring data to Snowflake, the recommendation is to deliver that data to a database per source system. Once the data is delivered to Snowflake into its respective database then we can create a view in the CKAN_DB that points back to where the data lives in the respective DB.

Replicate source systems 1:1 to databases in Snowflake

Proxy Service — Azure API Management

The API Management proxy service is a requirement of the Snowflake External Function. There isn’t much to it and I performed all this work through the Azure Portal. I really wanted to avoid as many components as possible and decrease integration points. However, the proxy service is required. So we create a new API Function App as documented Creating the Proxy Service in the Portal. Then we add the actions or endpoints that we want to call (e.g. package_create, resource_create). I did not need to leverage an Azure Function to accomplish my goal.

Example of the package_create endpoint. Not much here!
Example of the Inbound Processing policy for ALL requests

The only change I make to the request as it flows through the proxy service is to add my CKAN API key for authentication. I set the header as follows in the Inbound process stage for ALL OPERATIONS. This way we set it once for all requests.

Screenshot of the inbound policy for All Operations

External Function

API Integration

The API integration creates a trust channel from Snowflake to the Azure API Management service. Steps 1 and 2 of the Creating an External Function for Azure give us the values that we need to populate the integration options.

Request Translator

The request translator allows us to take the default generated request format and translate it to a format that CKAN can understand. CKAN is expecting a key-value pair. So the translator grabs the parameters from the row and we explicitly build the request object to match the API interface.

Response Translator

Similarly, the response from CKAN must be parsed since Snowflake will be expecting a batch “data” object with a line item per row sent. Because the external function will be configured for a single request, one row is parsed and returned to the batch object as the 0th row.

External Function

Snowflake sends multiple rows as part of a batch “data” object, but CKAN can only accept one request at a time. So the external function is configured to send one row at a time with the MAX_BATCH_ROWS setting. The function is also configured with the request and response translators.

Data Pipeline

With the API Proxy service configured we can start working on the pipeline. First let’s review the detailed pipeline that was illustrated previously.

Data Flow

As a first step to building out the pipeline, we will create the control table and schema where we’ll manage the datasets we want to share. We will leverage RBAC to control access to the views and tables that are defined int he PUBLISHED_DATASETS schema. This allows us to delineate access to data that we are publishing publicly. We want to be explicit about those datasets, so carving them off into a logically controlled zone by the CKAN_ROLE will ensure we mean to have it public facing. It also allows for quick visibility on all the datasets that exist in Snowflake which are publicly available.

The Control Table is defined as such and we create a STREAM on the table so that as new rows are added, the process is automated and working on only a subset of data.

Once the stream captures data, we want to extract that data into an internal stage. This is necessary because CKAN does not have a direct data connection to Snowflake. Snowflake will push the URL of the extracted file (as opposed to streaming the bytes over the wire). So when a new file is registered through the UI, the table is dumped to a CSV file that uses Snowflake Server Side Encryption (SSE). This will allow us to read the file via the presigned URL later.

Later, to bring together all the components we will batch up the following commands in a Stored Procedure. But for now, let’s look at each of the individual commands. The first component is to extract the data to a CSV. We grab the view name (i.e. trips) that was published via the UI and execute a COPY TO <location> command. This command will need to be dynamically generated in order to inject the filename and view name into the command. The name of the view is obtained from the STREAM. The result is a CSV file in an Internal Stage called PUBLISHED_DATASETS.

For new datasets, we need to make two API calls. The first creates a new Package. A package can have many resources. The resource will be our dataset. So the first command calls the External Function to create a new package with the metadata from the STREAM.

The response that we receive is a GUID of the new packageId. We will automate this process, but for now we manually update the control table.

Then we combine the last two commands into a single command that flushes the stream. When we update the original table it will add values back to the stream for the update, so we check to push only records that we INSERTED.

Then we create the final API call to create the resource in the package. This will leverage the presigned URL to register the resources.

The response returns the resource id that we can capture and update the table. This successfully completes the end to end process. The data remains in Snowflake. CKAN caches the data and performs a hash check on the file to determine whether a new version is necessary. This keeps the data in sync on both sides.

Final published dataset

Stored Procedure

The final step is to wrap this up in a stored procedure and a TASK to execute as data is delivered.

Thanks!

Special thanks to Brock Cooper for helping with development of the components and thought leadership on the design.

--

--