Essential Python for Machine Learning: Dask

The Scalability Enabler

Dagang Wei
6 min readMay 28, 2024

This article is part of my book Essential Python for Machine Learning.

Introduction

Python’s rich ecosystem of machine learning libraries is a treasure trove for data scientists. However, as your datasets grow and model complexity increases, you might hit the limitations of your machine’s memory or processing power. This is where Dask comes in, offering a powerful way to scale your Python workflows.

What is Dask?

Dask is an open-source library designed to parallelize and distribute your Python code seamlessly. It extends familiar Python libraries like NumPy, Pandas, and scikit-learn, allowing you to process datasets that exceed your machine’s memory capacity and accelerate computations.

Why Use Dask for Machine Learning?

  • Scalability: Dask’s ability to distribute computations across multiple cores or even a cluster of machines means you can tackle larger datasets and more complex models without hitting memory limitations.
  • Familiarity: Dask mirrors the APIs of NumPy, Pandas, and scikit-learn. If you’re comfortable with these libraries, adopting Dask is remarkably easy.
  • Performance: By parallelizing tasks, Dask can dramatically speed up machine learning operations like data preprocessing, feature engineering, model training, and hyperparameter tuning.
  • Flexibility: Dask is versatile and can be used for a wide range of machine learning tasks, from simple data cleaning to complex model ensembles.

Dask’s Architecture

Dask operates on a client-scheduler-worker architecture:

  • Client: This is your interface to Dask. You write your code using Dask collections (like Dask DataFrames or Dask Arrays), and the client translates this code into task graphs.
  • Scheduler: The scheduler manages the task graph, deciding which tasks to execute, on which workers, and in what order. It also monitors task progress and handles failures.
  • Workers: These are the processes (or threads) that actually execute the individual tasks. They can run on the same machine as the client and scheduler, or they can be distributed across a cluster of machines.

This architecture provides flexibility and scalability. You can start with a simple single-machine setup and then seamlessly scale to a cluster if needed.

How Does Dask Work?

  1. Task Graphs: Dask breaks down your code into a graph of tasks, each representing a smaller piece of the overall computation.
  2. Scheduling: Dask intelligently schedules these tasks, determining which ones can be run in parallel and managing dependencies between them.
  3. Execution: Tasks are executed on available cores or distributed across a cluster, depending on your setup.
  4. Results: Dask aggregates the results from individual tasks to produce the final output, just as if you had run your code sequentially.

When to Consider Dask

  • Large Datasets: If you’re working with datasets that exceed your machine’s memory, Dask is a game-changer.
  • Computationally Intensive Tasks: Dask’s parallelization can dramatically speed up tasks like hyperparameter tuning or large-scale simulations.
  • Cloud Computing: Dask integrates well with cloud environments, allowing you to scale your machine learning workflows effortlessly.

Key Dask Data Structures and Operations

Dask Array

Dask arrays are parallel NumPy arrays that allow you to work with large arrays that do not fit into memory.

Operations:

  • Creating Dask Arrays: Use `da.from_array()`, `da.ones()`, `da.zeros()`, etc.
  • Computing: Perform computations like NumPy arrays, but call `.compute()` to get the result.
  • Element-wise Operations: Supported operations like addition, subtraction, etc.
  • Reduction Operations: Operations like sum, mean, etc.

Example:

import dask.array as da
# Create a Dask array
x = da.random.random((10000, 10000), chunks=(1000, 1000))
# Perform operations
y = x + x.T - x.mean(axis=0)
# Compute the result
result = y.compute()
print(result)

Dask DataFrame

Dask DataFrames are parallelized Pandas DataFrames suitable for large datasets that don’t fit into memory.

Operations:

  • Creating Dask DataFrames: Use `dd.from_pandas()`, `dd.read_csv()`, etc.
  • GroupBy Operations: Similar to Pandas.
  • Merging and Joining: Similar to Pandas.
  • Aggregation: Operations like sum, mean, etc.
  • Applying Functions: Similar to Pandas.

Example:

import dask.dataframe as dd
import pandas as pd

# Create a Pandas DataFrame
pdf = pd.DataFrame({'x': range(100000), 'y': range(100000)})
# Convert to Dask DataFrame
ddf = dd.from_pandas(pdf, npartitions=10)
# Perform operations
grouped = ddf.groupby('x').y.mean()
# Compute the result
result = grouped.compute()
print(result)

Dask Bag

Dask Bags are like parallel lists and are suitable for processing large collections of Python objects.

Operations:

  • Creating Dask Bags: Use `db.from_sequence()`, `db.read_text()`, etc.
  • Functional Programming: Operations like map, filter, etc.
  • Aggregation: Operations like sum, mean, etc.
  • Flattening and Filtering: Operations to manage nested and filtered data.

Example:

import dask.bag as db

# Create a Dask Bag
data = db.from_sequence(range(10))
# Perform operations
squares = data.map(lambda x: x ** 2)
filtered = squares.filter(lambda x: x > 10)
# Compute the result
result = filtered.compute()
print(result)

Dask Delayed

Dask Delayed allows for building task graphs using normal Python code and delaying the execution until explicitly requested.

Operations:

  • Decorating Functions: Use `dask.delayed` to delay computations.
  • Building Task Graphs: Combine multiple delayed computations.
  • Computing: Use `.compute()` to trigger the actual computation.

Example:

from dask import delayed

# Define functions
@delayed
def add(x, y):
return x + y

@delayed
def multiply(x, y):
return x * y

# Create delayed computations
a = add(1, 2)
b = multiply(a, 10)
# Compute the result
result = b.compute()
print(result)

Examples

Local Cluster

import dask.array as da
from dask.distributed import Client, LocalCluster

# Create a local Dask cluster
cluster = LocalCluster()
client = Client(cluster)

# Information about your local cluster
print(f"Dashboard Link: {client.dashboard_link}")
print(f"Workers: {len(client.scheduler_info()['workers'])}")

rand_array = da.random.random(1000)

da.sum(rand_array).compute()

Output:

Dashboard Link: http://127.0.0.1:43457/status
Workers: 2
504.32328940496313

Dask YARN

import dask.array as da
from dask.distributed import Client
from dask_yarn import YarnCluster

cluster = YarnCluster()
client = Client(cluster)

cluster.adapt() # Dynamically scale Dask resources

rand_array = da.random.random(1000)

da.sum(rand_array).compute()

Word Count

import dask.bag as db

def word_count(text):
return len(text.split())

text_files = ["file1.txt", "file2.txt", ...]

bag = db.read_text(text_files)
counts = bag.map(word_count).sum().compute()

print("Total word count:", counts)

Linear Regression

The code is available in this Colab notebook.

import dask.array as da
import dask.dataframe as dd
import numpy as np
from dask_ml.model_selection import train_test_split
from dask_ml.linear_model import LinearRegression
from dask_ml.metrics import mean_squared_error

# Set random seed for reproducibility
np.random.seed(42)

# Generate data
size = 100000
passenger_count = da.from_array(np.random.randint(1, 6, size), chunks=(size // 4,))
trip_distance = da.from_array(np.random.uniform(0.5, 20, size), chunks=(size // 4,))
fare_amount = 0.50 * (passenger_count - 1) + 1.50 * trip_distance + 2.50
# Add noise
fare_amount = fare_amount + da.random.normal(0, 0.05 * fare_amount)

# Stack Dask arrays into a single 2D array
data = da.stack([passenger_count, trip_distance, fare_amount], axis=1)

# Create Dask DataFrame
df = dd.from_dask_array(
data, columns=["passenger_count", "trip_distance", "fare_amount"]
)

# Split the dataset into training and testing sets
X = df[["passenger_count", "trip_distance"]]
y = df["fare_amount"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

# Convert to Dask arrays
X_train_array = X_train.to_dask_array(lengths=True)
y_train_array = y_train.to_dask_array(lengths=True)
X_test_array = X_test.to_dask_array(lengths=True)
y_test_array = y_test.to_dask_array(lengths=True)

# Create and train the linear regression model
model = LinearRegression()
model.fit(X_train_array, y_train_array)

# Make predictions on the test set
y_pred_array = model.predict(X_test_array)

# Evaluate the model using Mean Squared Error (MSE)
mse = mean_squared_error(y_test_array, y_pred_array)
print(f"Mean Squared Error: {mse:.2f}")

# (Optional) Display model coefficients
print("Coefficients:", model.coef_)
print("Intercept:", model.intercept_)

print("\nTest Cases:")
test_indices = np.random.choice(len(X_test_array), 5, replace=False) # Get 5 random indices
for i in test_indices:
passenger = X_test_array[i, 0].compute()
distance = X_test_array[i, 1].compute()
actual_fare = y_test_array[i].compute()
predicted_fare = y_pred_array[i].compute()
print(f"- Passengers: {passenger}, Distance: {distance:.2f}, Actual Fare: {actual_fare:.2f}, Predicted Fare: {predicted_fare:.2f}")

Output:

Mean Squared Error: 1.10
Coefficients: [0.50430978 1.49927447]
Intercept: 1.983858464857498

Test Cases:
- Passengers: 5.0, Distance: 7.90, Actual Fare: 16.89, Predicted Fare: 16.35
- Passengers: 1.0, Distance: 16.16, Actual Fare: 27.70, Predicted Fare: 26.72
- Passengers: 2.0, Distance: 12.51, Actual Fare: 21.87, Predicted Fare: 21.75
- Passengers: 5.0, Distance: 12.16, Actual Fare: 23.14, Predicted Fare: 22.74
- Passengers: 2.0, Distance: 6.58, Actual Fare: 13.13, Predicted Fare: 12.86

Conclusion

Dask empowers you to scale your Python-based machine learning projects, breaking free from memory constraints and accelerating your computations. If you’re dealing with large datasets or computationally intensive tasks, adding Dask to your toolbox is a wise investment.

References

--

--