Utilizing the Power of Pyspark Pipelines in Data Science Projects: Benefits and Limitations (2/2)

Zeid Zandi
12 min readOct 24, 2023

--

AI-Generated Image

In Part 1, we covered the setup of PySpark using Docker, showcasing its usefulness for code experimentation. Moreover, we explored the crucial concept of data science pipelines, emphasizing their organized structure, modularity, and the advantages they offer in data processing workflows. We also addressed the limitations and constraints encountered when implementing these pipelines, providing illustrative code samples.

In this part, our focus will shift towards providing practical solutions to tackle the challenges we discussed. We will guide you through implementing a custom PySpark solution, tailored to overcome the obstacles we outlined earlier. While our examples will be specific to PySpark, it’s important to note that the underlying principles can be applied to similar frameworks such as scikit-learn.

Overcoming the limitation of passing data objects:

To address the challenge of passing data objects between different stages of a pipeline, a simple approach is introduced: the concept of a custom DataObject. This bespoke DataObject serves as a container for diverse attributes and information required throughout the pipeline’s execution. By crafting it, we achieve the capability to encapsulate and extract data as it journeys through the various stages of the pipeline.

class DataObject:
def __init__(self):
self.raw_data = None # any needed attribute can be added
self.additional_attributes = {} # Store any additional attributes
# as needed

def pass_object(self, additional_attributes=None):
if additional_attributes:
self.additional_attributes.update(additional_attributes)
return self

Before we delve into the implementation of this approach, it’s crucial to understand the foundational step of creating a custom transformer.

Why Custom Transformers?

Although Pyspark ML library provides an extensive transformers APIs for data processing, there are instances where the out-of-the-box functionality might fall short of meeting specific feature needs. In such scenarios, the creation of custom transformers becomes a necessity.

Implementing Custom Transformers:

To create a custom transformer, PySpark provides a set of classes that can significantly reduce the code we need to write. These classes are part of the pyspark.ml.param.shared module and include HasInputCol, HasInputCols, HasOutputCol, and HasOutputCols. They enable us to define parameters for our custom transformer, such as inputCol and outputCol, which can be easily set, retrieved, and even saved to a file.

In addition to harnessing these classes, we can enforce a consistent pattern by utilizing Abstract Base Classes (ABC) to implement the pass_object method for our custom transformers. Each custom transformer must implement both the pass_object method and the _transform method, the latter being mandated as an abstract method by PySpark. To fully leverage the benefits of the classes in pyspark.ml.param.shared , and implement and enforce the pass_object method we define a class called CommonParameters as follows:

from abc import ABCMeta, abstractmethod
from pyspark.ml.param.shared import (
HasInputCol,
HasOutputCol,
HasInputCols,
HasOutputCols,
)

class DataObject:
def __init__(self):
self.raw_data = None # Additional attributes can be added as needed
self.additional_attributes = {} # Store any extra attributes as necessary

def pass_object(self, additional_attributes=None):
if additional_attributes:
self.additional_attributes.update(additional_attributes)
return self

class CommonParameters(
HasInputCol,
HasOutputCol,
HasInputCols,
HasOutputCols,
metaclass=ABCMeta,
):
def __init__(self):
super().__init__()

def check_params(self):
# ensuring that the input and output columns are properly set
# and match in length when using multiple input columns
# this can be overridden if needed
if self.isSet("inputCol") and (self.isSet("inputCols")):
raise ValueError("Only one of `inputCol` and `inputCols` must be set.")
if not (self.isSet("inputCol") or self.isSet("inputCols")):
raise ValueError("One of `inputCol` or `inputCols` must be set.")
if self.isSet("inputCols"):
if len(self.getInputCols()) != len(self.getOutputCols()):
raise ValueError(
"The length of `inputCols` does not match the length of `outputCols`."
)

def setInputCol(self, new_inputCol):
return self.setParams(inputCol=new_inputCol)

def setOutputCol(self, new_outputCol):
return self.setParams(outputCol=new_outputCol)

def setInputCols(self, new_inputCols):
return self.setParams(inputCols=new_inputCols)

def setOutputCols(self, new_outputCols):
return self.setParams(outputCols=new_outputCols)

@abstractmethod
def pass_object(self, data_object: DataObject) -> DataObject:
raise NotImplementedError()

This class not only streamlines the creation of custom PySpark transformers but also enforces a standardized structure, ensuring seamless integration within our custom Pyspark pipeline which we will discuss later. Any custom transformer inheriting the CommonParameters class can leverage the inherited functionalities and, if needed, override the check_params method based on the specific use case or logic of the transformer.

To explain the subsequent steps for creating a custom PySpark transformer, we’ll develop a custom transformer called ReplaceCharacters that replaces characters in a string based on a regular expression (regex) pattern. To adhere to PySpark standards, we first create a class that inherits from the CommonParametersclass defined earlier, as well as DefaultParamsReadable and DefaultParamsWriteable These inherited traits facilitate reading and writing parameters to and from files, either as part of a pipeline or independently. Not inheriting these traits could lead to ValueError during saving the transformer or pipeline.

ValueError: ('Pipeline write will fail on this pipeline because stage %s of type %s is not MLWritable', 'StringAppender_281f47e48529', <class '__main__.StringAppender'>)

Below is the class definition for _ReplaceCharacters which sets the pattern_replaceparameter:

from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable

class _ReplaceCharacters(
DefaultParamsReadable,
DefaultParamsWritable,
CommonParameters,
):
pattern_replace = Param(
Params._dummy(),
"pattern_replace",
"Dict with keys as matching pattern supported with regex and value is the replacement for the given pattern",
)

def __init__(self, *args):
super().__init__(*args)
self._setDefault(pattern_replace={"[^a-zA-Z0-9]": ""})

def getPatternReplace(self):
return self.getOrDefault(self.pattern_replace)

def pass_object(self, data_object):
return data_object

In this class, pattern_replace is defined as a parameter. The _setDefault method initializes it with a default value. The getPatternReplace method retrieves the current value of the pattern_replace parameter, and the pass_object receives an instance of the custom data object as an argument and can be employed to modify and update the data stored within the object. If needed, additional attributes can be integrated into the data object using this method. The purpose of the pass_object method will become evident in our later discussion on constructing a custom pipeline.

Finally , we create the custom transformer ReplaceCharacters class , which inherits from both Transformer and _ReplaceCharacters. This class implements the actual transformation:

class ReplaceCharacters(Transformer, _ReplaceCharacters):
"""Keeps the characters given by pattern."""

@keyword_only
def __init__(
self,
inputCol=None,
outputCol=None,
inputCols=None,
outputCols=None,
pattern_replace=None,
):
super().__init__()
kwargs = self._input_kwargs
self.setParams(**kwargs)

@keyword_only
def setParams(
self,
inputCol=None,
outputCol=None,
inputCols=None,
outputCols=None,
pattern_replace=None,
):
kwargs = self._input_kwargs
return self._set(**kwargs)

def setPatternReplace(self, new_pattern_replace):
return self.setParams(pattern_replace=new_pattern_replace)

@staticmethod
def check_pattern(pattern_replace):
for pattern, replace in pattern_replace.items():
try:
re.compile(pattern)
except re.error:
raise ValueError("Non-valid pattern")

def multi_replacements(self, col_name, pattern_replace):
for pattern, replace in pattern_replace.items():
col_name = regexp_replace(col_name, pattern, replace)
return col_name

def _transform(self, dataset):
self.check_params()
pattern_replace = self.getPatternReplace()
self.check_pattern(pattern_replace=pattern_replace)
input_columns = (
[self.getInputCol()] if self.isSet("inputCol") else self.getInputCols()
)
output_columns = (
[self.getOutputCol()] if self.isSet("outputCol") else self.getOutputCols()
)


df = reduce(
lambda memo_df, col_name: memo_df.withColumn(
col_name[1], self.multi_replacements(col_name[0], pattern_replace)
),
zip(input_columns, output_columns),
dataset,
)
return df

In this class, we define methods for handling input parameters, checking regex patterns, and performing the character replacements. The _transform method is where the actual transformation occurs, using the defined parameters and regex patterns to replace characters in the specified columns. This method operates on the dataset that enters the stage. The primary goal is to process the data while also interacting with the custom data object, if required. Additionally we can implement any validation check before applying the transformation logic to avoid runtime error and enhance the robustness of the custom transformer as we have done here by self.check_params() and self.check_pattern()

The @keyword_only decorator is used to enforce that the arguments for a function must be passed using keywords, not positional arguments. This can be particularly useful when dealing with functions that have multiple optional parameters and it's easy to confuse the order of the arguments. When @keyword_only is used, all keyword arguments are stored in self._input_kwargs. The setParams method is a convention in PySpark's MLlib library. It is used to set the values of parameters for an instance of a transformer or an estimator. In the context of custom transformer development, it's particularly useful when one wants to allow users to dynamically change the parameters of the transformer after it has been instantiated.

Now that we’ve explored the process of creating a custom transformer, let’s return to our innovative approach for overcoming the challenge of passing data objects between stages of the pipeline.

Extending the PipelineModel:

We introduce a new class named CustomPipelineModel , an extension of the existing PySpark PipelineModel. This new class showcases the integration of our custom approach.

import logging
from typing import List, cast
from pyspark.ml.base import Estimator, Transformer
from pyspark.ml.pipeline import Pipeline, PipelineModel
from pyspark.ml.common import inherit_doc
from pyspark.sql.dataframe import DataFrame
logger = logging.getLogger(__name__)

class DataObject:
def __init__(self):
self.raw_data = None # any needed attribute can be added
self.additional_attributes = {} # Store any additional attributes
# as needed


def pass_object(self, additional_attributes=None):
if additional_attributes:
self.additional_attributes.update(additional_attributes)
return self

@inherit_doc
class CustomPipelineModel(PipelineModel):
@staticmethod
def get_data_object() -> DataObject:
data_object = DataObject()
return data_object

def pass_object(self, data_object: DataObject) -> DataObject:
return self.get_data_object()

def _transform(self, dataset: DataFrame) -> DataFrame:
data_object = self.get_data_object()
data_object.raw_data = dataset
data_object.logger = logger
for t in self.stages:
data_object = t.pass_object(data_object)
logger.info(f"transforming data with stage {t.__class__.__name__}")
dataset = t.transform(dataset)
return dataset

class CustomPipeline(Pipeline):
def _fit(self, dataset: DataFrame) -> "PipelineModel":
stages = self.getStages()
for stage in stages:
if not (isinstance(stage, Estimator) or isinstance(stage, Transformer)):
raise TypeError(
"Cannot recognize a pipeline stage of type %s." % type(stage)
)
indexOfLastEstimator = -1
for i, stage in enumerate(stages):
if isinstance(stage, Estimator):
indexOfLastEstimator = i
transformers: List[Transformer] = []
for i, stage in enumerate(stages):
if i <= indexOfLastEstimator:
if isinstance(stage, Transformer):
transformers.append(stage)
dataset = stage.transform(dataset)
else: # must be an Estimator
model = stage.fit(dataset)
transformers.append(model)
if i < indexOfLastEstimator:
dataset = model.transform(dataset)
else:
transformers.append(cast(Transformer, stage))
return CustomPipelineModel(transformers)
  • get_data_object is a static method defined within CustomPipelineModel. It instantiates an empty instance of the custom DataObject and returns it. This method becomes the foundation for managing the custom DataObject’s lifecycle.
  • Within CustomPipelineModel, the pass_object method accepts an instance of the custom DataObject as an argument and returns a new instance of it. This enables the data object to be passed along while preserving the original instance.
  • The _transform method encapsulates the core logic of the custom pipeline’s transformation process. As it processes each stage in a for loop, it utilizes the pass_object method of the stage to carry forward the data object and potentially update it with new attributes.
  • During each transformation step, the data object’s raw data attribute is assigned the dataset being transformed, facilitating data interaction at each stage.
  • Additionally, the _transform method generates informative log entries for each transformation stage, enhancing traceability and debuggability.

CustomPipeline: A Solution for Passing Non-Tabular Data Formats

In part 1, we explored the limitations of passing complex data objects between stages in PySpark ML pipelines and discussed the creation of a custom DataObject to address this issue. Now, let’s delve into how we can use the CustomPipeline along with DataObject class to overcome these limitations in scenarios involving passing non-tabular data formats.

import logging
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from typing import List, cast
from pyspark.ml.base import Estimator, Transformer
from pyspark.ml.pipeline import Pipeline, PipelineModel
from pyspark.ml.common import inherit_doc
from pyspark.sql.dataframe import DataFrame
from abc import ABCMeta, abstractmethod
from pyspark.ml.param.shared import (
HasInputCol,
HasInputCols,
HasOutputCol,
HasOutputCols,
)
from pyspark import keyword_only
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class DataObject:
def __init__(self):
self.raw_data = None
self.json_Schema = None
self.row_count_initial = None
self.row_count_final = None
self.logger = None
# Store any additional attributes as needed
self.additional_attributes = {}

def pass_object(self, additional_attributes=None):
if additional_attributes:
self.additional_attributes.update(additional_attributes)
return self


class CommonParameters(
HasInputCol,
HasOutputCol,
HasInputCols,
HasOutputCols,
metaclass=ABCMeta,
):
def __init__(self):
super().__init__()

def check_params(self):
if self.isSet("inputCol") and (self.isSet("inputCols")):
raise ValueError("Only one of `inputCol` and `inputCols`" "must be set.")
if not (self.isSet("inputCol") or self.isSet("inputCols")):
raise ValueError("One of `inputCol` or `inputCols` must be set.")
if self.isSet("inputCols"):
if len(self.getInputCols()) != len(self.getOutputCols()):
raise ValueError(
"The length of `inputCols` does not match"
" the length of `outputCols`"
)

def setInputCol(self, new_inputCol):
return self.setParams(inputCol=new_inputCol)

def setOutputCol(self, new_outputCol):
return self.setParams(outputCol=new_outputCol)

def setInputCols(self, new_inputCols):
return self.setParams(inputCols=new_inputCols)

def setOutputCols(self, new_outputCols):
return self.setParams(outputCols=new_outputCols)

@abstractmethod
def pass_object(self, data_object: DataObject) -> DataObject:
raise NotImplementedError()


@inherit_doc
class CustomPipelineModel(PipelineModel):
@staticmethod
def get_data_object() -> DataObject:
data_object = DataObject()
return data_object

def pass_object(self, data_object: DataObject) -> DataObject:
return self.get_data_object()

def _transform(self, dataset: DataFrame) -> DataFrame:
data_object = self.get_data_object()
data_object.raw_data = dataset
data_object.logger = logger
for t in self.stages:
data_object = t.pass_object(data_object)
logger.info(f"transforming data with stage {t.__class__.__name__}")
dataset = t.transform(dataset)
return dataset


class CustomPipeline(Pipeline):
def _fit(self, dataset: DataFrame) -> "PipelineModel":
stages = self.getStages()
for stage in stages:
if not (isinstance(stage, Estimator) or isinstance(stage, Transformer)):
raise TypeError(
"Cannot recognize a pipeline stage of type %s." % type(stage)
)
indexOfLastEstimator = -1
for i, stage in enumerate(stages):
if isinstance(stage, Estimator):
indexOfLastEstimator = i
transformers: List[Transformer] = []
for i, stage in enumerate(stages):
if i <= indexOfLastEstimator:
if isinstance(stage, Transformer):
transformers.append(stage)
dataset = stage.transform(dataset)
else: # must be an Estimator
model = stage.fit(dataset)
transformers.append(model)
if i < indexOfLastEstimator:
dataset = model.transform(dataset)
else:
transformers.append(cast(Transformer, stage))
return CustomPipelineModel(transformers)


def make_api_call(json_schema, feature_name):
# some dummy logic to make the api call and retrieve the needed information
info_dict = json.loads(json_schema)
if feature_name == "feature_one":
return info_dict.get("numRows") + info_dict.get("numColumns")
elif feature_name == "feature_two":
return info_dict.get("numRows") * info_dict.get("numColumns")


class _AddFeatureOne(
DefaultParamsReadable,
DefaultParamsWritable,
CommonParameters,
):

def __init__(self, *args):
super().__init__(*args)

def pass_object(self, data_object: DataObject) -> DataObject:
self.data_object = data_object
return data_object


class AddFeatureOne(Transformer, _AddFeatureOne):
"""changes the case either to upper or lower."""

@keyword_only
def __init__(
self,
inputCol=None,
outputCol=None,
inputCols=None,
outputCols=None
):
super().__init__()
kwargs = self._input_kwargs
self.setParams(**kwargs)

@keyword_only
def setParams(
self,
inputCol=None,
outputCol=None,
inputCols=None,
outputCols=None
):
kwargs = self._input_kwargs
return self._set(**kwargs)

def _transform(self, dataset):
# get the json schema and make an api call to create a feature
json_schema = self.data_object.json_schema
api_result = make_api_call(json_schema=json_schema, feature_name="feature_one")
df = dataset.withColumn(self.getOutputCol(), lit(api_result))
return df


class _AddFeatureTwo(
DefaultParamsReadable,
DefaultParamsWritable,
CommonParameters,
):

def __init__(self, *args):
super().__init__(*args)

def pass_object(self, data_object: DataObject) -> DataObject:
self.data_object = data_object
return data_object


class AddFeatureTwo(Transformer, _AddFeatureTwo):
"""changes the case either to upper or lower."""

@keyword_only
def __init__(
self,
inputCol=None,
outputCol=None,
inputCols=None,
outputCols=None
):
super().__init__()
kwargs = self._input_kwargs
self.setParams(**kwargs)

@keyword_only
def setParams(
self,
inputCol=None,
outputCol=None,
inputCols=None,
outputCols=None
):
kwargs = self._input_kwargs
return self._set(**kwargs)

def _transform(self, dataset):
# get the json schema and make an api call to create a feature
json_schema = self.data_object.json_schema
api_result = make_api_call(json_schema=json_schema, feature_name="feature_two")
df = dataset.withColumn(self.getOutputCol(), lit(api_result))
# we count the number of rows for sanity check
self.data_object.row_count_final = dataset.count()
return df


class _GenerateJson(
DefaultParamsReadable,
DefaultParamsWritable,
CommonParameters,
):

def __init__(self, *args):
super().__init__(*args)

def pass_object(self, data_object: DataObject) -> DataObject:
self.data_object = data_object
return data_object


class GenerateJson(Transformer, _GenerateJson):
"""changes the case either to upper or lower."""

@keyword_only
def __init__(
self,
inputCol=None,
outputCol=None,
inputCols=None,
outputCols=None
):
super().__init__()
kwargs = self._input_kwargs
self.setParams(**kwargs)

@keyword_only
def setParams(
self,
inputCol=None,
outputCol=None,
inputCols=None,
outputCols=None
):
kwargs = self._input_kwargs
return self._set(**kwargs)

def _transform(self, dataset):
num_rows = dataset.count() # Get the number of rows
num_cols = len(dataset.columns) # Extract column names
json_schema = json.dumps({"numRows": num_rows, "numColumns": num_cols})
# get the data_object instance and update the json_schema
# attribute to be passed to the next stages
self.data_object.json_schema = json_schema
# we count the number of rows for sanity check
self.data_object.row_count_initial = dataset.count()
return dataset

# creating dummy data and dummy pipeline
# Create a SparkSession
spark = SparkSession.builder \
.appName("JSON Schema Generation Pipeline") \
.getOrCreate()
data = [(1, 2, 3), (4, 5, 6), (7, 8, 9)]
columns = ["col1", "col2", "col3"]
dataset = spark.createDataFrame(data, columns)
generate_json = GenerateJson(inputCols=["col1", "col2", "col3"], outputCols=["col1", "col2", "col3"])
add_feature_one = AddFeatureOne(inputCols=["col1", "col2", "col3"], outputCols=["col1", "col2", "col3", "feature_one"])
add_feature_two = AddFeatureTwo(inputCols=["col1", "col2", "col3", "feature_one"], outputCols=["col1", "col2", "col3", "feature_one", "feature_two"])
pipeline = CustomPipeline(stages=[generate_json, add_feature_one, add_feature_two])
pipeline_fitted = pipeline.fit(dataset=dataset)
pipeline_fitted.transform(dataset=dataset)
# sanity check for missing rows
count_initial = generate_json.data_object.row_count_initial
count_final = add_feature_two.data_object.row_count_final
assert count_final - count_initial == 0

# executing this code will print
# INFO:__main__:transforming data with stage GenerateJson
# INFO:__main__:transforming data with stage AddFeatureOne
# INFO:__main__:transforming data with stage AddFeatureTwo

Addressing Memory Issues for Large Datasets:

While the proposed solution involving the DataObject and CustomPipeline effectively addresses the limitation of passing complex data objects between stages, it’s important to recognize that in situations involving large datasets, memory usage can become a significant concern. Storing intermediate results and metadata in memory, as the DataObject solution suggests, might lead to memory exhaustion and decreased performance.

To tackle memory issues when dealing with large datasets, one effective approach is to persist the data on disk within the pass_object method itself. By re-partitioning the data for optimal disk storage and considering formats like Parquet or Delta tables, it’s possible to balance the trade-off between memory utilization and performance. See the usage example below,

from pyspark.ml.base import Transformer
from pyspark.sql import SparkSession

class PersistDataStage(Transformer):
def __init__(self, num_partitions=4):
self.num_partitions = num_partitions

def pass_object(self, data_object, persist=True):
# Existing logic to perform transformations
processed_data = data_object.raw_data.select("id", "value")

if persist:
# Repartition and persist data in Parquet format
spark = SparkSession.builder.getOrCreate()

processed_data.repartition(self.num_partitions)\
.write.parquet("processed_data.parquet")
# Update the data object with the processed data
Data_object.processed_data\
spark.read.parquet("processed_data.parquet")

return data_object

def _transform(self, dataset):
# Perform transformations using the processed_data
# This method will be invoked in the main pipeline
pass

Summary

In part 1 and part 2, we delved into the world of pipelines in data science projects, uncovering their significance, advantages, and drawbacks. As you saw, pipelines provide structured and systematic sequences of data processing and transformation steps, simplifying the way we handle complex workflows. They bring forth a range of benefits, including modularity, streamlined workflows, enhanced reproducibility, parameter tuning capabilities, and seamless deployment.

As an illustration, we walked through a practical example using Pyspark ML to construct a machine learning pipeline for predicting housing prices based on various features. This showcased how pipelines simplify the development process, making it cohesive, efficient, and easy to reproduce.

Yet, pipelines have their limitations, as you observed. Passing data objects between stages can be a challenge, especially in scenarios dynamic calculations. To address this, we introduced an innovative custom approach. By including the DataObject class and implementing a CustomPipeline, we created a solution that enables the smooth transfer of data objects between stages, preserving the pipeline’s modularity and adaptability.

It’s important to note that memory concerns, particularly with large datasets, need careful attention. As we explored, mitigating memory issues involves persisting data on disk through techniques like repartitioning and using optimized formats like Parquet or Delta tables. This balance between memory usage and performance is essential for managing sizable datasets effectively.

In conclusion, our journey through pipelines and the custom approach outlined in these articles showcases their transformative impact on data workflows. By embracing these concepts, data scientists can navigate intricate data processes, enhance reproducibility, and overcome challenges tied to data object transfers within pipelines.

References

--

--

Zeid Zandi

Senior data scientist, I passionately share my insights and expertise in overcoming practical challenges in the realm of data science.