Dask for Python and Machine Learning

Shachi Kaul
Analytics Vidhya
Published in
6 min readJul 31, 2020
Source

RoadMap

Recently I encountered a very interesting Python library called DASK. It is an open-source python library with an exclusive feature of parallelism and scalability. It can either be scaled on a local system or even a cluster of machines. It has created its space in Python related development or in building the ML/AI model.

So, let’s start with it. 🍵

Tea-Talk on Dask

  • Dask is an open-source python library with the features of parallelism and scalability in Python.
  • Included by default in Anaconda distribution.
  • Dask reuses the existing Python libraries such as pandas or numpy and enhances them by incurring scalability. Also, sklearn related ML algorithms are implemented.
  • Dask performed better in data wrangling dealing with huge data in GBs than pandas and live-streaming data just like Spark
Source
  • Dask follows lazy computation mechanism where computations will only be performed when explicitly requested.

For example,

Suppose, to paint 4-walls of room, one painter completes in 4 days whereas 2 painters together at a time takes maximum of 2 days. This highlights the performance in speed and time which is the same adapted by Dask.

Why Dask a Showstopper?

  • Exploit parallelism in PyData Stack
    Libraries in PyData stack such as pandas and numpy are built for single CPU core which results into following problems:
    1. Reading python object (eg, dataframe) large than the size of RAM throws out of memory error and crashes the system.
    2. Computations may take a long time to run.
    Dask bypasses the Global Interpreter Lock (GIL) and utilizes multi-CPU cores by internally chunking python objects and processes them in parallel.
PyDatatack
  • Scalability and Distributed Computing
    DASK can handle large datasets on a single CPU exploiting its multiple cores on a single machine or cluster of machines, computing in parallel.
  • Memory constraint in Pandas when huge data to be read and handled.

Demoed in Optimized ways to Read Large CSVs in Python

To get more insights, glance over to this blog.

  • Dask allows real-time data streaming like Spark which is not possible in Pandas.
  • Easy debugging and diagnostics
    Beautiful dask dashboard to constantly keep an eye on the running progress and performance across machines.
  • Reading multiple CSVs at once
    Pandas use read_csv() to read single CSV into memory but reading multiple CSVs into one is not possible. But dask can do with the help of wildcard characters like in Spark.

Dask Workloads

What is Dask Overall?

Figure1

Dask is basically constructed with high and low-level APIs with the schedulers monitoring the process. This section will discuss more on schedulers and the next section talks about these APIs.

Workload Mechanism

  • Dask uses the concept of Lazy Evaluation which means to generate results only when requested when compute() is invoked. This execution is performed via following DAG coordinated by the Task Schedulers.

The Marvel: Task-Scheduler

  • Task Scheduler shown in Figure1 coordinates the computations of “divide and conquer”, exploiting parallelism across CPU cores.
  • As demonstrated in Figure2 , task graph is created in-memory having tasks in sequence to be executed on invoking compute().
  • It creates and manages a Directed Acyclic Graph(DAG) to model the logical representation of tasks, aka Task Graph. It manages by distributing tasks to node workers to compute in parallel generating a single result.
Figure2

Another Marvel: DAG

  • An acyclic directed graph with nodes and edges representing relationships.
  • Constructed by Task Scheduler representing steps to compute a result.
  • Directed as one-way movement upto its terminal node
  • visualize() is used to visualize it as shown below.
Source

Dask Collections

Figure3

The collections as named in Figure3 are distributed across multi-cores of single CPU then computed in parallel.

High-Level APIs

  1. Dask Arrays: (Parallel Numpy i.e. numpy + threading)
  • A single large dask array chunked into small numpy arrays, distributed across multiple CPU cores, running in parallel.
  • Loads even though they are larger than RAM
  • Any operation on dask array triggers the same on the small chunks, each utilizing a core. This speeds up the computation.
Source

2. Dask Dataframes: (Parallel Pandas i.e. pandas + threading)

  • Same as dask array, a large dataframe chunked into small pandas dataframes per indices as shown in Figure4, distributed across multiple CPU cores, running in parallel.
  • Loads dataframe even though it is larger than RAM. For hand-on, check Optimized ways to Read Large CSVs in Python.
  • Also speeds up the computation when performing on chunks at once.

3. Dask Bags: (Parallel List i.e. list+ threading)

  • Parallel collection of Python objects i.e. partitions of a larger collection.
  • Used when dealing with semi or unstructured data
    For eg, Preprocess JSON, log files, or text data. Also, a user-defined python object.
Source
  • Beneficiaries:
    In cases of reading nested JSON, bags are helpful which can be later converted into dask dataframe.
  • Limitations:
    1. Can’t change the element of the bag since it is immutable.
    2. groupby function is very slow.

4. Dask-ML: (Parallel Scikit-learn i.e. sklearn+ threading)

  • This Dask library is same as sklearn for building ML models.
  • Uses internal dask collection to parallelize ML algorithms
  • Dask uses existing sklearn estimators and algorithms having n_jobs argument for parallelization, via Joblib. It uses joblib backend to parallelize across the cluster where computations are heavy to run on a single machine.
  • Can also make use of external libraries such as xgboost or tf.

Personally, I would say Dask is still improving for ML. Many ongoing issues are still active while building even a simple linear model.

Low-Level APIs

  1. Dask delayed: (Lazy parallel objects)
  • Delayed dask objects are lazy in nature which means that only be computed when explicitly invoked compute() function.
  • These objects are equivalent to DAG nodes by wrapping delayed object around function.
  • The dependency of tasks onto each other creates a chain of steps to perform and construct DAG.
  • Invoking compute() on delayed object invokes the scheduler to handle parallel resources and compute DAG step by step, generating a result.

2. Dask futures: (Eager parallel objects)

  • Futures dask interface compute immediately unlikely delayed object.
  • Real-time execution is performed
  • Unlike dask array, dataframes etc, Dask client is needed to use future interface.

Find my hands-on in this github repository.

Dark Side of Dask

There are a few limitations to be considered when working with Dask.

  1. Pandas are good at optimizing complex SQL queries unlike Dask.
  2. Not good for those operations which are challenging for parallel computing such as sort and shuffle. Or setting up a new index in an unsorted dataframe.

Spark v/s Dask

Features of parallelism and scalability are sort of similar to Spark except a few differences.

©Shachi

References

  • Dask official latest documentation
  • Awesome book to lay hands-on

Feel free to follow this author if you liked the blog because this author assures to back again with more interesting ML/AI related stuff.
Thanks,
Happy Learning! 😄

Can get in touch via LinkedIn.

--

--

Shachi Kaul
Analytics Vidhya

Data Scientist by profession and a keen learner. Fascinates photography and scribbling other non-tech stuff too @shachi2flyyourthoughts.wordpress.com