Working with HDFS, Parquet and Dask

Giacomo Veneri
digitalindustry
Published in
3 min readApr 23, 2019

Based on http://jugsi.blogspot.com/2019/04/working-with-hdfs-parquet-and-dask.html

In this simple exercise we will use Dask to connect a simple 3 data-nodes Hadoop File system. Dask is a very popular framework for parallel computing, Dask provides advanced parallelism for analytics.

Full code is available at https://github.com/venergiac/dask-hadoop

architecture

To avoid complex configuration we use Docker and docker compose to start a 7 nodes cluster:

  1. 1 name node
  2. 3 data nodes
  3. 1 resource manager
  4. 1 data node manager
  5. 1 history manager
  6. 1 jupyter notebook with Dask, XGBoost, fastpaqruet and HDFS driver installed

St arting the cluster.

First of all clone the repository:

git clone https://github.com/venergiac/dask-hadoop

then, with docker compose, start the cluster

docker-compose up

please remember to stop the cluster using

docker-compose down

if all is good you should run

docker ps

and see something similar

expected output of docker ps

Wo rking with jupyter

Now you can connect the browser to

http://localhost:8888/notebooks/notebook1.ipynb#

Jupyter Notebook

Ge nerating a random dataset.

The first step is to generate a random data set of 3 columns * 10.000 rows:

hdfs_path='hdfs://namenode:8020/'import pandas as pd
import numpy as np
import datetime
df_src = pd.DataFrame(np.random.randint(0, 10**6, (10**4, 3)), columns=list('abc'))
df_src.head()

Wr iting into HDFS using CSV

Using Dask data-frame we can save our dataset into HDFS using 10 partitions.

The following code performs (also) a simple operation to add 10 to the first columns.

import dask
dask.config.set({"hdfs_driver": "hdfs3"})
# pyarrow wants hadoop client locally installed
#dask.config.set({"hdfs_driver": "pyarrow"})
import dask.dataframe as dd
df_dask = dd.from_pandas(df_src ,npartitions=10)def sum_10(df):
df['new'] = df['a'] + 10
return df[['new']]
now = datetime.datetime.now()
df_dask.pipe(sum_10).to_csv(hdfs_path + 'large_ds1_*.csv')
print("sum in 10 in:", datetime.datetime.now() - now)

now connect to local Namenode to se the exported csv files.

http://localhost:9870

Re ading back

To read the dataset (back) we can use a simple line of code.

df_dask2 = dd.read_csv(hdfs_path + 'large_ds1_*.csv')

Take in consideration that Dask uses lazy loading, so that the dataset has not been really loaded but only “connected”. Only after the first operation the dataset will be really imported.

Us ing Apache Parquet

Our last exercise is to work with Apache Parquet. Apache Parquet is a popular columnar data storage.

# save using parquet
df_dask.pipe(sum_10).to_parquet(hdfs_path + 'large_ds_parquet')
# read it again
df_parquet = dd.read_parquet(hdfs_path + 'large_ds_parquet')

This simple exercise doesn’t pretend to educate about Parquet/Dask or Hadoop, but it is a simple starting point to perform exercises about Architecture test.

Notes
The architecture is based on the last Hadoop 3.1.1.

--

--