How to save costs by ELT from web APIs into a Snowflake data lake with Python

In my previous blog post, I showed how you can use Snowflake as the ETL engine for pulling JSON data from an API and load into an open data lakehouse with Apache Iceberg. In this blog post, you’ll see an example how you can ELT raw JSON from an API directly into Snowflake without having to define schema upfront, saving on duplicative storage and compute costs and preserving flexibility for “schema-on-read.”

Snowflake as the data lake?

Snowflake can support structured SQL data types like many data warehouses, but also semi-structured types and unstructured formats like data lakes. In the past, a very common ingestion pattern has been to first land data as files (e.g. JSON, Parquet) in cloud object storage, then ingest files in batch with COPY INTO or micro-batch with Snowpipe.

File-based ingest with Snowpipe

As technologies have advanced to enable ACID transactions on a data lake, you might have asked yourself “Should I keep paying to store data both in object storage and in Snowflake?” No, you don’t. Snowflake has added features over the past few years that reduces the need to land data as files in the first place, allowing you to pay for loading and storage only once.

Store once in Snowflake with schema-on-read

In comparison to writing JSON files in object storage, writing directly to Snowflake tables can be beneficial for:

  • ACID transactions which make write operations and query results more reliable
  • Removing duplicative storage and ingest costs
  • More granular access control at the row, column, and cell level rather than bucket, prefix, or file level

If you’re trying to decide whether to store data in Iceberg tables or Snowflake’s table format, this blog post by James Malone, Director of Product Management at Snowflake, elaborates on this very topic.

Snowflake’s VARIANT data type

The VARIANT data type is unique to Snowflake, and it makes storing raw JSON very easy by not requiring explicit schema defined upfront, e.g. “schema-on-read.” In my last blog post writing to Iceberg tables, you’ll notice we have to specify data types in order to write to Iceberg tables.

For use cases where you’d rather not define schema upfront, writing to Snowflake’s table format with the VARIANT data type is a good option. And for use cases where you don’t mind defining schema upfront and prefer greater compute interoperability, then Iceberg tables is a good option. Both table types are usable, governed, and sharable in Snowflake!

Let’s dig in

Just like my last blog post with Iceberg tables, we’ll use Python to get data from the GitHub API. This time, we’ll store data in VARIANT columns to allow for more flexibility for schema-on-read in view definitions.

Create GitHub personal access token

This demo will make a few thousand requests to the GitHub API, which exceeds the rate limit of 60 requests per hour for unauthenticated users. If you don’t already have one, create a GitHub personal access token with the following scope:

  • public_repo
  • read:org
  • read:user
  • read:project

Create Snowflake objects

Create database, schemas, and warehouse for this demo.

CREATE DATABASE IF NOT EXISTS github;
USE DATABASE github;
CREATE SCHEMA IF NOT EXISTS raw;
CREATE SCHEMA IF NOT EXISTS curated;
CREATE SCHEMA IF NOT EXISTS analytics;
USE SCHEMA public;
CREATE WAREHOUSE IF NOT EXISTS github
STATEMENT_TIMEOUT_IN_SECONDS = 3600
STATEMENT_QUEUED_TIMEOUT_IN_SECONDS = 3600;

To allow Snowflake to call the GitHub API endpoints directly, create a network rule, secret, and external access integration.

CREATE OR REPLACE NETWORK RULE github.public.gh_nw_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('api.github.com');

-- Store GitHub personal access tokens for API calls
CREATE OR REPLACE SECRET github.public.gh_token
TYPE = GENERIC_STRING
SECRET_STRING = '<your github token>';

-- Create integration using previous objects
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION gh_integration
ALLOWED_NETWORK_RULES = (github.public.gh_nw_rule)
ALLOWED_AUTHENTICATION_SECRETS = (github.public.gh_token)
ENABLED = true;

Create Python functions

Create a Python UDF for getting all of the latest repositories (“repo”), and the table to store the results.

CREATE OR REPLACE FUNCTION github.public.load_projects ()
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = 3.11
HANDLER = 'load_projects'
EXTERNAL_ACCESS_INTEGRATIONS = (gh_integration)
PACKAGES = ('requests','simplejson')
AS
$$
import _snowflake
import requests
import simplejson as json

def load_projects():
response = requests.get('https://api.github.com/orgs/delta-io/repos')
delta_json_obj = json.loads(response.text)
delta_proj_repos = []
for i in delta_json_obj:
delta_proj_repos.append(i['full_name'])

all_proj_repos = ['apache/iceberg','apache/hudi']
all_proj_repos.extend(delta_proj_repos)

return all_proj_repos
$$;

CREATE OR REPLACE TABLE github.raw.repos AS
SELECT *
FROM (SELECT load_projects() AS response);

Create a Python UDTF looping through each repo and getting all of the contributors, and the table to store the results.

CREATE OR REPLACE FUNCTION github.public.load_project_contributors(repo STRING)
RETURNS TABLE (RESPONSE VARIANT)
LANGUAGE PYTHON
RUNTIME_VERSION = 3.11
HANDLER = 'load_project_contributors'
EXTERNAL_ACCESS_INTEGRATIONS = (gh_integration)
PACKAGES = ('requests','simplejson')
SECRETS = ('cred' = github.public.gh_token)
AS
$$
import _snowflake
import requests
import simplejson as json

class load_project_contributors:
def process(self, repo):
url = 'https://api.github.com/repos/'+str(repo)+'/contributors?per_page=100&page=1'
mytoken = _snowflake.get_generic_secret_string('cred')
response = requests.get(url,headers={"Authorization": 'token '+ mytoken})
contributors_list = json.loads(response.text)

while 'next' in response.links.keys():
response = requests.get(response.links['next']['url'],headers={"Authorization": 'token '+ mytoken})
contributors_list.extend(response.json())

for data in contributors_list:
yield (data,)
$$;

CREATE OR REPLACE TABLE github.raw.all_project_contributors AS
SELECT
r.repo,
p.response
FROM (SELECT VALUE::STRING AS repo
FROM TABLE(FLATTEN(SELECT * FROM github.raw.repos))) r,
TABLE(github.public.load_project_contributors(repo)) p;

Create a Python UDTF looping through each contributor and getting more profile metadata, and the table to store the results.

CREATE OR REPLACE FUNCTION github.public.load_profile(url STRING)
RETURNS TABLE (RESPONSE VARIANT)
LANGUAGE PYTHON
RUNTIME_VERSION = 3.11
HANDLER = 'load_profile'
EXTERNAL_ACCESS_INTEGRATIONS = (gh_integration)
PACKAGES = ('requests')
SECRETS = ('cred' = github.public.gh_token)
AS
$$
import _snowflake
import requests

def load_profile(url):
mytoken = _snowflake.get_generic_secret_string('cred')
response = requests.get(url,headers={"Authorization": 'token '+ mytoken})
return response.json()
$$;

CREATE OR REPLACE TABLE github.raw.unique_profiles AS
SELECT
github.public.load_profile(url) AS response
FROM (SELECT DISTINCT response:url::STRING AS url
FROM github.raw.all_project_contributors) u;

Create a Python UDTF for looping through each repo and getting all of the pull requests, and the table to store the results. Because the pull request payload per repo exceeds the 16MB limit for VARIANT columns in Snowflake, I pull out the fields that are needed.

CREATE OR REPLACE FUNCTION github.public.load_project_pulls(repo STRING)
RETURNS TABLE (RESPONSE VARIANT)
LANGUAGE PYTHON
RUNTIME_VERSION = 3.11
HANDLER = 'load_project_pulls'
EXTERNAL_ACCESS_INTEGRATIONS = (gh_integration)
PACKAGES = ('requests','simplejson')
SECRETS = ('cred' = github.public.gh_token)
AS
$$
import _snowflake
import requests
import simplejson as json

class load_project_pulls:
def process(self, repo):
url = 'https://api.github.com/repos/'+str(repo)+'/pulls?state=all&per_page=100&page=1'
mytoken = _snowflake.get_generic_secret_string('cred')
response = requests.get(url,headers={"Authorization": 'token '+ mytoken})
pulls = response.json()
pulls_list = []

for pull in pulls:
pulls_list.append({
"id": pull["id"],
"number": pull["number"],
"state": pull["state"],
"created_at": pull["created_at"],
"updated_at": pull["updated_at"],
"merged_at": pull["merged_at"],
"login": pull["user"]["login"]
})

while 'next' in response.links.keys():
response = requests.get(response.links['next']['url'],headers={"Authorization": 'token '+ mytoken})
pulls = response.json()

for pull in pulls:
pulls_list.append({
"id": pull["id"],
"number": pull["number"],
"state": pull["state"],
"created_at": pull["created_at"],
"updated_at": pull["updated_at"],
"merged_at": pull["merged_at"],
"login": pull["user"]["login"]
})

for data in pulls_list:
yield(data,)
$$;

CREATE OR REPLACE TABLE github.raw.all_project_pulls AS
SELECT
r.repo,
l.response
FROM (SELECT VALUE::STRING AS repo
FROM TABLE(FLATTEN(SELECT * FROM github.raw.repos))) r,
TABLE(github.public.load_project_pulls(repo)) l;

Create a Python UDTF for looping through each repo and getting all of the commits, and the table to store the results. Again, because the commits payload per repo exceeds the 16MB limit for VARIANT columns in Snowflake, I pull out the fields that are needed.

CREATE OR REPLACE FUNCTION github.public.load_project_commits(repo STRING)
RETURNS TABLE (RESPONSE VARIANT)
LANGUAGE PYTHON
RUNTIME_VERSION = 3.11
HANDLER = 'load_project_commits'
EXTERNAL_ACCESS_INTEGRATIONS = (gh_integration)
PACKAGES = ('requests','simplejson')
SECRETS = ('cred' = github.public.gh_token)
AS
$$
import _snowflake
import requests
import simplejson as json

class load_project_commits:
def process(self, repo):
url = 'https://api.github.com/repos/'+str(repo)+'/commits?per_page=100&page=1'
mytoken = _snowflake.get_generic_secret_string('cred')
response = requests.get(url,headers={"Authorization": 'token '+ mytoken})
commits = response.json()
commits_list = []

for commit in commits:
author_login = commit["author"]["login"] if commit["author"] else None
committer_login = commit["committer"]["login"] if commit["committer"] else None

commits_list.append({
"id": commit["sha"],
"author_login": author_login,
"committer_login": committer_login,
"committed_at": commit["commit"]["author"]["date"]
})

while 'next' in response.links.keys():
response = requests.get(response.links['next']['url'],headers={"Authorization": 'token '+ mytoken})
commits = response.json()

for commit in commits:
author_login = commit["author"]["login"] if commit["author"] else None
committer_login = commit["committer"]["login"] if commit["committer"] else None

commits_list.append({
"id": commit["sha"],
"author_login": author_login,
"committer_login": committer_login,
"committed_at": commit["commit"]["author"]["date"]
})

for data in commits_list:
yield (data,)
$$;

CREATE OR REPLACE TABLE github.raw.all_project_commits AS
SELECT
r.repo,
l.response,
FROM (SELECT VALUE::STRING AS repo
FROM TABLE(FLATTEN(SELECT * FROM github.raw.repos))) r,
TABLE(github.public.load_project_commits(repo)) l;

Look at a record in any of the tables you’ve created, and you’ll see the semi-structured JSON payloads. Below is an example from the all_project_contributors table.

JSON payload in VARIANT column

Schedule and orchestrate Python functions with Tasks

How can you automate the process of getting new data from the API? Snowflake has a number of options for different use cases, but for this demo we’ll use tasks.

The pipeline schedule specified in the task definitions below are on the first day of every month, but you can change to whatever CRON schedule you’d like. The downstream tasks are setup to run only after the upstream task completes.

-- Task to load updated list of repos for projects
CREATE OR REPLACE TASK github.public.repos
WAREHOUSE = github
SCHEDULE = 'USING CRON 0 0 1 * * America/Los_Angeles'
AS
INSERT OVERWRITE INTO github.raw.repos
SELECT *
FROM (SELECT load_projects() AS response);

-- Task to load contributors data when the github.public.repos task completes getting updated list of repos
CREATE OR REPLACE TASK github.public.all_project_contributors
WAREHOUSE = github
AFTER github.public.repos
AS
INSERT OVERWRITE INTO github.raw.all_project_contributors
SELECT
r.repo,
p.response
FROM (SELECT VALUE::STRING AS repo
FROM TABLE(FLATTEN(SELECT * FROM github.raw.repos))) r,
TABLE(github.public.load_project_contributors(repo)) p;

-- Task to load profile data when the github.public.all_proj_contributors task completes getting updated list of contributors
CREATE OR REPLACE TASK github.public.unique_profiles
WAREHOUSE = github
AFTER github.public.all_project_contributors
AS
INSERT OVERWRITE INTO github.raw.unique_profiles
SELECT
github.public.load_profile(url) AS response
FROM (SELECT DISTINCT response:url::STRING AS url
FROM github.raw.all_project_contributors) u;

-- Task to load pull request data when the github.public.repos task completes getting updated list of repos
CREATE OR REPLACE TASK github.public.all_project_pulls
WAREHOUSE = github
AFTER github.public.repos
AS
INSERT OVERWRITE INTO github.raw.all_project_pulls
SELECT
r.repo,
l.response
FROM (SELECT VALUE::STRING AS repo
FROM TABLE(FLATTEN(SELECT * FROM github.raw.repos))) r,
TABLE(github.public.load_project_pulls(repo)) l;

-- Task to load commit data when the github.public.repos task completes getting updated list of repos
CREATE OR REPLACE TASK github.public.all_project_commits
WAREHOUSE = github
AFTER github.public.repos
AS
INSERT OVERWRITE INTO github.raw.all_project_commits
SELECT
r.repo,
l.response,
FROM (SELECT VALUE::STRING AS repo
FROM TABLE(FLATTEN(SELECT * FROM github.raw.repos))) r,
TABLE(github.public.load_project_commits(repo)) l;

-- By default, new tasks are in a suspended state. Activate or "resume" the full DAG as follows
SELECT SYSTEM$TASK_DEPENDENTS_ENABLE('github.public.repos');

After creating these tasks, you can see them in a directed acyclic graph (“DAG”) along with a run history.

Task DAG and run history

Create structured views from VARIANT columns

Now that tables are loaded with raw data, you can extract meaningful fields from the JSON-like structures into views.

-- Flatten repos table
CREATE OR REPLACE VIEW github.curated.repos AS
SELECT VALUE::STRING AS repo
FROM TABLE(FLATTEN(SELECT * FROM github.raw.repos));

-- Flatten contributors table, add project grouping of repos
CREATE OR REPLACE VIEW github.curated.all_project_contributors AS
SELECT
CASE
WHEN repo = 'apache/iceberg' THEN 'iceberg'
WHEN repo = 'apache/hudi' THEN 'hudi'
ELSE 'delta'
END AS project,
repo,
response:login::STRING AS login,
response:url::STRING AS url,
response:contributions::NUMBER AS contributions,
FROM github.raw.all_project_contributors;

-- Flatten profiles table, parsing out clean company string
CREATE OR REPLACE VIEW github.curated.unique_profiles AS
WITH base AS (
SELECT
response:login::STRING AS login,
RTRIM(REPLACE(LOWER(response:company::STRING), '@', '')) AS company,
response:email::STRING AS email,
SPLIT_PART(email, '@', 2) AS email_domain,
response:created_at::TIMESTAMP AS created_at,
response:updated_at::TIMESTAMP AS updated_at,
FROM github.raw.unique_profiles
)
SELECT
login,
email,
email_domain,
created_at,
updated_at,
CASE
WHEN CONTAINS(base.company,'databricks') = TRUE THEN 'databricks'
WHEN CONTAINS(base.company,'tabular') = TRUE THEN 'tabular'
WHEN CONTAINS(base.company,'netflix') = TRUE THEN 'netflix'
WHEN CONTAINS(base.company,'adobe') = TRUE THEN 'adobe'
WHEN CONTAINS(base.company,'alibaba') = TRUE THEN 'alibaba'
WHEN CONTAINS(base.company,'amazon') = TRUE THEN 'amazon'
WHEN CONTAINS(base.company,'aws') = TRUE THEN 'amazon'
WHEN CONTAINS(base.company,'apple') = TRUE THEN 'apple'
WHEN CONTAINS(base.company,'bytedance') = TRUE THEN 'bytedance'
WHEN CONTAINS(base.company,'cisco') = TRUE THEN 'cisco'
WHEN CONTAINS(base.company,'cloudera') = TRUE THEN 'cloudera'
WHEN CONTAINS(base.company,'cockroach') = TRUE THEN 'cockroach'
WHEN CONTAINS(base.company,'dremio') = TRUE THEN 'dremio'
WHEN CONTAINS(base.company,'ebay') = TRUE THEN 'ebay'
WHEN CONTAINS(base.company,'eventual-inc') = TRUE THEN 'eventual-inc'
WHEN CONTAINS(base.company,'freelance') = TRUE THEN 'freelance'
WHEN CONTAINS(base.company,'huawei') = TRUE THEN 'huawei'
WHEN CONTAINS(base.company,'ibm') = TRUE THEN 'ibm'
WHEN CONTAINS(base.company,'linkedin') = TRUE THEN 'linkedin'
WHEN CONTAINS(base.company,'microsoft') = TRUE THEN 'microsoft'
WHEN CONTAINS(base.company,'netease') = TRUE THEN 'netease'
WHEN CONTAINS(base.company,'salesforce') = TRUE THEN 'salesforce'
WHEN CONTAINS(base.company,'selectdb') = TRUE THEN 'selectdb'
WHEN CONTAINS(base.company,'snowflake') = TRUE THEN 'snowflake'
WHEN CONTAINS(base.company,'starburst') = TRUE THEN 'starburst'
WHEN CONTAINS(base.company,'tencent') = TRUE THEN 'tencent'
WHEN CONTAINS(base.company,'uber') = TRUE THEN 'uber'
WHEN CONTAINS(base.company,'apache') = TRUE THEN 'apache'
ELSE base.company
END AS gh_company,
CASE
WHEN CONTAINS(base.email_domain,'amazon') = TRUE THEN 'amazon'
WHEN CONTAINS(base.email_domain,'apache') = TRUE THEN 'apache'
WHEN CONTAINS(base.email_domain,'apple') = TRUE THEN 'apple'
WHEN CONTAINS(base.email_domain,'bytedance') = TRUE THEN 'bytedance'
WHEN CONTAINS(base.email_domain,'cloudera') = TRUE THEN 'cloudera'
WHEN CONTAINS(base.email_domain,'databricks') = TRUE THEN 'databricks'
WHEN CONTAINS(base.email_domain,'ebay') = TRUE THEN 'ebay'
WHEN CONTAINS(base.email_domain,'intel') = TRUE THEN 'intel'
WHEN CONTAINS(base.email_domain,'linkedin') = TRUE THEN 'linkedin'
WHEN CONTAINS(base.email_domain,'microsoft') = TRUE THEN 'microsoft'
WHEN CONTAINS(base.email_domain,'robinhood') = TRUE THEN 'robinhood'
WHEN CONTAINS(base.email_domain,'tabular') = TRUE THEN 'tabular'
WHEN CONTAINS(base.email_domain,'tencent') = TRUE THEN 'tencent'
WHEN CONTAINS(base.email_domain,'uber') = TRUE THEN 'uber'
END AS email_company,
COALESCE(gh_company, email_company) AS company
FROM base;

-- Flatten pull requests table, add project-level grouping of repos
CREATE OR REPLACE VIEW github.curated.all_project_pulls AS
SELECT
CASE
WHEN repo = 'apache/iceberg' THEN 'iceberg'
WHEN repo = 'apache/hudi' THEN 'hudi'
ELSE 'delta'
END AS project,
response:repo::STRING AS repo,
response:id::NUMBER AS id,
response:number::NUMBER AS number,
response:created_at::TIMESTAMP AS created_at,
response:merged_at::TIMESTAMP AS merged_at,
response:state::STRING AS state,
response:login::STRING AS login,
FROM github.raw.all_project_pulls;

-- Flatten commits table, add project-level grouping of repos
CREATE OR REPLACE VIEW github.curated.all_project_commits AS
SELECT
CASE
WHEN repo = 'apache/iceberg' THEN 'iceberg'
WHEN repo = 'apache/hudi' THEN 'hudi'
ELSE 'delta'
END AS project,
response:id::STRING AS id,
response:committed_at::TIMESTAMP AS committed_at,
response:author_login::STRING AS author_login,
response:committer_login::STRING AS committer_login,
COALESCE(response:author_login::STRING, response:committer_login::STRING) AS login
FROM github.raw.all_project_commits;

Create analytical views and charts

Now with some structure defined, we can more easily perform aggregations into views for analytics.

-- Total number of pull requests by project
CREATE OR REPLACE VIEW github.analytics.pulls_by_project AS
SELECT
project,
COUNT(*) AS num_pulls
FROM github.curated.all_project_pulls
GROUP BY 1
ORDER BY 2 DESC;

-- Running total number of pull requests by project per day
CREATE OR REPLACE VIEW github.analytics.pulls_by_project_day AS
WITH base AS (
SELECT
project,
DATE_TRUNC('DAY', created_at) AS created_at,
COUNT(*) AS num_pulls,
FROM github.curated.all_project_pulls
GROUP BY 1, 2
ORDER BY 1, 2
)
SELECT
project,
created_at,
num_pulls,
SUM(num_pulls) OVER (PARTITION BY project ORDER BY created_at) AS running_sum_pulls
FROM base
GROUP BY 1, 2, 3
ORDER BY 1, 2;

-- Unique committers by company per project
CREATE OR REPLACE VIEW github.analytics.project_company_committers AS
WITH project_company_committers AS (
SELECT
c.project,
p.company,
COUNT(DISTINCT c.login) AS company_committers
FROM github.curated.all_project_commits c
JOIN github.curated.unique_profiles p
ON c.login = p.login
GROUP BY 1, 2
ORDER BY 1, 3 DESC
), project_committers AS (
SELECT
project,
SUM(company_committers) AS project_committers
FROM project_company_committers
GROUP BY 1
)
SELECT
c.project,
c.company,
c.company_committers,
p.project_committers,
c.company_committers / p.project_committers * 100 AS pct_committers
FROM project_company_committers c
JOIN project_committers p
ON c.project = p.project
ORDER BY 1, 5 DESC;

You can explore the results of queries as charts directly in Snowflake’s web UI (“Snowsight”). Let’s look at the running total pull requests over time by project.

SELECT * FROM github.analytics.pulls_by_project_day;
Running total pull requests by project

Apache Iceberg and Apache Hudi both crossed 7k pull requests in

Now let’s look at how what percentage of each project’s committers work for which companies.

SELECT
company,
pct_committers
FROM github.analytics.project_company_committers
WHERE project = 'delta'
ORDER BY 2 DESC;

SELECT
company,
pct_committers
FROM github.analytics.project_company_committers
WHERE project = 'iceberg'
ORDER BY 2 DESC;

SELECT
company,
pct_committers
FROM github.analytics.project_company_committers
WHERE project = 'hudi'
ORDER BY 2 DESC;
Percent of unique committers for Delta Lake by company
Percent of unique committers for Apache Iceberg by company
Percent of unique committers for Apache Hudi by company

I described in my last blog post as well, but you can see how Apache Iceberg and Apache Hudi have a more flat and even distribution of companies committing, while Databricks makes up a noticeably higher percentage of the Delta Lake committers. This could be interpreted as a good or bad thing depending on who you ask, and their motives and priorities. But biases and interpretation aside, the difference is noticeable.

Wrapping up

To summarize, this example demonstrates two points:

  1. Raw, semi-structured JSON data can be stored in Snowflake as the data lake using the VARIANT data type with schema-on-read.
  2. Features like External Network Access and Snowpipe Streaming allow you to cut out the costs of writing and storing raw files in object storage then copying into tables.

Other features to check out

--

--

Scott Teal
Snowflake Builders Blog: Data Engineers, App Developers, AI/ML, & Data Science

Product Marketing at Snowflake. Originally from Orlando, now in Seattle. Previously at Tableau.