Custom transformer for sklearn Pipeline that alters both X and y

You have to modify the internal code of sklearn Pipeline.

We define a transformer that removes samples where at least the value of a feature or the target is NaN during fitting (fit_transform). While it removes the samples where at least the value of a feature is NaN during inference (transform). Important to note that our transformer returns X and y in fit_transform so we need to handle this behaviour in the sklearn Pipeline.

class Dropna():

    def fit(self, X, y):
        return self

    def fit_transform(self, X, y):
        
        mask = (np.isnan(X).any(-1) | np.isnan(y))
        if hasattr(X, 'loc'):
            X = X.loc[~mask]
        else:
            X = X[~mask]
        if hasattr(y, 'loc'):
            y = y.loc[~mask]
        else:
            y = y[~mask]
        
        return X, y   ###### make fit_transform return X and y
    
    def transform(self, X):
        
        mask = np.isnan(X).any(-1)
        if hasattr(X, 'loc'):
            X = X.loc[~mask]
        else:
            X = X[~mask]
        
        return X

We only have to modify the original sklearn Pipeline in only two specific points in fit and in _fit method. The rest remains unchanged.

from sklearn import pipeline
from sklearn.base import clone
from sklearn.utils import _print_elapsed_time
from sklearn.utils.validation import check_memory

class Pipeline(pipeline.Pipeline):

    def _fit(self, X, y=None, **fit_params_steps):
        self.steps = list(self.steps)
        self._validate_steps()
        memory = check_memory(self.memory)

        fit_transform_one_cached = memory.cache(pipeline._fit_transform_one)

        for (step_idx, name, transformer) in self._iter(
            with_final=False, filter_passthrough=False
        ):
                        
            if transformer is None or transformer == "passthrough":
                with _print_elapsed_time("Pipeline", self._log_message(step_idx)):
                    continue

            try:
                # joblib >= 0.12
                mem = memory.location
            except AttributeError:
                mem = memory.cachedir
            finally:
                cloned_transformer = clone(transformer) if mem else transformer

            X, fitted_transformer = fit_transform_one_cached(
                cloned_transformer,
                X,
                y,
                None,
                message_clsname="Pipeline",
                message=self._log_message(step_idx),
                **fit_params_steps[name],
            )
            
            if isinstance(X, tuple):    ###### unpack X if is tuple X = (X,y)
                X, y = X
            
            self.steps[step_idx] = (name, fitted_transformer)
        
        return X, y
    
    def fit(self, X, y=None, **fit_params):
        fit_params_steps = self._check_fit_params(**fit_params)
        Xt = self._fit(X, y, **fit_params_steps)
        
        if isinstance(Xt, tuple):    ###### unpack X if is tuple X = (X,y)
            Xt, y = Xt 
        
        with _print_elapsed_time("Pipeline", self._log_message(len(self.steps) - 1)):
            if self._final_estimator != "passthrough":
                fit_params_last_step = fit_params_steps[self.steps[-1][0]]
                self._final_estimator.fit(Xt, y, **fit_params_last_step)

        return self

This is required in order to unpack the values generated by Dropna().fit_transform(X, y) in the new X and y.

Here is the full pipeline at work:

from sklearn.linear_model import Ridge

X = np.random.uniform(0,1, (100,3))
y = np.random.uniform(0,1, (100,))
X[np.random.uniform(0,1, (100)) < 0.1] = np.nan
y[np.random.uniform(0,1, (100)) < 0.1] = np.nan

pipe = Pipeline([('dropna', Dropna()), ('model', Ridge())])
pipe.fit(X, y)

pipe.predict(X).shape

Another trial with a further intermediate preprocessing step:

from sklearn.preprocessing import StandardScaler

pipe = Pipeline([('dropna', Dropna()), ('scaler', StandardScaler()), ('model', Ridge())])
pipe.fit(X, y)

pipe.predict(X).shape

More complex behaviors can be achieved with other simple modifications according to the needs. If you are interested also in Pipeline().fit_transform or Pipeline().fit_predict you need to operate the same changes.


The package imblearn, which is built on top of sklearn, contains an estimator FunctionSampler that allows manipulating both the features array, X, and target array, y, in a pipeline step.

Note that using it in a pipeline step requires using the Pipeline class in imblearn that inherits from the one in sklearn. Furthermore, by default, in the context of Pipeline, the method resample does nothing when it is not called immediately after fit (as in fit_resample). So, read the documentation ahead of time.


Modifying the sample axis, e.g. removing samples, does not (yet?) comply with the scikit-learn transformer API. So if you need to do this, you should do it outside any calls to scikit learn, as preprocessing.

As it is now, the transformer API is used to transform the features of a given sample into something new. This can implicitly contain information from other samples, but samples are never deleted.

Another option is to attempt to impute the missing values. But again, if you need to delete samples, treat it as preprocessing before using scikit learn.