Easier spark jobs logging with decorators

During one of my recent tasks at work, I had to figure out what was going on with a spark job we were running daily. Goes without saying, logs showed nothing of interest, and some jobs were never even finishing.

Following one of databricks post about debugging spark jobs, they suggested the use of .count() on dataframes here and there so you could see where your job was having issues. This technique has proven to be super useful in my quest to find the bottleneck.

One thing that didn’t feel right though, was leaving all those .count() in the code, or even adding ifs all over the code. So that's when I thought that a python decorator could be a very suitable alternative for doing this. And so the with_logging decorator was born. The code is simple as hell:

from functools import wraps
import time
import os
import types
DEBUG_ENABLED = 'WITH_LOGGING_ENABLED' in os.environ
def has_count_method(df):
"""Checks for the existance of .count() method"""
return hasattr(df, 'count') and type(getattr(df, 'count')) == types.MethodType
def with_logging(f):
@wraps(f)
def wrapper(*args, **kwargs):
if DEBUG_ENABLED:
start = time.time()
result = f(*args, **kwargs)
# if the result is a spark dataframe (we actually care for .count())
# then execute the count to include that in the timing.
with_count = has_count_method(result)
if with_count:
# if the result is a spark DataFrame, this will cause the spark jobs to be
# executed, which is useful for debugging/troubleshooting, but will
# make the execution of the notebook slower.
count = result.count()
elapsed = time.time() - start
print(f.__name__, 'took', elapsed, 'seconds, and retrieved', count if with_count else 'no', 'records')
return result
else:
return f(*args, **kwargs)
return wrapper

Notice how we can set the logging ON/OFF with an environment variable, and we print the results of calling the count method if the wrapped function returned something that has count in it.

The usage of this decorator is trivial, for example:

@with_logging
def user_id_device_id_mapping(df):
return (
df
.select('device_id', 'user_id')
.where(
df.user_id.isNotNull() & df.device_id.isNotNull()
)
.distinct()
)

Now you can do either:

mapping = user_id_device_id_mapping(data)

Or when you need to troubleshoot your job:

import os
os.environ['WITH_LOGGING_ENABLED'] = 'YES'
mapping = user_id_device_id_mapping(data)

And keep an eye for the progress on the standard output for things like:

user_id_device_id_mapping took 1.5562841892242432 seconds, and retrieved 8339 records

There’s certainly room for improvements, but I think it is a quite easy to set up helper that can bring some value to your troubleshooting sessions.