Snowflake Python API: The Key to Modern Data Pipeline Creation

Maeda Kentaro
Snowflake Engineering
4 min readMar 14, 2024
Polar bears working together to assemble a database in a modern factory

Conclusion

Use the Snowflake Python API to Manage Tasks

What is the Snowflake Python API?

The Snowflake Python API is a Python object management library called “snowflake.core” (currently in Public Beta). It is entirely different from the Snowflake Python Connector.
With the Snowflake Python API, you can use Python code to manage Snowflake resources such as Tables, Warehouses, Tasks, and Snowpark Container Service Compute Pools.
In this article, we will take an in-depth look at how to use the Snowflake Python API to manage Snowflake tasks and DAGs (Directed Acyclic Graphs). This will allow you to manage tasks while benefiting from Python's type hints and autocompletion, instead of directly executing SQL.

Installation and Connection

Install the Python package using the following command:

pip install snowflake -U

Next, create a Root object using the connection information for Snowpark or the Snowflake Python Connector. Here, we’ll introduce how to use a Snowpark session.

from snowflake.core import Root
from snowflake.snowpark.context import get_active_session

session = get_active_session()
root = Root(session)

Creating and Managing Simple Tasks

To create a simple task, use the Task object and TaskCollection object. The following example shows how to create a task named “my_task”. By specifying a snowflake.core.task.Cron object for the schedule argument, you can create a task that runs every 10 minutes, like schedule=Cron("*/10 * * * *", "Asia/Tokyo"). Although this is a simple task that only executes select 1, it demonstrates how easily a task can be created.

from datetime import timedelta
from snowflake.core.task import Task

schema = root.databases["my_db"].schemas["my_schema"]
tasks = schema.tasks
my_task = Task(name="my_task", definition="select 1", schedule=timedelta(hours=1))
tasks.create(my_task)

Use the TaskResource object for operations like executing, suspending, resuming, and deleting tasks.

The following code shows an example of immediately executing, suspending, resuming, and deleting an existing task named “my_task”.

from snowflake.core.task import Task

tasks = root.databases["my_db"].schemas["my_schema"].tasks
task_res = tasks['my_task']

task_res.execute()
task_res.suspend()
task_res.resume()
task_res.delete()

Creating a Task to Periodically Execute a Local Python Function

This is where the Snowflake Python API really shines. You can create a task that executes a Python function as a stored procedure.
The following code shows an example of creating a task named “my_task2” that periodically executes a local function (automatically uploaded as a new stored procedure by Snowpark) represented by a StoredProcedureCall object. Note that if you don’t specify is_permanent=True, the stored procedure will disappear when the task is executed.

from snowflake.core.task import StoredProcedureCall, Task
from snowflake.snowpark.functions import sproc


def hello_world(session: Session) -> str:

return "Hello World"


stored_procedure_object = sproc(
func=hello_world,
is_permanent=True,
stage_location="@EXAMPLE_STAGE",
replace=True,
packages=["snowflake-snowpark-python"],
external_access_integrations=["YOUR_EAI_NAME"],
)

my_task2 = Task(
StoredProcedureCall(stored_procedure_object),
warehouse="test_warehouse",
schedule=Cron("10 * * * *", "Asia/Tokyo"),
)

tasks = root.databases["my_db"].schemas["my_schema"].tasks
tasks.create(my_task2)

Creating a Task Graph

Task graphs allow you to combine multiple tasks and define their dependencies. A task graph refers to the root task that is executed first and the additional tasks that are executed afterward.
The following code shows an example of creating a DAGTask object named “dag_task” where task1 executes select 1 every 10 minutes, and when task1 finishes, task2 executes select 2.

schema = root.databases["my_db"].schemas["my_schema"]
dag = DAG(
name="dag_task",
schedule=Cron("*/10 * * * *", "Asia/Tokyo"),
)
with dag:
task1 = DAGTask(
name="task1",
definition="select 1",
warehouse="EXAMPLE_WH",
)
task2 = DAGTask(
name="task2",
definition="select 2",
warehouse="EXAMPLE_WH",
)
task1 >> task2
op = DAGOperation(schema)
op.deploy(dag, mode=CreateMode.or_replace)

Registering Python Functions as DAGs

It is also possible to express the previously mentioned task graph in Python code. In the following code, using External Network Access bound by the sproc function, task1 sends an HTTP request and sets the JSON data of the response as the return value. task2 receives the return value of task1 and parses the JSON data.


from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session
from snowflake.snowpark.functions import sproc
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation, CreateMode
from snowflake.core.task import Cron

schema = root.databases["my_db"].schemas["my_schema"]


def task1_func(session: Session) -> None:
import requests

context = TaskContext(session)
response = requests.get("https://httpbin.org/get")
json_data = response.json()
context.set_return_value(str(json_data))


def task2_func(session: Session) -> None:
import json

context = TaskContext(session)
task1_value: str = context.get_predecessor_return_value("task1")
data = json.loads(task1_value)
# data = {
# "args": {},
# "headers": {
# "Accept": "*/*",
# "Host": "httpbin.org"
# …
# }
# }
stored_procedure_object_1 = sproc(
func=task1_func,
is_permanent=True,
stage_location="@EXAMPLE_STAGE",
replace=True,
packages=["snowflake-snowpark-python", "requests"],
external_access_integrations=["YOUR_EAI_NAME"],
)
stored_procedure_object_2 = sproc(
func=task2_func,
is_permanent=True,
stage_location="@EXAMPLE_STAGE",
replace=True,
packages=["snowflake-snowpark-python"],
)
dag = DAG(
name="dag_task",
schedule=Cron("*/10 * * * *", "Asia/Tokyo"),
use_func_return_value=True,
)
with dag:
task1 = DAGTask(
name="task1",
definition=StoredProcedureCall(stored_procedure_object_1),
warehouse="EXAMPLE_WH",
)
task2 = DAGTask(
name="task2",
definition=StoredProcedureCall(stored_procedure_object_2),
warehouse="EXAMPLE_WH",
)
task1 >> task2
op = DAGOperation(schema)
op.deploy(dag, mode=CreateMode.or_replace)

Use a Task Config

There is a useful feature for when you want to change the behavior of a stored procedure without recreating it. For example, if you want to create a DAG that only changes the query parameters, you can pass a dictionary to the config parameter of the DAG.
You can also implement this as an argument to the stored procedure, but when multiple tasks reference the same parameters, it becomes cumbersome to pass the arguments repeatedly. Moreover, sensitive arguments may be left in the execution history of the stored procedure, so it is better to prioritize the config parameter.
The following task shows an example of changing the query parameters to access based on the config passed to the DAG.

def task1_func(session: Session) -> None:
import requests
context = TaskContext(session)
# Receive the config
config = context.get_task_graph_config()
response = requests.get(f"https://httpbin.org/get?query={config['query_params']}")
json_data = response.json()
context.set_return_value(str(json_data))

dag = DAG(
name='dag_task',
schedule=Cron("10 * * * *", "Asia/Tokyo"),
# Added
config={"query_params":"value"}
)
with dag:
task1 = DAGTask(
name="task1",
definition=StoredProcedureCall(stored_procedure_object_1),
warehouse="EXAMPLE_WH",
)

Conclusion

By using the Snowflake Python API, it becomes possible to specify task dependencies using task graphs and register Python functions as tasks, making it easy to create complex workflows.
Previously, to manage Snowflake tasks from Python, it was necessary to repeatedly call session.sql("CREATE TASK")… from Snowpark. However, with the introduction of this API, it seems that tasks defined using Python on external tools like Airflow can be easily migrated to Snowflake tasks.

References

Snowflake Python API Docs: https://docs.snowflake.com/en/developer-guide/snowflake-python-api/snowflake-python-overview

--

--

Maeda Kentaro
Snowflake Engineering

RAKUDEJI inc. CEO | SnowPro Advanced : Data Engineer❄️