So, You Want To Build A Pipeline?

Jesse Markowitz
CodeX
Published in
11 min readSep 20, 2021
Trans-canyon Pipeline (Historic) 2400
*slaps pipe* This baby can hold so much data! | “Trans-canyon Pipeline (Historic) 2400” by Grand Canyon NPS is licensed under CC BY 2.0

Look, I’m only going to say this once: One of the cardinal rules of writing code is Don’t Repeat Yourself (DRY). It is a great universal rule that applies to all languages and it’s especially important when it comes to handling data. DRY is why we write for loops instead of copying and pasting; it’s why we declare functions instead of copying and pasting; and it’s why we use pipelines to pre-process data, fit models, and cross-validate them too.

Repetition in code isn’t just cluttering and confusing — it can lead to serious errors as you write and re-write over variable values. Remember that time that you forgot you had already pre-processed X_train and you ended up filling the entire DataFrame will np.nan, then spent an hour trying to figure out why your model had the accuracy of a potato? Start using pipelines and let that experience become a tale you tell your children to bore them to sleep.

penguins

Penguins in a pipeline

In this little how-to we’ll see some pipelines in action and learn a few strategies along the way to make pipelines even more convenient. We’ll use Allison Horst’s penguins dataset, imported via Seaborn, to build a model that can predict the species of a penguin. The features listed for each penguin are home island, bill and flipper measurements, body mass, and sex. The three species of penguin in the dataset: Adelie, Gentoo, and Chinstrap. All of them are adorable, but especially the Adelie.

Note: I usually see one giant code block at the top of a notebook for importing dependencies, but I think it’s more helpful to do it as we go so it’s easier to see where everything comes from.

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# To print some nice tables (https://pypi.org/project/tabulate/)
from tabulate import tabulate

# Load in the penguins
penguins = sns.load_dataset("penguins")
display(penguins.head())
print(penguins.shape)
png
(344, 7)
penguins

Let’s assume we’ve already done some exploratory data analysis (EDA) to see the distributions of each feature, the potential relationships between them, etc., and we’re ready to do some modeling. As usual, we’ll start by separating our features from our target classes using Scikit-learn’s test_train_split. Since we're trying to predict the species of a penguin, that will be our target, y.

from sklearn.model_selection import train_test_split

# Separate features from target
X = penguins.drop('species', axis=1)
y = penguins['species']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20, random_state=42)

# Print out the sizes
shape_table = [['Original', X.shape, y.shape], ['Training', X_train.shape, y_train.shape],
['Testing', X_test.shape, y_test.shape]]
print(tabulate(shape_table, headers=['Dataset', 'X shape', 'y shape']))
Dataset X shape y shape
--------- --------- ---------
Original (344, 6) (344,)
Training (275, 6) (275,)
Testing (69, 6) (69,)

Before we get too far, we better take a look at a count of missing values.

penguins.isna().sum()species               0
island 0
bill_length_mm 2
bill_depth_mm 2
flipper_length_mm 2
body_mass_g 2
sex 11
dtype: int64

Hmm…not too many, but we still have to do something about them before we try to fit a model. Since we don’t have that much data to begin with, and because this tutorial depends on it, let’s fill them in instead of dropping them.

So the next steps we need to take are:

  1. Fill missing values with:
  • The mean for numerical features
  • The mode for categorical features

2. Scale the numerical data

3. One-hot-encode the categorical data

4. Fit a model (we’ll just use a simple logistic regression)

5. Evaluate the model

And we’re going to use pipelines to do it.

let's do this

But what’s wrong with the way I do things now?

Before pipelines, my workflow might have looked something like this:

from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.model_selection import cross_val_score

# I want to fill in missing values,
# but some of my columns are categorical and some are numerical
num_imputer = SimpleImputer(strategy='median')
cat_imputer = SimpleImputer(strategy='most_frequent')

# Apply each imputer to the correct columns by selecting datatypes
X_train_num_imputed = num_imputer.fit_transform(X_train.select_dtypes(include=['int64', 'float64']))
X_train_cat_imputed = cat_imputer.fit_transform(X_train.select_dtypes(include='object'))

# Might as well scale the numerical stuff...
ss = StandardScaler()
X_train_num_imputed_scaled = ss.fit_transform(X_train_num_imputed)

# ...and one-hot-encode the categorical stuff
ohe = OneHotEncoder(handle_unknown='ignore', sparse=False)
X_train_cat_imputed_ohe = ohe.fit_transform(X_train_cat_imputed)

# Now I gotta put 'em back together
X_train_preprocessed = np.concatenate([X_train_num_imputed_scaled, X_train_cat_imputed_ohe], axis=1)

# And finally fit and evaluate the model
logreg = LogisticRegression(random_state=42)
logreg.fit(X_train_preprocessed, y_train)
initial_score = logreg.score(X_train_preprocessed, y_train)
initial_crossval_score = cross_val_score(logreg, X_train_preprocessed, y_train).mean()

# Print out scores
scores_table = [['Original', initial_score, initial_crossval_score]]
scores_headers = ['Dataset', 'Training score', 'Cross-val score']
print(tabulate(scores_table, headers=scores_headers))
Dataset Training score Cross-val score
--------- ---------------- -----------------
Original 0.996364 0.992727

What a mess!

If you skipped over that block of code, I can’t blame you. It’s repetitive and not well organized. I split and renamed my dataset half a dozen times, then put it all back together. The first time I tried to run it, I had to debug multiple errors, most of which resulted from simple typos because of all the different names I wrote for each new version of X_train. If I want to change or add anything later on, I'll have to hunt through my code and rename a bunch of things. And worst of all, I'll have to repeat that entire process with X_test in order to do a final evaluation of my model!

There’s got to be a better way!
There’s got to be a better way!

Pipelines: A Better Way

Using pipelines will make that code simpler, cleaner, and less repetitive. When working with machine learning models, pipelines make it easier to preprocess data and fit models to training and testing sets. Pipelines can also prevent data leakage, especially when evaluating models via cross-validation.

Just as functions store processes you can run again and again, sklearn’s Pipeline class stores instances of other classes to run on your dataset. Some classes are transformers, while others are estimators. Transformers battle the Decepticons — wait, that’s not right.

Part of what makes pipelines so amazing is their intuitive use of sklearn’s consistent API. Anything you can do with a transformer or an estimator on its own you can do with a pipeline. That means you can use methods like .fit(), .transform(), or .predict() on a pipeline just like you can on each individual piece. You can also use a pipeline in cross-validation to evaluate the performance of a model.

How do you build a pipeline?

When you instantiate sklearn’s Pipeline class, the main parameter you need to define is steps, which takes a list of the transformers and estimator you'd like to include in the pipeline. Each step is written in the form of a tuple. The first item in the tuple is a string name for the transformer/estimator and the second item is the transformer/estimator itself.

A typical pipeline might contain multiple transformers and a final estimator, but you don’t necessarily need multiple components in every pipeline. In fact, the simplest possible pipeline contains just one thing:

simple pipeline

In practice, pipelines usually have at least two steps. Ultimately, we’re going to build a pipeline that consists of other pipelines!

To build a pipeline, you need to know the steps you want to take to process your data and fit a model. That can be difficult to know entirely in advance, so it’s sometimes easier to start with some messy code and then refactor it. Another strategy is build smaller pipelines before putting them together.

Pipe-by-numbers

Let’s start with a pipeline for just the numerical columns in the dataset. We’ll impute the null values using the mean for each column, then scale the data before fitting with a LogisticRegression() estimator.

from sklearn.pipeline import Pipeline

# Pipeline for numerical data only
num_pipe = Pipeline(steps=[
('num_imputer', SimpleImputer(strategy='mean')),
('ss', StandardScaler()),
('logreg', LogisticRegression(random_state=42))
])

# Select only the numerical columns and drop all nulls
X_train_numerical = X_train.select_dtypes(include='float64')

# Fit and score the pipeline
num_pipe.fit(X_train_numerical, y_train)
num_score = num_pipe.score(X_train_numerical, y_train)
num_crossval_score = cross_val_score(num_pipe, X_train_numerical, y_train).mean()

# Compare scores
scores_table.append(['Numerical', num_score, num_crossval_score])
print(tabulate(scores_table, headers=scores_headers))
Dataset Training score Cross-val score
--------- ---------------- -----------------
Original 0.996364 0.992727
Numerical 0.989091 0.985455

It makes sense that our scores dipped — remember that the Original score included all the features from the original dataset, while Numerical includes only the numerical features.

Compare the syntax to accomplish processing the data and fitting a model with and without a pipeline:

Without a pipeline:

imputer = SimpleImputer()
ss = StandardScaler()
logreg = LogisticRegression()

X_train_imp = imputer.fit_transform(X_train)
X_train_scl = ss.fit_transform(X_train_imp)
logreg.fit(X_train_scl)

With a pipeline:

pipe = Pipeline(steps=[
('num_imputer', SimpleImputer()),
('ss', StandardScaler()),
('logreg', LogisticRegression())
])

pipe.fit(X_train)

Look how using a pipeline reduced the overall amount of code and completely removed the need to create a new, renamed version of X_train for each step. Simpler, less risk of errors, and very DRY!

A Categorical Yes

Can we add back in our categorical columns? You betcha! For this we’ll pull in another class called ColumnTransformer. ColumnTransformer is incredibly useful in exactly these kinds of situations because it allows us to perform different operations on different columns, all in one go.

To use ColumnTransformer, we’ll refactor our code a bit and create two sub-pipelines: one for numerical data and one for categorical data.

# Sub-pipeline for the numerical columns
num_transformer = Pipeline(steps=[
('num_imputer', SimpleImputer(strategy='mean')),
('ss', StandardScaler())])

# Sub-pipeline for the categorical columns
cat_transformer = Pipeline(steps=[
('cat_imputer', SimpleImputer(strategy='most_frequent')),
('ohe', OneHotEncoder(handle_unknown='ignore'))])

Notice how neither of these pipelines ends with our LogisticRegression estimator! We’ll save that for our final pipeline. Instead, we’re going to join these two sub-pipelines together using a ColumnTransformer, which takes a list of the transformers you’d like to include in the pipeline. Each transformer is written in the form of a 3-tuple with the following items:

  1. The name for the transformer (a string)
  2. The class or instance of a transformer or sub-pipeline
  3. The columns to apply the transformer to

We can specify the columns by giving a list, such as ['bill_length_mm', 'bill_depth_mm'], but using make_columns_selector is easier since we're selecting columns by datatype rather than by name.

from sklearn.compose import ColumnTransformer, make_column_selector

preprocessing = ColumnTransformer(
transformers=[
('numerical sub-pipe', num_transformer, make_column_selector(dtype_include=['float64'])),
('categorical sub-pipe', cat_transformer, make_column_selector(dtype_include=['object']))
])

Putting it all together

Now we can make a complete pipeline that preprocesses all our features and ends with our estimator. Notice now in the 'preprocessing' step we're passing in the ColumnTransformer that contains the two sub-pipelines, then letting the LogisticRegression work its magic on our whole, completely processed dataset.

# A complete pipeline 
complete_pipe = Pipeline(steps=[
('preprocessing', preprocessing),
('logreg', LogisticRegression(random_state=42))
])

The complete pipeline now consists of a ColumnTransformer and a LogisticRegression classifier. Inside the ColumnTransformer are two sub-pipelines, one for each datatype in our dataset. Each sub-pipeline is made of a SimpleImputer and one other step: a StandardScaler for the numerical data and a OneHotEncoder for the categorical data.

No worries if that’s still a lot of pieces to put together. It might be easier to just see a visual:

# This will allow us to see a nice diagram of our pipelinefrom sklearn import set_config
set_config(display='diagram')

complete_pipe
Note: If you’re running this code yourself in a Jupyter Notebook, see what happens when you click on each component in the diagram!

Now when we want to fit and score our pipeline, we don’t have to select columns or datatypes outside of it — it all happens within the pipe! We can now pass in X_train directly, without having to transform it by hand or meddle with it at all.

# Fit and score the pipeline
complete_pipe.fit(X_train, y_train)
complete_score = complete_pipe.score(X_train, y_train)
complete_crossval_score = cross_val_score(complete_pipe, X_train, y_train).mean()

# Compare scores
scores_table.append(['Complete', complete_score, complete_crossval_score])
print(tabulate(scores_table, headers=scores_headers))
Dataset Training score Cross-val score
--------- ---------------- -----------------
Original 0.996364 0.992727
Numerical 0.989091 0.985455
Complete 0.996364 0.996364

As expected, our Complete training score is identical to our Original since we’re once again using all our features again. But notice that the cross-val scores are different! (Hint: It has to do with accidental data leakage during the construction of the Original model! But we’ll save that for another post…)

you did it!

Not to repeat myself…

…but let’s take another look at both methods to see the full pipeline all together and to admire the power of the pipe to make our code simpler, cleaner, and as DRY an article about pipelines.

Without pipelines:

If you skipped over this code block before, this time try to identify each part that we included in our complete pipeline.num_imputer = SimpleImputer(strategy='median')
cat_imputer = SimpleImputer(strategy='most_frequent')

X_train_num_imputed = num_imputer.fit_transform(X_train.select_dtypes(include=['int64', 'float64']))
X_train_cat_imputed = cat_imputer.fit_transform(X_train.select_dtypes(include='object'))

ss = StandardScaler()
X_train_num_imputed_scaled = ss.fit_transform(X_train_num_imputed)

ohe = OneHotEncoder(handle_unknown='ignore', sparse=False)
X_train_cat_imputed_ohe = ohe.fit_transform(X_train_cat_imputed)

X_train_preprocessed = np.concatenate([X_train_num_imputed_scaled, X_train_cat_imputed_ohe], axis=1)

logreg = LogisticRegression(random_state=42)
logreg.fit(X_train_preprocessed, y_train)
initial_score = logreg.score(X_train_preprocessed, y_train)
initial_crossval_score = cross_val_score(logreg, X_train_preprocessed, y_train).mean()

scores_table = [['Original', initial_score, initial_crossval_score]]
scores_headers = ['Dataset', 'Training score', 'Cross-val score']
print(tabulate(scores_table, headers=scores_headers))
Dataset Training score Cross-val score
--------- ---------------- -----------------
Original 0.996364 0.992727

Without pipelines, we have to:

  • perform each step manually
  • keep track of various versions and splits of X_train
  • join them all back together in the end.

It’s complex, repetitive, and at high risk of errors from typos or putting steps in the wrong order. Not to mention that there’s data leakage (hint: it has to do with StandardScaler!). And worst of all, in order to evaluate our model on our holdout set, we’d have to repeat the entire process with a whole new set of versions and splits of X_test, plus remembering to change each .fit_transform() to .transform() and to remove logreg.fit() entirely. It’s a recipe for endless debugging and invalid results. No thanks!

With pipelines:

Here's our pipeline, all in one go.num_transformer = Pipeline(steps=[
('num_imputer', SimpleImputer(strategy='mean')),
('ss', StandardScaler())])

cat_transformer = Pipeline(steps=[
('cat_imputer', SimpleImputer(strategy='most_frequent')),
('ohe', OneHotEncoder(handle_unknown='ignore'))])

preprocessing = ColumnTransformer(
transformers=[
('numerical sub-pipe', num_transformer, make_column_selector(dtype_include=['float64'])),
('categorical sub-pipe', cat_transformer, make_column_selector(dtype_include=['object']))
])

complete_pipe = Pipeline(steps=[
('preprocessing', preprocessing),
('logreg', LogisticRegression(random_state=42))
])

complete_pipe.fit(X_train, y_train)
complete_score = complete_pipe.score(X_train, y_train)
complete_crossval_score = cross_val_score(complete_pipe, X_train, y_train).mean()

scores_table.append(['Complete', complete_score, complete_crossval_score])
print(tabulate(scores_table, headers=scores_headers))
Dataset Training score Cross-val score
--------- ---------------- -----------------
Original 0.996364 0.992727
Complete 0.996364 0.996364

With pipelines, our code is cleaner and each step is clearly spelled out. Making changes or adding to any of the parts of the complete pipeline is easy and doesn’t require a cascade of renaming. Plus, in order to evaluate our model on our holdout set, all we have to do is:

final_score = complete_pipe.score(X_test, y_test)
print('Final score on holdout set: ', final_score)
Final score on holdout set: 0.9855072463768116
it's that simple

But wait, there’s more!

If you want to level-up your pipelines, check out these other methods:

I hope this helped you transition into using pipelines! Learning about pipelines brought a lot of clarity to my understanding of machine learning and a lot of improvements to my code. I’m still fairly new to Data Science, so please feel free to leave a suggestion or (especially) a correction in the comments!

Happy modeling!

See my code and try it yourself at: https://github.com/jmarkowi/build_a_pipeline

--

--

Jesse Markowitz
CodeX
Writer for

Data scientist with a background in science education and a passion for creative problem solving for public good. New York, NY