The automated preliminary feature selection for a machine learning model by PySpark

Dmitry Petukhov
IT’s Tinkoff
Published in
7 min readAug 18, 2022

This study describes an approach to feature selection that is intended for filtering the most useful features from the feature search space in supervised machine learning problems.

Problem statement

Traditional feature engineering on tabular data usually means hand-crafted statistical research based upon your intuition as well as the knowledge of people responsible for a service for which you are developing a model. However, as practice shows, this may be very slow and misleading sometimes and does not necessarily provide the best features and the best model.

If you follow this routine according to Recency — Frequency — Monetary method [1] or in some similar way, at some point you may face plenty of various features. This variety arises since even one feature can be constructed by utilizing any of the available categorical attributes and, of course, the different combinations of them.

Given this, you may prefer to identify the most useful features out of the variety you can create. Here, filter methods come into play, allowing you to get a fast approximate estimate of feature importance [2].

It is preferable to construct features using SQL, which will make them understandable to people with no programming experience and especially for you after a while.

Together with that, a problem-specific limitation arises that requires selected features to be straight and easily calculated in the service where you will deploy the model.

Problem solution

The pipeline begins with the pipeline() function:

As the first argument, the function takes a map of attributes attrs_map. It links an attribute name with the corresponding SQL expression, where every name is represented by one or a few concatenated categorical columns as well as numerical ones transformed into categorical. An example of such a map is shown below:

The second argument of the pipeline() function is the partitions. As Spark naturally works with partitions of data, it is a good idea to get an estimate of feature importance for a partition. Hence, it would also be a good idea to specify a reasonable number of partitions to get a representative feature estimate. The simplest example of such partitions is dates, if you have data over time as I have. In this case, partitions can be specified as follows:

To speed up the process, the pipeline() function concurrently [3] runs a function called get_estimate(), producing a quantity measure of feature importance.

It is suggested to use combinations() from itertools to create new attributes as concatenated attributes to expand the feature search space. Here, combinations of two attributes are used.

If get_estimate() completes successfully, then the function produces a tuple containing the name of a single or compound attribute, feature estimates, and the average execution time of the SQL query across all partitions. The tuple is added to the list estimates_out, which can be processed further by the pandas library.

The code below shows the get_estimate() function, where the ss variable hides a Spark session instance:

The main idea of this function is to gather some data as features and estimate them according to filter criteria. Data is gathered within a specific Spark job group across all partitions. After successful completion of this stage, the estimates are produced and passed back into the pipeline() function.

One of the important arguments of this function coming from the pipeline() is a timeout limiting SQL execution and data collection times. If the timeout exceeds its limit even for one partition, then the Spark job is canceled and get_estimate() returns an empty value despite the successfully processed partitions within the same job group.

The feature estimation method is shown as three dots, as it strongly depends on a machine learning task you are solving. It is supposed to estimate all the features that can be created with the SQL query.

This function uses the pretty simple thread function collect() that executes SQL code and then fetches data as features on the driver by the toPandas() method:

As a thread, the function collect() stores collected data in the list data from get_estimate() and returns None. It also requires a parameterized SQL query from the get_sql() function to shape the features created. An example of get_sql() is shown below:

Putting all the above together shapes the solution to the problem as the following block of code:

Discussion

Why do I specify setJobGroup() and cancelJobGroup() methods several times?

Developing this pipeline, occasionally I have seen jobs without belonging to any group in Spark UI, which could last forever as they weren’t canceled when the timeout had been reached. The second call of setJobGroup() in the collect() function fixes it. At the same time, some jobs may hang and take up resources after the data has been collected for all partitions within the job group. The second call of cancelJobGroup() ensures that these jobs are completed correctly.

The noted issues are because a Java thread does not necessarily inherit properties from a Python thread, which is a well-known problem in Spark 2 [4, 5]. If you use PySpark 3, I strongly recommend you use InheritableThread [6] instead of Python thread, as it ensures inheritance of PVM thread properties into JVM thread and correct completion of these threads [5]. It is unnecessary to enable pinned thread mode, since it was enabled by default in Spark 3.2 [7]. If you want to dive deeper into it, especially if you haven’t worked with it before, you can start with a nice read about Python threads in Spark [8].

It is important to note that you should carefully balance the timeout and the number of tasks in ThreadPoolExecutor with Spark session resources, namely the number of executor instances, their memory and also driver memory. Basically, this will allow you to avoid out of memory errors that will lead to the killing of executors by YARN and this will eventually ruin all your pipeline. This balancing may be tricky sometimes because it also depends on the data you have and the SQL queries you write.

According to these notes, someone might prefer to use the FAIR scheduler to ensure fairness in the distribution of resources between Spark jobs. However, I didn’t notice much difference between the FIFO and FAIR scheduling modes when solving my problem. Basically, FAIR is meant to avoid a situation when only one job takes all the resources while all the others are waiting for them [9]. If this sounds intricate to you, then you can read some details about the FAIR scheduler [10], which will make things clearer.

As I mentioned earlier, the estimation method of feature importance strongly depends on the machine learning task you are solving and your personal preferences, so it wasn’t shown here. I suggest using the AUC ROC or Kolmogorov-Smirnov test for the binary classification tasks, which take up most of my daily routine.

Lastly, I recommend optimizing your SQL code. If you have time-ordered data and use self-join with relatively large time windows, it may be better to use window functions. In this case, do not forget to specify a sufficiently large spark.sql.windowExec.buffer.in.memory.threshold parameter in the Spark session, since the default value is small [11]. If data has skewness caused by null values in the join keys, it is better to use pre-join without null values. I can recommend detailed reading about the ways to tackle this problem in the GitHub documentation [12].

Conclusion

Here is a general idea of the approach to preliminary feature selection using PySpark, which currently provides the most straightforward way of feature construction and estimation on the fly in a Hadoop cluster. It is worth noting that this idea can be adapted to any SQL database engine, but that is out of the scope of this article.

The value of this approach is to reduce the number of features that can be created for the model by selecting the most useful ones from the feature search space according to the filter criteria.

It also has the following positive aspects. Since the entire feature construction process may require considerable time and computing power, reducing the number of created features helps to cut down these costs and consequently shorten model development time. This subset of the most valuable features can be filtered further by a model-based approach like select_features() in CatBoost, resulting in the best possible model.

How can it be improved?

When you create combinations of attributes, you may come across some attributes that inherently make the jobs stuck in and canceled all the time. To avoid unnecessary calculations, you can count the number of job group cancellations for every attribute, and if the attribute failure counter exceeds some threshold value, you can throw it away from the process.

Using hyperparameter tuning techniques like Optuna can make this approach more directional. For example, you can define a list of attributes with an additional empty string and then define a few hyperparameters with that list in the Optuna objective by suggest_categorical() method. The values of these hyperparameters can be combined during the trial in a specific way. For example, if the combination contains the same attributes, then you can leave only one. If one of them is an empty string, then you can leave only the non-empty ones and use all of them in other cases. Finally, you can run the trial to find the best attribute or combination of them according to the estimate.

References

  1. https://en.wikipedia.org/wiki/RFM_(market_research)
  2. https://arxiv.org/abs/2111.12140
  3. https://docs.python.org/3/library/concurrent.futures.html
  4. https://issues.apache.org/jira/browse/SPARK-29017
  5. https://spark.apache.org/docs/latest/job-scheduling.html#concurrent-jobs-in-pyspark
  6. https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.InheritableThread.html#pyspark.InheritableThread
  7. https://issues.apache.org/jira/browse/SPARK-35303
  8. https://www.linkedin.com/pulse/threads-pyspark-douglas-leal/
  9. https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application
  10. https://towardsdatascience.com/apache-spark-sharing-fairly-between-concurrent-jobs-d1caba6e77c2
  11. https://issues.apache.org/jira/browse/SPARK-21595
  12. https://ericxiao251.github.io/spark-syntax/Chapter%207%20-%20High%20Performance%20Code/Section%201.2%20-%20Joins%20on%20Skewed%20Data%20%3CNull%20Keys%3E.html

--

--