Uncovering the new Snowflake UDAFs with Apache DataSketches

Snowflake now supports creating your own user-defined aggregate functions (UDAFs) in Python. Let’s discover them by implementing Apache DataSketches HLL approximate counts within the new Snowflake UDAFs.

Image created with AI

What’s Apache DataSketches?

From the Apache DataSketches homepage:

In the analysis of big data there are often problem queries that don’t scale because they require huge compute resources and time to generate exact results. Examples include count distinct, quantiles, most-frequent items, joins, matrix computations, and graph analysis.

If approximate results are acceptable, there is a class of specialized algorithms, called streaming algorithms, or sketches that can produce results orders-of magnitude faster and with mathematically proven error bounds. For interactive queries there may not be other viable alternatives, and in the case of real-time analysis, sketches are the only known solution.

Snowflake has its own approximate aggregation function (HLL, APPROXIMATE_JACCARD_INDEX, APPROX_TOP_K, APPROX_PERCENTILE). These functions perform much faster than the UDAFs we are going to implement in this post — however this exercise will be interesting for cases were we need compatibility with systems outside Snowflake. This is also a good way to discover the implementation and design decisions behind the Snowflake UDAFs, so let’s get started.

How much faster are the native Snowflake approximate functions?

I started with an exact distinct count of the customers on TPCH_SF10. This took 6.6s on a Small-wh:

select count(distinct c_name)
from snowflake_sample_data.tpch_sf1000.customer
group by 'x'
-- 150000000
-- 6.6s S

Meanwhile Snowflake’s native HLL implementation can get a similar result in 0.9s:

select approx_count_distinct(c_name)
from snowflake_sample_data.tpch_sf1000.customer
group by 'x'
-- 148133819
-- 0.9s S
;

That’s pretty good. Then my implementation of Apache DataSketches HLL inside a Snowflake UDAF takes 36s to do something similar:

select apache_sketches_hll(c_name)
from snowflake_sample_data.tpch_sf1000.customer
group by 'x'
-- 152248026.5622
-- 36s S-wh
;

That’s not spectacular, but still gets us the compatibility we might need with other systems. I also checked the previously available Snowflake Python UDTFs (table functions), which took 51s:

select apache_sketches_hll_udtf_sketch_union(
array_agg(b.sketch)
)
from snowflake_sample_data.tpch_sf1000.customer a
, table(apache_sketches_hll_udtf_sketch(c_name) )b
-- 152248026.5622
-- 51s S
;

That shows that the Python UDAFs are not only easier to use due to their more traditional syntax in SQL, they are also faster thanks to better parallelization.

Measuring the performance of approx count distinct over 150 million rows on a Snowflake Small-wh

Let’s now go deeper into how these were implemented.

Apache DataSketches HLL in a Python UDAF

This is how to implement Apache DataSketches’ HLL approximate count distinct in a Python UDAF:

create or replace aggregate function apache_sketches_hll(a string)
returns float
language python
packages = ('datasketches')
runtime_version=3.11
handler = 'X'
as $$

from datasketches import hll_sketch, hll_union, tgt_hll_type

class x:
def __init__(self):
self._sketch = hll_sketch(12)

@property
def aggregate_state(self):
return self._sketch.serialize_compact()

def accumulate(self, input_value):
self._sketch.update(input_value)

def merge(self, other_partial_sum):
union = hll_union(12)
union.update(self._sketch)
union.update(hll_sketch.deserialize(other_partial_sum))
self._sketch = union.get_result()

def finish(self):
return self._sketch.get_estimate()
$$;

As seen above, using it is trivial:

select apache_sketches_hll(c_name)
from snowflake_sample_data.tpch_sf1000.customer
group by 'x'
-- 152248026.5622
-- 36s S-wh
;

Interesting things to notice:

  • Anaconda already provides datasketches in Snowflake, so getting the required libraries only took requesting them in the UDAF definiton.
  • To create a UDAF we need to provide a class with 5 methods: __init__(), aggregate_state(), accumulate(), merge(), finish().
  • __init__() takes care of initializing an empty sketch.
  • accumulate() looks at each new row of data and adds it to our existing sketch.
  • aggregate_state() returns the current state of our sketch.
  • merge() is the key that allows Snowflake to parallelize the aggregation through multiple threads — each thread returns partial results and merge() takes care of combining them.
  • finish() returns the final result.

I’m happy to see that the UDAFs are faster than the previously available UDTFs. We’ll implement HLL in a UDTF further down, to confirm this performance gain.

From the Snowflake docs

Performance review

This statistics screenshot comes from a UDAF run that took 46s in a S-wh:

Statistics of running the UDAF over 150,000,000 rows on a S-wh

In the above screenshot we can see:

  • 667 partitions were scanned.
  • __init__() was called 17 times, which indicates the parallelism of processing those 667 partitions.
  • accumulate() was called 150,000,000 times (once per each row), taking in total 406 seconds. The whole query ran in less time thanks to parallelization.
  • aggregate_state() was called 16 times, each time to recover the results of one thread started by __init__() (except the last thread).
  • merge() was called 16 times, bringing the results of aggregate_state() into another thread.
  • finish() was called once, to return the final result.

The same, but on an XL-wh:

Statistics of running the UDAF over 150,000,000 rows on a XL-wh

In this screenshot we can see that the 667 partitions were processed by 128 129 threads in parallelel, bringing the processing time down to 9.4s.

Image generated by AI

The UDTF alternative

When we didn’t have UDAFs, the way to implement this was with a UDTF. I already explored a similar case with Java and BigQuery’s HLL++ — but now let’s check the implementation in a Python UDTF of Apache DataSketches HLL:

create or replace function apache_sketches_hll_udtf(input_value string)
returns table (total float)
language python
packages = ('datasketches')
runtime_version=3.11
handler='X'
as $$
from datasketches import hll_sketch, hll_union, tgt_hll_type

class X:
def __init__(self):
self._sketch = hll_sketch(12)

def process(self, input_value):
self._sketch.update(input_value)

def end_partition(self):
yield (self._sketch.get_estimate(), )
$$;

You can see that the code above is more concise than the UDAF definition. However the SQL query using it is harder to write and understand:

select b.*
from snowflake_sample_data.tpch_sf1000.customer a
, table(apache_sketches_hll_udtf(c_name))b
-- 50s S
;

Now only the query is harder to understand, the results don’t match what we need:

We get 16 different results, instead of the total aggregate

This because the UDTF does its own partitioning (when we don’t define it) to parallelize the query and make it faster. If we try un-parallelizing it then the query takes 12 times longer in the same Small-wh:

select b.*
from snowflake_sample_data.tpch_sf1000.customer a
, table(apache_sketches_hll_udtf(c_name) over(partition by 1))b
-- 617s S-wh
;

That’s not good. What we really need is to get sketches for each partition, and then aggregate them:

create or replace function apache_sketches_hll_udtf_sketch(input_value string)
returns table (sketch string)
language python
packages = ('datasketches')
runtime_version=3.11
handler='X'
as $$
from datasketches import hll_sketch, hll_union, tgt_hll_type
import base64

class X:
def __init__(self):
self._sketch = hll_sketch(12)

def process(self, input_value):
self._sketch.update(input_value)

def end_partition(self):
yield (base64.b64encode(self._sketch.serialize_compact()).decode('utf-8'), )
$$;

create or replace function apache_sketches_hll_udtf_sketch_union(input_value array)
returns float
language python
packages = ('datasketches')
runtime_version=3.11
handler='x'
as $$
from datasketches import hll_sketch, hll_union, tgt_hll_type
import base64

def x(arr):
union = hll_union(12)
for sketch in arr:
union.update(hll_sketch.deserialize(base64.b64decode(sketch)))
return union.get_estimate()
$$;

That’s how we get to a query that looks like this:

select apache_sketches_hll_udtf_sketch_union(
array_agg(b.sketch)
)
from snowflake_sample_data.tpch_sf1000.customer a
, table(apache_sketches_hll_udtf_sketch(c_name) )b
-- 152248026.5622
-- 51s S
;

The lesson here is that it’s good to migrate our UDTFs to UDAFs (as long as they are written in Python — Java UDAFs are not yet available). However there’s an important step to consider in this example: What if we want to merge sketch results within UDAFs?

Image generated by AI

Storing and merging sketches with UDAFs

Why would anyone use Apache DataSketches within a Snowflake Python UDAF is they are slower than the native HLL approximate counts?

Well, the beauty of these sketches comes when achieving compatibility within multiple systems. Currently Apache DataSketches is available in Java, C++, and Python (with the community creating bindings in other languages, like Rust).

It’s good to store partial state sketches in Snowflake, especially if they are coming from other systems. For this to work, we need to split the original HLL UDAF in 3 parts:

  • apache_sketches_hll_accumulate(): Produces a sketch representing the probabilist count of elements seen.
  • apache_sketches_hll_combine(): Combines multiple sketches (which could be coming from apache_sketches_hll_accumulate() in Snowflake, or from an external systems).
  • apache_sketches_hll_estimate(): Transforms a sketch into an estimation.

My code to define these 3:

create or replace aggregate function apache_sketches_hll_accumulate(a string)
returns binary
language python
packages = ('datasketches')
runtime_version=3.11
handler = 'X'
as $$
from datasketches import hll_sketch, hll_union, tgt_hll_type

class X:
def __init__(self):
self._sketch = hll_sketch(12)

@property
def aggregate_state(self):
return self._sketch.serialize_compact()

def accumulate(self, input_value):
self._sketch.update(input_value)

def merge(self, other_partial_sum):
union = hll_union(12)
union.update(self._sketch)
union.update(hll_sketch.deserialize(other_partial_sum))
self._sketch = union.get_result()

def finish(self):
return self._sketch.serialize_compact()
$$;

create or replace aggregate function apache_sketches_hll_combine(a binary)
returns binary
language python
packages = ('datasketches')
runtime_version=3.11
handler = 'X'
as $$
from datasketches import hll_sketch, hll_union, tgt_hll_type

class X:
def __init__(self):
self._union = hll_union(12)

@property
def aggregate_state(self):
return self._union.get_result().serialize_compact()

def accumulate(self, input_value):
self._union.update(hll_sketch.deserialize((input_value)))

def merge(self, other_partial_sum):
self._union.update(hll_sketch.deserialize(other_partial_sum))

def finish(self):
return self._union.get_result().serialize_compact()
$$;


create or replace function apache_sketches_hll_estimate(sketch binary)
returns float
language python
packages = ('datasketches')
runtime_version=3.11
handler = 'x'
as $$
from datasketches import hll_sketch, hll_union, tgt_hll_type

def x(sketch):
return hll_sketch.deserialize((sketch)).get_estimate()
$$;

Note above that when moving sketches from external systems to Snowflake (or out) special care needs to be taken care with the binary sketches, by transforming them to/from base64 or similar.

Using these then becomes easy within SQL:

select apache_sketches_hll_accumulate(c_name) sketch
from snowflake_sample_data.tpch_sf1000.customer
group by left(c_name, 12)
-- (151 sketches)
-- 41s S
;

select apache_sketches_hll_estimate(sketch)
from (
select apache_sketches_hll_combine(sketch) sketch
from table(result_scan(last_query_id(-1)))
)
-- 152248026.56219986
-- 0.9s S-wh
;
Estimating cardinality from multiple stored sketches

When to use Snowflake HLL, vs Apache DataSketches, vs Google ZetaSketch

  • Use Snowflake’s HLL implementation for the fastest results, but only when the whole life-cycle happens within Snowflake. This because Snowflake has not open-sourced its implementation to be used in other systems.
  • Use Apache DataSketches with Snowflake Python UDAFs to achieve compatibility with multiple systems. UDAFs are easy to use within SQL, and have decent performance — while using an Apache project ensures wide industry adoption.
  • Use Google zetasketch (HyperLogLog) to achieve compatibility with BigQuery sketches. Note that they have only open-sourced a Java implementation, and Snowflake only has Python UDAFs — so you’ll need to implement Snowflake Java UDTFs instead — as described in my previous post.

Next steps

Want more?

Try this out with a Snowflake free trial account — you only need an email address to get started.

I’m Felipe Hoffa, Data Cloud Advocate for Snowflake. Thanks for joining me on this adventure. You can follow me on Threads and LinkedIn (while increasingly less on Twitter). And subscribe to reddit.com/r/snowflake for the most interesting Snowflake news.

--

--

Felipe Hoffa
Snowflake Builders Blog: Data Engineers, App Developers, AI/ML, & Data Science

Data Cloud Advocate at Snowflake ❄️. Originally from Chile, now in San Francisco and around the world. Previously at Google. Let’s talk data.