Apache Spark, Hadoop & Apache Spark and Parquet & Orc Format

M. Cagri AKTAS
10 min readOct 6, 2023

--

  1. Before Apache Spark
  2. What’s the Apache Spark
  3. What’s the diffirent of Apache Spark and Hadoop
  4. Apache Spark Introduction with Codes
  5. Apache Spark Data Cleaning with Codes
  6. Parquet and Orc format.

PART — I

Hello everyone! I’m here this time to explain Apache Spark. And I’ll make this article with two part. In the first part, I will cover the Apache Spark ecosystem, and in the second part, we will discuss data manipulation with Apache Spark, with a focus on SparkSQL. But before delving into Apache Spark, let’s first understand why people need it. Let’s take a closer look at that:

Made by myself in Microsft Visio

Before Apache Spark

Before Apache Spark, it’s important to understand the context of distributed data processing. Distributed data processing refers to the practice of processing and analyzing large volumes of data by distributing the work across multiple computers or nodes within a network or cluster. This approach is commonly used for handling big data, which is data that is too large or complex to be processed on a single machine within a reasonable amount of time.

For that why many technologies have been developed for big data processing. Hadoop MapReduce was born for this purpose in 1998–2004.

MapReduce: Before Apache Spark, the most popular big data processing framework was Hadoop MapReduce. MapReduce is a batch processing framework that is well-suited for large-scale data processing, but it is not as efficient for iterative processing or interactive data analysis.

MapReduce is a programming model and processing framework designed for distributed and parallel processing of large datasets. It was introduced by Google in a research paper titled “MapReduce: Simplified Data Processing on Large Clusters” published by Jeffrey Dean and Sanjay Ghemawat in 2004. MapReduce has been influential and served as the foundation for many distributed data processing systems, including Hadoop MapReduce.

Please check my other article on Hadoop, which discusses the MacReduce ecosystem and distributed data processing.

APACHE SPARK

Apache Spark is an open-source, distributed computing framework designed for fast and efficient big data processing. It was initially developed at the University of California, Berkeley’s AMPLab, and later donated to the Apache Software Foundation. Spark has gained significant popularity in the world of big data analytics due to its speed, scalability, ease of use and versatility.

  1. Speed: Spark is one of the fastest big data processing engines available. It can process large amounts of data very quickly, even when the data is distributed across multiple machines.
  2. Scalability: Spark can scale to handle very large datasets. It can be used to process datasets that are too large for traditional databases.
  3. Ease of use: Spark is relatively easy to use, even for users who are not familiar with big data processing. It provides a number of high-level APIs that make it easy to develop data processing applications.
  4. Versatility: Spark can be used for a wide range of data processing tasks, including batch processing, real-time stream processing, machine learning, and graph processing.
Made by myself in Microsft Visio

Data Loading and Preparation:

  • The first step is to load and prepare the data for processing. Spark can fetch data from various data sources (HDFS, Apache HBase, Cassandra, file systems, etc.) and represent this data as RDDs (Resilient Distributed Datasets).
  • Data cleaning, transformation, and filtering operations are performed in this stage as needed.

RDD Creation and Transformations:

  • Data processing operations are performed using RDDs. RDD is a distributed data structure designed for parallel processing.
  • RDDs are derived from initial data and processed through transformations. Transformations can include operations such as Map, Filter, Reduce, Join, and more.
  • These transformations work in parallel to achieve the desired results.

Cache Usage:

  • Spark provides fast access to data during processing thanks to its ability to store data in memory (in-memory caching). This is particularly beneficial in scenarios involving repetitive operations and iterative algorithms, leading to improved processing speed.
  • Cache settings can be adjusted by users to enhance performance further, allowing for customization based on specific processing requirements.

Actions:

  • Transformations can be done on RDDs, but you need to make a call to an action to get the results.
  • Actions trigger the execution of Spark and return results to the user. Examples of actions include operations like count, collect, and saveAsTextFile.

Parallel Processing and Distributed Execution:

  • Spark distributes tasks across multiple executor nodes in parallel. Each executor node can run many tasks simultaneously.
  • Tasks are executed by the Spark core to process data, and the results are collected.

Error Handling and Data Recovery:

  • Spark has internal error handling mechanisms to address potential errors and failures during processing.
  • Caching RDDs and having idempotent operations allow for data recovery in error scenarios.

Collecting and Processing Results:

  • When actions collect their results or store data, users can retrieve these results and process them as they see fit. For example, results can be used for reporting or further processing as needed.

Why Apache Spark is so populer then Hadoop MapReduce

Made by myself in Microsft Visio

You can see why the Apache Spark is x100 faster then Hadoop MapReduce, because Apache Spark is working on ram but Hadoop Mapreduce is write and read on disk.

Hadoop MapReduce, data processing using a two-step process called “map” and “reduce.” In the “map” phase, data is read from disk and prepared for processing. In the “reduce” phase, the processed data is aggregated, and the final result is computed. (Check on my other article about map and reduce)

Spark, storing data in RAM allows it to spend less time reading data from disk during the “map” phase and perform in-memory merging during the “reduce” phase instead of writing data to disk. This contributes to Spark’s faster performance compared to Hadoop MapReduce.

If we need to categorize and explain these points, we can express them as follows:

Storage: Apache Spark doesn’t belong to just one storage technology, but Hadoop runs exclusively on Hadoop Distributed File System (HDFS). Therefore, Apache Spark generally outperforms Hadoop in terms of storage flexibility.

Speed: Apache Spark shared statistics comparing the speed performance of Hadoop and Spark. According to these statistics, Apache Spark is 100 times faster than Hadoop.

Difficulty: Apache Spark offers RDDs and SQL, making it easier to use compared to Hadoop.

Management: Spark manages itself, while Hadoop requires YARN for management.

Real-time streaming analysis: Apache Spark can process 100 million data records per second, which Hadoop cannot achieve for real-time streaming analysis.

Community: Apache Spark has a vast library, including machine learning algorithms like Logistic Regression, K Means, K NN, and Naive Bayes. In contrast, Hadoop is considered an older technology and may not be as forward-look.

PART — II

Data Cleaning with Apache Spark

When starting Apache Spark, we need to initiate it with the following codes:

#Apache Spark's modul.
from pyspark.sql import SparkSession

#Apache Spark Session starter codes.
spark = SparkSession.builder \
.master("local[2]") # your computer core.
.appName("medium") \
.getOrCreate()

#Dataset's reading.
df = spark.read \
.option("header", True) \ # column name.
.option("inferSchema", True) \ #automatic set datatype.
.csv("dirty_store.csv")

When I said that, we can use SQL in our session, it makes it very easy to use Apache Spark. Let’s check out <pyspark.sql> But before doing that, I strongly suggest looking at the official Apache Spark documentation for some code examples. You'll find the most helpful functions on the left side of the page.

Then, we should check and locate the clean values. If you’re familiar with Python Pandas, cleaning data in PySpark is quite similar to Pandas, but let’s explain this further:

Pandas is designed for single computers (local machines) and is suitable for handling simple and small datasets. I provided a link in the previous lines so you can explore the big data ecosystem. If you attempt to process a 10 GB dataset in Pandas, your computer will likely struggle and may even crash. This is where Apache Spark comes in handy, as it is specifically designed for big data processing.

#We can use Pandas-style output in Jupyter Lab to view data in a more visually appealing format.
df.limit(5).toPandas()
df.printSchema()
root
|-- STORE_ID: string (nullable = true)
|-- STORE_LOCATION: string (nullable = true)
|-- PRODUCT_CATEGORY: string (nullable = true)
|-- PRODUCT_ID: string (nullable = true)
|-- MRP: string (nullable = true)
|-- CP: string (nullable = true)
|-- DISCOUNT: string (nullable = true)
|-- SP: string (nullable = true)
|-- Date: date (nullable = true)

As you can see, the datasets are in the classic .csv file format. When we work with these datasets using PySpark, we can also use SQL. If you’re interested in performing operations with RDDs, simply visit the official Apache Spark website for more information.

Now, since we’ve decided to clean with SparkSQL, first, we need to create a table from the variable where we’ve defined the dataset.

df.createOrReplaceTempView("table")

With this created table, we can now execute SQL queries. We will perform the cleaning operations for each column in a single line.

sqldf = spark.sql("""
SELECT
TRIM(STORE_ID) AS STORE_ID,
TRIM(REGEXP_REPLACE(STORE_LOCATION, '[^a-zA-Z0-9 ]', '')) AS STORE_LOCATION,
TRIM(PRODUCT_CATEGORY) AS PRODUCT_CATEGORY,
TRIM(PRODUCT_ID) AS PRODUCT_ID,
CAST(TRIM(SUBSTRING(MRP, 2, 6)) AS INTEGER) AS MRP,
ROUND(CAST(TRIM(SUBSTRING(CP, 2, 6)) AS FLOAT), 2) AS CP,
CAST(TRIM(SUBSTRING(DISCOUNT, 2, 6)) AS FLOAT) AS DISCOUNT,
ROUND(CAST(TRIM(SUBSTRING(SP, 2, 6)) AS FLOAT), 2) AS SP,
CAST(TRIM(DATE) AS DATE) AS DATE
FROM table
""")

As you can see, we can simply use functions like TRIM, REGEXP_REPLACE, CAST, ROUND, SUBSTRING to obtain clean data

sqldf.show(5)

You can use the <show()> method to display data, but it’s not ideal for long column datasets. If you have trouble viewing columns when reading the dataset, consider using the <toPandas()> method. However, please keep in mind that this method can consume all your resources. To mitigate this, you can add a limit as follows: <df.limit(5).toPandas()> to display a limited number of rows.

After cleaning the data, we had assigned it to the <sqldf> variable. Now, let's read that variable.

root
|-- STORE_ID: string (nullable = true)
|-- STORE_LOCATION: string (nullable = true)
|-- PRODUCT_CATEGORY: string (nullable = true)
|-- PRODUCT_ID: string (nullable = true)
|-- MRP: integer (nullable = true)
|-- CP: float (nullable = true)
|-- DISCOUNT: float (nullable = true)
|-- SP: float (nullable = true)
|-- DATE: date (nullable = true)

Below, I’m providing a few more code snippets for your review.

sqldf.createOrReplaceTempView("clean_table")

# Round method is not working on toPandas(), if you want to fix that, try to use show()
# Ps: I did not use " REGEXP_REPLACE(PRODUCT_ID, '[^0-9]', '') AS PRODUCT_LOCATION " in pyspark.sql
# because when you want to change a column in pyspark.sql, you must add every columns' name in SELECT statement!

print("Rows:", sqldf.count(), "Columns:", len(sqldf.columns))

sqldf_clean = spark.sql("""
SELECT
*
FROM clean_table
WHERE
(MRP > 0) OR (CP > 0) OR (DISCOUNT > 0) OR (SP > 0)
""")

sqldf_clean = sqldf.withColumn("PRODUCT_ID", regexp_replace(col("PRODUCT_ID"), "[^0-9]", "").cast(IntegerType()))

print("Rows:", sqldf_clean.count(), "Columns:", len(sqldf_clean.columns))

sqldf_clean.createOrReplaceTempView("clean_table_2")

spark.sql("""
SELECT * FROM clean_table_2
""").show(5)

sqldf_clean.printSchema()

Data cleaning with PySpark DataFrame is the same as the steps we followed with PySparkSQL above. :)

from pyspark.sql.functions import *
from pyspark.sql.types import *

#str abject has no attribute 'alias'
cleaned_df = df.select(
trim(col("STORE_ID")).alias("STORE_ID"),
trim(regexp_replace(col("STORE_LOCATION"), "[^a-zA-Z0-9 ]", "")).alias("STORE_LOCATION"),
trim(col("PRODUCT_CATEGORY")).alias("PRODUCT_CATEGORY"),
trim(regexp_replace(col("PRODUCT_ID"), "[^a-zA-Z0-9 ]", "")).alias("PRODUCT_ID"),
trim(col("MRP")).substr(2, 6).cast(IntegerType()).alias("MRP"),
round(substring(trim(col("CP")), 2, 6).cast(FloatType()), 2).alias("CP"),
round(substring(trim(col("DISCOUNT")), 2, 6).cast(FloatType()), 2).alias("DISCOUNT"),
round(substring(trim(col("SP")), 2, 6).cast(FloatType()), 2).alias("SP"),
col("Date").cast(DateType()).alias("DATE")
)

cleaned_df.show(5)

# PS: The "withColumn" method is used to create a new DataFrame by adding a new column or replacing an existing
# column with a modified version.
# The "Select" method returns a new DataFrame that includes only the specified columns.
# Therefor we used "withColumn" for modification!

print("Rows:", cleaned_df.count(), "Columns:", len(cleaned_df.columns))

conditions = (
(col("MRP") > 0) &
(col("CP") > 0) &
(col("DISCOUNT") > 0) &
(col("SP") > 0))

cleaned_df_2 = cleaned_df.filter(conditions)
cleaned_df_2 = cleaned_df.withColumn("PRODUCT_ID", regexp_replace(col("PRODUCT_ID"), "[^0-9]", "").cast(FloatType()))

print("Rows:", cleaned_df_2.count(), "Columns:", len(cleaned_df_2.columns))

cleaned_df.show(5)

cleaned_df_2.printSchema()

I didn’t explain this code in detail because if you check the documentation, you can easily understand the logic. Everything is quite similar to Pandas, and so on.

Parquet and Orc Format

If you want to write this cleaned data in Parquet or ORC format, it’s also straightforward to do. Just be cautious when using “OVERWRITE” because if you set your desktop as the output location, you’ll lose everything in that folder. So, make sure to set an empty folder as the destination to avoid data loss.

sqldf_clean.write.mode("overwrite").orc("spark_output_format/orc")
sqldf_clean.write.mode("overwrite").parquet("spark_output_format/parquet")

Lastly, we can explore Parquet and ORC formats. :)

Parquet: Parquet is a open-soruce format and columnar storage file format commonly used in the big data ecosystem, including tools like Apache Spark, Hive, Impala. It is designed to optimize data storage and query performance. It helps organizations save storage space, reduce query times, and work with evolving data schemas.

Orc (Optimized Row Columnar): is another columnar storage file format used primarily in the big data ecosystem, similar to Parquet. It’s designed to provide high compression and fast query performance for big data workloads.

Overall, ORC and Parquet are similar in many respects, and the choice between them often depends on the specific tools and platforms you’re using and your organization’s preferences. Both formats excel at optimizing storage and query performance for large datasets, making them valuable options for big data processing.

https://community.cloudera.com/t5/Support-Questions/ORC-vs-Parquet-When-to-use-one-over-the-other/td-p/95942

Cheers in peace everyone :) I hope the article could help you…

--

--