A Big Data Processing Technique

Mehmet Vergili
Sep 5, 2018 · 6 min read

The ability to process large volumes of data in a short period of time is a big plus in today’s data. If we have to process our data in a short time and it is not possible to process this data in this time interval with a single processing, the data is a big data. This is my favorite big data description in many definitions.

Big data can be scaled vertically with a single machine that has high CPU memory, or it can be scaled horizontally with a system that has multiple machine power.

In this tutorial we will offer a framework to process relatively big volume of data in short time with a horizontally scaled hadoop cluster, hive and sqoop which are packaged in Cloudera CDH [1][2].

The data what we use for this example is 10M rows x 30 columns (~4.1 GB) delimited text data that we created with a simple python script which you can find at the reference [5] and the hadoop cluster has 2 nodes that each one has 4 CPU 32 GIB memory and SSD storage AWS instances.

Step 1

To start process first we need to move delimited text data under hadoop distributed file system (HDFS). To to that;

sudo su hdfs
hdfs dfs -mkdir /data
hdfs dfs -put mydata.csv /data/mydata.csv

The file that you copied to HDFS can be seen on http://your_hadoop_host:50070/explorer.html#/data

The next step will be moving the data into hive. A simple hive query will move the delimited text data from HDFS to HIVE table with the SERDE library. Hive will process data on Hadoop cluster via MapReduce Jobs. After hive table is created we will convert it to PARQUET format since processing parquet files is faster.

CREATE EXTERNAL TABLE mydata_tmp (
column1 STRING,
column2 STRING,
column3 STRING,
.
.
.
column30 STRING
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
STORED AS TEXTFILE
LOCATION 'hdfs_path'
TBLPROPERTIES('skip.header.line.count'='1')
CREATE TABLE mydata STORED AS PARQUET
AS
SELECT *
FROM mydata_tmp;

DROP TABLE mydata_tmp;

mydata_tmp temprorary hive table in text format. It will be dropped at the end of the query.
mydata is the actual table name in PARQUET format.
hdfs_path is your data location in HDFS which is “/data/mydata.csv” for this example

Step 2

Now mydata is exist on a hive table. We can use all advantage of hive to process this high volume of data.

Let say we want to apply a cleaning function which is written in python to the each row of the mydata and create new clean table. To do this we preferred to use hive UDF (User Defined Function). UDFs provide a way of extending the functionality of Hive with a function (written in python, java etc) that can be evaluated in Hive Query statements.

So below we call the UDF (my_udf_function.py) in the hive query to apply to the each row of the mydata table.

CREATE TABLE clean_mydata STORED AS PARQUET AS
SELECT TRANSFORM (
column1, column2, column3, ...... column30
)
USING 'python /your_udfs_path/my_udf_function.py'
AS (
column1, column2, column3, ........columnN
)
FROM mydata

clean_mydata is your new clean table
mydata is the source table which we created from the delimited text
my_udf_function.py is the UDF function

# my_udf_function.py

import sys
sys.path.append('/your_udfs_path')
# Read the hive columns
for line in sys.stdin:
line = line.split('\t')

column1 = line[0].strip()
column2 = line[1].strip()
column3 = line[3].strip()
.
.
.
column30 = line[29].strip()

# do cleaning, normalization validation etc. and create your output columns in a tab delimited print. Number of columns can be more or less

out_columns = [
new_column1,
new_column2,
new_column3,
.
.
.
new_columnN
]
print '\t'.join(str(v) for v in out_columns)

you can collect your exceptions/erros with sys.stderr at any point of python script.

sys.stderr.write("something happen: {}\n".format(error))

To see output of stderr we need to track map-reduce jobs on hadoop cluster. The jobs history can be seen on: http://your_hadoop_host:19888. On the other hand when you run the hive query hadoop will create a temporary url on your screen to track the running map-reduce tasks.

In this tutorial we used python to write UDF functions. You can use java or C to write your UDFs. Using different languages is one of the advantage of UDFs. Language selection can be important for the performance if the function has complex calculation etc.

Step 3

Now we have cleaned data to move a relational db such as mysql, mssql etc. To do this we can write a smart function to read clean_mydata table from hive and distribute it on the hadoop cluster and write to the destination db. Or we can use apache sqoop to make these things for us.

Apache Sqoop(TM) is a tool designed for efficiently transferring bulk data between Apache Hadoop and structured datastores such as relational databases.

Sqoop need hive data in text format. Since we used PARQUET format for all our hive tables we first need to convert our data into text format. To do that;

CREATE TABLE {tmp_table}
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES(
"separatorChar" = {delimiter},
"quoteChar" = {quote_char},
"escapeChar" = {escape_char}
) STORED AS TEXTFILE
AS SELECT {column_list}
FROM {clean_mydata};

delimiter, quote_char and escape_char are the standard CSV library requirements to separate fields.

We are ready to move data to destination db. Here we used MSSQL but you can use any relational db.

/usr/bin/sqoop  export --connect conn_str
--username sql_conn_login
--password sql_conn_password
--table mssql_table
--export-dir hdfs_export_dir
-m number_of_mappers
--input-fields-terminated-by delimiter
--input-enclosed-by quote_char
--input-escaped-by escape_char
--input-null-string '\\\N'

conn_str can be found below, mssql_table is your destination table, hdfs_export_dir is your table location on HDFS, number_of_mappers is your number of parallel tasks, delimiter, quote_char and escape_char are the CSV requirements which we defined above.

conn_str = jdbc:sqlserver: // {host};databaseName = {database}

Performance

Complete process took about 10 minutes for 10M rows and 30 columns of data with the 2 AWS nodes defined above.

Performance can be much better with more nodes and mappers for mapreduce tasks on HDFS. To make better decision please read more about Hadoop Mapreduce and HDFS.

Hadoop Cluster

For this tutorial we used cloudera CDH tools which are including hadoop, hive sqoop etc. https://www.cloudera.com/downloads/cdh/5-13-0.html.

Cloudera impala is also very useful tool to improve the performance but we didn’t use in this tutorial.

Data Flow Controling

To control our data flow we use python airflow. Airflow is a platform to programmatically author, schedule and monitor workflow.

We do not have to use airflow to run steps. All steps can be run manually too. But using airflow makes staff easy for developers. We use airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Rich command line utilities make performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.

Using airflow is not part of this article. More information will be provided in another article. See Building Data Pipeline with Airflow on medium. We will create a data pipeline step by step with airflow for this tutorial.

Airflow DAG.
Airflow Monitoring Tools

References:

[1] Cloudera CDH https://www.cloudera.com/products/open-source/apache-hadoop/key-cdh-components.html
[2] CDH Version 5–13–0
https://www.cloudera.com/downloads/cdh/5-13-0.html.
[3] Airflow
http://pythonhosted.org/airflow/ .
[4]
Create Random Data

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade