A Distributed Dask Quickstart… that makes Pandas faster!

Russell Jurney
5 min readFeb 9, 2023

--

The Dask Quickstart easy way has never worked for me! Nor has the first file I loaded in an unpartitioned state ever processed quickly. This is then my own attempt to create a working Quickstart for Dask for just about anyone processing just about any data using the package manager you’re probably using: poetry.

Readers of this post may like my other post on Python and Parquet Performance as we cover data partitioning and Parquet in this post. Much more on that is contained in that post

Installing Dask with Poetry

If you aren’t using Poetry to manage packages for your project, please consider it. Read up on project setup, add and removepackages and update if you edit pyproject.toml directly. That’s about it.

Use Poetry’s features when you need them. Move on with your life. Poetry makes life easy!

To install Dask to a project for general purpose use with poetry you will want dask[complete], so edit pyproject.toml by adding the last line to your tool.poetry.dependencies:

[tool.poetry]
name = "graphlet"
version = "0.1.1"

[tool.poetry.dependencies]
...
dask = { version = ">=2023.1.1", extras = ["complete"] }

Then update the project dependencies via poetry update. Then commit your poetry.lock and pyproject.toml files in your project.

Single-Node Dask… the Working Way

The Dask documentation shows a Dask cluster as looking like a Client that works through a Scheduler to access Pandas processes in more than one Worker. That’s a simple way to think about it.

Remember the way those orange workers line up side-by-side. We’re going to come back to it when I talk about partitioning Parquet files below to give one partition to each worker :)

Welcome to the Dask Tutorial: https://tutorial.dask.org/00_overview.html

Some moons 🌝 ago I came up with this start_dask.sh script to make the tutorial work for me in booting a new cluster. It starts the scheduler and as many workers as you have CPU cores and sends their STDOUT and STDERR logs into /tmp/dask.log. Then you can tail -f /tmp/dask.log to see what is going on in your cluster.

No Partitions… No Parallelism

So now you read your single partition Parquet file and you get a big speedup in performance across the 16 cores of an m6i.4xlarge, right? Wrong! Without columnar partitioning, even on a random identifier or via an underlying mechanism like a random partitioner in PySpark, you won’t get parallelism by moving from Pandas to Dask.

Let’s look at what I mean with an example. You’re chugging along in Pandas and you generate a large Parquet file (half of 1GB, in this case, is all it took) without partitioning. Now you want to perform a more expensive operation in Dask across multiple cores, machines or GPUs. After all, this is what it is designed for…

df.to_parquet(
"data/dblp.nodes.parquet",
compression="snappy",
engine="pyarrow",
)

And load it back using Dask…

df = dd.read_parquet("data/dblp.nodes.parquet", engine="pyarrow")

# Run a basic computation, explicit in Dask distributed
df.count().compute()

This htop output is about as much parallelism as you will ever see… not much! Workers tend to run out of RAM and die. Dask is broke.

Despite the machine having 16 cores and 16 workers… which Dask can use… we don’t achieve parallelism in loading 1 Parquet partition :( It takes a long time to compute a simple count and eventually the compute() dies. Dask on one pid sucks 😩

To address this issue, let’s revisit columnar partitioning from our post on Parquet to fix Dask up!

Columnar Partitioning

One way Parquet makes data more efficient is by partitioning data on the unique values within one or more columns. Each unique value in a column-wise partitioning scheme is called a key. Using a format originally defined by Apache Hive, one folder is created for each key, with additional keys stored in sub-folders. This is called columnar partitioning, and it combines with columnar storage and columnar compression to dramatically improve I/O performance when loading part of a dataset corresponding to a partition key.

A Parquet dataset partitioned on gender and country would look like this:

path
└── to
└── table
├── gender=male
│ ├── …
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── …

Each unique value for the columns gender and country gets a folder and sub-folder, respectively. What we need then is a random ID to partition on. This really should be a feature of pandas.DataFrame.to_parquet in my opinion… in order to automatically partition a file above a certain size into N partitions… but it isn’t. Pandas is firmly rooted in a one-threaded world.

For more information, see my original Python and Parquet Performance post.

Random Partitioning a Pandas DataFrame

A couple of posts back, I was using Pandas and needed to generate random IDs for randomly partitioning a Parquet file. Let’s revisit that now that the complete pipeline is working!

# Generate a random numpy.ndarray of integers to create a pandas.Series
df["random_id"] = np.random.randint(low=1, high=16, size=len(df))

# Now write it to a Parquet file with N
df.to_parquet(
"data/dblp.nodes.partitioned.parquet",
compression="snappy",
engine="pyarrow",
partition_cols=["random_id"],
)

Looking at our data, we can see there are 16 partitions in sub-folders of the data/dblp.nodes.partitioned.parquet folder.

ls data/dblp.nodes.partitioned.parquet/

'random_id=1' 'random_id=11' 'random_id=13' 'random_id=15' 'random_id=3' 'random_id=5' 'random_id=7' 'random_id=9'
'random_id=10' 'random_id=12' 'random_id=14' 'random_id=2' 'random_id=4' 'random_id=6' 'random_id=8

Now I want to load that back using Dask and do some more expensive computation.

# Create edge lists using Dask
ddf = dd.read_parquet(
"data/dblp.nodes.partitioned.parquet",
engine="pyarrow"
)

ddf.count().compute()

Here we see htop light up like a Christmas tree! Now Dask is doing useful work.

This always throws me as it feels like PySpark handles this better… but maybe I am wrong. Maybe I am just relying on S3 to do that for me while I work with Dask locally?

Conclusion

Hang in there big data kiddos as we move from a core-centric to GPU centric mindset! Next up I will be using RAPIDS and cuDF to do some more computation. The finale will be Dask-cuDF!

--

--

Russell Jurney

I am into LLMs meet knowledge graphs and GNNs, huge networks, network motifs, graphlets and hypergraphs. Founder of Graphlet AI: Knowledge Graph Factory