Python User-Defined Aggregate Functions: now in Public Preview

We are thrilled to announce the public preview of User-Defined Aggregate Functions (UDAFs) in Snowflake! This new feature empowers developers to extend Snowflake’s powerful analytics capabilities by creating custom aggregate functions tailored to their specific data processing needs.

Click here to get started with Python UDAFs!

Overview

User-Defined Aggregate Functions (UDAFs) in Snowflake provide a generic framework for defining and executing custom aggregate functions directly within the Snowflake platform. Leveraging Snowpark, UDAFs enable you to write, compile, and execute custom aggregate functions using Python.

UDAFs extend the functionality of standard aggregate functions by allowing you to define complex, domain-specific calculations that are not supported by built-in aggregate functions. Whether you need to calculate a weighted average, perform advanced statistical analysis, or implement a unique aggregation logic, UDAFs offer the flexibility and customization to meet your data processing requirements.

What our Users are Saying

“UDAFs made a particular workflow that we use for reconciliation between Snowflake tables and Oracle tables fast, IO efficient, and most importantly simple to develop. We wanted to verify equality between snowflake and oracle tables at the field level without having to use a lot of processing power and IO from either database. We leveraged the fact that we expect the rows to be equal in the vast majority of cases by designing a series of steps which condense a large block of rows into a single value, transferring that, and comparing it. The (python) UDAF was instrumental in being able to do this as we could simply create an aggregate that combines all fields into a single string, hashes it to form a row-hash, and then combining all row-hashes and hashing that to form the block-hash. Without the UDAF this requires some custom JAR or some other more complicated solution, and this one was ready in only a few hours.”
— Gavin Sherman, PershingX

Flexibility and Customization

UDAFs allow you to define custom aggregate functions tailored to your specific business logic and data processing needs. This flexibility enables you to perform complex calculations and analyses that are not possible with standard aggregate functions, empowering you to derive deeper insights from your data.

Performance and Scalability

UDAFs leverage Snowflake’s highly scalable and optimized query execution engine, ensuring high performance and efficient processing of large datasets. By executing custom aggregate functions directly within Snowflake, you can take advantage of Snowflake’s distributed architecture and parallel processing capabilities to achieve faster and more efficient data processing.

Seamless Integration with Snowpark

UDAFs are seamlessly integrated with Snowpark, our developer experience, providing a familiar and streamlined development environment for creating and deploying custom aggregate functions. You can use the Python API to implement and deploy your UDAFs, or you can always use inline SQL to create them.

Examples

Here’s a simple example of using Snowpark with Python to create and execute a custom aggregate function with in-line SQL:

CREATE OR REPLACE AGGREGATE FUNCTION python_avg(a INT)
RETURNS FLOAT
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'PythonAvg'
AS $$
from dataclasses import dataclass

@dataclass
class AvgAggState:
count: int
sum: int

class PythonAvg:
def __init__(self):
# This aggregate state is an object data type.
self._agg_state = AvgAggState(0, 0)

@property
def aggregate_state(self):
return self._agg_state

def accumulate(self, input_value):
sum = self._agg_state.sum
count = self._agg_state.count

self._agg_state.sum = sum + input_value
self._agg_state.count = count + 1

def merge(self, other_agg_state):
sum = self._agg_state.sum
count = self._agg_state.count

other_sum = other_agg_state.sum
other_count = other_agg_state.count

self._agg_state.sum = sum + other_sum
self._agg_state.count = count + other_count

def finish(self):
sum = self._agg_state.sum
count = self._agg_state.count
return sum / count
$$;

-- Example data
CREATE OR REPLACE TABLE sales(item STRING, price INT);
INSERT INTO sales VALUES ('car', 10000), ('motorcycle', 5000), ('car', 7500), ('motorcycle', 3500), ('motorcycle', 1500), ('car', 20000);

-- Call UDAF
SELECT python_avg(price) FROM sales;

And this is the same example implementation using the Snowpark Python API:

from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import udaf, col, call_function
from snowflake.snowpark.types import IntegerType

session = Session.builder.configs({...}).create()

@udaf(name="sum_int", replace=True, return_type=IntegerType(), input_types=[IntegerType()])
class PythonSumUDAF:
def __init__(self) -> None:
self._sum = 0

@property
def aggregate_state(self):
return self._sum

def accumulate(self, input_value):
self._sum += input_value

def merge(self, other_sum):
self._sum += other_sum

def finish(self):
return self._sum

# Example data
df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")

# Call UDAF
df.agg(PythonSumUDAF("a")).collect()

# Alternate syntax to call the UDAF by name
df.select(call_function(PythonSumUDAF.name, col("a")).alias("sum_a")).collect()

Try It Out

We invite you to try out User-Defined Aggregate Functions in Snowflake:

--

--