Connecting Snowflake to CKAN for publishing to the Open Data Portal
Automated API calls using External Functions
Objective
The State of California leverages the CKAN data curation product to supply the citizens of California with a myriad of datasets from government departments on the California Open Data Portal. The datasets are publicly available and downloadable by anyone. The current state details how the State extracts and loads data via a python application. This small application requires an Azure Virtual Machine instance that needs to either be online regularly or manually turned on in order to extract and load. This requires maintenance for a small application that they would like to avoid. In this article I detail how we can push datasets from Snowflake directly to CKAN with an External Function.
Current State
The current state requires a managed server that must be monitored by the infrastructure team. To avoid some of the work associated with this small server, the team was interested in having a more native Snowflake connection to the CKAN server managed by OpenGov. The python code that has been deployed runs on a regular basis and is triggered by a Windows OS Task.
Once a day the python code checks for updates and uploads the data up to CKAN. To do this, the python code extracts the data from Snowflake and drops it to the local file system as a csv. A byte stream then pushes the data to CKAN. This means we have a copy of the file in Snowflake, a temporary copy of the data on the python server, and a copy of the data in CKAN. Each step in the process physically moves the data around.
Future State
We want to build a pipeline that removes as much infrastructure from the equation as possible. As such, we will leverage Snowflake as our source of truth and make contact in a somewhat direct fashion to OpenGov. We will need the Azure API Management proxy service to broker the API calls out of Snowflake as a requirement of the External Function.
Digging into the details is now a Snowflake native exercise and we’re going to use several Snowflake components to accomplish the automating of publishing datasets to CKAN. In this next diagram, I’ve included some additional components.
On the left hand side we have the Source Systems which, outside of the scope of this article, will be ELT’d into Snowflake. The source systems are diverse so each one may require a different load process. Fivetran is in the mix for delivering that data, but once that is complete the source of truth for data is Snowflake. To publish data into CKAN there are required metadata fields that need to be populated. So while the raw dataset is now in Snowflake, we need to provide an interface to map the metadata to that raw dataset. This is recorded into a Control List table which has a STREAM on it. The STREAM acts as the mechanism to start the publish process.
Code snippets are found in a github repository.
Front End
I’m going to use Retool to build a front end for adding the metadata to the control list table. The general flow of the process is going to be straightforward. The UI will register only new datasets at this time. The UI will identify which tables are available to publish within a particular database and schema.
The Retool SaaS application allows us to easily stand up a new user interface that we can use to push data into Snowflake. I’ve mocked up a small app that will mirror the required attributes for publishing a new CKAN Package and Resource. Upon collecting the values, an INSERT pushes the data into the CONTROL_TABLE where our continuous data pipeline begins.
The “Table to Publish” drop down will be populated with only tables or views that have been configured in the PUBLISHED_DATASETS schema. An ELT tool will be required to move data from their source transactional systems into Snowflake to consolidate access across disparate systems. When replicating or transferring data to Snowflake, the recommendation is to deliver that data to a database per source system. Once the data is delivered to Snowflake into its respective database then we can create a view in the CKAN_DB that points back to where the data lives in the respective DB.
Proxy Service — Azure API Management
The API Management proxy service is a requirement of the Snowflake External Function. There isn’t much to it and I performed all this work through the Azure Portal. I really wanted to avoid as many components as possible and decrease integration points. However, the proxy service is required. So we create a new API Function App as documented Creating the Proxy Service in the Portal. Then we add the actions or endpoints that we want to call (e.g. package_create, resource_create). I did not need to leverage an Azure Function to accomplish my goal.
The only change I make to the request as it flows through the proxy service is to add my CKAN API key for authentication. I set the header as follows in the Inbound process stage for ALL OPERATIONS. This way we set it once for all requests.
<policies><inbound><base /><set-header name="X-CKAN-API-Key" exists-action="override"><value>f8----redacted API key---1b</value></set-header></inbound><backend><base /></backend><outbound><base /></outbound><on-error><base /></on-error></policies>
External Function
API Integration
The API integration creates a trust channel from Snowflake to the Azure API Management service. Steps 1 and 2 of the Creating an External Function for Azure give us the values that we need to populate the integration options.
create or replace api integration ckan_proxy_int
api_provider=azure_api_management
AZURE_TENANT_ID='***GUID***'
AZURE_AD_APPLICATION_ID='***GUID***'
api_allowed_prefixes=('https://gmullen-api-mgmt.azure-api.net/')
api_key='******'
enabled=true;
Request Translator
The request translator allows us to take the default generated request format and translate it to a format that CKAN can understand. CKAN is expecting a key-value pair. So the translator grabs the parameters from the row and we explicitly build the request object to match the API interface.
create or replace function CKAN_create_request_translator(event object)
returns object
language javascript as
'
var package_id;
var name ;
var description ;
var format ;
var url ;let row = EVENT.body.data[0];
package_id = row[1]
name=row[2];
description=row[3];
format=row[4];
url=row[5];return { "body": { "package_id": package_id, "name" : name, "description" : description, "format" : format, "url" : url } }
';
Response Translator
Similarly, the response from CKAN must be parsed since Snowflake will be expecting a batch “data” object with a line item per row sent. Because the external function will be configured for a single request, one row is parsed and returned to the batch object as the 0th row.
create or replace function CKAN_create_response_translator(event object)
returns object
language javascript as
'
var responses = new Array(0);
responses[0] = [0,EVENT.body.result.id]
return { "body": { "data" : responses } };
';
External Function
Snowflake sends multiple rows as part of a batch “data” object, but CKAN can only accept one request at a time. So the external function is configured to send one row at a time with the MAX_BATCH_ROWS setting. The function is also configured with the request and response translators.
create or replace external function resource_create(
package_id string
,name string
,description string
,format string
,url string)
returns variant
api_integration = ckan_proxy_int
MAX_BATCH_ROWS = 1
request_translator = CKAN_create_request_translator
response_translator = CKAN_create_response_translator
as 'https://gmullen-api-mgmt.azure-api.net/resource_create';
Data Pipeline
With the API Proxy service configured we can start working on the pipeline. First let’s review the detailed pipeline that was illustrated previously.
As a first step to building out the pipeline, we will create the control table and schema where we’ll manage the datasets we want to share. We will leverage RBAC to control access to the views and tables that are defined int he PUBLISHED_DATASETS schema. This allows us to delineate access to data that we are publishing publicly. We want to be explicit about those datasets, so carving them off into a logically controlled zone by the CKAN_ROLE will ensure we mean to have it public facing. It also allows for quick visibility on all the datasets that exist in Snowflake which are publicly available.
create or replace database ckan_db;
— PUBLISHED_DATASETS is where views or tables for publishing are registered
create schema published_datasets;use database ckan_db;
USE ROLE SECURITYADMIN;
GRANT OWNERSHIP ON DATABASE CKAN_DB TO ROLE CKAN_ROLE;
GRANT OWNERSHIP ON SCHEMA ckan_db.PUBLIC TO ROLE CKAN_ROLE;
GRANT OWNERSHIP ON SCHEMA ckan_db.published_datasets TO ROLE CKAN_ROLE;
GRANT OWNERSHIP ON VIEW CKAN_DB.published_datasets.trips TO ROLE SYSADMIN;
GRANT SELECT ON VIEW CKAN_DB.published_datasets.trips TO ROLE CKAN_ROLE;
GRANT OWNERSHIP ON TABLE CONTROL_TABLE TO ROLE CKAN_ROLE;
GRANT OWNERSHIP ON stream CONTROL_STREAM TO ROLE CKAN_ROLE;
The Control Table is defined as such and we create a STREAM on the table so that as new rows are added, the process is automated and working on only a subset of data.
create or replace table control_table (
package_id string NULL
,notes string NOT NULL
,accesslevel string NOT NULL
,contact_name string NOT NULL
,contact_email string NOT NULL
,rights string NOT NULL
,accrualperiodicity string NOT NULL
,tag_string string NOT NULL
,owner_org string NOT NULL
,table_name string NOT NULL
);create or replace stream control_stream on table control_Table;
Once the stream captures data, we want to extract that data into an internal stage. This is necessary because CKAN does not have a direct data connection to Snowflake. Snowflake will push the URL of the extracted file (as opposed to streaming the bytes over the wire). So when a new file is registered through the UI, the table is dumped to a CSV file that uses Snowflake Server Side Encryption (SSE). This will allow us to read the file via the presigned URL later.
create or replace stage published_extracts encryption = (type = 'SNOWFLAKE_SSE');
Later, to bring together all the components we will batch up the following commands in a Stored Procedure. But for now, let’s look at each of the individual commands. The first component is to extract the data to a CSV. We grab the view name (i.e. trips) that was published via the UI and execute a COPY TO <location> command. This command will need to be dynamically generated in order to inject the filename and view name into the command. The name of the view is obtained from the STREAM. The result is a CSV file in an Internal Stage called PUBLISHED_DATASETS.
copy into @published_datasets.published_extracts/trips.csv from CKAN_DB.PUBLISHED_DATASETS.TRIPS SINGLE = TRUE file_format = (type = csv compression = none);
For new datasets, we need to make two API calls. The first creates a new Package. A package can have many resources. The resource will be our dataset. So the first command calls the External Function to create a new package with the metadata from the STREAM.
select package_create(
lower(table_name)
,notes
,accesslevel
,contact_name
,contact_email
,rights
,accrualperiodicity
,tag_string
,owner_org
)
from ckan_db.public.control_Stream;
The response that we receive is a GUID of the new packageId. We will automate this process, but for now we manually update the control table.
update control_table set package_id = '9e521179-39ee-48f8-a1c1-d6402145ed79' where table_name = 'TRIPS';
Then we combine the last two commands into a single command that flushes the stream. When we update the original table it will add values back to the stream for the update, so we check to push only records that we INSERTED.
UPDATE CONTROL_TABLE
set package_id = EXT_PACKAGE_ID
FROM (select package_create(
lower(table_name)
,notes
,accesslevel
,contact_name
,contact_email
,rights
,accrualperiodicity
,tag_string
,owner_org
) EXT_PACKAGE_ID, TABLE_NAME, METADATA$ISUPDATE ISUPDATE
from ckan_db.public.control_Stream) STRM
WHERE CONTROL_TABLE.TABLE_NAME = STRM.TABLE_NAME
AND ISUPDATE = FALSE;
Then we create the final API call to create the resource in the package. This will leverage the presigned URL to register the resources.
select resource_create(
package_id
,lower(table_name)
,notes
,'CSV'
,(select get_presigned_url(@published_datasets.published_extracts, 'trips.csv')) )
from ckan_db.public.control_table;
The response returns the resource id that we can capture and update the table. This successfully completes the end to end process. The data remains in Snowflake. CKAN caches the data and performs a hash check on the file to determine whether a new version is necessary. This keeps the data in sync on both sides.
Stored Procedure
The final step is to wrap this up in a stored procedure and a TASK to execute as data is delivered.
--create task
USE ROLE accountadmin;
grant EXECUTE MANAGED TASK on account to role ckan_role;
USE ROLE CKAN_ROLE;
use database ckan_db;CREATE OR REPLACE TASK PUBLISH_CKAN_TASK
USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE ='XSMALL'
SCHEDULE = '1 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('CONTROL_STREAM')
AS
CALL SP_PUBLISH_DATASET();--package create
CREATE OR REPLACE PROCEDURE SP_PUBLISH_DATASET()
RETURNS VARIANT
LANGUAGE JAVASCRIPT
AS
$$
var return_results = [];//Get Tables to validate from control table
var rs_tables = snowflake.createStatement({
sqlText: "select table_name from control_stream;"
}).execute();while (rs_tables.next()) {
//FOR EACH TABLE GET TABLE INFO
var table_name = rs_tables.getColumnValue('TABLE_NAME');//drop all published files to internal stage
snowflake.createStatement({
sqlText: "copy into @published_datasets.published_extracts/" +
table_name + ".csv from CKAN_DB.PUBLISHED_DATASETS." +
table_name + " SINGLE = TRUE MAX_FILE_SIZE=5368709120 OVERWRITE=TRUE file_format = (type = csv compression = none);"
}).execute();
}
//make the api call. Updates packageid
var package_res = snowflake.createStatement({
sqlText: "UPDATE CONTROL_TABLE set package_id = EXT_PACKAGE_ID FROM (select package_create(lower(table_name),notes,accesslevel,contact_name,contact_email,rights ,accrualperiodicity,tag_string ,owner_org) EXT_PACKAGE_ID, TABLE_NAME, METADATA$ISUPDATE ISUPDATE from ckan_db.public.control_Stream) STRM WHERE CONTROL_TABLE.TABLE_NAME = STRM.TABLE_NAME AND ISUPDATE = FALSE;"
}).execute();var resource_res = snowflake.createStatement({
sqlText: "select resource_create(package_id,lower(table_name),notes,'CSV',(select get_presigned_url(@published_datasets.published_extracts, table_name || '.csv'))) from ckan_db.public.control_stream WHERE METADATA$ACTION = 'INSERT' AND METADATA$ISUPDATE = 'TRUE';"
}).execute();
resource_res.next();
return_results.push(resource_res.DATA);
package_res.next();
return_results.push(package_res.DATA);
snowflake.createStatement({sqlText: "insert into ckan_log select sysdate(),package_id,table_name from control_Stream "
}).execute();
return return_results;
$$;
Thanks!
Special thanks to Brock Cooper for helping with development of the components and thought leadership on the design.