Using Pipelines#

Author: Nathan A. Mahynski

Date: 2023/08/31

Description: A brief introduction to using pipelines.

Open In Colab

Pipelines are composite estimators which are compose of multiple intermediate steps. The point is to be able to construct a reproducible workflow that performs various actions (transformations) on the X data before making a final prediction with it. For example, it is common to autoscale data before performing PCA. A pipeline can be created to do these in order for you.

image0

There are numerous online resources discussing pipelines, but they are a central part of the sklearn API, and make producing models and workflows very easy. Here we will review basic use of pipelines to make composite models. One of the main features of pychemauth is that it follows scikit-learn’s estimator API so these models can be incorporated into pipelines.

It is important to view the entire pipeline as “the model” - everything that happens between the beginning and the end involves various hyperparameters that can be tuned using cross-validation. This allows you to quickly and easily test whether certain steps improve the performance of a pipeline.

The image at the right shows an example pipeline used in this paper. These can be more complicated, but the general workflow is a linear series of steps which transform the data each recieves from the last step until it reaches the end. The final estimator can be another transformer or a model such as a classifier or regressor. The pipeline inherits properties of the last step, so you can think of the pipeline as if it were that last step, it just operates on preprocessed data.

Note that pipelines only operate on the X (feature matrix) data. scikit-learn has other tools to handle transformations of the the y (target) variable(s).

[1]:
if 'google.colab' in str(get_ipython()):
    !pip install git+https://github.com/mahynski/pychemauth@main
    import os
    os.kill(os.getpid(), 9) # Automatically restart the runtime to reload libraries
[2]:
try:
    import pychemauth
except:
    raise ImportError("pychemauth not installed")

import matplotlib.pyplot as plt
%matplotlib inline

import watermark
%load_ext watermark

%load_ext autoreload
%autoreload 2
[3]:
import sklearn
import imblearn
import numpy as np
from pychemauth.preprocessing.scaling import CorrectedScaler
from pychemauth.preprocessing.imbalanced import ScaledSMOTEENN
from pychemauth.preprocessing.missing import PCA_IA
from pychemauth.classifier.plsda import PLSDA
[4]:
%watermark -t -m -v --iversions
Python implementation: CPython
Python version       : 3.11.4
IPython version      : 8.14.0

Compiler    : GCC 12.2.0
OS          : Linux
Release     : 6.2.0-34-generic
Machine     : x86_64
Processor   : x86_64
CPU cores   : 40
Architecture: 64bit

json      : 2.0.9
watermark : 2.4.3
sklearn   : 1.3.0
imblearn  : 0.11.0
numpy     : 1.24.3
pychemauth: 0.0.0b3
matplotlib: 3.7.2

Scikit-learn Pipelines#

Pipelines are composed of a series of estimators. These are basically functions that implement a fit and transform function (and a few others). Each step calls transform on the input from the last step.

[5]:
# Here is an example pipeline

# Each step is composed of a ("name", estimator) pair.

# The values each step should be instantiated with can be specified.  These can later be changed during cross-validation
# but these serve as defaults that will be used unless otherwise specified.
pipeline = sklearn.pipeline.Pipeline(
    steps=[
        ("fill_in_missing", PCA_IA( # First we are assuming we need to fill in missing values in X
                n_components=3,
                missing_values=np.nan
            )
        ),
        ("class_balancer", ScaledSMOTEENN( # Next, we will have to balance the observations of the number of each class
            k_smote=5,
            k_enn=3,
            )
        ),
        ("autoscaler", CorrectedScaler( # Then, we should center and scale the data
            with_mean=True,
            with_std=True,
            pareto=False
            )
        ),
        ("my_chosen_model", PLSDA( # Finally, we will pass the cleaned, balanced, and scaled data to the model
            n_components=1,
            style='hard',
            not_assigned='UNKNOWN'
            )
        )
    ]
)
[6]:
# In Jupyter notebooks the pipeline will even interactively render in HTML!
pipeline
[6]:
Pipeline(steps=[('fill_in_missing', PCA_IA(n_components=3)),
                ('class_balancer',
                 <pychemauth.preprocessing.imbalanced.ScaledSMOTEENN object at 0x7f149842a4d0>),
                ('autoscaler',
                 <pychemauth.preprocessing.scaling.CorrectedScaler object at 0x7f12b543bc50>),
                ('my_chosen_model',
                 PLSDA(not_assigned='UNKNOWN', style='hard'))])
In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.

A pipeline’s steps can be access as a python list, dictionary, or even directly by its name.

[7]:
pipeline.steps
[7]:
[('fill_in_missing', PCA_IA(n_components=3)),
 ('class_balancer',
  <pychemauth.preprocessing.imbalanced.ScaledSMOTEENN at 0x7f149842a4d0>),
 ('autoscaler',
  <pychemauth.preprocessing.scaling.CorrectedScaler at 0x7f12b543bc50>),
 ('my_chosen_model', PLSDA(not_assigned='UNKNOWN', style='hard'))]
[8]:
pipeline.named_steps
[8]:
{'fill_in_missing': PCA_IA(n_components=3),
 'class_balancer': <pychemauth.preprocessing.imbalanced.ScaledSMOTEENN at 0x7f149842a4d0>,
 'autoscaler': <pychemauth.preprocessing.scaling.CorrectedScaler at 0x7f12b543bc50>,
 'my_chosen_model': PLSDA(not_assigned='UNKNOWN', style='hard')}
[9]:
pipeline["fill_in_missing"]
[9]:
PCA_IA(n_components=3)
In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.

Each estimator in the pipeline has parameters which can be accessed using the name__parameter syntax. This is usually most important when doing cross-validation as will be explained later.

[10]:
pipeline.set_params(fill_in_missing__n_components=2)
[10]:
Pipeline(steps=[('fill_in_missing', PCA_IA(n_components=2)),
                ('class_balancer',
                 <pychemauth.preprocessing.imbalanced.ScaledSMOTEENN object at 0x7f149842a4d0>),
                ('autoscaler',
                 <pychemauth.preprocessing.scaling.CorrectedScaler object at 0x7f12b543bc50>),
                ('my_chosen_model',
                 PLSDA(not_assigned='UNKNOWN', style='hard'))])
In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.

Cross-Validation#

There are various forms of cross-validation that sklearn provides; KFold (for regression) and StratifiedKFold (for classification) are probably the most commonly used. To optimize the hyperparameters of a pipeline one can use GridSearchCV or other hyperparameter optimizers.

For a grid search, we simply specify all parameters we wish to try and every combination will be tested; the best model, as determined by the model (final pipeline step) score from CV will be taken as the best.

[11]:
# Let's get some data ready for this example
from sklearn.datasets import load_iris as load_data
from sklearn.model_selection import train_test_split, GridSearchCV

X, y = load_data(return_X_y=True, as_frame=True)

# Let's turn the indices into names
names = dict(zip(np.arange(3), ['setosa', 'versicolor', 'virginica']))
y = y.apply(lambda x: names[x])

# Split the data into a test/train partition
X_train, X_test, y_train, y_test = train_test_split(
    X.values,
    y.values, # Let's try to predict the salary based on the other numerical features.
    shuffle=True,
    random_state=42,
    test_size=0.2,
    stratify=y # It is usually important to balance the test and train set so they have the same fraction of classes
)

We can also set entire steps to be skipped by specifying them as “passthrough”, as shown for the class_balancer step below.

[12]:
# We can specify the set of parameters to check over like this.  Note that anything we do not change will remain the
# same as what was originally specified when the pipeline was instantiated.
param_grid = [{
    'fill_in_missing__n_components': [1, 3],
    'class_balancer': ['passthrough'], # Skip class balancing altogether
    'autoscaler__pareto': [True, False],
    'my_chosen_model__n_components':[3, 4],
}]

gs = GridSearchCV(
    estimator=pipeline,
    param_grid=param_grid,
    n_jobs=-1,
    cv=sklearn.model_selection.StratifiedKFold(n_splits=3, shuffle=True, random_state=0), # Specify a form of CV
    error_score=0,
    refit=True # This will retrain the entire model on the whole training set after it determines the best hyperparameters
)

# Calling `fit` on the gridsearch object performs CV on the pipeline to find the best hyperparameters
_ = gs.fit(X_train, y_train)
[13]:
# Here are the best parameters found
gs.best_params_
[13]:
{'autoscaler__pareto': True,
 'class_balancer': 'passthrough',
 'fill_in_missing__n_components': 1,
 'my_chosen_model__n_components': 4}
[14]:
# All the detailed results from CV are stored here
gs.cv_results_
[14]:
{'mean_fit_time': array([0.01555355, 0.01575057, 0.01295646, 0.01605217, 0.01434684,
        0.01609238, 0.01260916, 0.01798193]),
 'std_fit_time': array([0.00363846, 0.00170496, 0.00145024, 0.00393951, 0.00045871,
        0.0052821 , 0.0003251 , 0.00686031]),
 'mean_score_time': array([0.02027122, 0.02102049, 0.01915995, 0.02233807, 0.01883252,
        0.02080957, 0.01766928, 0.02486396]),
 'std_score_time': array([0.0036506 , 0.00277361, 0.00274776, 0.00668645, 0.00148447,
        0.00517467, 0.00083951, 0.0082815 ]),
 'param_autoscaler__pareto': masked_array(data=[True, True, True, True, False, False, False, False],
              mask=[False, False, False, False, False, False, False, False],
        fill_value='?',
             dtype=object),
 'param_class_balancer': masked_array(data=['passthrough', 'passthrough', 'passthrough',
                    'passthrough', 'passthrough', 'passthrough',
                    'passthrough', 'passthrough'],
              mask=[False, False, False, False, False, False, False, False],
        fill_value='?',
             dtype=object),
 'param_fill_in_missing__n_components': masked_array(data=[1, 1, 3, 3, 1, 1, 3, 3],
              mask=[False, False, False, False, False, False, False, False],
        fill_value='?',
             dtype=object),
 'param_my_chosen_model__n_components': masked_array(data=[3, 4, 3, 4, 3, 4, 3, 4],
              mask=[False, False, False, False, False, False, False, False],
        fill_value='?',
             dtype=object),
 'params': [{'autoscaler__pareto': True,
   'class_balancer': 'passthrough',
   'fill_in_missing__n_components': 1,
   'my_chosen_model__n_components': 3},
  {'autoscaler__pareto': True,
   'class_balancer': 'passthrough',
   'fill_in_missing__n_components': 1,
   'my_chosen_model__n_components': 4},
  {'autoscaler__pareto': True,
   'class_balancer': 'passthrough',
   'fill_in_missing__n_components': 3,
   'my_chosen_model__n_components': 3},
  {'autoscaler__pareto': True,
   'class_balancer': 'passthrough',
   'fill_in_missing__n_components': 3,
   'my_chosen_model__n_components': 4},
  {'autoscaler__pareto': False,
   'class_balancer': 'passthrough',
   'fill_in_missing__n_components': 1,
   'my_chosen_model__n_components': 3},
  {'autoscaler__pareto': False,
   'class_balancer': 'passthrough',
   'fill_in_missing__n_components': 1,
   'my_chosen_model__n_components': 4},
  {'autoscaler__pareto': False,
   'class_balancer': 'passthrough',
   'fill_in_missing__n_components': 3,
   'my_chosen_model__n_components': 3},
  {'autoscaler__pareto': False,
   'class_balancer': 'passthrough',
   'fill_in_missing__n_components': 3,
   'my_chosen_model__n_components': 4}],
 'split0_test_score': array([0.75, 0.8 , 0.75, 0.8 , 0.75, 0.8 , 0.75, 0.8 ]),
 'split1_test_score': array([0.725, 0.775, 0.725, 0.775, 0.725, 0.775, 0.725, 0.775]),
 'split2_test_score': array([0.65 , 0.675, 0.65 , 0.675, 0.65 , 0.675, 0.65 , 0.675]),
 'mean_test_score': array([0.70833333, 0.75      , 0.70833333, 0.75      , 0.70833333,
        0.75      , 0.70833333, 0.75      ]),
 'std_test_score': array([0.04249183, 0.05400617, 0.04249183, 0.05400617, 0.04249183,
        0.05400617, 0.04249183, 0.05400617]),
 'rank_test_score': array([5, 1, 5, 1, 5, 1, 5, 1], dtype=int32)}

refit=True refits the entire pipeline on all of the data. Here we used 3-fold CV so the best model was only over fit with 2/3 of the training data while the remaining 1/3 was held out to evaluate the performance on. As a result, now we can use the pipeline directly to predict on the test set, etc. without any need for further training.

[15]:
gs.predict(X_test)[:10]
[15]:
['setosa',
 'virginica',
 'versicolor',
 'versicolor',
 'setosa',
 'virginica',
 'setosa',
 'setosa',
 'virginica',
 'setosa']
[16]:
print(
    'The best model has: {}% accuracy on the test set and {}% accuracy on the training set'.format(
        '%.1f'%(100*gs.score(X_test, y_test)),
        '%.1f'%(100*gs.score(X_train, y_train)),
    )
)
The best model has: 76.7% accuracy on the test set and 75.8% accuracy on the training set

Imbalanced-learn Pipelines#

The path the test data takes through the pipeline shown at the top of this document is different from that taken by the training data. This is because there is a class-balancing step in the pipeline. This particular example uses resampling to generate synthetic training examples of minority classes in the training set so the final model will be less biased. However, this should only act during training; at test time, we don’t want to create “fake” data on which to evaluate the model since this does not give us a real impression of the now model is handling data.

The imbalanced-learn package gets around this by employing fit_resample methods which are only called during training, while fit methods get called at test time. If an estimator does not have a fit method, it will be skipped. sklearn’s pipelines do not include this logic, but fortunately imblearn’s pipelines are drop-in replacements which can handle this! This is why it was actually necessary to specify ‘passthrough’ for the class_balancer in the last example.

[17]:
# Note the syntax is identical!
imblearn_pipeline = imblearn.pipeline.Pipeline(
    steps=[
        ("fill_in_missing", PCA_IA( # First we are assuming we need to fill in missing values in X
                n_components=3,
                missing_values=np.nan
            )
        ),
        ("class_balancer", ScaledSMOTEENN( # Next, we will have to balance the observations of the number of each class
            k_smote=5,
            k_enn=3,
            )
        ),
        ("autoscaler", CorrectedScaler( # Then, we should center and scale the data
            with_mean=True,
            with_std=True,
            pareto=False
            )
        ),
        ("my_chosen_model", PLSDA( # Finally, we will pass the cleaned, balanced, and scaled data to the model
            n_components=1,
            style='hard',
            not_assigned='UNKNOWN'
            )
        )
    ]
)
[18]:
param_grid = [{
    'fill_in_missing__n_components': [1, 3],
    'class_balancer': ['passthrough', ScaledSMOTEENN(k_smote=5, k_enn=3)], # Now we can try adding a class balancing step
    'autoscaler__pareto': [True, False],
    'my_chosen_model__n_components':[3, 4],
}]

gs = GridSearchCV(
    estimator=imblearn_pipeline,
    param_grid=param_grid,
    n_jobs=-1,
    cv=sklearn.model_selection.StratifiedKFold(n_splits=3, shuffle=True, random_state=0), # Specify a form of CV
    error_score=0,
    refit=True # This will retrain the entire model on the whole training set after it determines the best hyperparameters
)

# Calling `fit` on the gridsearch object performs CV on the pipeline to find the best hyperparameters
_ = gs.fit(X_train, y_train)
[19]:
gs.best_params_
[19]:
{'autoscaler__pareto': True,
 'class_balancer': <pychemauth.preprocessing.imbalanced.ScaledSMOTEENN at 0x7f146ff1d510>,
 'fill_in_missing__n_components': 1,
 'my_chosen_model__n_components': 4}
[21]:
print(
    'The best model has: {}% accuracy on the test set and {}% accuracy on the training set'.format(
        '%.1f'%(100*gs.score(X_test, y_test)),
        '%.1f'%(100*gs.score(X_train, y_train)),
    )
)
The best model has: 80.0% accuracy on the test set and 80.0% accuracy on the training set