Snowflake Python APIs — Python Everywhere!

Anurag Gupta Avani Chandrasekaran Yijun Xie

Python plays a crucial role in our Snowflake strategy for App, Data Engineering & ML. It is one of the most widely used languages by data engineers, data scientists, and machine learning professionals to build pipelines and applications on Snowflake. The popularity of Python with Snowflake has been steadily increasing, and Python Connector has millions of downloads each month validating it. To further expand our support for the Python ecosystem across Snowflake and provide first-class Python APIs, we are building unified Python support across all Snowflake resources & workloads, providing great developer experience for Python developers.

Snowflake Python API is the unified Python API across all Snowflake workloads providing APIs for all Snowflake resources across Data Engineering, Snowpark, ML, and App workloads.

Today, on behalf of the Developer Platform team, we’re excited to announce Snowflake Python APIs is officially released as Private Preview for our customers with support for Tasks, DAG, and Snowpark container services resources.

Features

  • Task/DAG APIs

Using the Snowflake Python APIs, Developers will be able to quickly create Tasks, resume/suspend tasks or create a DAG with a series of tasks composed of a single root task and additional tasks, organized by their dependencies.

Demo of Data pipeline automation using Tasks/DAGs .

  • Snowpark Container Management APIs

Developers can quickly create compute pools , manage them, and create services using the container images via the new Python APIs .

Demo of Snowpark Containers Python APIs with creating a new compute pool , creating the new service, starting it, and checking it running.

How to use Snowflake Python APIs?

Authentication

To use the Snowflake Python API, you can re-use a connection of the Snowflake Python Connector, or a Snowpark Session. In the following example code, a connection is used.

CONNECTION_PARAMETERS = {
"account": os.environ["snowflake_account_demo"],
"user": os.environ["snowflake_user_demo"],
"password": os.environ["snowflake_password_demo"],
"database": test_database,
"warehouse": test_warehouse,
"schema": test_schema,
}
connection = connect(**CONNECTION_PARAMETERS)
root = Root(connection)

​​Manage Tasks

  • Use Task APIs
tasks = root.databases["MYDB"].schemas["MYSCHEMA"].tasks
# Create a serverless task that uses a SQL statement.
task1_def = Task(
"MYTASK1",
definition="MERGE INTO target USING source_stream on target.k = source_stream.k WHEN MATCHED THEN UPDATE SET target.v = source_stream.v",
schedule=timedelta(minutes=1)
)
task1 = tasks.create(task1_def)
def dosomething(session: Session) -> str:
df = session.table("target")
df.group_by("a").agg(sum_("b")).save_as_table("agg_table")
return "Success"
# create a user-managed task from a Python function.
task2_def = Task(
"MYTASK2",
definition=StoredProcedureCall(dosomething, stage_location="@mystage", packages=["snowflake-snowpark-python"]),
warehouse="test_warehouse"
)
task2_def.predecessors = ["MYDB.MYSCHEMA.MYTASK1"]
task2 = tasks.create(task2_def)
# resume the tasks
task1.resume()
task2.resume()
# suspend them
task1.suspend()
task2.suspend()
# delete them
task2.delete()
task1.delete()
  • Use the DAG APIs
dag = DAG("my_dag", schedule=timedelta(days=1))
with dag:
dag_task1 = DAGTask("dagtask1", "MERGE INTO target USING source_stream WHEN MATCHED THEN UPDATE SET target.v = source_stream.v")
dag_task2 = DAGTask(
StoredProcedureCall(
dosomething, stage_location="@mystage",
packages=["snowflake-snowpark-python"]
),
warehouse="test_warehouse")
)
dag_task2 >> dag_task1 # task1 is a predecessor of task2
schema = root.databases["MYDB"].schemas["MYSCHEMA"]
dag_op = DAGOperation(schema)
dag_op.deploy(dag)

Managing compute pools

new_pool_def = ComputePool(
name="MYCOMPUTEPOOL",
instance_family="STANDARD_1",
min_nodes=1,
max_nodes=1,
)
new_pool = root.compute_pools.create(new_pool_def)
cp_snapshot = new_pool.fetch()
cp_data = root.compute_pools.iter(like="%COMPUTEPOOL")
new_pool.suspend()
new_pool.resume()
new_pool.stop_all_services()
new_pool.delete()

Managing container services

new_service_def = Service(
name="MYSERVICE",
compute_pool="MYCOMPUTEPOOL",
spec="@~/myservice_spec.yml",
min_instances=1,
max_instances=1,
)
services = root.databases["MYDB"].schemas["MYSCHEMA"].services
myservice = services.create(new_service_def)
myservice_snapshot = myservice.fetch()
service_data = services.iter(like="%SERVICE")
myservice.suspend()
myservice.resume()
service_status = myservice.get_service_status()
logs = myservice.get_service_logs()
myservice.delete()

What’s Next

The PrPr release will have APIs for Task, DAG, Compute Pool, Image Repository, and Service. We’ll continue to invest in the following areas:

  1. Expand the Python API coverage to all Snowflake objects such as Warehouse, Database, Schema, Table, Dynamic Table, Stage, etc. The goal is that Python developers can use first-class Python API instead of SQL to manage all their Snowflake resources.
  2. Streamline Snowflake’s multiple Python distribution packages. You will be able to easily install, authenticate and use these distribution packages via one single command Pip install snowflake
  3. Integrate with the upcoming new Snowflake REST APIs

Get started today!

Sign up for PrPr using our intake form, and get access to the new Python APIs. We value your feedback, which plays a vital role in shaping the future of Snowflake Python APIs.Please do share your suggestions, and feature requests at developers@snowflake.com

Happy coding, and thank you for being an integral part of our thriving Snowflake developer community!

--

--