Using Dask/Dask Dashboard to play with Covid-19 vaccination data just a little bit
In a prior post, we used Spark and looked at partitioning and shuffling behaviors in brief. Now we’re using Dask to do the similar thing. Again, this is not about data set feature engineering or visualization stuff, we’re trying to look at what’s going on from platform level.
A very short Dask introduction hands on can be found here. Here we’re using Anaconda 202111 distribution where dask is installed.
First start Dask scheduler on node fig and 2 dask workers on node kale and onion:
(conda202111base) [hadoop@tomato1 ~]$ conda list | grep dask
dask 2021.10.0 pyhd3eb1b0_0
dask-core 2021.10.0 pyhd3eb1b0_0[root@fig1 ~]# /shared/Anaconda202111/envs/conda202111base/bin/dask-scheduler
distributed.scheduler - INFO - -----------------------------------------------
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://9.21.104.146:8786
distributed.scheduler - INFO - dashboard at: :8787[root@kale1 ~]# /shared/Anaconda202111/envs/conda202111base/bin/dask-worker tcp://9.21.104.146:8786[root@onion1 ~]# /shared/Anaconda202111/envs/conda202111base/bin/dask-worker tcp://9.21.104.146:8786
Next start JEG on node beet and notebook service on node tomato:
(conda202111base) [hadoop@beet1 ~]$ /shared/Anaconda202111/envs/conda202111base/bin/jupyter-enterprisegateway --ip=0.0.0.0 --log-level=DEBUG > jeg.log 2>&1 &(conda202111base) [hadoop@tomato1 ~]$ jupyter notebook --gateway-url=http://beet1:8888 > /home/hadoop/ipython.log 2>&1 &
Now from Notebook UI we can connect to this Dask cluster: we have 2 works and 8 cores in total.

We’ll read Covid19 vaccine data file downloaded from Kaggle and do a simple “groupby” and “nlargest” like following:
%%time
df = dd.read_csv('file:///shared/data/COVID-19_Vaccinations_in_the_United_States_County.csv',
dtype={'Administered_Dose1_Recip': 'float64',
'Administered_Dose1_Recip_12Plus': 'float64',
'Administered_Dose1_Recip_18Plus': 'float64',
'Administered_Dose1_Recip_65Plus': 'float64',
'FIPS': 'object',
'Series_Complete_12Plus': 'float64'})
group_by = df.groupby('Recip_State')
group_by_sum = group_by.Series_Complete_Yes.sum()
group_by_sum_nlargest = group_by_sum.nlargest(5)%%time
group_by_sum_nlargest.compute()
When running this code, we can see the top 5 states which has most population vaccinated.

What happened under the hood in Dask cluster? A look at Dask dashboard shows:

- In “Task Streams” section, each rectangle/block with different colors is one Dask task. Each horizontal line represents a core/thread that does these tasks over time. Usually “red” color represents the data transfer task which moves data around between Dask workers.
- There are three tasks “read-csv” reading into the data file in parallel. So three data chunks are kept in memory referred by the three tasks.
- After each data chunk is read, there is “series-groupby-sum-chunk” task handling this data chunk for group data by state and sum the number.
- After all the three “series-groupby-sum-chunk” tasks finish, one task in red color called “transfer-series-groupby-sum-agg” is started to transfer “groupby-sum” results of the 2 chunks to the worker where the third chunk result is located.
- On this worker, another task called “series-groupby-sum-agg” does the aggregation to get sum results for all the three chunks.
- Next “series-nlargest-chunk” task started to get nlargest against the chunk data. But there is only 1 chunk, so next task “series-nlargest-agg” will return with the final results to Dask scheduler.
Happy Reading!