#Tech in 5 — Snowflake & Dask
Why Snowflake and Dask could revolutionize data discovery for data engineers and data scientists alike by providing a fast, scalable, purely Python-based stack.
Introduction
This week we are delving into the next item on my tech list: Dask. As a religious pandas user: I ❤ Dataframes. Dataframes make the whole data munging experience quite enjoyable. However, I wanted to use scalable tech to perform some rudimentary analysis on our AWS spend in order to better forecast costs.
How could I do this effectively and efficiently? I could always simply use SQL on Snowflake to do this as the data is stored there; however, I wanted to import my findings into a reusable, reproducible bot that shares this information once per month and tells us how close we are to our budget.
I need to use a programming package that is fast and has pre-built native packages and connects easily to Snowflake. I have used Spark for this kind of work before, and while useful, it isn’t as Data Science friendly as I need it to be. Besides, Spark has been around for a while, and there must be something just around the corner waiting to jump in.
That is when I landed on Dask: A highly distributed, performant data analysis library that mimics the best parts of pandas without the memory overhead limitations.
High-Level Technical Overview
Dask is here and ready to disrupt the big data world. In a nutshell, Dask allows users to parallelize their dataframes to analyze massively large data sets with a Pandas-like API that is similar to what most Data Scientists have come to expect and love. Not only does Dask provide a way for Data Scientists to scale code built with packages they are familiar with (scikit-learn, xgboost, numpy, etc.), but Dask is being co-developed with these standard data science packages to provide a seamless transition between your local machine to a multi-node cluster.
One of the biggest advantages Dask has over Spark for data scientists is its purely Python-based approach — Spark, Scala, and Java are not needed.
There is no need to convert dataframes from Delta/Spark Dataframes into PyArrow dataframes and then be able to run Python data science code. Dask provides a full data science and data engineering stack without the hassle of learning or struggling with other programming languages.
Dask enables data scientists to stick to Python and doesn’t require them to learn the nuances of Spark, Scala, and Java to be able to perform initial analysis.
Dags — Directed Acyclic Graphs
Dask takes advantage of DAGs (Directed Acyclic Graph) for its task scheduler. If you are familiar with Airflow you already know how this works. Not to get too deep into it, DAGs are how Dask sorts its tasks to allow for the most optimal compute path possible. It takes a line of code and then transforms it into an ordered series of instructions that allows for optimal sequencing when running code. In this instance, Dask uses this to optimize code running in parallel on clusters or locally.
This is important as it is, in part, what allows Dask to run more efficiently than other distributed Big Data Tools. Dask builds a DAG that sequences your code and allows for parallel runs of repeatable tasks. Dask also allows you to develop code faster as a data scientist since there is significantly less to learn as you can BYOP (Bring your own Package — sklearn, TensorFlow, XgBoost, insert favorite data science package tool, etc). This is such a cool feature that I personally love.
Code Time — Connecting to Snowflake
In case you missed my last post on Snowflake Object Management in a CI/CD Pipeline, here is another example of how to build a reusable class for your Snowflake adventures. I don’t see this code much online, but it creates a reusable way to connect to a Snowflake environment. Please note that in this situation I am using the authenticator as the environment I am testing from has SSO enabled. In this mode, I do not need to use a password, but I leave that code here in case you need it.
import snowflake.connectorclass Connect: def snow_connect(self, **kwargs): """ :param kwargs: takes credentials from a source
:return: a cursor object to query
""" conn = snowflake.connector.connect(
user = kwargs['user'],
password = kwargs['password'],
account = kwargs['account'],
authenticator = kwargs['authenticator'],
warehouse = kwargs['warehouse'],
database = kwargs['database']
) cursor = conn.cursor()
return cursor
This code makes Snowflake easy to interact with and understand for Python. The object returned here is a cursor object that we can run SQL statements against to get our subset of data. In our example, we are going to query the Hashmap AWS Costs to provide us with some data in order to perform analysis.
C = Connect() # init connection from Connect Class.cursor = C.snow_connect(user="s0m3_Us34",
password='s0m3_P4sS',
account='OuR_4CC0UN7',
authenticator='externalbrowser',
warehouse="s0m3_wH4R3H0U63",
database='s0m3_Db',
schema='AWS_COST_MONITORING')# Do you really think I would D0XX us?sql_stuff = cursor.execute("""SELECT cols(n) FROM AWS_COSTS""")# our query for to get the data from.
Data Discovery
Now that we have our data, we can perform some fundamental data discovery on our dataset. First, let’s see what our data contains from a column perspective. I’m looking for some information on our AWS costs — maybe some anomalies in our prices could be detected.
import pandas as pd
import dask.dataframe as dd# take cursor query result and turn into pandas dataframe.
df = pd.DataFrame.from_records(iter(sql_stuff), columns=[x[0] for x in sql_stuff.description])# convert pandas df to dask df. You need to specify partitions,
# helps with distributing the dataframe
ddf = dd.from_pandas(df, npartitions=3)# See list of columns in your dataframe
for items in ddf.columns:
print(items)... result is a list of Columns
_FILE
_LINE
_MODIFIED
IDENTITY_LINE_ITEM_ID
IDENTITY_TIME_INTERVAL
BILL_INVOICE_ID
BILL_BILLING_ENTITY
BILL_BILL_TYPE
BILL_PAYER_ACCOUNT_ID
BILL_BILLING_PERIOD_START_DATE
BILL_BILLING_PERIOD_END_DATE
LINE_ITEM_USAGE_ACCOUNT_ID... 1000’s of columns not shown.
Ok, now that we know which columns live within this data, it is time to narrow down our search and rerun the SQL. I wanted to show how to perform some basic data discovery techniques for those just starting down this path of data engineering and data science. It is essential to do some rudimentary profiling so that you can source data yourself from a data set. This is the part of the job most people gloss over. I would argue that selecting features is just as important as picking a model.
sql_stuff = cursor.execute("""SELECT BILL_BILLING_PERIOD_START_DATE, BILL_BILLING_PERIOD_END_DATE, LINE_ITEM_UNBLENDED_COST, IDENTITY_LINE_ITEM_ID, PRODUCT_PRODUCT_NAME FROM AWS_COSTS""")
Once we have this subset I am going to save this as a View in our Snowflake service so that I do not have to traverse such a large table. (1 million rows x 300 columns, vs, 1 million rows x 5 columns). Alternatively, if the data were not very large, I would recommend saving the data to a CSV or a Parquet file.
Finally, we can start to model our data. The simplest way I can think of to model the data is by taking a linear approach to start and see where we land from there.
In this list of variables, I care more about the cost as a function of time and whether the cost is getting larger or smaller per item outlined in the data. In this example, we are using a local cluster to load and process the data. In a production environment, we would be using a distributed cluster to manage our workload across the cluster.
Dask Config + Regression Analysis
dask.config.set(scheduler="single-threaded") ## If you had a cluster you could set this to multi-threaded.from dask.distributed import Client, LocalCluster
client = Client(n_workers=1, threads_per_worker=1, processes=False,
memory_limit='25GB', scheduler_port=0,
silence_logs=True, diagnostics_port=0)
client
Now here is the fun part — the model in Dask! Here I am importing a model based on sklearn to provide some insight into our Amazon Web Services Costs. You could also bring your own model here, I am choosing to use the native dask_ml ones.
from sklearn.model_selection import train_test_split
from dask_ml.linear_model import LinearRegressionlr = LinearRegression()x = sql_stuff['PRODUCT_PRODUCT_NAME']
y = sql_stuff['LINE_ITEM_UNBLENDED_COST']X_train, X_test, Y_train, Y_test = train_test_split(x, y, test_size=0.10, random_state=24)lr.fit(X_train, Y_train)
lr.score()#### R^2 value of .03789.... that's pretty awful.lr.coef_lr.predict(X_test, Y_test)#### R^2 value of .04821.... better, but still terrible.
Our R² values clearly indicates our Amazon Costs are not scaling in any sort of linear fashion. This makes sense, considering we have hundreds of employees turning services on and off all the time.
Conclusion
In this example, we used Dask and Snowflake to ingest, compute, and understand our AWS costs a little better. I would test and try more models if I had more time, but I have spent enough time writing this article already. It would be interesting to see if there was more of a regular pattern to the spending here at Hashmap.
Fortunately, there is a managed service for Dask, Saturn Cloud, that might make this interaction with Snowflake even more native. If Saturn takes off and utilizes Dask DAGs to organize the compute warehouse of Snowflake under the hood, I could see a potential new Spark killer in the making. You could even potentially run SnowSQL on Dask, where Dask runs the compute while Snowflake is the storage layer. That would be some genuinely terrifying performance, especially considering a Data Scientist could use all of the ML packages they know and love.
Ready To Accelerate Your Digital Transformation?
At Hashmap, we work with our clients to build better, together.
If you’d like additional assistance in this area, Hashmap offers a range of enablement workshops and consulting service packages as part of our consulting service offerings, and would be glad to work through your specifics in this area.
How does Snowflake compare to other data platforms? Our technical experts have implemented over 250 cloud/data projects in the last 3 years and conducted unbiased, detailed analyses across 34 business and technical dimensions, ranking each cloud data platform.
Other Tools and Content You Might Like
Feel free to share on other channels and be sure and keep up with all new content from Hashmap here.
Kieran Healey is a Cloud and Data Engineer with Hashmap providing Data, Cloud, IoT, and AI/ML solutions and consulting expertise across industries with a group of innovative technologists and domain experts accelerating high-value business outcomes for our customers.