Utilizing the Power of Pyspark Pipelines in Data Science Projects: Benefits and Limitations (1/2)

Zeid Zandi
8 min readOct 24, 2023

--

AI-Generated Image

This sequence of articles offer an interpretation of pipelines and delves into the benefits they bring when incorporated into data science projects. Additionally, it examines the potential constraints they might introduce and propose solutions to overcome these challenges. Below, you’ll find a brief introduction and the objectives of each article.

  • Part 1, Docker Setup for Pyspark, Exploring Challenges in Implementing Efficient Pipelines: Before delving into the topic of pipelines in data science, we provide a brief explanation on setting up a Docker container and exposing a Jupyter Notebook port for using PySpark. We then explore the concept of pipelines, outlining their structure, purpose, and the advantages they offer in efficiently processing data. We also discuss the potential challenges and limitations faced when implementing pipelines in data science projects through coding examples.
  • Part 2, Overcoming Challenges and Implementing Solutions within PySpark Pipelines: This part focuses on implementing custom pipelines and transformers in PySpark to overcome the challenges and the limitations which were discussed in the previous part. We then will implement the working solutions for the same example discussed in the previous part.

Docker Setup for Pyspark, Exploring Challenges in Implementing Efficient Pipelines:

The most convenient method for experimenting with PySpark for rapid code trials is by leveraging Docker. Once Docker is installed, one can easily execute the PySpark code outlined in this sequence of articles.

Introduction to Docker:

Docker serves as a containerization platform, enabling the bundling of an application and its prerequisites into a standardized software development unit. This approach ensures a consistent environment for running applications across diverse setups, simplifying the development and deployment.

Here’s how you can run the code from this articles:

To utilize the code examples featured in this sequence of articles, it is imperative to have Docker installed on your system. Docker can be installed on a range of operating systems, and comprehensive installation instructions are available on the official Docker website

After successfully installing Docker, execute the following command in your terminal or command prompt:

docker run -it --rm -p 8888:8888 -v "${PWD}":/home/jovyan/work jupyter/pyspark-notebook

Accessing http://<hostname>:8888/?token=<token> in a web browser launches JupyterLab, with:

  • hostname: Referring to the computer where Docker is running.
  • token: The confidential token displayed in the console.

Explanation of the Command:

docker run: This command initiates a Docker container.

-it: Allocates an interactive terminal session.

— rm: Cleans up the container and its file system upon container termination.

-p: 8888:8888: Maps port 8888 from the container to port 8888 on your host system, facilitating access to Jupyter Notebook.

-v: “${PWD}”:/home/jovyan/work: Mounts your present working directory on the host system to the /home/jovyan/work directory within the container. This enables seamless file sharing between your host system and the container.

jupyter/pyspark-notebook: Specifies the Docker image to employ, which, in this context, is the Jupyter PySpark Notebook image.

For more information please see

Pipelines in Data Science Projects:

In data science projects, where the volume and complexity of data continue to grow, effective management and processing of data are paramount. One of the most valuable tools that have emerged to streamline this process is the concept of pipelines. Data science pipelines, such as those provided by libraries like scikit-learn and PySpark MLlib, have become indispensable design patterns for data science projects. This design pattern offers a structured approach to data processing, transformation, and modeling, bringing a multitude of benefits that enhance efficiency, reproducibility, and readability.

Definition of Pipelines:

In the context of data science and machine learning, a pipeline refers to a systematic and organized sequence of data processing and transformation steps, often used to streamline and automate complex workflows. It is a structured approach that involves chaining together multiple stages, where each stage performs a specific operation on the data.

In a Pyspark pipeline, each stage is typically represented by either a Transformer or an Estimator. A Transformer is responsible for applying data transformations without changing its structure, adhering to the principles of the design pattern. An Estimator is used to fit a model or perform more complex operations on the data, ensuring seamless integration within the pipeline structure. These stages are executed sequentially in a for loop, guaranteeing that data flows through each step, undergoing necessary transformations, feature engineering, and modeling, all while adhering to the underlying design pattern.

Benefits of Pipelines:

1. Modularity and Reusability: Pipelines break down the complex data processing workflow into manageable stages, each encapsulating a specific task or transformation. These stages are modular, allowing for easy reuse in different projects. This modularity not only speeds up the development process but also promotes best practices by enforcing consistency and standardization across projects.

2. Streamlined Workflow: Pipelines facilitate a structured and streamlined workflow. Each stage in the pipeline operates sequentially, ensuring that data moves through a series of well-defined steps. This not only makes the process more understandable but also simplifies debugging and error tracking.

3. Enhanced Reproducibility: Reproducibility is a cornerstone of robust data science. Pipelines enable reproducibility by clearly outlining the sequence of data transformations and modeling steps. This makes it easier to recreate results, share analyses, and collaborate with team members.

4. Parameter Tuning and Experimentation: Pipelines provide a convenient framework for hyperparameter tuning and experimentation. By encapsulating different algorithms or models within pipeline stages, it becomes straightforward to compare their performance systematically.

5. Easier Deployment: A well-structured pipeline can seamlessly transition from data exploration and modeling to deployment. Once the pipeline is defined, it can be used to process new data or make predictions without the need for extensive code modifications. Additionally, it’s worth noting that the entire pipeline can be saved and loaded, enhancing the ease and efficiency of deployment processes.

Example Usage of Pipelines:

Consider a machine learning project where the goal is to predict housing prices based on various features. A pipeline could be constructed using Pyspark to streamline the entire process. The pipeline might consist of stages like data preprocessing (handling missing values, scaling features), feature selection, and a regression model (e.g., linear regression).

At the preprocessing stage, the pipeline could include transformers to handle missing data and scale features. Then, a feature selection stage might employ an estimator to select the most relevant features. Finally, the pipeline could end with a regression estimator trained on the selected features. By arranging these stages within a pipeline, the entire process becomes cohesive and easy to reproduce. The pipeline can be fitted on training data, automatically applying all necessary transformations and training steps, and then be used to predict housing prices on new, unseen data.

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from sklearn.datasets import fetch_california_housing
from pyspark.sql.types import StructType, StructField, FloatType

# Create a SparkSession
spark = SparkSession.builder \
.appName("California Housing") \
.getOrCreate()
# Load the California Housing dataset
data = fetch_california_housing()
feature_cols = data.feature_names
# Create a dictionary using a dictionary comprehension
data_dict = {feature: data.data[:, i] for i, feature in enumerate(data.feature_names)}
# add target
data_dict['Target'] = data.target
# Create a Pandas DataFrame from the dictionary
df_pd = pd.DataFrame(data_dict)
# create schema
schema = StructType([
StructField(feature, FloatType(), True) for feature in data.feature_names
] + [StructField("Target", FloatType(), True)])
# create pyspark dataframe from pandas dataframe
df = spark.createDataFrame(df_pd, schema=schema)
# Split the data into training and testing sets
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)
# Define stages for the pipeline
# fill in missing values with the median for each feature.
imputer = Imputer(strategy='median', inputCols=feature_cols[:-1], outputCols=feature_cols[:-1])
# assemble feature columns into a vector column
assembler = VectorAssembler(inputCols=feature_cols[:-1], outputCol='features')
# scale the features to have zero mean and unit variance
scaler = StandardScaler(inputCol='features', outputCol='scaled_features', withStd=True, withMean=True)
# regression algorithm used for prediction
lr = LinearRegression(featuresCol='scaled_features', labelCol='Target', predictionCol='prediction')
# Create the pipeline
pipeline = Pipeline(stages=[imputer, assembler, scaler, lr])
# Fit the pipeline on the training data
model = pipeline.fit(train_data)
# Make predictions on the testing data
predictions = model.transform(test_data)
# Evaluate the model
evaluator = RegressionEvaluator(labelCol='Target', predictionCol='prediction', metricName='mse')
mse = evaluator.evaluate(predictions)
print(f"Mean Squared Error: {mse}")
# Stop the SparkSession
spark.stop()

Please note that this is a simplified example for illustrative purposes. In real-world scenarios, we might want to include more advanced preprocessing steps, perform more thorough hyperparameter tuning, and possibly explore different models and techniques.

Limitation: Passing Complex Data Objects between Stages

While data science pipelines offer many benefits, they also come with certain limitations that need to be acknowledged. One major limitation is the restricted nature of data movement between stages. Pipelines are primarily designed for applying transformations on datasets, which can sometimes lead to complexities and readability issues. To further illustrate the limitation of passing complex data objects between stages in pipelines, let’s delve into a simple example that highlight the challenges posed by this restriction.

Passing non-tabular data formats

One scenario that underscores the limitation of passing complex data objects between stages in pipelines is when dealing with non-tabular data formats. Consider a situation where, instead of transforming a column within a dataset, our objective is to generate a JSON file. This JSON file is crucial as it serves as the schema for an API call to a third-party application to retrieve certain features.

Integrating this task within a pipeline poses a challenge. The pipeline architecture can only accept data frames as inputs for the fit and transform methods. Handling intricate data objects like JSON schema strings directly within a pipeline isn't feasible due to this constraint. Attempting to do so would result in errors and disrupt the pipeline's functionality.

import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
from pyspark.ml import Transformer, Pipeline
from pyspark.ml.feature import VectorAssembler


# Create a SparkSession
spark = SparkSession.builder \
.appName("JSON Schema Generation Pipeline") \
.getOrCreate()

# Sample DataFrame
data = [(1, 2, 3), (4, 5, 6), (7, 8, 9)]
columns = ["col1", "col2", "col3"]
df = spark.createDataFrame(data, columns)

# Attempt to generate JSON schema within the pipeline
class GenerateJson(Transformer):

def _transform(self, dataset):
# dummy example
num_rows = dataset.count() # Get the number of rows
num_cols = len(dataset.columns) # Extract column names
schema = json.dumps({"numRows": num_rows, "numColumns": num_cols})
json_schema = json.dumps(schema) # Convert to JSON
return json_schema

# Define the stages (attempting to include a non-DataFrame stage)
assembler = VectorAssembler(inputCols=columns, outputCol="features")
generate_json_schema = GenerateJson()
lr = LinearRegression(featuresCol="json_schema", labelCol="label")

# Attempting to create a pipeline with non-DataFrame stage
pipeline = Pipeline(stages=[assembler, generate_json_schema, lr])

# Attempt to fit the pipeline (this would fail)
model = pipeline.fit(df)

# Stop the SparkSession
spark.stop()

In this dummy example, the generate_json_schema stage tries to generate a JSON schema from the number of columns and number of rows. However, this operation cannot be directly integrated into the pipeline because it doesn't operate on a data frame. Attempting to include this non-data frame operation in the pipeline would result in errors and disrupt the pipeline's functionality.

This demonstrates the limitation of passing complex data objects (such as JSON schema strings) directly within a PySpark pipeline due to the constraint that pipeline stages can only accept and process data frames.

Now that we have covered the challenges of working with data science pipelines, the next part of our discussion will focus on practical solutions for overcoming these obstacles. See part 2.

References

--

--

Zeid Zandi

Senior data scientist, I passionately share my insights and expertise in overcoming practical challenges in the realm of data science.