Apache Spark

Technocrat
11 min readSep 19, 2023

--

Photo by Warren on Unsplash

Apache Spark is a unified analytics engine for large-scale data processing. It has a thriving open-source community and is the most active Apache project in Big Data. Spark provides an interface for programming entire clusters with implicit data parallelism and fault-tolerance.

Spark is a fast and general engine for large-scale data processing. Some of the main features of Apache Spark are:

Speed: Spark is up to 100x faster than Hadoop MapReduce for large scale data processing. It utilizes in-memory caching and optimized execution for fast performance.

Ease of Use: You can quickly write apps in Java, Scala, Python, R. Scala is the default language, but you can also use interactive shells in Scala, Python, R.

Generality: Spark has libraries for SQL (Spark SQL), streaming (Spark Streaming), machine learning (MLlib), and graph processing (GraphX) — all using the same core engine with the same APIs which makes it easy to combine these tools.

Run Anywhere: You can run Spark using its standalone cluster mode, on Hadoop YARN, Mesos, Kubernetes, or in the cloud on EC2.

Spark consists of several libraries that can be used together in the same application:

  • Spark Core: The basic functionality of Apache Spark, including task scheduling, memory management, and interaction with storage systems. Spark Core contains the APIs that are used by the higher-level libraries.
  • Spark SQL: Used to run SQL queries on your data, both static data and streaming data. It supports multiple data sources(csv, json, parquet) and data types( DataFrames, Datasets).
  • Spark Streaming: Used to stream live data into analytics applications. It lets you build scalable fault-tolerant streaming apps.
  • MLlib: Spark’s machine learning library with algorithms and utilities to do machine learning in a distributed fashion.
  • GraphX: Graph computation framework which includes APIs for creating and manipulating graphs.
  • SparkR: It provides integration with the R programming language. You can analyze your data using R and scale it with Spark.

In the next sections, we will explore each of these libraries in more detail with examples and use cases.

Key Features of Apache Spark

Apache Spark is a fast and general engine for large-scale data processing. Some of the key features of Apache Spark are:

Speed

Spark is up to 100x faster than Hadoop MapReduce for large-scale data processing. It achieves this speed through in-memory caching and optimized execution. The core Spark API is centered around the notion of distributed collections which can be cached in memory and persisted on disk.

Ease of Use

Spark provides easy to use APIs in Python, Java, Scala, and R. You can quickly start programming Spark using any of these languages. Here’s a simple example of a word count program in Python:

from pyspark import SparkContext

sc = SparkContext.getOrCreate()

input = sc.textFile("file:///path/to/input")
words = input.flatMap(lambda line: line.split(" "))
wordCounts = words.countByValue()

for word, count in wordCounts.items():
print("%s: %i" % (word, count))

Generality

Spark runs on Hadoop, Mesos, standalone, or in the cloud. It can access diverse data sources. You can use it for:

  • Batch processing
  • Interactive queries
  • Streaming
  • Machine learning

All using a single framework. This means you can combine multiple workloads in a single application.

Run Anywhere

Spark runs on Hadoop, Mesos, Kubernetes, standalone, or in the cloud. It can access diverse data sources including HDFS, HBase, Cassandra, S3, and Kafka. This makes it easy to run Spark on existing infrastructure or scale it up on cloud resources.

III. Spark Core

Spark Core is the foundation of the Spark stack. It provides distributed task dispatching, scheduling, and basic I/O functionalities.

RDDs (Resilient Distributed Datasets)

An RDD is the fundamental data structure in Spark. It is an immutable distributed collection of objects partitioned across nodes in a cluster. RDDs can be created from source data in storage systems (HDFS, HBase, S3) or by transforming existing RDDs.

Some examples of creating RDDs:

# From a list
sc.parallelize([1, 2, 3])

# From a file
sc.textFile("data.txt")

# By transforming an existing RDD
rdd.map(lambda x: x * 2)

RDDs support two types of operations:

  • Transformations: Return a new RDD (e.g. map(), filter(), flatMap(), union(), intersection())
  • Actions: Return a result to the driver program (e.g. reduce(), count(), first(), saveAsTextFile())

Transformations are lazily evaluated, meaning they are not computed until an action occurs. This allows Spark to optimize the DAG of transformations.

RDDs can be persisted in memory to speed up future actions. There are multiple storage levels:

  • MEMORY_ONLY
  • MEMORY_AND_DISK
  • MEMORY_ONLY_SER
  • MEMORY_AND_DISK_SER

We can unpersist an RDD to free up space.

Partitions

RDDs are split into partitions, which are blocks of data that can be computed on different nodes of the cluster. The partitioner of an RDD determines how its data will be split across partitions.

Some examples using the repartition() and coalesce() transformations to repartition an RDD:

# Decrease number of partitions 
rdd = rdd.coalesce(2)

# Increase number of partitions
rdd = rdd.repartition(10)

Coalesce attempts to shrink the number of partitions in a shuffle-free manner, while repartition can result in a shuffle.

This section gives an overview of the core Spark APIs for working with RDDs, partitions and persistence. The next sections will cover Spark’s higher-level components like Spark SQL, MLlib, and GraphX which build on Spark Core.

Spark SQL

Spark SQL is a Spark module for working with structured data using SQL or a DataFrame API. It allows you to query structured data inside Spark programs, using SQL or a DataFrame API.

DataFrames

A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python. DataFrames can be constructed from a wide array of sources such as:

  • Structured data files (JSON, CSV, XML, etc.)
  • Hive tables
  • External databases
  • Existing RDDs

For example, here’s how you can create a DataFrame from a CSV file:

df = spark.read.csv("data.csv")

This will give you a DataFrame with the schema inferred from the CSV data.

You can view the schema of the DataFrame using the .printSchema() method:

df.printSchema()
# root
# |-- name: string (nullable = true)
# |-- age: integer (nullable = true)

SQL Queries on DataFrames

You can run SQL queries on DataFrames using the .sql() method. For example:

df.createOrReplaceTempView("table1")

query = spark.sql("SELECT * FROM table1 WHERE age > 20")

This will run the SQL query and return the result as a DataFrame.

Spark SQL supports almost all of the major SQL aggregate functions like COUNT(), SUM(), AVG(), etc. So you can easily run complex aggregations and analytics on your data using only SQL.

Writing DataFrames

You can write DataFrames to many storage systems to serve as more permanent data tables using the DataFrameWriter. For example:

df.write.mode("overwrite").parquet("data.parquet")

This will write the DataFrame df to a Parquet file called data.parquet. The available formats are:

  • JSON
  • CSV
  • Parquet
  • ORC
  • JDBC
  • Hive, etc.

So Spark SQL gives you a unified way to connect to many data sources, query and aggregate the data, and also write the resulting data out to various storage systems. This allows you to build powerful data processing applications and pipelines.

Spark Streaming

Spark Streaming is a lightweight framework built on top of Spark Core that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, Twitter, etc.

# Initialize StreamingContext 
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

The main abstraction provided by Spark Streaming is a Discretized Stream or DStream, which represents a continuous stream of data. DStreams can be created from various input sources like:

  • File based sources: Files in a directory accessed via any filesystem compatible with Hadoop.
  • Socket based sources: Raw TCP sockets.
  • Akka actor based sources: Akka actors.
  • Kafka, Flume and Twitter sources.

DStreams support two types of operations:

  • Transformations: Operations on DStreams to get new DStreams. Some examples are map, filter, reduceByKey, join, etc. These are lazily evaluated.
  • Output Operations: Operations like saveAsTextFiles, saveAsHadoopFiles, print, etc. These actually trigger the execution of all the transformations on the DStreams.

Here’s a simple example of word count in Spark Streaming:

# Create a DStream that will connect to localhost:9999 
lines = ssc.socketTextStream("localhost", 9999)

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint(10)

# Start the computation
ssc.start()

# Wait for the computation to terminate
ssc.awaitTermination()

This program connects to a socket stream of data on localhost:9999, computes the word count on each batch of data and prints the top 10 words from each batch.

To summarize, Spark Streaming allows scaling real-time data processing by leveraging the scalable features of Spark Core and extension libraries. I hope this helps provide an overview of the Spark Streaming API and its capabilities! Please let me know if you have any other questions.

MLlib — Spark’s Machine Learning Library

MLlib is Spark’s machine learning library. MLlib provides many machine learning algorithms and utilities on top of Spark. Some of the algorithms MLlib supports are:

  • Regression — Linear regression, logistic regression, decision trees, random forests, gradient-boosted trees, etc.
  • Classification — Logistic regression, naive Bayes, decision trees, random forests, SVM, gradient-boosted trees, etc.
  • Clustering — K-means clustering, latent Dirichlet allocation (LDA), Gaussian mixture models, etc.
  • Collaborative filtering — Alternating least squares (ALS) matrix factorization
  • Dimensionality reduction — Singular value decomposition (SVD), principal component analysis (PCA), etc.

MLlib allows you to construct pipelines to chain multiple algorithms and transformers together. MLlib also supports model persistence using the DataFrame-based API.

Here’s a simple example of linear regression in MLlib:

from pyspark.ml.regression import LinearRegression

# Load training data
training = spark.read.format("libsvm").load("data/mllib/sample_linear_regression_data.txt")

# Fit the model
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
model = lr.fit(training)

# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(model.coefficients))
print("Intercept: %s" % str(model.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = model.summary
print("numIterations: %i" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

This article covers the basics of MLlib and some example use cases. Let me know if you would like me to explain anything in more detail! I aimed to make this section descriptive with long form content and SEO optimized headers while keeping the user read ratio in mind with code examples and a simple flowing structure. Please clap for this article if you found it helpful!

GraphX — Spark’s Graph Processing Framework

GraphX is Spark’s API for graphs and graph-parallel computation. GraphX extends the Spark RDD abstraction to include graph operations and algorithms. It provides a uniform toolset for ETL, exploratory analysis, and iterative graph computations on large graphs.

At a high level, GraphX represents a graph as a Graph object which contains:

  • vertices: An RDD of vertex properties
  • edges: An RDD of edge properties

The VertexRDD and EdgeRDD provide convenient views on the vertex and edge properties.

GraphX provides a set of fundamental operators (e.g. subgraph, joinVertices, aggregateMessages) as well as an optimized variant of the Pregel API. These operators simplify graph ETL, exploratory analysis, and iterative graph algorithms.

GraphX also includes a collection of built-in algorithms such as PageRank, Connected Components, Triangle Counting, and Strongly Connected Components. These are implemented on top of the GraphX operators using standard Spark APIs.

Here’s a simple example to create a graph and run PageRank:

// Create an RDD for the vertices
val vertices = sc.parallelize(Seq(
(1L, "A"),
(2L, "B"),
(3L, "C")
))

// Create an RDD for edges
val edges = sc.parallelize(Seq(
(1L, 2L),
(2L, 3L)
))

// Create a Graph
val graph = Graph(vertices, edges)

// Run PageRank
val ranks = graph.pageRank(0.15).vertices

// Join ranks back to vertices
val ranksJoined = vertices.join(ranks)

This creates a graph with 3 vertices and 2 edges, runs PageRank, and joins the calculated ranks back to the vertex properties.

To summarize, GraphX is a powerful graph processing framework with a uniform API for graph computation and built-in algorithms. It allows you to easily build scalable graph applications on top of Spark.

SparkR — Using R with Spark

SparkR is an R package that provides a light-weight front end to use Apache Spark from R. It allows you to manipulate Spark dataframes from R and invoke MLlib algorithms. SparkR makes it easy to combine the power of R and Spark.

Connecting to Spark from R

To get started with SparkR, you first need to connect to a running Spark cluster from your R program. You can do this with the sparkR.connect() function, passing in the Spark master URL:

library(SparkR)

sc <- sparkR.connect("spark://hostname:7077")

This will connect to a Spark standalone cluster. You can also connect to YARN and Mesos clusters.

Loading and Saving Data

Once connected to Spark, you can load data into Spark DataFrames and save DataFrames to storage systems. Some examples:

# Load JSON file into DataFrame 
df <- read.json(sc, "data.json")

# Save DataFrame to Parquet
write.parquet(df, "data.parquet")

# Read from Hive table
hiveDF <- read.df(sc, "my_hive_table")

# Write to Postgres
write.jdbc(df, "jdbc:postgresql://host/database", "table_name")

SparkR supports reading and writing to many data sources including JSON, CSV, Text, Parquet, Hive, MySQL, Postgres, etc.

Invoking Spark SQL through SparkR

You can run Spark SQL queries on your DataFrames from R using the sql() method. For example:

# Register DataFrame as a table
createOrReplaceTempView(df, "table_name")

# Run SQL query
results <- sql(sc, "SELECT * FROM table_name")

This will return a new DataFrame with the SQL query results.

MLlib Models through SparkR

You can also invoke MLlib machine learning algorithms from R using the SparkR ml package. For example:

# Logistic regression 
lr <- ml_logistic_regression(sc, df)

# Naive Bayes
nb <- ml_naive_bayes(sc, df)

# Clustering
km <- ml_kmeans(sc, df, k = 2)

The ml package allows you to build ML pipelines, tune hyperparameters, and evaluate your ML models all from R!

To summarize, SparkR provides a useful interface for data scientists and analysts comfortable with R to leverage the power of Spark for large-scale data processing, SQL, and machine learning.

Running Spark Applications

Now that we have covered the main components of the Spark ecosystem, let’s discuss how to actually run Spark applications. There are a few main ways to run Spark — the Spark shell, standalone applications, and distributed deployments.

The Spark Shell

The easiest way to start using Spark is through the Spark shell, spark-shell. The Spark shell provides a simple CLI to interact with Spark using the SparkContext (sc) and SQLContext (sqlContext) objects. You can quickly prototype things in the shell before moving to a full application.

To launch the Spark shell:

spark-shell

This will launch the Spark shell with your default Spark configuration. You can pass additional options to change the master URL, add packages, set spark configuration options, etc.

Standalone Applications

For larger and more complex applications, you will want to build standalone Spark applications. You can write Spark applications in Java, Scala, Python, R, and SQL. For examples of standalone applications, see the Spark Examples page.

To submit a standalone application called MySparkApp.py in Python:

spark-submit MySparkApp.py

Again, you can pass various options to configure your application and how you want to run it.

Distributed Deployments

For serious use of Spark, you will want to run it on a distributed cluster. There are a few main options for distributed deployment:

  • Standalone — Spark’s own simple standalone cluster manager. Great for testing purposes.
  • YARN — Run Spark on top of Hadoop NextGen (YARN) which can run distributed workloads on top of Hadoop clusters.
  • Mesos — General cluster manager that can also run Hadoop MapReduce and Spark applications.
  • Kubernetes — Open-source system for automating deployment, scaling, and management of containerized applications like Spark.

You can configure the cluster manager by passing the --master argument to spark-shell or spark-submit. For example, to launch the Spark shell in standalone cluster mode:

spark-shell --master spark://hostname:7077

This will connect to a Spark standalone cluster on hostname. You can also run applications on YARN, Mesos, Kubernetes, etc by changing the master URL accordingly.

The key takeaway is that there are many options for running your Spark applications based on your use case and infrastructure. Let me know if you have any other questions!

I hope this article has been helpful to you! If you found it helpful please support me with 1) click some claps and 2) share the story to your network. Let me know if you have any questions on the content covered.

Feel free to contact me at coderhack.com(at)xiv.in

--

--