Simple Tags in Snowflake Snowpark for Python

In In the old days, when I was building Hadoop-based legacy Spark pipelines for my clients, one of the biggest challenges was understanding, debugging, and optimisation of the executed code. I still remember one holiday in Dresden, when after sightseeing, my family had retired to bed just after a delicious Eierscheke, and I was burning the midnight oil debugging complex transformations in legacy Spark. Of course, these transformations were working perfectly fine on Pre-Production, but for some reason the legacy Spark was creating wrong results on the Production environment. The overall code was extremely complex with multiple transformations implemented in multiple inter-dependent PySpark classes, deployed in Yarn, and orchestrated using Oozie. Who still remembers those tools, eh?

Photo by Angèle Kamp on Unsplash

So you can imagine the task that I was facing: my family waiting for me to finish the work, and me digging through millions of lines of java and python logs — the log files were 80–90 GB big (!) — trying to understand which component was misbehaving. I actually needed an additional tool, like Apache Solr, to effectively analyse that massive log. But I didn’t have it available on the platform. I could have also written another legacy Spark Log Analyser running on Hadoop and Yarn, but it would be legacy-code-on-legacy-code kind of an inception that I really wanted to avoid. Not to mention the lack of time (I was on vacation, remember?) and lack of motivation to write another line of legacy Spark code.

Photo by Szabo Viktor on Unsplash

So, what did I do? I used the good ol’ *nix toolset that I had handy. And I was diggin’, diggin’, and diggin’, to find the gem of wisdom. And I did manage to find it after investing a few evenings of my vacation time, facing a multitude of looks of disappointment from my family, and a sea of tears of frustration. The details of which, are a topic for another day.

Enter Snowflake

Since Snowflake appeared, such problems are now a part of history. More and more clients are abandoning legacy Spark-based tool stacks and migrating to Snowpark, based on the most modern architecture — Snowflake.

Snowpark, mind you, not only improves performance and reduces time to pipeline results but also, and more importantly, improves cost effectiveness of those pipelines.

I have seen pipelines migrated from legacy Spark-based systems to modern Snowpark that delivers the same business results, but at 1/10th of the costs and 8 times faster. Even if you raise concerns on migration times, one can easily make use of accelerators such as SnowConvert to significantly reduce the same.

Additionally, there is one special feature of Snowflake and Snowpark that — when used — would allow me to avoid the total loss of time when debugging gigabytes of log files generated by legacy Spark to find the broken query. This feature is Snowflake Query Tags.

I am not going to explain what the Query Tags are in detail and how to set them up for SQL queries. For that you can refer to documentation. I am going to show you how easy it is to implement a simple but effective strategy of tagging when using Snowpark for python API.

Let us dig into some code.

Spherical cow in vacuum

To present the approach, let us use a simplified situation, where code is relatively small and easy to understand.

Spherical cow in vacuum, source: Wikipedia

Imagine, that you have a simple procedure that copies given amount of rows from one table to another table, dropping the target table first, like below:

CREATE OR REPLACE PROCEDURE COPY_TO_ANOTHER_TABLE(
from_table STRING,
to_table STRING,
count INT)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'run'
EXECUTE AS CALLER
AS
$$

# entry function
def run(session, from_table, to_table, count):

def do_it(from_table, to_table, count):
session.sql(f'DROP TABLE IF EXISTS {to_table}').collect()
session.table(from_table).limit(count).write.save_as_table(to_table)

return "SUCCESS"

return do_it(from_table, to_table, count)

$$;

There are 2 queries in that function:

  • the first one uses SQL through the Snowpark API to drop the target table to_table, if it exists.
  • the second one is again using Snowpark API (DSL if you will) to count the number of rows in table from_table and persist the results in another table called to_table.

Note, that from_table, to_table and count are parameters passed by the COPY_TO_ANOTHER_TABLE procedure.

It is easy to run this procedure (I have skipped the connectivity steps. Please refer to great Snowflake’s documentation on how to connect) to get 100 rows from table_a in public schema of geo database to table_b in the same database and schema:

call MYPROC_FOUR('geo.public.table_a', 'geo.public.table_b', 100);

If everything goes well you will have a new table table_b with 100 rows created in geo.public (you can use any existing table, and if you don’t have one handy you can use a table from SNOWFLAKE database).

Now, let us check history of the queries by running the following code in Snowsight UI:

select user_name,
role_name,
warehouse_name,
query_tag,
query_text,
start_time,
credits_used_cloud_services
from table(information_schema.query_history())
where query_type in ('SELECT', 'DROP', 'CREATE_TABLE_AS_SELECT')
order by start_time DESC;

Results of the query are below:

Results of the query presenting in the first two lines logic executed by the stored procedure. Image by Author.

Note, that there is a tag showing position in code where the DROP statement was executed. It has no relation to our workbook in Snowsight, however, as the line number does not correspond to what Snowflake executed from a temporal location. There is no tag for create table as select.

Since it is just 2 queries (drop table and create table as select), it is relatively easy to track them. Now, imagine you have 42 statements in your stored procedure. How will you identify statements executed from your Stored Proc among the hundred/thousand other Stored Procs running at any given time in a Production scenario?

Simple tagging in Snowpark

Now, let us move from a simplified (spherical-cow-like) procedure to an example that can be implemented in your project right away.

Photo by Noah Näf on Unsplash

What would you say if queries were tagged by the name of the function that called them?

# tag_name can be parametrized. We will see it later.  
@Tag(session=session, tag_name=f"do_it")
def do_it(from_table, to_table, count):
session.sql(f'''DROP TABLE IF EXISTS {to_table}''').collect()
session.table(from_table).limit(count).write.save_as_table(to_table)

return f"SUCCESS {session.query_tag}"

Or if tagging separate queries within each function was as simple as below?

def do_it_too(from_table, to_table, count):

# only queries from this context will be tagged
with Tag(session, f"drop_in_do_it_too"):
session.sql(f'''DROP TABLE IF EXISTS {to_table}''').collect()

# queries from this context will use different tag
with Tag(session, f"ctas_in_do_it_too"):
session.table(from_table).limit(count).write.save_as_table(to_table)

return f"SUCCESS {session.query_tag}"

Or even a mixture of the above was possible?

@Tag(session=session, tag_name=f"do_it_again")
def do_it_again(from_table, to_table, count):

# will be logged with default tag, which is do_it_again
session.sql(f'''DROP TABLE IF EXISTS {to_table}''').collect()

# will be logged with context tag
with Tag(session, f"cras_in_do_it_again"):
session.table(from_table).limit(count).write.save_as_table(to_table)

# will be logged with the default tag, which is do_it_again
no_of = session.sql(f'''SELECT COUNT(*) FROM {to_table}''').collect()[0]

return f"SUCCESS {no_of}"

If you are a seasoned pythonista, data engineer or data scientist, you know what I am using here: a decorator and a context manager.

The code is pretty simple, if you use contextlib’s ContextDecorator, like below:

from contextlib import ContextDecorator
class Tag(ContextDecorator):

def __init__(self, session, tag_name, label=None):
self.old_tag = None
self.session = session
self.tag_name = tag_name
self.label = None

def __call__(self, func):
if self.label is None: # Label was not provided
self.label = func.__name__ # Use function's name.
return super().__call__(func)

def __enter__(self):
self.old_tag = self.session.query_tag
if self.label:
self.session.query_tag=f"{self.tag_name}_{self.label}"
else:
self.session.query_tag=self.tag_name
return self

def __exit__(self, *exc):
self.session.query_tag=self.old_tag
return False

The code should be included in all of your stored procedures, and can be either pasted directly in the body of the procedure, or used as a library, i.e. uploaded from stage.

Here’s a full example when using it in the body of a stored procedure:

CREATE OR REPLACE PROCEDURE COPY_TO_ANOTHER_TABLE(
tag_suffix STRING,
from_table STRING,
to_table STRING,
count INT)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'run'
EXECUTE AS CALLER
AS
$$
from contextlib import ContextDecorator

# create this once, can be as external .py file
class Tag(ContextDecorator):

def __init__(self, session, tag_name, label=None):
self.old_tag = None
self.session = session
self.tag_name = tag_name
self.label = None

def __call__(self, func):
if self.label is None: # Label was not provided
self.label = func.__name__ # Use function's name.
return super().__call__(func)

def __enter__(self):
self.old_tag = self.session.query_tag
if self.label:
self.session.query_tag=f"{self.tag_name}_{self.label}"
else:
self.session.query_tag=self.tag_name
return self

def __exit__(self, *exc):
self.session.query_tag=self.old_tag
return False


# entry function
def run(session, tag_suffix, from_table, to_table, count):

# suffix, to easier distinguish between runs. Can be UUID, or anything else
# here it is passed as parameter of the stored proc
SUF=f"_{tag_suffix}"

# tag all the statements in this function with the same tag
@Tag(session=session, tag_name=f"D-ALL{SUF}")
def do_it(from_table, to_table, count):
session.sql(f'DROP TABLE IF EXISTS {to_table}').collect()
session.table(from_table).limit(count).write.save_as_table(to_table)

return f"SUCCESS {session.query_tag}"


# this function will not tag queries automatically
def do_it_too(from_table, to_table, count):

# only queries from this context will be tagged
with Tag(session, f"D-DROP{SUF}"):
session.sql(f'DROP TABLE IF EXISTS {to_table}').collect()

# queries from this context will use different tag
with Tag(session, f"D-CREATE{SUF}"):
session.table(from_table).limit(count).write.save_as_table(to_table)

return f"SUCCESS {session.query_tag}"


# lets tag all queries with the default tag
# and use a context tag if needed
@Tag(session=session, tag_name=f"D-AGAIN{SUF}")
def do_it_again(from_table, to_table, count):

# will be logged with default tag
session.sql(f'DROP TABLE IF EXISTS {to_table}').collect()

# will be logged with context tag
with Tag(session, f"D-CREATE_AGAIN{SUF}"):
session.table(from_table).limit(count).write.save_as_table(to_table)

# will be logged with default tag
no_of = session.sql(f'SELECT COUNT(*) FROM {to_table}').collect()[0]

return f"SUCCESS {no_of}"

x = do_it(from_table, to_table, count)
y = do_it_too(from_table, to_table, count)
z = do_it_again(from_table, to_table, count)
return f"{x}, {y}, {z}"

$$;

And calling it:

call COPY_TO_ANOTHER_TABLE(
'my_suffix',
'geo.public.table_a',
'geo.public.table_b',
100);

Note, that the signature of the stored procedure has changed. I have added a parameter — tag_suffx, which allows to add a custom (dynamic) suffix to your tags.

Extracting information from information_schema.query_history() leads to the following result:

Tags were logged together with the queries. Now it is very simple to track. Note, that cost per query is also logged. Image by Author.

This functionality enables better tracking of the queries, as an orchestrator can set value of the parameter depending on who, where, when etc. is calling it. Such parametrisation allows to even track cost of each query, and enables charge back per use, i.e. an IT department can charge units, divisions or persons that are running those queries. Or at least to better understand the usage patterns of their business clients, if you know what I mean ;)

In conclusion: Why should we tag with a decorator and context manager in Snowpark?

  • Tag in Snowpark for development speed. It is much easier to debug and fine tune your code.
  • Tag in Snowpark for traceability. You see all the queries, their execution order and even generated code. It is easy to identify which query was generated by which part of the code
  • Tag in Snowpark for better cost management. When tags are dynamic, and your workflow orchestrator passes parameters to Snowpark code, you can charge back the cost to users.
  • Last, but not least: Tag in Snowpark for a better vacation. It saves time, ensures peace of mind, and costs nothing.

Helpful links

PS.
My family forgave me. But don’t test yours ;)

Note: These are my personal opinions and not of my current employer (Snowflake).

--

--

Bart Wrobel
Snowflake Builders Blog: Data Engineers, App Developers, AI/ML, & Data Science

Architect, Data Engineer, Evangelist and Data Cloud Partner Enabler in EMEA at Snowflake - The Data Cloud. Presenting personal opinions, nothing else.