Using pandas’ sort_values with dask can result in tears
For those who have been working with pandas and finally looking to go beyond there local machine’s memory limit with parallelism (ex. Spark) and haven’t heard of dask, I suggest that you give it a try. Setting up is a lot more easier than PySpark and the developers have tried to make the transition between pandas and dask as easy as possible. However, there will be issues (and tears) if you over assume that whatever works with pandas will work with dask.
Although dasks.dataframe itself copies most of the pandas API, the architecture supporting the two is completely different. According to the documents :
A Dask DataFrame is a large parallel dataframe composed of many smaller Pandas dataframes, split along the index.
What happened to me is that I needed sorting. Sorting is actually a very expensive problem for parallelism because all data needs to be held within memory for sorting to be possible. This is the reason why pandas’sort_values is not supported in dask and sorting can only be done via set_index . The problem was that I didn’t know.
If you happen to work with a session-based dataset, you might be temped to set your session identifier as index then utilizes groupby-and-apply to sort your data like so:
def sortgroup_bytime(a_df):
a_df = a_df.sort_values(by='timestamp') ##problematic lines
#other operations
return a_dfdask_df = dask_df.set_index('session_id')
sorted_groups = dask_df.groupby('session_id').apply(sortgroup_bytime, meta=meta_data)
Dask is actually lazy (ie. it stacks and postpones the computations until you really need them.), so operations like groupby-and-apply will be first computed on the dask dataframe and not the actual pandas dataframe. The code above, while still run, will return a dataframe where the index and the specfied column are sorted but the rows in other columns randomly shuffled, completely renders the data useless.
To avoid such problem, I advise you to adopt the pattern of setting index which affects the ordering of your data (ex. time) first then groupby-and-apply later to avoid the deathtrap:
def analyze(a_df):
#extract useful metrics/features from your data
return a_dfdask_df = dask_df.set_index('time')
sorted_groups = dask_df.groupby('session_id').apply(analyze, meta=meta_data)
