How to Run Spark on Ubuntu Machine in Google Cloud (PySpark: Basic) 🚀

Kang Zhi Yong
6 min readFeb 22, 2024

--

Notes on Data Engineering Zoomcamp Module 5: Batch Processing. Check it out if you’re interested.

What is batch processing?

  • Processing of huge chunk data according to schedule, from hourly, daily, weekly, 3 times per hour or every 5 minutes depend on the need of organization.

What technologies are commonly used?

  • Python Scripts ( Can be run anywhere, Kurbunetes/AWS Batch)
  • SQL (dbt)
  • Spark
  • Flink

What is the pros and cons of batch processing?

  • It is easy to manage and retry if the job failed
  • Ease of scaling. Change to larger machine is computation power hits the bottleneck.
  • Delay in update as main disadvantages.

What is Apache Spark?

A data processing engine in cluster which pull data from data lake, perform data transformation and load into data warehouse for analytical purpose. It support multiple language, such as JAVA, Scala, Python and R. In most of the company, pyspark is much popular. Can be used for batch jobs and streaming.

When to use Spark?

If you able to use SQL to transform data directly from the data lake like S3 and Google Cloud Storage, you can use software or service like AWS Athena, HIVE or PRESTO to transform data and load it into another dedicated data lake. If it’s not able to do so in SQL, it is where Spark come in to solve the problem, for example machine learning problem which Spark has dedicated library supporting the ML training, as shown in the figure below:

Installing spark in linux, taking Virtual Machine in Google Cloud as example

  • Assuming that you have already had anaconda installed in the virtual machine, you can first create a directory called spark , then download java which is the pre-requisite to run Spark from this url using wget. Remember that only choose between OpenJDK 11 or Oracle JDK 11 as spark requires version 8 or 11 specifically.
  • After unpacking the file using command tar xzfv , we should define JAVA home and add it to PATH by running this two commands,
export JAVA_HOME="${HOME}/spark/jdk-11.0.2"
export PATH="${JAVA_HOME}/bin:${PATH}"
  • You might need to define it every time login to the virtual machine if the environment variable get clear out.
  • Check the installation via java --version
  • Download spark from https://spark.apache.org/downloads.html and unzip it through tar xzfv
  • Add it to PATH as well like what we did for JAVA
  • Remember to check the compatability issue of the lastest Apache Spark with the python module. For example, avoiding the use of spark-3.3.2 with pandas 2.1.4 version.
export SPARK_HOME="${HOME}/spark/spark-3.4.2-bin-hadoop3"
export PATH="${SPARK_HOME}/bin:${PATH}"
  • To avoid keying the PATH everytime log in, we can add the environment variable into the .bashrc file. So, it get run everytime when we log in to the virtual machine.
  • Then, we have to add in PYTHONPATH. Remember to check the py4j-0.10.9-src.zip version
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH"

Creating a SparkSession

To start with pyspark, first import the SparkSession from pyspark.sql

from pyspark.sql import SparkSession

spark = SparkSession.builder \
.master("local[*]")\
.appName('test')\
.getOrCreate()

By specifying the .master("local[*]") , we can define the number of cpu used for the spark job. In this case, we use all the available resources to tun the jobs.

To test out the pyspark module, we load the data from a .csv file and write into .parquet file as follow. The file can be downloaded using this url.

ds = spark.read\
.option("header","true")\
.csv("taxi+_zone_lookup.csv")

ds.write.parquet('zones')

By forwarding the port to 4040 in the vs-studio, we can access all the command running in spark as UI as below:

First look on Pyspark

After reading the data directly using the spark session, we can check the schema of the data via ds.schema . In response, it return the datatype used as follow:

StructType([
StructField('hvfhs_license_num', StringType(), True),
StructField('dispatching_base_num', StringType(), True),
StructField('pickup_datetime', StringType(), True),
StructField('dropoff_datetime', StringType(), True),
StructField('PULocationID', StringType(), True),
StructField('DOLocationID', StringType(), True),
StructField('SR_Flag', StringType(), True)
])

You may notice that all the datatype are in string which is not good to use for further transformation. There is a way in nferring the schema directlt, by explicitly declare inferschema = True . Taking a small part of the data as example via the code below:

# take 1000 example of the data 
! head -n 10001 fhvhv_tripdata_2021-01.csv > head.csv

# we can read it directly from spark using inferScehema
spark_df = spark.read.csv("head.csv", header=True, inferSchema=True)

spark_df.schema

The schema returned will be:

StructType([
StructField('hvfhs_license_num', StringType(), True),
StructField('dispatching_base_num', StringType(), True),
StructField('pickup_datetime', TimestampType(), True),
StructField('dropoff_datetime', TimestampType(), True),
StructField('PULocationID', IntegerType(), True),
StructField('DOLocationID', IntegerType(), True),
StructField('SR_Flag', StringType(), True)
])

It returned the data with correct datatype. Aside from that, we can also use pandas.DataFrame to create spark.DataFrame as follow:

# read in pandas dataframe
pandas_df = pd.read_csv('head.csv')

# create spark dataframe from pandas dataframe
spark_df = spark.createDataFrame(pandas_df)

# print the schema
spark_df.schema

The schema returned will be

StructType([
StructField('hvfhs_license_num', StringType(), True),
StructField('dispatching_base_num', StringType(), True),
StructField('pickup_datetime', StringType(), True),
StructField('dropoff_datetime', StringType(), True),
StructField('PULocationID', LongType(), True),
StructField('DOLocationID', LongType(), True),
StructField('SR_Flag', DoubleType(), True)
])

From the inferred schema shown here, LongType() might not the good datatype compared to IntegerType() and the previos took up 8 bytes while latter took up 4 bytes. We might need to enforce a custom schema.

How to infer custom schema in spark dataframe?

To provide the custom schema, we can explicity define the datatype used as follow:

from pyspark.sql import types

# define schema
schema = types.StructType([
types.StructField('hvfhs_license_num', types.StringType(), True),
types.StructField('dispatching_base_num', types.StringType(), True),
types.StructField('pickup_datetime', types.TimestampType(), True),
types.StructField('dropoff_datetime', types.TimestampType(), True),
types.StructField('PULocationID', types.IntegerType(), True),
types.StructField('DOLocationID', types.IntegerType(), True),
types.StructField('SR_Flag', types.StringType(), True)
])

ds = spark.read\
.option("header","true")\
.schema(schema)\
.csv("fhvhv_tripdata_2021-01.csv")

ds.head(5)

It return data with correct schema as shown which dropoff_datetime now is in TIMESTAMP :

Save the large file into parquet with multiple partition

Partitioning the large file into several partitions instead of one is extremely useful, as the worker in the spark cluster can work together, saving file into different partitions that they pointed to. No workers node will stay idle and wasted the processing power. Imagine you have only one file to be pointed to by multiple workers, only one worker is able to write to the location while others stay idle to wait the worker to get the job done.

Repartition of the large dataset can be achieved via ds.repartition(24) which will divide the dataset into 24 parts to be processed and write to the system. The code stay lazy until it is executed (save).

While writing to the .parquet file, the 4 core machine will get 6 tasks as we have 24 partitions

Partitioning is an expensive operation, which expected to take up several time while running the commands. Total of 24 tasks were run to partitioned the data into 24 files.

Files in the directory.

End

Here, i stopped my notes on the first parts of batch processing in Data Engineering Zoomcamp 2024, that given by Alexey Grigorev. The figure explaining Spark is also by Alexey Grigorev. If you’re interested, check out my Github repo as well🚀.

--

--