Time Series Feature Engineering in Apache Spark for Python with Scala

Maximizing Performance in Data Science

Holetz
17 min readApr 3, 2024

In the realm of machine learning and predictive modeling, feature engineering stands as a pivotal step in harnessing the full potential of data. As datasets become increasingly complex and diverse, the art of crafting meaningful features becomes ever more essential, facilitating the extraction of valuable insights and enhancing model performance. While feature engineering has long been recognized as a cornerstone in traditional data analysis, its significance in the domain of time series data is particularly profound.

In this article, we introduce the intricacies of feature engineering with time series data and demonstrate how we can leverage the power of Spark in a Databricks environment, with the assistance of Scala, to deliver a robust and efficient solution for extracting hidden insights from our data.

Upon completing this article, readers will have acquired a comprehensive understanding of the challenges faced in a big data scenario with thousands of columns in Spark. Additionally, they will learn strategies for crafting and deploying Scala-based User Defined Functions (UDFs) to effectively address these challenges.

The importance of Feature Engineering

Before delving into the intricacies of time series feature engineering, it is essential to grasp the broader scope of feature engineering techniques. From handling missing values and encoding categorical variables to normalizing numerical features and selecting relevant attributes, a variety of methodologies exists to shape data into a suitable format for machine learning algorithms. While these techniques are fundamental across various domains, their application to time series data presents unique challenges and opportunities.

Time series feature engineering emerges as a specialized domain within the broader framework of feature engineering. Unlike static datasets where observations are independent, time series data exhibits temporal dependencies and intrinsic patterns that require specific preprocessing and feature extraction methods. By leveraging the temporal structure inherent in sequential data, time series feature engineering goes beyond traditional approaches, offering a diverse range of methodologies to capture dynamic relationships and temporal dependencies.

For instance, consider a dataset containing a client’s account balance over a month. Unlike a traditional static model, where we conduct independent analyses between observations and have limited options for generating new insights, evaluating the information as a time series enables us to extract a significant amount of new information. This includes measurements representing the amount, direction, location, frequency, and even shape of values shifting over time.

Getting Practical

Defining Our Data

To lay the foundation for our examples, let’s begin by generating a dataset randomly that reflects the monthly dynamics of various features for a group of users. This random dataset covers a duration of 12 months, providing us with a platform to explore the complexities of temporal patterns and dependencies within the data. Each entry in this dataset represents a snapshot of multiple attributes or characteristics associated with individual users at the end of each month.

For this example, we will utilize the newly implemented feature in Spark 3.5.0 known as UDTF (User Defined Table Functions). These functions are designed to function as data sources that accept parameters and, in return, yield rows as defined by Python code logic.

from pyspark.sql import functions as F
from datetime import date
import pandas as pd

# Let's start by defining the shape and location in time of our dataset
USERS_COUNT = 1000
RAW_FEATURES_COUNT = 100
MONTHLY_OBSERVATIONS_COUNT = 12
LAST_DATE = date(2023,12,1)

# For our dataframe source we will use an UDTF.
# As Spark can't be sure of what the function is returning, we
# must specify it through the 'returnType' parameter.
@F.udtf(returnType="user_id: int, obs_date: date, obs_lag: int")
class TimeSeriesObservations:
def eval(self, users_count:int, dates_count:int, last_date:date):
for user_id in range(users_count):
for obs_lag, obs_date in enumerate(
pd.date_range(
end=last_date,
periods=dates_count,
freq='MS'
)[::-1]
):
yield (user_id, obs_date.date(), obs_lag)

# Now we can call our UDFT directly followed by a 'select'
# statement where we generate our feature columns and a
# 'write' to save our dataframe to disk
(
TimeSeriesObservations(
F.lit(USERS_COUNT),
F.lit(MONTHLY_OBSERVATIONS_COUNT),
F.lit(LAST_DATE)
)
.select(
'*',
*[
F.when(F.rand()>0.1,F.rand()).alias(i)
for i in map(str,range(RAW_FEATURES_COUNT))
]
)
.write
.save('/tmp/df_timeseries')
)

# Let's then define a dataframe as a load from the previous saved data.
# This allows us to measure further transformation executions
# without them being affected by the definition step.
df_timeseries = spark.read.load('/tmp/df_timeseries')

# Finaly, we can display a sample of our dataframe.
(
df_timeseries
.where('user_id = 0')
.select(
'user_id',
'obs_date',
'obs_lag',
# We will select 6 out of the 100 feature columns and
# round them for a better partial view of our sample
*[F.round(i,2).alias(i) for i in map(str,range(6))]
)
.show(truncate=False)
)

# Output:
# +-------+----------+-------+----+----+----+----+----+----+----
# |user_id|obs_date |obs_lag|0 |1 |2 |3 |4 |5 | -->
# +-------+----------+-------+----+----+----+----+----+----+----
# |0 |2023-12-01|0 |0.45|NULL|0.82|0.62|0.53|0.41|
# |0 |2023-11-01|1 |0.12|0.75|0.86|0.88|0.87|0.9 |
# |0 |2023-10-01|2 |0.43|0.98|0.7 |0.86|NULL|0.08|
# |0 |2023-09-01|3 |0.2 |0.97|0.1 |NULL|0.61|0.25|
# |0 |2023-08-01|4 |0.78|0.69|0.27|0.71|0.03|0.6 |
# |0 |2023-07-01|5 |0.92|NULL|0.16|0.61|0.18|0.48| -->
# |0 |2023-06-01|6 |0.55|0.96|NULL|0.61|NULL|0.49|
# |0 |2023-05-01|7 |NULL|NULL|0.1 |0.16|0.56|0.74|
# |0 |2023-04-01|8 |NULL|0.95|0.38|0.91|0.91|0.03|
# |0 |2023-03-01|9 |NULL|NULL|0.91|0.62|0.04|0.08|
# |0 |2023-02-01|10 |0.7 |0.15|0.98|0.5 |0.74|0.24|
# |0 |2023-01-01|11 |0.52|NULL|0.09|NULL|0.64|0.95|
# +-------+----------+-------+----+----+----+----+----+----+----

Understanding Our Problem

Now that we’ve established a dataframe representing the monthly dynamics of various features for a group of users, it’s crucial to comprehend the challenges we face so we can devise more effective solutions, ultimately yielding more meaningful insights.

First and foremost, our dataset exhibits low frequency and a limited number of observations per user. This characteristic necessitates an approach that prioritizes capturing the evolution of features over time rather than focusing on seasonal or cyclical patterns.

Moreover, the challenge of determining the relevant time window for each feature in relation to the target variable adds complexity to our task, especially given the small number of observations. To address this challenge, we’ve adopted a flexible strategy, exploring multiple potential time windows for each feature, which we can later refine during the feature selection stage. This adaptive approach allows us to account for varying temporal influences on different features and potentially uncover significant relationships that might otherwise remain obscured.

With this strategy in mind, let’s now begin developing our solution.

Starting from the basics

As our initial step in this study, we will utilize Spark and its native functions to tackle our problem and use it as a benchmark for evaluating the performance of other methods. In most scenarios, this approach is anticipated to be the fastest.

from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql.functions import col as C
from more_itertools import collapse

# The windows we will calculate our features with
TIMESERIES_WINDOWS = [2,5,8,11]
RAW_FEATURES_COUNT = 100

# List of aggregations to be calculated
agg_functions = [
'count','count_num','count_unique','count_val','sum','max',
'mean','min','mode','perc_20','perc_50','perc_80','var_s','var_p',
'stddev_s','stddev_p','skew','ni','nd','lrinter','lrslope'
]

# Our window specifications
w = W().partitionBy('user_id').orderBy('obs_lag')

# Lambda funtion that returns the value if the window is included
lag_filter = lambda f:F.when(C('obs_lag').isin(*TIMESERIES_WINDOWS),f)

df_features_native = (
df_timeseries
# Creates an auxiliar column with lag information for use in the next steps
.select(
'*',
*[
F.lag(i,1).over(w).alias(f'{i}_lag')
for i in map(str,range(RAW_FEATURES_COUNT))
]
)
# Calculates every feature over every lag window
.select(
'user_id',
'obs_date',
'obs_lag',
*collapse([
[
lag_filter(F.row_number().over(w)).alias(f'{i}_count'),
lag_filter(F.count(i).over(w)).alias(f'{i}_count_num'),
# Can't use cout_distinct on window aggregation, so we use collect_set instead
lag_filter(F.size(F.collect_set(i).over(w))).alias(f'{i}_count_unique'),
lag_filter(F.count_if(C(i)>0).over(w)).alias(f'{i}_count_val'),
lag_filter(F.sum(i).over(w)).alias(f'{i}_sum'),
lag_filter(F.max(i).over(w)).alias(f'{i}_max'),
lag_filter(F.mean(i).over(w)).alias(f'{i}_mean'),
lag_filter(F.min(i).over(w)).alias(f'{i}_min'),
lag_filter(F.mode(i).over(w)).alias(f'{i}_mode'),
lag_filter(F.percentile(i,percentage=0.2).over(w)).alias(f'{i}_perc_20'),
lag_filter(F.percentile(i,percentage=0.5).over(w)).alias(f'{i}_perc_50'),
lag_filter(F.percentile(i,percentage=0.8).over(w)).alias(f'{i}_perc_80'),
lag_filter(F.var_samp(i).over(w)).alias(f'{i}_var_s'),
lag_filter(F.var_pop(i).over(w)).alias(f'{i}_var_p'),
lag_filter(F.stddev_samp(i).over(w)).alias(f'{i}_stddev_s'),
lag_filter(F.stddev_pop(i).over(w)).alias(f'{i}_stddev_p'),
lag_filter(F.skewness(i).over(w)).alias(f'{i}_skew'),
lag_filter(F.count_if(C(f'{i}_lag')<C(i)).over(w)).alias(f'{i}_ni'),
lag_filter(F.count_if(C(f'{i}_lag')>C(i)).over(w)).alias(f'{i}_nd'),
lag_filter(F.regr_intercept(i,'obs_lag').over(w)).alias(f'{i}_lrinter'),
lag_filter(F.regr_slope(i,'obs_lag').over(w)).alias(f'{i}_lrslope'),
] for i in map(str,range(100))
])
)
# Pivot the desired results to a columnar format to use in models
.where(C('obs_lag').isin(*TIMESERIES_LAGS)) # Only if the window is selected
.groupBy('user_id')
.agg(*[
F.collect_list(f'{i}_{agg}').alias(f'{i}_{agg}')
for i in map(str,range(100))
for agg in agg_functions
])
.select(
'user_id',
*[
C(f'{i}_{agg}')[n].alias(f'{i}_{agg}_{lag}')
for i in map(str,range(100))
for agg in agg_functions
for n,lag in enumerate(TIMESERIES_LAGS)
]
)
)

Although the code strikes a balance between complexity and performance by employing strategies such as calculating results only for desired lags and using a direct approach to pivot data into columns, it still fails to collect the expected results within a reasonable time frame. Measuring the time required for disk writing of the data using the IPython magic method %timeit reveals a total runtime of approximately 1 minute on our cluster.

Note:

Every measure in this article is taken, on a hot cluster (not the
first run), after clearing all cached data and within the
following cluster specifications:

Single Node m5d.xlarge (16 GB Memory, 4 Cores)
Databricks Runtime 14.3 LTS ML (includes Apache Spark 3.5.0, Scala 2.12)
Photon Disabled

TIP - To clear all cached data for more accurate measurements,
use the following command in Scala

%scala
for ((k,v) <- sc.getPersistentRDDs) {
v.unpersist()
}
%timeit -r 1 -n 1 _ = df_features_native.localCheckpoint()

# Output:
# 55.8 s per loop (mean ± std. dev. of 1 runs, 1 loop each)

Regarding the result, we have the following sampled values:

df_features_native.limit(1).select(df_features.columns[:10]).show(vertical=True)

# Output:
# -RECORD 0---------------
# user_id | 0
# 0_count_2 | 3
# 0_count_5 | 6
# 0_count_8 | 9
# 0_count_11 | 12
# 0_count_num_2 | 2
# 0_count_num_5 | 5
# 0_count_num_8 | 8
# 0_count_num_11 | 11
# 0_count_unique_2 | 2

Excessive DataFrame Columns

Not only did our initial approach fail to meet our expectations, but when we attempted to define our DataFrame, we encountered another challenge: it took a staggering 4 minutes solely to instantiate the DataFrame without executing any action.

In Spark, the primary cause of this type of performance degradation during DataFrame definition, especially in complex transformations like ours, is the excessive number of columns. While the framework can efficiently handle large datasets, it’s primarily designed to do so within a relatively small set of columns.

To illustrate this issue, let’s consider a brief example. We’ll run the following code to time the creation of multiple DataFrames with the same amount of stored data but varying dimensions, with and without aggregation:

from plotly.subplots import make_subplots
import plotly.graph_objects as go
import numpy as np

# 65536 random float values
np_array = np.random.rand(2**16)
last_time_average = 0
performance_select = []
performance_sum = []
performance_shape = []

for i in range(16):
new_shape = (2**(16-i), 2**i)
print(f'Shape: {new_shape}')

if last_time_average < 1:
t1 = %timeit -n 1 -o _df = spark.createDataFrame(np_array.reshape(new_shape)).collect()
t2 = %timeit -n 1 -o _df = spark.createDataFrame(np_array.reshape(new_shape)).groupBy().sum().collect()
else:
t1 = %timeit -r 1 -n 1 -o _df = spark.createDataFrame(np_array.reshape(new_shape)).collect()
t2 = %timeit -r 1 -n 1 -o _df = spark.createDataFrame(np_array.reshape(new_shape)).groupBy().sum().collect()

performance_select.append(t1.average)
performance_sum.append(t2.average)
performance_shape.append(str(new_shape))
last_time_average = max(t1.average,t2.average)

fig = make_subplots(rows=2, cols=1, vertical_spacing = 0.25)

for i in range(2):
fig.add_trace(
go.Scatter(
x=list(performance_shape),
y=performance_select,
mode="lines+markers",
name="Select",
line_color="blue",
showlegend=(i == 0),
),
col=1,
row=i + 1,
)
fig.add_trace(
go.Scatter(
x=list(performance_shape),
y=performance_sum,
mode="lines+markers",
name="Aggregate",
line_color="red",
showlegend=(i == 0),
),
col=1,
row=i + 1,
)
fig.update_xaxes(title_text="DataFrame Shape", col=1, row=i + 1)
fig.update_yaxes(
title_text=("Execution Time<br>" + ("(secounds, log scale)" if i == 1 else "(secounds)")),
type=("log" if i == 1 else "linear"),
col=1,
row=i + 1,
)
fig.update_layout(
width=800,
height=600,
title_text="<b>Execution Time of DataFrame Selection and Agregation</b>",
legend={
'orientation':"h",
'yanchor':"top",
'y':1.1,
'xanchor':"right",
'x':1
}
)
fig.show()

The graph illustrates that as the DataFrame’s dimensionality increases, starting from around 32 columns, its performance deteriorates. This trend is further aggravated by the growing number of transformations applied within the DataFrame.

An Array Solution

Given these limitations, a better approach is to avoid wide DataFrames altogether. Fortunately, we can still work with a large set of attributes within a confined number of columns by leveraging array columns. To achieve this, let’s restructure our previous code as follows:

from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql.functions import col as C
from more_itertools import collapse

TIMESERIES_WINDOWS = [2,5,8,11]
RAW_FEATURES_COUNT = 100

w = W().partitionBy('user_id').orderBy('obs_lag')

df_features_array = (
df_timeseries
# Creates an auxiliar column with lag information for use in the next steps
.select(
'user_id',
'obs_date',
'obs_lag',
F.array(*[C(i) for i in map(str,range(100))]).alias('values'),
F.array(*[F.lag(i,1).over(w) for i in map(str,range(RAW_FEATURES_COUNT))]).alias('lag')
)
# Calculates every feature over every lag window
.select(
'user_id',
'obs_date',
'obs_lag',
F.when(
C('obs_lag').isin(*TIMESERIES_WINDOWS),
F.array(*collapse([
[
F.row_number().over(w), # count
F.count(C('values')[i]).over(w), # count_num
# Can't use cout_distinct on window aggregation, so we use collect_set instead
F.size(F.collect_set(C('values')[i]).over(w)), # count_unique
F.count_if(C('values')[i]>0).over(w), # count_val
F.sum(C('values')[i]).over(w), # sum
F.max(C('values')[i]).over(w), # max
F.mean(C('values')[i]).over(w), # mean
F.min(C('values')[i]).over(w), # min
F.mode(C('values')[i]).over(w), # mode
F.percentile(C('values')[i],percentage=0.2).over(w), # perc_20
F.percentile(C('values')[i],percentage=0.5).over(w), # perc_50
F.percentile(C('values')[i],percentage=0.8).over(w), # perc_80
F.var_samp(C('values')[i]).over(w), # var_s
F.var_pop(C('values')[i]).over(w), # var_p
F.stddev_samp(C('values')[i]).over(w), # stddev_s
F.stddev_pop(C('values')[i]).over(w), # stddev_p
F.skewness(C('values')[i]).over(w), # skew
F.count_if(C('lag')[i]<C('values')[i]).over(w), # ni
F.count_if(C('lag')[i]>C('values')[i]).over(w), # nd
F.regr_intercept(C('values')[i],'obs_lag').over(w), # lrinter
F.regr_slope(C('values')[i],'obs_lag').over(w), # lrslope
] for i in range(100)
]))
).alias('ar_features')
)
# Pivot the desired results to a columnar format to use in models
.where(C('obs_lag').isin(*TIMESERIES_WINDOWS))
.groupBy('user_id')
.agg(F.flatten(F.collect_list('ar_features')).alias('ar_features'))
)

With this conversion, we lose the ability to easily label each attribute as we did before with named columns, and we’ll need to map this programmatically instead. However, while moving away from the main problem, we also gain the ability to check the current window only once per observation and a much faster way to group our features into a single row. Overall, our DataFrame is now defined almost instantly, and the collection performance is greatly improved.

%timeit -r 1 -n 1 _ = df_features_array.localCheckpoint()

# Output:
# 35.8 s per loop (mean ± std. dev. of 1 runs, 1 loop each)

Our features, now ordered by row, are displayed as follows:

df_features_array.limit(1).show(vertical=True)

# Output:
# -RECORD 0---------------------------
# user_id | 1
# ar_features | [3.0, 3.0, 3.0, 3...

One step further

While engineering 8,400 features from 1,000 users in half the time signifies a notable improvement over our initial performance, it still fails to meet our expectations for an acceptable outcome. This becomes especially apparent when considering scenarios where our user portfolio could easily expand to include millions of users. In such cases, the time required to calculate resulting features would approach nearly 10 hours, significantly disrupting the flow of new model development or the maintenance of a feature store. For these situations, or even more demanding cases, we must tackle the primary cause of our code’s poor performance: redundant computations.

Redundant Computations

Redundant computation refers to the unnecessary repetition of calculations, resulting in inefficiencies in resource utilization and longer processing times. This issue is particularly evident in our window functions example, where we have multiple overlaps in frames.

Despite the capabilities of Spark in orchestrating optimized transformations, it will not be capable of achieving high performance when dealing with calculations in such complex structures and numerous implicit relationships where redundant computations are prevalent.

Entering UDFs

User Defined Functions (UDF) are not the preferred option, especially when performance is a priority. This is primarily because Spark excels at orchestrating an optimized plan for all transformations, achieving this without the need to serialize and deserialize data back and forth between the Tungsten backend, JVM, and Python.

However, this serialization overhead can be greatly mitigated if we reduce the quantity of calls to the UDF. One way to accomplish this is by using Vectorized UDFs like Pandas UDF in Python, where a chunk of data is passed to the function instead of each row at a time. Unfortunately, the Pandas library doesn’t fulfill all the needed capabilities to effectively solve our window problem, as it can’t vectorize transformations on arrays, and iterating over its rows would significantly impact performance.

Considering these limitations, and in addition to the fact that we can also leverage the ability to pass thousands of data points by aggregating them in an array, developing a custom optimized function in a high-performance language may be the best approach. For this purpose, we have Scala.

The Scala Language

Even though the Spark community has gradually shifted towards a greater focus on Python in recent years due to various positive aspects of the language and the constant reduction in the performance and capability gap compared to Scala, the latter still remains a strong tool for resolving specific problems with a strong dependency of performance for feasibility , as in our case.

In addition to these benefits, it’s crucial to highlight that selecting a development tool doesn’t limit its usage to Python projects. The Spark framework facilitates seamless integration between languages, enabling users to call custom functions and manage data flow through DataFrames effectively.

This capability is especially important for tailored solutions, such as the one in our example. It ensures that project choices aren’t confined to Scala, enabling the involvement of a multidisciplinary team and facilitating cost-effective implementation.

Defining the UDF

Now that we understand our problems and have the necessary tools to solve them, let’s try engineer the features with Scala, avoiding redundant comuptation where possible:

import spire.syntax.cfor._ // For a better performance with for iterations

def percentile(fraction: Float, c_count_num: Int, sort: Array[Float]): Float = {
if (c_count_num == 0) {Float.NaN} else if (c_count_num == 1) {sort(0)} else {
val i_fraction = fraction * (c_count_num - 1)
val i = i_fraction.toInt
sort(i) + (sort(i + 1) - sort(i)) * (i_fraction - i)
}
}

def features_agg(features: Array[Float], indice: Array[Int], windows: Array[Int]): Array[Float] = {
@inline val AGGREGATIONS_COUNT = 20
val indice_size:Int = indice.size
val cols_count:Int = (features.size / indice_size).toInt
val indice_max:Int = indice.max + 1
val windows_max:Int = windows.max + 1
val indice_windows_max = indice_max.max(windows_max)
val agg_size:Int = if (windows.contains(0)) {
(1 + AGGREGATIONS_COUNT * (windows.size - 1)) * cols_count
} else {
AGGREGATIONS_COUNT * windows.size * cols_count
}

// Arrays - We instantiate them with a fixed size for better performance.
var indice_align:Array[Int] = Array.fill[Int](indice_windows_max)(-1)
var windows_align:Array[Int] = Array.fill[Int](indice_windows_max)(-1)
var sort:Array[Float] = new Array[Float](indice_size)
var sort_index:Array[Int] = new Array[Int](indice_size)
var agg:Array[Float] = Array.fill[Float](agg_size)(Float.NaN)

// As Spark can shuffle our observations we have to align data
// In this step, if we don't have an observation at a given lag,
// the index reference will remain as -1, as defined above.
cforRange(0 until indice.size) {i => indice_align(indice(i)) = i}
cforRange(0 until windows.size) {i => windows_align(windows(i)) = i}

var ia:Int = 0
var c_mean = Float.NaN
var c_xmean = Float.NaN
var c_yminus_mean = Float.NaN
var c_xminus_mean = Float.NaN
var c_var_sum = Float.NaN
var c_lr_xysum = Float.NaN
var c_lr_xxsum = Float.NaN
var c_count_unique = 0.0f
var c_last_unique = Float.NaN
var c_last_unique_count = 0.0f
var c_mode = Float.NaN
var c_mode_count = 0.0f
var c_var_p = Float.NaN
var c_stdev_p = Float.NaN
var c_skewness = Float.NaN
// For each feature
cforRange(0 until cols_count) {c =>
var c_count_num:Int = 0
var c_count_val = 0.0f
var c_sum = Float.NaN
var c_xsum = 0.0f
var c_inc_count = 0.0f
var c_dec_count = 0.0f

var ir = c * indice_size
var v = Float.NaN
var v_isnan = true
var v_first = Float.NaN
var v_last = Float.NaN

// For 0 to the max defined windows
cforRange(0 until indice_windows_max) {r =>
var v_lag: Float = v
// If the indice is -1 then we don't have the observation
if (indice_align(r) != -1) {
v = features(ir + indice_align(r))
v_isnan = v.isNaN
} else if (!v_isnan) {
v = Float.NaN
v_isnan = true
}
if (v > v_lag) {c_inc_count += 1}
if (v < v_lag) {c_dec_count += 1}

// Insert Sort
// NaN values are not contained in the sorted
// array, thus we can skip if so.
if (!v_isnan) {
if (c_sum.isNaN) c_sum = v else c_sum += v
if (v > 0) c_count_val += 1

c_xsum += r
c_count_num += 1

var i = c_count_num.toInt - 1
while (i > 0 && sort(i - 1) > v) {
sort(i) = sort(i - 1)
sort_index(i) = sort_index(i - 1)
i -= 1
}
sort(i) = v
sort_index(i) = r
}

// We check if the current window is selected,
// if not, we skip it's definition.
if (windows_align(r) == 0) {
agg(ia + 0) = v
ia += 1
} else if (windows_align(r) != -1) {
if (c_count_num != 0) {
c_mean = c_sum / c_count_num
c_xmean = c_xsum / c_count_num
c_yminus_mean = sort(0) - c_mean
c_xminus_mean = sort_index(0) - c_xmean
c_var_sum = c_yminus_mean * c_yminus_mean
c_lr_xysum = c_yminus_mean * c_xminus_mean
c_lr_xxsum = c_xminus_mean * c_xminus_mean
c_count_unique = 1
c_last_unique = sort(0)
c_last_unique_count = 1
c_mode = sort(0)
c_mode_count = 1

cforRange(0 until c_count_num) {i =>
c_yminus_mean = sort(i) - c_mean
c_xminus_mean = sort_index(i) - c_xmean
c_var_sum += c_yminus_mean * c_yminus_mean
c_lr_xysum += c_yminus_mean * c_xminus_mean
c_lr_xxsum += c_xminus_mean * c_xminus_mean
if (sort(i) != c_last_unique) {
c_count_unique += 1
c_last_unique = sort(i)
c_last_unique_count = 1
} else {
c_last_unique_count += 1
}
if (c_last_unique_count > c_mode_count) {
c_mode = c_last_unique
c_mode_count = c_last_unique_count
}
}

c_var_p = c_var_sum / c_count_num
c_stdev_p = math.sqrt(c_var_p).toFloat
c_skewness = 0.0f
cforRange(0 until c_count_num) {i =>
c_skewness += math.pow(c_yminus_mean / c_stdev_p,3).toFloat
}
c_skewness = c_skewness / c_count_num

agg(ia + 0) = c_count_num // A:count_num - Count Numerics
agg(ia + 1) = c_count_unique // A:count_unique - Count Unique
agg(ia + 2) = c_count_val // A:count_val - Count Values ( > 0)
agg(ia + 3) = c_sum // A:sum - Sum
agg(ia + 4) = sort(c_count_num - 1) // A:max - Max
agg(ia + 5) = c_mean // A:mean - Mean
agg(ia + 6) = sort(0) // A:min - Min
agg(ia + 7) = c_mode // A:mode - Mode
agg(ia + 8) = percentile(0.2f, c_count_num, sort) // A:perc_20 - Percentile 20
agg(ia + 9) = percentile(0.5f, c_count_num, sort) // A:perc_50 - Percentile 50
agg(ia + 10) = percentile(0.8f, c_count_num, sort) // A:perc_80 - Percentile 80
agg(ia + 11) = if (c_count_num <= 1) 0.0f else c_var_sum / (c_count_num - 1) // A:var_s - Variance Sample
agg(ia + 12) = c_var_p // A:var_p - Variance Population
agg(ia + 13) = math.sqrt(agg(ia + 11)).toFloat // A:stdev_s - Standard Deviation Sample
agg(ia + 14) = c_stdev_p // A:stdev_p - Standard Deviation Population
agg(ia + 15) = c_skewness // A:skew - Skewness
agg(ia + 16) = c_inc_count // A:ni - Increments Count
agg(ia + 17) = c_dec_count // A:nd - Decrements Count
agg(ia + 19) = if (c_lr_xxsum != 0) c_lr_xysum / c_lr_xxsum else 0.0f // A:lrslope - Linear Regression Slope
agg(ia + 18) = if (c_lr_xxsum != 0) c_mean - agg(ia + 19) * c_xmean else 0.0f // A:lrinter - Linear Regression Intercept
}
ia += AGGREGATIONS_COUNT
}
}
}
agg
}

In an initial benchmark, passing random values with the same dimensions as our df_timeseries dataframe, we can observe a theoretical substantial gain, with the result being obtained in less than 200 milliseconds in total (225 times faster than the array solution):

import scala.util.Random
val random = new Random()

val features = Array.fill(100 * 12)(random.nextFloat())
val indice = Array(0,1,2,3,4,5,6,7,8,9,10,11,12)
val windows = Array(2,5,8,11)
val n = 10

val before = System.nanoTime
for (i <- 0 until n) {
for (i <- 0 until 1000) {
features_agg(features,indice,windows)
}
}
val totalTimeSecounds=((System.nanoTime-before).toFloat/n/1000000000)

// Output:
// ...
// totalTimeSecounds: Float = 0.15849303

Back to Python

Now that we’ve defined the function, the next step is to register it as a UDF, allowing us to call it from Python. This is crucial in many scenarios as it enables unrestricted usage across various projects by team members, irrespective of their familiarity with Scala. To do this, we execute the following code in Scala:

import org.apache.spark.sql.{functions => F}

// And with the function defined, we can now register it with Spark
spark.udf.register("features_agg", F.udf(features_agg _))

With that accomplished, we can now invoke the UDF using the call_udf function in Python:

from pyspark.sql import functions as F
from pyspark.sql.functions import col as C
from pyspark.sql.functions import lit as L
import numpy as np

df_features_scala = (
df_timeseries
.groupBy('user_id')
.agg(
F.call_udf(
'features_agg',
F.flatten(F.collect_list(F.array(*[F.nvl(i,L(np.NaN)) for i in map(str,range(100))]))).cast('array<float>'),
F.collect_list('obs_lag'),
F.array(L(2),L(5),L(8),L(11))
).alias('features')
)
)

df_features_scala.limit(1).show(vertical=True)

# Output:
# -RECORD 0---------------------------
# user_id | 1
# ar_features | [3.0, 3.0, 3.0, 3...

Testing the Performance

As our previous direct approach to measure our UDF performance can’t assess elements such as the overhead of data serialization and the benefits of distributing computation across the workers, let’s now measure from within Python by calling our registered UDF.

%timeit -r 1 -n 1 _ = df_features_scala.localCheckpoint()

# Output:
# 1.53 s per loop (mean ± std. dev. of 1 runs, 1 loop each)

With a small dataframe of only 1000 rows, the total execution time is now just under two seconds, marking a 23-fold performance increase from our previous array solution. However, as we approach such short execution times, our performance starts to be penalized by other overheads that we want to overlook. Therefore, for a better understanding of the real performance gain, let’s test it with a dataframe of 100,000 rows:

%timeit -r 1 -n 1 _ = df_features_scala_100000_rows.localCheckpoint()

# Output:
# 42.5 s per loop (mean ± std. dev. of 1 runs, 1 loop each)

With this we reduce the impact of some overheads and can reach a better confidence, with a final estimation of 84x gain in performance over our array solution and 131x from our initial proposal. Moreover, this performance should increase significantly as we integrate more calculations into our function, particularly for tasks with high computational redundancy, such as those involving sorting.

The Next Steps

Now that we have a performative solution at hand, we can advance its development in order to extract the maximum possible from the data we have in our projects. A great starting point is to consult existing libraries for engineering temporal features such as tsfresh. A listing of other related libraries can be found in the Awesome Time Series listing.

Furthermore, for easier integration with Python, consider compiling the function definition into a JAR and importing it as a dependency.

Wrapping Up

Time series feature engineering offers a wide range of opportunities to enhance projects by enriching existing information with temporal data and providing a deeper understanding of the phenomena under analysis. However, this approach can also present new challenges, as it quickly transforms simple problems into big data challenges due to the vastness of temporal datasets.

Although Apache Spark is a powerful tool for handling large volumes of data, it is important to recognize its limitations, such as performance degradation on DataFrames with numerous columns and the absence of native mechanisms for efficiently reducing computational redundancies.

Therefore, it is essential that we are aware of Spark’s customization capabilities and explore the versatility of different languages that interface with it when needed. By doing so, we will be equipped to tackle the challenges of large-scale data analysis effectively and efficiently.

--

--