This is a continuation of the last article wherein I covered some basic and commonly used Column functions. In this post, we will discuss some other common functions available.
7) cast
Let’s say you have the below dataset and you want to validate the status based on the start and end date.
scala> val df = spark.sparkContext.parallelize(List(
| ("2019-01-10", "2018-12-01", "2020-12-31"),
| ("2019-02-12", "2010-01-01", "2018-12-12")
| )).toDF("needsVerified", "startDate", "endDate")
df: org.apache.spark.sql.DataFrame = [needsVerified: string, startDate: string ... 1 more field]scala> df.show
+-------------+----------+----------+
|needsVerified| startDate| endDate|
+-------------+----------+----------+
| 2019-01-10|2018-12-01|2020-12-31|
| 2019-02-12|2010-01-01|2018-12-12|
+-------------+----------+----------+
scala> df.printSchema
root
|-- needsVerified: string (nullable = true)
|-- startDate: string (nullable = true)
|-- endDate: string (nullable = true)
If you notice, the startDate
and endDate
here are in the string
format but we need them to be in date
format. For this, we can use the cast
function .
scala> df.select(col("needsVerified").cast("date"), col("startDate").cast("date"), col("endDate").cast("date"))
res95: org.apache.spark.sql.DataFrame = [needsVerified: date, startDate: date ... 1 more field]scala> inputDF.printSchema
root
|-- needsVerified: date (nullable = true)
|-- startDate: date (nullable = true)
|-- endDate: date (nullable = true)
8)gt , > , lt ,< , geq , >= , leq , <=
There are greater than(gt, >), less than(lt, <), greater than or equal to(geq, >=) and less than or equal to (leq, <=)methods which we can use to check if the needsVerified
date is valid or not.
scala> inputDF.withColumn("isValid", 'needsVerified >= 'startDate && 'needsVerified <= 'endDate).show
+-------------+----------+----------+-------+
|needsVerified| startDate| endDate|isValid|
+-------------+----------+----------+-------+
| 2019-01-10|2018-12-01|2020-12-31| true|
| 2019-02-12|2010-01-01|2018-12-12| false|
+-------------+----------+----------+-------+scala> inputDF.withColumn("isValid", 'needsVerified .geq('startDate).and('needsVerified.leq('endDate))).show
Also, if you have noticed there are &&
, and
methods that perform logical AND
operation. Similarly, we have ||
, or
for logical OR operation . There are bitwise operators also available which can perform bitwise OR(bitwiseOR), bitwise AND(bitwiseAND) and bitwise XOR(bitwiseXOR) operations.
9) Logical Operation:
asc, asc_nulls_first, asc_nulls_last, desc, desc_nulls_first, desc_nulls_last
There are methods to sort the data based on the ascending or descending order and also methods whether to return data that have null values in the sort column first or last.
gotData.sort(gotData("house").asc)+---+------+---+--------+
| id| name|age| house|
+---+------+---+--------+
|221| Jon| 16| null|
| 21|Tyrion| 40|Lanister|
|101| Bran| 10| Stark|
| 11| Ned| 50| Stark|
+---+------+---+--------+gotData.sort(gotData("house").asc_nulls_first)+---+------+---+--------+
| id| name|age| house|
+---+------+---+--------+
|221| Jon| 16| null|
| 21|Tyrion| 40|Lanister|
|101| Bran| 10| Stark|
| 11| Ned| 50| Stark|
+---+------+---+--------+gotData.sort(gotData("house").asc_nulls_last)+---+------+---+--------+
| id| name|age| house|
+---+------+---+--------+
| 21|Tyrion| 40|Lanister|
| 11| Ned| 50| Stark|
|101| Bran| 10| Stark|
|221| Jon| 16| null|
+---+------+---+--------+
gotData.sort(gotData("house").desc)
gotData.sort(gotData("house").desc_nulls_first)
gotData.sort(gotData("house").desc_nulls_last)
10) Equality operation
There are === and equalTo methods for the Equality test to check if 2 columns have the same data.
scala> inputDF.withColumn("equalsTo", 'needsVerified === 'endDate ).show
+-------------+----------+----------+--------+
|needsVerified| startDate| endDate|equalsTo|
+-------------+----------+----------+--------+
| 2019-01-10|2018-12-01|2020-12-31| false|
| 2019-02-12|2010-01-01|2018-12-12| false|
+-------------+----------+----------+--------+scala> inputDF.withColumn("equalsTo", 'needsVerified equalTo 'endDate ).show
11) Inequality operation
There are =!=, notEqual and !== functions for the Inequality test.Note that !==
is deprecated
in favor of =!=
.
scala> inputDF.withColumn("equalsTo", 'needsVerified =!= 'endDate ).show
+-------------+----------+----------+--------+
|needsVerified| startDate| endDate|equalsTo|
+-------------+----------+----------+--------+
| 2019-01-10|2018-12-01|2020-12-31| true|
| 2019-02-12|2010-01-01|2018-12-12| true|
+-------------+----------+----------+--------+
scala> inputDF.withColumn("equalsTo", 'needsVerified !== 'endDate ).show
warning: there was one deprecation warning; re-run with -deprecation for details
scala> inputDF.withColumn("equalsTo", 'needsVerified notEqual 'endDate ).show
These functions can be used for joins. But let’s say if you are joining on columns that have null data.
scala> val df1 = spark.createDataFrame(Seq(("123","name1"),("001","name4"),("","name3"),(null,"name2"))).toDF("id","name")scala> df1.show
+----+-----+
| id| name|
+----+-----+
| 123|name1|
| 001|name4|
| |name3|
|null|name2|
+----+-----+scala> val df2 = spark.createDataFrame(Seq((null,"sales"),("123","sales"),("223","Legal"),("","IT"))).toDF("id","dept")scala> df2.show
+----+-----+
| id| dept|
+----+-----+
|null|sales|
| 123|sales|
| 223|Legal|
| | IT|
+----+-----+
When you join using the regular ===
equality test function , you will lose the null data.
scala> df1.join(df2, df1.col("id") === df2.col("id")).show
+---+-----+---+-----+
| id| name| id| dept|
+---+-----+---+-----+
|123|name1|123|sales|
| |name3| | IT|
+---+-----+---+-----+
Hence there is another equality test ie <=> method that is safe for null values and doesn’t get rid of null data. Also, eqNullSafe is the equivalent method for this for Java but is available in Scala as well.
scala> df1.join(df2, df1.col("id") <=> df2.col("id")).show
+----+-----+----+-----+
| id| name| id| dept|
+----+-----+----+-----+
| 123|name1| 123|sales|
| |name3| | IT|
|null|name2|null|sales|
+----+-----+----+-----+
12) substr
This method returns a substring for a column. It requires a starting position
and the length
of the required substring.
scala> gotData.withColumn("short_house", col("house").substr(0, 3)).show()
+---+------+---+--------+-----------+
| id| name|age| house|short_house|
+---+------+---+--------+-----------+
|101| Bran| 10| Stark| Sta|
|221| Jon| 16| null| null|
| 11| Ned| 50| Stark| Sta|
| 21|Tyrion| 40|Lanister| Lan|
+---+------+---+--------+-----------+
Note that, for Jon
the short_house
returns a null. It is the default behavior to return null when the input is null.
13) contains
This method returns a boolean and is similar to other contains method of programming languages. Note that contains
method is case sensitive ie contains(“stark”)
would return false
. Also, contains needs an exact match for it to return and can’t have wild cards in it ie contains(“Sta*”)
would return false
as well.You might wanna use like
or rlike
in such a scenario.
gotData.withColumn("isStark", col("house").contains("Stark")).show()
+---+------+---+--------+-------+
| id| name|age| house|isStark|
+---+------+---+--------+-------+
|101| Bran| 10| Stark| true|
|221| Jon| 16| null| null|
| 11| Ned| 50| Stark| true|
| 21|Tyrion| 40|Lanister| false|
+---+------+---+--------+-------+
14) when/otherwise
when and other wise can be used to simulate the case
statements of SQL or if/else
of any programming language on the columns data. It evaluates a list of conditions and returns 1 of the possible matching condition’s result expression.We can use otherwise
to set a default value if none of the conditions match.If otherwise
is not defined at the end of our when expression, a null will be returned in case of no matching when conditions.
gotData.withColumn("isSnow", when(gotData("house").startsWith("St"), "Stark")
.when(gotData("house").endsWith("ter") ,"Lannister")
.otherwise("snow")).show+---+------+---+--------+---------+
| id| name|age| house| isSnow|
+---+------+---+--------+---------+
|101| Bran| 10| Stark| Stark|
|221| Jon| 16| null| snow|
| 11| Ned| 50| Stark| Stark|
| 21|Tyrion| 40|Lanister|Lannister|
+---+------+---+--------+---------+
Also, note that we have used the startsWith
and endsWith
method here on the house
column object to check if the name of the house
column starts with St
or ends
with ter
. These are couple of other handy methods available in Column object.
Gotcha: This when can be applied only for the column that was previously generated by the org.apache.spark.sql.functions.when
function and for imtellij to recognise the 1st when method, you might wanna import it.
import org.apache.spark.sql.functions.when
15) — , !
The Unary minus( -) basically negates the numeric type(int, long, double) values and the Unary not (!) negates only a boolean type.
val people = Seq((1,"male",false),(2,"female",true)).toDF("id","gender","alive")scala> people.withColumn("dead",!people("alive")).show
+---+------+-----+-----+
| id|gender|alive| dead|
+---+------+-----+-----+
| 1| male|false| true|
| 2|female| true|false|
+---+------+-----+-----+scala> people.withColumn("neg id",-people("id")).show
+---+------+-----+------+
| id|gender|alive|neg id|
+---+------+-----+------+
| 1| male|false| -1|
| 2|female| true| -2|
+---+------+-----+------+
16) between
It returns a boolean, ie a true if the column’s value is in between the upper bound and the lower bound value. The upper and lower bound values are inclusive.
scala> people.withColumn("between",people("id").between(1,2)).show
+---+------+-----+-------+
| id|gender|alive|between|
+---+------+-----+-------+
|1.0| male|false| true|
|2.0|female| true| true|
+---+------+-----+-------+
17) over
This is one of the most important function that is used in many of the window operations.We can talk about the window function in detail when discuss about aggregation in spark but for now, it will be fair enough to say that over method provides a way to apply an aggregation over a window specification which in turn can be used to specify partition, order and frame boundaries of the aggregation.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.avg
val w: WindowSpec = Window.partitionBy("name").orderBy("id")
gotData.select(avg('age).over(w).as("avg_age")).show
+-------+
|avg_age|
+-------+
| 40.0|
| 50.0|
| 10.0|
| 16.0|
+-------+
In this example, all we are doing is calculating average age from our dataset. The over method is applied to notify spark that the average function should be applied over the window specification mentioned in the window spec w
. Also, if you notice here we are aliasing our final output column as avg_age
by using the as
function of the column object
. We can use alias
or name
functions to achieve the same result.
Thanks for reading! Please do share the article, if you liked it. Any comments or suggestions are welcome! Check out my other articles here.