A practical introduction to Spark’s Column- part 2

achilleus
7 min readMar 22, 2019

--

This is a continuation of the last article wherein I covered some basic and commonly used Column functions. In this post, we will discuss some other common functions available.

7) cast

Let’s say you have the below dataset and you want to validate the status based on the start and end date.

scala> val df = spark.sparkContext.parallelize(List(
| ("2019-01-10", "2018-12-01", "2020-12-31"),
| ("2019-02-12", "2010-01-01", "2018-12-12")
| )).toDF("needsVerified", "startDate", "endDate")
df: org.apache.spark.sql.DataFrame = [needsVerified: string, startDate: string ... 1 more field]
scala> df.show
+-------------+----------+----------+
|needsVerified| startDate| endDate|
+-------------+----------+----------+
| 2019-01-10|2018-12-01|2020-12-31|
| 2019-02-12|2010-01-01|2018-12-12|
+-------------+----------+----------+
scala> df.printSchema
root
|-- needsVerified: string (nullable = true)
|-- startDate: string (nullable = true)
|-- endDate: string (nullable = true)

If you notice, the startDate and endDate here are in the string format but we need them to be in date format. For this, we can use the cast function .

scala> df.select(col("needsVerified").cast("date"), col("startDate").cast("date"), col("endDate").cast("date"))
res95: org.apache.spark.sql.DataFrame = [needsVerified: date, startDate: date ... 1 more field]
scala> inputDF.printSchema
root
|-- needsVerified: date (nullable = true)
|-- startDate: date (nullable = true)
|-- endDate: date (nullable = true)

8)gt , > , lt ,< , geq , >= , leq , <=

There are greater than(gt, >), less than(lt, <), greater than or equal to(geq, >=) and less than or equal to (leq, <=)methods which we can use to check if the needsVerified date is valid or not.

scala> inputDF.withColumn("isValid", 'needsVerified >= 'startDate && 'needsVerified <= 'endDate).show
+-------------+----------+----------+-------+
|needsVerified| startDate| endDate|isValid|
+-------------+----------+----------+-------+
| 2019-01-10|2018-12-01|2020-12-31| true|
| 2019-02-12|2010-01-01|2018-12-12| false|
+-------------+----------+----------+-------+
scala> inputDF.withColumn("isValid", 'needsVerified .geq('startDate).and('needsVerified.leq('endDate))).show

Also, if you have noticed there are && , and methods that perform logical AND operation. Similarly, we have || , or for logical OR operation . There are bitwise operators also available which can perform bitwise OR(bitwiseOR), bitwise AND(bitwiseAND) and bitwise XOR(bitwiseXOR) operations.

9) Logical Operation:

asc, asc_nulls_first, asc_nulls_last, desc, desc_nulls_first, desc_nulls_last

There are methods to sort the data based on the ascending or descending order and also methods whether to return data that have null values in the sort column first or last.

gotData.sort(gotData("house").asc)+---+------+---+--------+
| id| name|age| house|
+---+------+---+--------+
|221| Jon| 16| null|
| 21|Tyrion| 40|Lanister|
|101| Bran| 10| Stark|
| 11| Ned| 50| Stark|
+---+------+---+--------+
gotData.sort(gotData("house").asc_nulls_first)+---+------+---+--------+
| id| name|age| house|
+---+------+---+--------+
|221| Jon| 16| null|
| 21|Tyrion| 40|Lanister|
|101| Bran| 10| Stark|
| 11| Ned| 50| Stark|
+---+------+---+--------+
gotData.sort(gotData("house").asc_nulls_last)+---+------+---+--------+
| id| name|age| house|
+---+------+---+--------+
| 21|Tyrion| 40|Lanister|
| 11| Ned| 50| Stark|
|101| Bran| 10| Stark|
|221| Jon| 16| null|
+---+------+---+--------+
gotData.sort(gotData("house").desc)
gotData.sort(gotData("house").desc_nulls_first)
gotData.sort(gotData("house").desc_nulls_last)

10) Equality operation

There are === and equalTo methods for the Equality test to check if 2 columns have the same data.

scala>  inputDF.withColumn("equalsTo", 'needsVerified === 'endDate ).show
+-------------+----------+----------+--------+
|needsVerified| startDate| endDate|equalsTo|
+-------------+----------+----------+--------+
| 2019-01-10|2018-12-01|2020-12-31| false|
| 2019-02-12|2010-01-01|2018-12-12| false|
+-------------+----------+----------+--------+
scala> inputDF.withColumn("equalsTo", 'needsVerified equalTo 'endDate ).show

11) Inequality operation

There are =!=, notEqual and !== functions for the Inequality test.Note that !== is deprecated in favor of =!= .

scala>  inputDF.withColumn("equalsTo", 'needsVerified =!= 'endDate ).show
+-------------+----------+----------+--------+
|needsVerified| startDate| endDate|equalsTo|
+-------------+----------+----------+--------+
| 2019-01-10|2018-12-01|2020-12-31| true|
| 2019-02-12|2010-01-01|2018-12-12| true|
+-------------+----------+----------+--------+
scala> inputDF.withColumn("equalsTo", 'needsVerified !== 'endDate ).show
warning: there was one deprecation warning; re-run with -deprecation for details
scala> inputDF.withColumn("equalsTo", 'needsVerified notEqual 'endDate ).show

These functions can be used for joins. But let’s say if you are joining on columns that have null data.

scala> val df1 = spark.createDataFrame(Seq(("123","name1"),("001","name4"),("","name3"),(null,"name2"))).toDF("id","name")scala> df1.show
+----+-----+
| id| name|
+----+-----+
| 123|name1|
| 001|name4|
| |name3|
|null|name2|
+----+-----+
scala> val df2 = spark.createDataFrame(Seq((null,"sales"),("123","sales"),("223","Legal"),("","IT"))).toDF("id","dept")scala> df2.show
+----+-----+
| id| dept|
+----+-----+
|null|sales|
| 123|sales|
| 223|Legal|
| | IT|
+----+-----+

When you join using the regular === equality test function , you will lose the null data.

scala> df1.join(df2, df1.col("id") === df2.col("id")).show
+---+-----+---+-----+
| id| name| id| dept|
+---+-----+---+-----+
|123|name1|123|sales|
| |name3| | IT|
+---+-----+---+-----+

Hence there is another equality test ie <=> method that is safe for null values and doesn’t get rid of null data. Also, eqNullSafe is the equivalent method for this for Java but is available in Scala as well.

scala> df1.join(df2, df1.col("id") <=> df2.col("id")).show
+----+-----+----+-----+
| id| name| id| dept|
+----+-----+----+-----+
| 123|name1| 123|sales|
| |name3| | IT|
|null|name2|null|sales|
+----+-----+----+-----+

12) substr

This method returns a substring for a column. It requires a starting position and the length of the required substring.

scala> gotData.withColumn("short_house", col("house").substr(0, 3)).show()
+---+------+---+--------+-----------+
| id| name|age| house|short_house|
+---+------+---+--------+-----------+
|101| Bran| 10| Stark| Sta|
|221| Jon| 16| null| null|
| 11| Ned| 50| Stark| Sta|
| 21|Tyrion| 40|Lanister| Lan|
+---+------+---+--------+-----------+

Note that, for Jon the short_house returns a null. It is the default behavior to return null when the input is null.

13) contains

This method returns a boolean and is similar to other contains method of programming languages. Note that contains method is case sensitive ie contains(“stark”) would return false. Also, contains needs an exact match for it to return and can’t have wild cards in it ie contains(“Sta*”) would return false as well.You might wanna use like or rlike in such a scenario.

gotData.withColumn("isStark", col("house").contains("Stark")).show()
+---+------+---+--------+-------+
| id| name|age| house|isStark|
+---+------+---+--------+-------+
|101| Bran| 10| Stark| true|
|221| Jon| 16| null| null|
| 11| Ned| 50| Stark| true|
| 21|Tyrion| 40|Lanister| false|
+---+------+---+--------+-------+

14) when/otherwise

when and other wise can be used to simulate the case statements of SQL or if/else of any programming language on the columns data. It evaluates a list of conditions and returns 1 of the possible matching condition’s result expression.We can use otherwise to set a default value if none of the conditions match.If otherwise is not defined at the end of our when expression, a null will be returned in case of no matching when conditions.

gotData.withColumn("isSnow", when(gotData("house").startsWith("St"), "Stark")
.when(gotData("house").endsWith("ter") ,"Lannister")
.otherwise("snow")).show
+---+------+---+--------+---------+
| id| name|age| house| isSnow|
+---+------+---+--------+---------+
|101| Bran| 10| Stark| Stark|
|221| Jon| 16| null| snow|
| 11| Ned| 50| Stark| Stark|
| 21|Tyrion| 40|Lanister|Lannister|
+---+------+---+--------+---------+

Also, note that we have used the startsWith and endsWith method here on the house column object to check if the name of the house column starts with St or ends with ter. These are couple of other handy methods available in Column object.

Gotcha: This when can be applied only for the column that was previously generated by the org.apache.spark.sql.functions.when function and for imtellij to recognise the 1st when method, you might wanna import it.

import org.apache.spark.sql.functions.when

15) — , !

The Unary minus( -) basically negates the numeric type(int, long, double) values and the Unary not (!) negates only a boolean type.

val people = Seq((1,"male",false),(2,"female",true)).toDF("id","gender","alive")scala>  people.withColumn("dead",!people("alive")).show
+---+------+-----+-----+
| id|gender|alive| dead|
+---+------+-----+-----+
| 1| male|false| true|
| 2|female| true|false|
+---+------+-----+-----+
scala> people.withColumn("neg id",-people("id")).show
+---+------+-----+------+
| id|gender|alive|neg id|
+---+------+-----+------+
| 1| male|false| -1|
| 2|female| true| -2|
+---+------+-----+------+

16) between

It returns a boolean, ie a true if the column’s value is in between the upper bound and the lower bound value. The upper and lower bound values are inclusive.

scala> people.withColumn("between",people("id").between(1,2)).show
+---+------+-----+-------+
| id|gender|alive|between|
+---+------+-----+-------+
|1.0| male|false| true|
|2.0|female| true| true|
+---+------+-----+-------+

17) over

This is one of the most important function that is used in many of the window operations.We can talk about the window function in detail when discuss about aggregation in spark but for now, it will be fair enough to say that over method provides a way to apply an aggregation over a window specification which in turn can be used to specify partition, order and frame boundaries of the aggregation.

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.avg
val w: WindowSpec = Window.partitionBy("name").orderBy("id")
gotData.select(avg('age).over(w).as("avg_age")).show
+-------+
|avg_age|
+-------+
| 40.0|
| 50.0|
| 10.0|
| 16.0|
+-------+

In this example, all we are doing is calculating average age from our dataset. The over method is applied to notify spark that the average function should be applied over the window specification mentioned in the window spec w . Also, if you notice here we are aliasing our final output column as avg_age by using the as function of the column object. We can use alias or name functions to achieve the same result.

Thanks for reading! Please do share the article, if you liked it. Any comments or suggestions are welcome! Check out my other articles here.

--

--