Top 100 Pyspark Features: Master ETL with Pyspark 1 — Fundamental

Minh Tran
11 min readMay 7, 2020

--

Become master to use Pyspark for ETL, the one of the most used Python tools for Big Data Analysis

Pyspark ETL

As we know, the volume of data is increase by hour, really really big data. The amount of the newly created data in 2020 was predicted to grow 44X to reach 35 zettabytes (35 trillion gigabytes). And the most of theme, they are raw data. So the problem of science is how to research, investigate to find the mining of big data (Data Mining). Fortunately, our excellent developers was built many tools, frameworks for processing , ETL (Extract, Transform, Loading) big data and one of theme is Spark.

So What is Spark and Why?

Spark is a general-purpose distributed data processing engine that is suitable for use in a wide range of circumstances. On top of the Spark core data processing engine, there are libraries for SQL, machine learning, graph computation, and stream processing, which can be used together in an application. Programming languages supported by Spark include: Java, Python, Scala, and R. Application developers and data scientists incorporate Spark into their applications to rapidly query, analyze, and transform data at scale. Tasks most frequently associated with Spark include ETL and SQL batch jobs across large data sets, processing of streaming data from sensors, IoT, or financial systems, and machine learning tasks.
You can read more about Spark in here. Some detail information in Spark as: MapReduce, How to Spark run in Clusters, Spark Streaming, how it run Pipeline ……
In this fundamental topic, I only focus on Spark SQL for data processing, how to write the and use Python for example (Pyspark).
The dataset will be use for practical is Movie dataset from Megogo Challenge in Kaggle. You can view and download it in here.
Note: We assume you have knowledge about data frame (Maybe a little with Pandas.) If not, you can read my blog in here about Pandas first. If you have knowledge about Pandas, I think you can learn Pysark for ETL easier and faster.

Installing

You can following in this document from Spark to install. (It is not complex to install theme, so I really believe you can do it by yourself :D ).

Generic Load/Save Functions

In the simplest form, the default data source (parquet unless otherwise configured by spark.sql.sources.default) will be used for all operations.

Simple Load and Save data from a file with Pyspark

You can also manually specify the data source that will be used along with any extra options that you would like to pass to the data source. Data sources are specified by their fully qualified name (i.e., org.apache.spark.sql.parquet), but for built-in sources you can also use their short names (json, parquet, jdbc, orc, libsvm, csv, text). DataFrames loaded from any data source type can be converted into other types using this syntax.

Load json file and save to parquet with pyspark
Load csv file with pyspark

More, we have many kind of load data with Spark. Not only local, we can load data from AWS S3, GCS, Redshift, Athena, SFTP, Google BigQuery …. with one magic line. (spark.read.load) 💁, so amazing….

Actually, we can have load, extract and read data from one query. But normally, I don’t recommend you use that because it not easy to trace back and validate data. Example:

Oki lah, Now we will read our movie dataset to go to detail.

Read movie data csv with pyspark

It seem ok, Now we will go to main of this fundamental topic.

TOP 100 Pyspark Features

pyspark.sql.DataFrame

  1. show(n) : Show and return top n rows of this data frame.
Show n first row with Pyspark

2. collect() : Returns all the records as a list of Row.

Collect method in Pyspark

3. columns (property) : Returns all column names as a list.

4. count() : Returns the number of rows in this DataFrame.

Count number of rows in Pyspark

5. describe() : Computes basic statistics for numeric and string columns.
This include count, mean, stddev, min, and max. If no columns are given, this function computes statistics for all numerical or string columns.
Actually this is not good example because in this data, we don’t have columns is number, but I think you can easy understand that.

Describe in pyspark

6. Distinct() : Returns a new DataFrame containing the distinct rows in this DataFrame.
You can see the example to recognize the number of records reduced because distinct method was drop some duplicate row in original data frame.

Distinct method in Pyspark

7. drop(*cols) : Returns a new DataFrame that drops the specified column. This is a no-op if schema doesn’t contain the given column name(s).
Example, you can see 3 columns in [‘Director’, ‘Genre’, ‘Cast’] was deleted.

Drop columns in pyspark

8. dropDuplicates() : Return a new DataFrame with duplicate rows removed, optionally only considering certain columns.

dropDuplicate in Pyspark

9. dropna(how=’any’, thresh=None, subset=None) : Returns a new DataFrame omitting rows with null values.

Drop NaN in Pyspark

10. explain() : Prints the (logical and physical) plans to the console for debugging purpose.
Actually, to become expert pyspark ETL, you have to understand this feature clearly. It will be show how your query complex and you can optimize the performance of query base on that. I will have a specific advance topic to discuss about that. In here, I only show you the example:

Explain query with Pyspark

11. fillna(): Replace null values, alias for na.fill()

Fill missing value in Pyspark

12. filter(condition) : Filters rows using the given condition.

Filter by condition in Pyspark

13. First() : Returns the first row as a Row.

First in Pyspark

14. foreach(function) : Applies the f function to all Row of this DataFrame.

foreach in pyspark

15. groupBy(*cols) : Groups the DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions.

Groupby in Pyspark

16. head(n) : Returns the first n rows.

Head method in pyspark

17. join(): Joins with another DataFrame, using the given join expression.
Actually it quite same as SQL Join or Pandas Concat. So I only show you simple example. (But we will talk how to optimize when using join in next topic: advanced pyspark)

Simple Join in Pyspark

18. Limit() : Limits the result count to the number specified.

Limitation in Pyspark

19. orderBy(*cols, **kwargs): Returns a new DataFrame sorted by the specified column(s).

OrderBy in Pyspark

20. printSchema(): Prints out the schema in the tree format.

Display Schema in pyspark

21. repartition() : Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.
One of the ways to improve performance of ETL query is partition. If we can calculate the good number partition, your performance will be better. (Talk about advance topic.)

22. replace() : Returns a new DataFrame replacing a value with another value.

Replace value in Pyspark

23. rollup(*cols) : Create a multi-dimensional rollup for the current DataFrame using the specified columns, so we can run aggregation on them.

Rollup in Pyspark

24. select() : Projects a set of expressions and returns a new DataFrame.

Select columns in Pyspark

25. selectExpr() : Projects a set of SQL expressions and returns a new DataFrame.

26. sort(*cols, **kwargs) : Returns a new DataFrame sorted by the specified column(s).

Sort in Pyspark

27. Summary(*statistics): Computes specified statistics for numeric and string columns. Available statistics are: — count — mean — stddev — min — max — arbitrary approximate percentiles specified as a percentage (eg, 75%)

Summary statistic in pyspark

28. toPandas() : Returns the contents of this DataFrame as Pandas pandas.DataFrame. This is only available if Pandas is installed and available.

Pyspark Dataframe to Pandas DataFrame

29. union() : Return a new DataFrame containing union of rows in this and another DataFrame.

30. unpersist() : Marks the DataFrame as non-persistent, and remove all blocks for it from memory and disk. Normally, when you use Pyspark for pipeline, this feature will be useful to clear memory after each part. Remember to use that if you face this problem.

31. withColumn() : Returns a new DataFrame by adding a column or replacing the existing column that has the same name.

32. withColumnRenamed() : Returns a new DataFrame by renaming an existing column. This is a no-op if schema doesn’t contain the given column name.

Rename Column in Pyspark

pyspark.sql.GroupedData

A set of methods for aggregations on a DataFrame, created by DataFrame.groupBy().

33. agg() : Compute aggregates and returns the result as a dataframe.

Aggregate function in Pyspark

34. apply(udf): Maps each group of the current DataFrame using a pandas udf and returns the result as a DataFrame.The user-defined function should take a pandas.DataFrame and return another pandas.DataFrame. For each group, all columns are passed together as a pandas.DataFrame to the user-function and the returned pandas.DataFrame are combined as a DataFrame.

User define function in Pyspark

35. avg(), count(), max(), mean(), min(), sum()
This is really simple function to apply after groupby. So I only give you an example with count() :

36. pivot(): Pivots a column of the current dataframe and perform the specified aggregation. There are two versions of pivot function: one that requires the caller to specify the list of distinct values to pivot on, and one that does not. The latter is more concise but less efficient, because Spark needs to first compute the list of distinct values internally.

Pivot tabel in Pyspark

pyspark.sql.Column

37. alias(): Returns this column aliased with a new name or names (in the case of expressions that return more than one column, such as explode).

Alias in Pyspark

38. asc() : Returns a sort expression based on ascending order of the column.

ascending in Pyspark

39. between() : A boolean expression that is evaluated to true if the value of this expression is between the given columns.

Between in Pyspark

40. cast(dataType): Convert the column into type dataType.

Cast DataType in Pyspark

41. contain() : Contains the other element. Returns a boolean Column based on a string match.

Contains in Pyspark

42. desc() : Returns a sort expression based on the descending order of the column.

Descending in Pyspark

43. endswith(): String ends with. Returns a boolean Column based on a string match.

Endswith in pyspark

44. isNotNull() : True if the current expression is NOT null.

check data is not null in pyspark

45. isNull() : True if the current expression is null. (Equal with ~isNotNull())

check data is null in pyspark

46. isin(*) : A boolean expression that is evaluated to true if the value of this expression is contained by the evaluated values of the arguments.

check data is in a list with pyspark

47. like() : SQL like expression. Returns a boolean Column based on a SQL LIKE match.

Like in Pyspark

48. otherwise(): Evaluates a list of conditions and returns one of multiple possible result expressions. If Column.otherwise() is not invoked, None is returned for unmatched conditions.

Other and when in pyspark

49. over() : Define a windowing column.

Ranking in Pyspark

50. startswith() : String starts with. Returns a boolean Column based on a string match. It look like endswith() so I will don’t give you example.
You can practice and try some query like me in here.

I think it is enough for you to cover basic ETL with Pyspark. Most of theme, you will use a lot. I hope this fundamental topic will useful for you. (Clap this blog if you want to do that =))) ).

See you in advanced topic with ETL Pyspark.

If you want to contact with me, here:
Linkedin , Facebook.

--

--