Spark’s structured API’s

ravi g
6 min readFeb 9, 2019

--

Although we can access Spark from a variety of languages, each language provides us the Spark API’s. Spark basically has two fundamental sets of APIs: the low-level “unstructured” APIs, and the higher-level structured APIs. Before going to discuss them, let us discuss SparkSession.

Important Note:

In spark, we have two modes namely cluster mode and local mode.

cluster mode: In the cluster mode, we have many computers interconnected to each other through a network and we have a driver and many workers which execute the task parallelly.

Local mode: In local mode, the driver and executors are simply processes(Threads).

How to start Spark?

If spark is not installed in your system, click here for a tutorial.

Note: Here I’m using python and pyspark provides the spark interactive shell

In order to run the spark shell, we need to execute the following commands in the terminal from the downloaded spark folder.

To open spark shell in python ./bin/pyspark

To open spark shell in scala ./bin/spark-shell

For submitting spark application in production, we use spark-submit.

What is SparkSession

we know that we can control our Spark Application through a driver process called the SparkSession. The SparkSession instance is the way Spark executes user-defined operations across the cluster. There is a one-to-one relationship between a SparkSession and a Spark Application meaning one spark Application will have one SparkSession.

If we use the spark in interactive mode, then we will get a SparkSession by default and it is available as spark.

Click here to read more about SparkSession.

DataFrames & Datasets :

In the Spark Structured API’s, we have mainly two API’s namely Datasets and DataFrames. Dataframes are untyped means Spark checks its schema at the Runtime and on the other hand, Datasets are typed meaning Spark checks its Schema at the compile time and Datasets are available to only typed languages[Java, scala]. DataFrames are simply Datasets of Type Row. The “Row” type is Spark’s internal representation of its optimized in-memory format for computation.

Via [https://image.slidesharecdn.com/spark2zinovievforit-subbotnik-161028213227/95/joker16-spark-2-api-changes-structured-streaming-encoders-77-638.jpg?cb=1483965803]

What is Dataframe?

DataFrame is the most common Structured API in spark. It is a table of data with rows and columns partitioned across the cluster. Dataframes concept is not unique in Spark. If you ever worked with pandas in python then you might familiar with dataframes.

Features:

1.Immutable: We can’t change the DataFrame after once we create it but we can transform a DataFrame to another DataFrame after applying transformations.

2.Lazy Evaluations: Which means that a task is not executed until an action is performed.

How to create a DataFrame?

A DataFrame can be created from multiple sources such as RDBMS, CSV, JSON, and many other formats.

Via [https://www.analyticsvidhya.com/wp-content/uploads/2016/10/DataFrame-in-Spark.png]

Spark has the ability to read from a large number of data sources. To read the data we will use a reader that is associated with SparkSession. Before reading the data we can specify any options that we want.

For example,

code for reading data

In the above code, we are reading a CSV file which contains crime information in the cities around the world.

In the above, we specified a few options

inferSchema: Each DataFrame will have some schema and we want Spark to take the best guess what it should be.

In order to get the schema information, Spark reads in a little bit of the data and then attempts to parse the types in those rows according to the types available in Spark.

header: We want to specify that the first row is the header in the file.

Every DataFrame contains some columns and an unspecified number of rows. The reason being the number of rows unspecified is reading operation itself is a transformation.

Our DataFrame stored across the cluster and divided into partitions. In order to get the number of partitions, we use the following code.

No.of partitions in the dataframe is 8
Via [https://i.stack.imgur.com/dx93c.png]

Our dataframe is stored in 8 partitions like in the above picture.

In the above code, In order to get the number of partitions, we converting our DatFrame to RDD because at the end spark will do its manipulations using RDD’s but by using DataFrames spark will take care of the optimization part(In which it’s hiding low-level details). If we want more control over our physical distribution of data we can use RDD’s but be we have to care of most of the Optimization.

We can get the inferred Schema of the DataFrame by calling printSchema() on the DataFrame.

output the above code

The output of the above code is self-explanatory. Here spark itself trying best guess of column datatypes of the DataFrame.

Let’s try some more functions of the DataFrame. One of the most commonly used function is select()

If we perform the take action on the DataFrame, It will return an array with the specified rows.

If you are familiar with the SQL then you can also run the same transformation in the SQL as well. All we have to do is Create a table from the DataFrame by a simple function.

data.createOrReplaceTempView("data") # creates a table for SQLspark.sql('select district from data') # SQL queries on table

That’s the power of Spark. Even if you know SQL then you can get all features of DataFrames offered by the spark.

If you observe both DataFrame and SQL functions, both returning the same output and Interestingly both of these have the same plan of execution and we can observe it with the help of .explain() function on transformation.

DataFrame explain() function description.
SQL explain() function.

If you observe both screenshots, both Dataframe, and SQL producing the same plan.

There is no performance difference between using SQL and DataFrames. Both methods use exactly the same execution engine and internal data structures and At the end of the day all boils down to the personal preferences.

  • Basically DataFrame queries are much easier to construct programmatically.
  • By using DataFrame, one can break the SQL into multiple statements/queries, which helps in debugging, easy enhancements and code maintenance.

Structured Spark API Execution plan :

Via [https://databricks.com/wp-content/uploads/2015/04/Screen-Shot-2015-04-12-at-8.41.26-AM-1024x235.png]

Steps involved:

  1. we write DataFrame or SQL Queries.
  2. If the code is valid then converts it into a logical plan.
  3. Converts the Logical plan into a Physical plan.

When we completed the writing code, the catalyst will analyze a logical plan to resolve references, Although our code might valid, the references used like tables and columns might or might not exist. The catalyst will reject the logical if the references specified don’t exist. If catalyst can resolve it then it can be passed to catalyst optimizer in order to optimize the logical plan.

After creating an optimized logical plan, Spark will begin the physical planning process. The physical plan often called a Spark plan, specifies how the logical plan will execute on the cluster by generating different physical execution strategies and comparing them through a cost model and it will choose the model whichever the plan is best. At last Physical plan results in a series of RDDs and transformations. I made this point earlier when calculating the number of partitions for a DataFrame.

Conclusion

In this article, I covered Spark Structured APIs and how Spark transforms your code into what will physically execute on the cluster. In the next article, I will write about structured operations.

Happy Learning..!

References:

1.https://techvidvan.com/tutorials/spark-catalyst-optimizer/.

2. https://stackoverflow.com/questions/35222539/spark-sql-queries-vs-dataframe-functions.

3.https://techvidvan.com/tutorials/spark-partition/.

4. Spark: The Definite Guide.

--

--