Amazon Redshift’s UDF

As you have probably heard, Amazon released User Defined Functions for Amazon Redshift and now you can write your own stored procedures in Python. I, myself, was looking forward to it. However, after having conducted a few tests, I have developed mixed feelings towards Amazon Redshift’s UDF feature.


Let’s do some simple tests!

In our data warehouse, we store client identifiers for every event in a character varying column. They can be literally anything and we use the same condition in multiple SQL files when we want to decode them into understandable, human readable names.

This isn’t a good approach — in fact, it’s terribly risky because if we change the condition in only one file, then our dataset will be inconsistent. As all of us working with data know, data inconsistency is never good.

So we decided to try to fix the problem with the use of a UDF. The original condition was the following:

CASE 
WHEN client_id = ‘784cc8a1ebf89f’ THEN ‘Mac’
WHEN client_id = ‘39087b8db19be5’ THEN ‘iOS’
WHEN client_id = ‘905eb7572309c0’ THEN ‘iOS’
WHEN client_id = ‘9f6f660b58b49e’ THEN ‘Android’
WHEN client_id = ‘58e7d0fd643292’ THEN ‘Android’
WHEN client_id = ‘8383ce7c3eb0e5’ THEN ‘WindowsPhone’
WHEN client_id = ‘a2fa2bb0e727b7’ THEN ‘WindowsStore’
WHEN client_id = ‘14eaa2a06ae9e5’ THEN ‘Web’
WHEN client_id = ‘4d6012e4f7620e’ THEN ‘chrome’
WHEN client_id = ‘52f0c56e495f6b’ THEN ‘WindowsWrapper’
WHEN client_id = ‘8074ebdb3d5e9c’ THEN ‘Windows10’
ELSE NULL
END

There are multiple solutions for this problem like:

  1. Create a new table with the values and names and join the tables when it’s necessary. The advantage is we only need to change the client_id/platform pairs in one table. The downside is the size of the tables. We have a lot of tables with more than 2 billion records and joining tables isn’t the quickest thing in Redshift.
  2. Create a User Defined Function and write these conditions in Python. No JOIN necessary.

My assumption was that using UDFs would be slower than the original one, but it would be quicker than joining the tables with LEFT OUTER JOIN.

However, the problem was still there, so I had to count the misclassified records. It was my test case for this problem. I’ve tried the queries on our aggregated events table with 471,895,959 records on SSD.

The first comparison‘s (I’ve removed the condition but it’s the same as above) average execution time of several runs was 3 218.347 ms.

SELECT 
e.platform,
CASE WHEN <...> END AS “valid_platform”,
SUM(e.count) AS “event_count”
FROM real.events_counted_month_2015_07_01 AS e
WHERE CASE WHEN <...> END != e.platform
GROUP BY 1, 2
ORDER BY 3 DESC;

I was really curious about the UDF feature. I created it right after I got the original result.

CREATE FUNCTION f_get_platform_by_client_id ( client_id varchar ) 
RETURNS varchar IMMUTABLE AS $$
if client_id == ‘784cc8a1ebf89f’: return ‘Mac’
elif client_id == ‘39087b8db19be5’: return ‘iOS’
...
else: return None
$$ LANGUAGE plpythonu;

… and executed the long-awaited query.

SELECT 
e.platform,
f_get_platform_by_client_id(e.client_id) AS “valid_platform”,
SUM(e.count) AS “event_count”
FROM real.events_counted_month_2015_07_01 AS e
WHERE f_get_platform_by_client_id(e.client_id) != e.platform
GROUP BY 1, 2
ORDER BY 3 DESC;

The result was surprisingly awful. It finished under ~54 761.971 ms. It’s more than 10 times slower than the original condition.

My third test case was the new const table solution. I created it and inserted the client_id and platform pairs. Finally I joined these tables together to get the counts. The average execution time was 3 414.535 ms.

CREATE TABLE temp.client_platform ( 
client_id VARCHAR(50),
platform VARCHAR(127)
) SORTKEY(client_id);
INSERT INTO temp.client_platform (client_id, platform) VALUES ...;
SELECT 
e.platform,
p.platform AS “valid_platform”,
SUM(e.count) AS “event_count”
FROM real.events_counted_month_2015_07_01 AS e
LEFT OUTER JOIN temp.client_platform AS p ON (
p.client_id = e.client_id )
WHERE p.platform != e.platform
GROUP BY 1, 2
ORDER BY 3 DESC;

Based on the results of the tests, I cannot recommend anyone to use UDF’s for short scripts when it’s possible to do with pure SQL.


Let’s kill the snake.

Of course there are other cases when the UDF support can be a game changer. Until now if you wanted to do something which was not possible within SQL you had to

  • dump your data from Redshift into S3
  • download it into your server or your local machine
  • transform and evaluate the data
  • upload into S3
  • load into Redshift

It was pain in the back to be honest. But after the arrival of UDF support we can do these transformations within Redshift. Heck yeah!

I started to roll out new research to get a quick overview on user’s language usage within Wunderlist. We wanted to determine the language of every list we have. I chose to detect lists because I believed the task could be accomplished within reasonable amount of time. My goal was to spend no more than a day on this research.

We had about 84 million lists and 11 million unique titles at the time.

I saw an interesting and usable Python package in PyPI, called langdetect. This package was ported from Java and doing naive bayes with character n-grams to detect the language possibilities. Maybe it wasn’t the best tool but it was good enough to help with performing the examination.

So I crafted this function:

CREATE OR REPLACE FUNCTION detect_language( title VARCHAR )
RETURNS VARCHAR IMMUTABLE
AS $$
import langdetect
if not title: 
return None
if max(map(len, title.split())) <= 4 and len(title) <= 12:
return None
try:
return langdetect.detect(title.decode(‘utf-8’))
except:
return None
$$ LANGUAGE plpythonu;

It’s kind of a small and stupid script. All that left was to install the required library. I was naive because I executed the following command without hesitation:

CREATE LIBRARY langdetect LANGUAGE plpythonu
FROM ‘https://pypi.python.org/packages/source/l/langdetect/langdetect-1.0.5.zip';

It worked but when I tried to test the function I got the following error message:

ImportError: No module named langdetect.

The directory structure of any python package of PyPi won’t fit into Redshift’s expected UDF structure.

They want to get a package with the following structure:

langdetect.zip
__init__.py
langdetect/
__init__.py
...

… and the average Python package’s directory structure is something like this:

packagename.zip
setup.cfg
setup.py
packagename/
__init__.py
...

Probably you noticed that the problem is the __init__.py file in the root. If you want to use a Python package

  • You have to download it.
  • Create an empty __init__.py file.
  • Compress the content into a zip file.
  • Upload it into S3.
  • Install the library.
  • Recursively repeat it with every dependency the package has.

Plus, you have to be really careful and wary. They said the following:

Also, functions that write files or access the network are not supported. — AWS Official Blog

Only one thing is missing! They don’t let you to read files either. If your Python package want to open any file then the package will fail instantly.

I’ve created another test Python library, called mypkg which does the following:

import os
def get_dirname():
return os.path.dirname(__file__)
def get_file_content():
path = os.path.join(get_dirname(),'otherfile.txt')
fd = open(path, 'r')
content = fd.read()
fd.close()
return content

I created the otherfile.txt too with a few words in it.

The get_dirname() function is working properly and I got a reasonable result:

/rdsdbdata/user_lib/0/0/197285.zip/mypkg

get_file_content() function didn’t work.

Error : ERROR: IOError: (20, ‘Not a directory’, ‘/rdsdbdata/user_lib/0/0/197285.zip/mypkg/otherfile.txt’).

Personally, I think it’s a shame because some Python package uses JSON or pickle files to contain extra information. If your package is like that then you have to modify your package and have to relocate the related data blobs within the Python code itself.

The langdetect package is like this. It contains the languages’ n-gram data in a JSON file per language. I changed the package to work well and removed every file opening when I met a new interesting problem: Unicode support!

I think if you have ever used Python 2.7 before you know exactly how painful it can be to deal with UnicodeDecode errors. Furthermore, I think you also know how problematic Redshift’s Unicode support is. For me it was a strange lesson when I got some mysterious errors but the package performed well in the console.

Error : ERROR: Internal error while processing UDF request

Sadly, I didn’t find the unsupported letter in half of the language profiles. However, it works most of the time.

# -*- coding: utf-8 -*-
def get_unicode_chars():
return u’нет or 沒有’

I was more than happy when I was finally able to create a working library that can detect languages within Redshift. Nevertheless, I was not able to port every language within an hour because of internal server errors. Finally I got a package that can detect 24 languages. (The original package can detect 53 languages).

Let’s see some benchmark comparison between the Redshift’s UDF and the Python code working on a CSV file in Linux.

SELECT l.title, detect_language(l.title) AS “language” 
FROM temp.latest_lists AS l;

The result was interesting and unpredictable. It appears the execution time is directly proportional to the number of records in the dataset. I ran this test on our 18 x dc1.large cluster on SSD.

  • 1,000/1,000 records: 1 377.072 ms
  • 1,000/10,000 records: 5 764.857 ms
  • 10,000/10,000 records: 5 790.040 ms
  • 1,000/100,000 records: 46 050.904 ms
  • 100,000/100,000 records: 46 032.125 ms
  • 1,000/1,000,000 records: 101 904.887 ms
  • 1,000/1,000,000 records with subquery: 11 161.097 ms
  • 1,000,000/1,000,000 records: 440 353.064 ms
  • 1,000/84,000,000 records: 106 628.889 ms
  • 1,000/84,000,000 records with subquery: 10 315.718 ms

That means if you want to test your function you have to create a temporary table or use a subquery with a few records for testing and if it’s working as expected, then you can execute it on your original data.

After several execution of the queries on the same dataset the results did not change drastically. Therefore, I really miss caching on the immutable data.

Let’s see some baseline number to understand the speed above. Running the detection on a local computer (2,7 GHz Intel Core i5) on a single core with Python 3.5 will decode 1,000 records in 7 594 ms and 10,000 records in 73.81 seconds. It contains the time when it’s creating the output CSV file.

I’ve rolled out 100,000 records’ detection with splitting and merging the files together on 4 virtual cores. The execution time was 287.49 seconds.

I believe the performance of Redshift’s UDF sounds good. However if you build a c3.8xlarge computer (32 vCPU) it will finish sooner than Redshift (36 vCPU in our cluster) with almost zero load.


Conclusion

Although, I only checked these use cases, I have mixed feelings. My guess is that Redshift’s UDFs are really good for calculating metrics and doing complex math with the pre-installed packages like numpy, pandas, scipy. Otherwise I don’t recommend it at the moment because of the lack of Python package support and the difficulties to make it work.

We should test it again occasionally because it has the potential to be something really useful for our daily work.

If you know any other use cases that perform well, please let me know. I am really interested in hearing more about this topic.

I’m looking forward to see the following features:

  • better Unicode support and helpful error messages
  • compatibility with PyPI packages and auto-install dependencies
  • increased performance
  • caching of immutable function results

I am a Data & Applied Scientist at Microsoft in Berlin, Germany. Working on Wunderlist’s data infrastructure. You can find me on Twitter, LinkedIn, if you‘d like to connect.