A practical introduction to Spark’s Column- part 1

A table in Spark SQL

Frequently used simple, important and common column methods for effective dataframe/dataset manipulation.

Most of the operations that we do on Spark generally involve high usage of the column objects. Spark has rich functions to do manipulation and transformation over the column data. A column of a Dataframe/Dataset in Spark is similar to a column in a traditional database.

Consider the below dataframe example, id and name are the columns here and they represent a column in a Dataset/dataframe which is a container for a Catalyst expression(We can talk about catalyst some other day). In a nutshell, the catalyst is a tree manipulation framework and expressions are the nodes of the expression tree. For the Spark API user, id and name are just like the columns of User table like good old SQL.

scala> val got = Seq((1,"Bran"),(2,"Jon")).toDF("id","name")
got: org.apache.spark.sql.DataFrame = [id: int, name: string]
scala> got.show
+---+----+
| id|name|
+---+----+
| 1|Bran|
| 2| Jon|
+---+----+

Spark has a variety of methods to support a wide variety of operations on the columns.

To access the column from a dataset/dataframe:

Say you wanna access just the name from the got dataframe.There are multiple ways to do it.

scala>  import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.col
scala>   got.select(col("id"))
res17: org.apache.spark.sql.DataFrame = [id: int]
scala>   import org.apache.spark.sql.functions.column
import org.apache.spark.sql.functions.column
scala>   got.select(column("id"))
res18: org.apache.spark.sql.DataFrame = [id: int]
scala>   import spark.implicits.StringToColumn
import spark.implicits.StringToColumn
scala>   got.select($"id")
res19: org.apache.spark.sql.DataFrame = [id: int]
scala>   import spark.implicits.symbolToColumn
import spark.implicits.symbolToColumn
scala>   got.select('id)
res20: org.apache.spark.sql.DataFrame = [id: int]
scala>   got.select('id).show
+---+
| id|
+---+
| 1|
| 2|
+---+

Note: If you are in spark-shell all these imports come for free. Otherwise, some imports have to be done manually. The implicit imports can be tricky.

Generally, I believe people prefer to do import spark.implicits._ and be done with it. Also, if you are curious about what implicits are being used in scala. There is a handy way of doing it.

scala> import scala.reflect.runtime.universe.reify
import scala.reflect.runtime.universe.reify
scala> println(reify(Seq((1,"Bran"),(2,"Jon")).toDF("id","name")))
Expr[org.apache.spark.sql.DataFrame]($iw.$line3$read.$iw.$iw.spark.implicits.localSeqToDatasetHolder(Seq.apply(Tuple2.apply(1, "Bran"), Tuple2.apply(2, "Jon")))($iw.$line3$read.$iw.$iw.spark.implicits.newProductEncoder(Predef.this.implicitly)).toDF("id", "name"))

Also, we can have typed columns which is basically a column with an expression encoder specified for the expected input and return type.

scala> val name = $"name".as[String]
name: org.apache.spark.sql.TypedColumn[Any,String] = name
scala> val name = $"name"
name: org.apache.spark.sql.ColumnName = name

There are more than 50 methods(67 the last time I counted ) that can be used for transformations on the column object. We will be covering some of the important methods that are generally used.

!==, +, <=, >, apply, asc_nulls_last, bitwiseXOR, desc_nulls_first, eqNullSafe, expr, gt, isNotNull, like, multiply, otherwise, startsWith, unary_-, %, -, <=>, >=, as, between, cast, desc_nulls_last, equalTo, geq, hashCode, isNull, lt, name, over, substr, when, &&, /, =!=, alias, asc, bitwiseAND, contains, divide, equals, getField, isInCollection, isin, minus, notEqual, plus, toString, ||, *, <, ===, and, asc_nulls_first, bitwiseOR, desc, endsWith, explain, getItem, isNaN, leq, mod, or, rlike, unary_!

Let’s assume that we have GOT dataset!

scala>  val gotData = Seq((101,"Bran",10,"Stark"),(221,"Jon",16,null),(11,"Ned",50,"Stark"),(21,"Tyrion",40,"Lanister")).toDF("id","name","age","house")
gotData: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]
scala> gotData.show
+---+------+---+--------+
| id| name|age| house|
+---+------+---+--------+
|101| Bran| 10| Stark|
|221| Jon| 16| null|
| 11| Ned| 50| Stark|
| 21|Tyrion| 40|Lanister|
+---+------+---+--------+

1) +

Let’s say you wanted the current age of the GOT stars and add 8 years to each of character’s age and have a new column with the current age. We have + for this:

scala> gotData.withColumn("char_current_age", col("age").+(8)).show()
scala> gotData.withColumn("char_current_age", col("age") +8 ).show()
+---+------+---+--------+----------------+
| id| name|age| house|char_current_age|
+---+------+---+--------+----------------+
|101| Bran| 10| Stark| 18|
|221| Jon| 16| null| 24|
| 11| Ned| 50| Stark| 58|
| 21|Tyrion| 40|Lanister| 48|
+---+------+---+--------+----------------+

You could also, create a new dataframe by adding to columns. But the adding datatypes should or else resulting dataframe would be null. Say if you do gotData(“age”) + gotData(“name”) , the resulting dataframe would be dataframe of nulls. Also, theplus is the java equivalent method.

scala> gotData.select( gotData("age") + gotData("id") ).show
+----------+
|(age + id)|
+----------+
| 111|
| 237|
| 61|
| 61|
+----------+
scala> gotData.select( gotData("age") plus gotData("id") )

2) -

scala> gotData.withColumn("older_than_bran", col("age") - 10  ).show()
+---+------+---+--------+---------------+
| id| name|age| house|older_than_bran|
+---+------+---+--------+---------------+
|101| Bran| 10| Stark| 0|
|221| Jon| 16| null| 6|
| 11| Ned| 50| Stark| 40|
| 21|Tyrion| 40|Lanister| 30|
+---+------+---+--------+---------------+
scala> gotData.withColumn("older_than_bran", col("age") minus 10 )

3)

Similarly, we have * to multiply, / to divide and % to get the mod of a column.

scala> gotData.withColumn("multiply_10", col("age") *  10)
scala> gotData.withColumn("multiply_10", col("age") multiply 10)
scala> gotData.withColumn("divide_10", col("age") / 10)
scala> gotData.withColumn("divide_10", col("age") divide 10)
scala> gotData.withColumn("mod_10", col("age") mod 10)
scala> gotData.withColumn("mod_10", col("age") % 10)

4) isin

It creates a new column with boolean value and the provided collection is evaluated._* is required when we are checking if the element is present in a collection as the asin method expects variable arguments(Any*). It is a special case of type ascription which tells the Scala compiler to treat a single argument of a sequence type as a varargs.

scala> val teenage = (15 to 25)
teenage: scala.collection.immutable.Range.Inclusive = Range(15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25)
scala>   gotData.withColumn("teenage",col("age").isin(teenage:_*)).show()
+---+------+---+--------+-------+
| id| name|age| house|teenage|
+---+------+---+--------+-------+
|101| Bran| 10| Stark| false|
|221| Jon| 16| null| true|
| 11| Ned| 50| Stark| false|
| 21|Tyrion| 40|Lanister| false|
+---+------+---+--------+-------+
scala> gotData.withColumn("teenage",col("age").isin(15 to 25:_*))
scala> gotData.withColumn("is_Stark",col("house").isin("Stark"))

We can use isInCollection , if you want to just reference the collection. It gives the same output.isInCollection does the conversion to varargs notation for us.

scala>   gotData.withColumn("teenage",col("age").isInCollection(15 to 25))

5) like

This is the SQL like expression to check if the column has that particular data and returns a boolean. It is also case sensitive. A like(“b%”) would return a false.

scala> gotData.withColumn("starts with B",col("name").like("B%")).show()
+---+------+---+--------+-------------+
| id| name|age| house|starts with B|
+---+------+---+--------+-------------+
|101| Bran| 10| Stark| true|
|221| Jon| 16| null| false|
| 11| Ned| 50| Stark| false|
| 21|Tyrion| 40|Lanister| false|
+---+------+---+--------+-------------+
scala> gotData.withColumn("has O ",col("name").like("%o%")).show()
+---+------+---+--------+------+
| id| name|age| house|has O |
+---+------+---+--------+------+
|101| Bran| 10| Stark| false|
|221| Jon| 16| null| true|
| 11| Ned| 50| Stark| false|
| 21|Tyrion| 40|Lanister| true|
+---+------+---+--------+------+

Similarly, there is a rlike function that can be used to match the regex.

scala> gotData.withColumn("has O ",col("name").rlike("^(Bran|Jon)")) .show()
+---+------+---+--------+------+
| id| name|age| house|has O |
+---+------+---+--------+------+
|101| Bran| 10| Stark| true|
|221| Jon| 16| null| true|
| 11| Ned| 50| Stark| false|
| 21|Tyrion| 40|Lanister| false|
+---+------+---+--------+------+

Adding a ! negates the boolean output and can be used to implement not like and not rlike features.

gotData.withColumn("has O ",!col("name").like("%o%"))
gotData.withColumn("has O ",!col("name").rlike("^(Bran|Jon)"))

6) isNull and isNotNull

scala>gotData.withColumn("house_unknown",col("house").isNull).show()
+---+------+---+--------+-------------+
| id| name|age| house|house_unknown|
+---+------+---+--------+-------------+
|101| Bran| 10| Stark| false|
|221| Jon| 16| null| true|
| 11| Ned| 50| Stark| false|
| 21|Tyrion| 40|Lanister| false|
+---+------+---+--------+-------------+
scala>gotData.withColumn("houseNotnull",col("house").isNotNull)
.show()

I will be covering some other important methods that can come handy in the next part of this article. Thanks for reading! Please do share the article, if you liked it. Any comments or suggestions are welcome! Check out my other articles here.