Source code for etna.transforms.math.sklearn
import warnings
from copy import deepcopy
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
import numpy as np
import pandas as pd
from sklearn.base import TransformerMixin
from etna.core import StringEnumWithRepr
from etna.transforms.base import Transform
from etna.transforms.utils import match_target_quantiles
[docs]class TransformMode(StringEnumWithRepr):
"""Enum for different metric aggregation modes."""
macro = "macro"
per_segment = "per-segment"
[docs]class SklearnTransform(Transform):
"""Base class for different sklearn transforms."""
def __init__(
self,
in_column: Optional[Union[str, List[str]]],
out_column: Optional[str],
transformer: TransformerMixin,
inplace: bool = True,
mode: Union[TransformMode, str] = "per-segment",
):
"""
Init SklearnTransform.
Parameters
----------
in_column:
columns to be transformed, if None - all columns will be transformed.
transformer:
:py:class:`sklearn.base.TransformerMixin` instance.
inplace:
features are changed by transformed.
out_column:
base for the names of generated columns, uses ``self.__repr__()`` if not given.
mode:
"macro" or "per-segment", way to transform features over segments.
* If "macro", transforms features globally, gluing the corresponding ones for all segments.
* If "per-segment", transforms features for each segment separately.
Raises
------
ValueError:
if incorrect mode given
"""
if inplace and (out_column is not None):
warnings.warn("Transformation will be applied inplace, out_column param will be ignored")
self.transformer = transformer
if isinstance(in_column, str):
in_column = [in_column]
self.in_column = in_column if in_column is None else sorted(in_column)
self.inplace = inplace
self.mode = TransformMode(mode)
self.out_column = out_column
self.out_columns: Optional[List[str]] = None
def _get_column_name(self, in_column: str) -> str:
if self.out_column is None:
new_transform = deepcopy(self)
new_transform.in_column = [in_column]
return repr(new_transform)
else:
return f"{self.out_column}_{in_column}"
[docs] def fit(self, df: pd.DataFrame) -> "SklearnTransform":
"""
Fit transformer with data from df.
Parameters
----------
df:
DataFrame to fit transformer.
Returns
-------
:
"""
segments = sorted(set(df.columns.get_level_values("segment")))
if self.in_column is None:
self.in_column = sorted(set(df.columns.get_level_values("feature")))
if self.inplace:
self.out_columns = self.in_column
else:
self.out_columns = [self._get_column_name(column) for column in self.in_column]
if self.mode == TransformMode.per_segment:
x = df.loc[:, (segments, self.in_column)].values
elif self.mode == TransformMode.macro:
x = self._reshape(df)
else:
raise ValueError(f"'{self.mode}' is not a valid TransformMode.")
self.transformer.fit(X=x)
return self
[docs] def transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Transform given data with fitted transformer.
Parameters
----------
df:
DataFrame to transform with transformer.
Returns
-------
:
transformed DataFrame.
"""
segments = sorted(set(df.columns.get_level_values("segment")))
if self.mode == TransformMode.per_segment:
x = df.loc[:, (segments, self.in_column)].values
transformed = self.transformer.transform(X=x)
elif self.mode == TransformMode.macro:
x = self._reshape(df)
transformed = self.transformer.transform(X=x)
transformed = self._inverse_reshape(df, transformed)
else:
raise ValueError(f"'{self.mode}' is not a valid TransformMode.")
if self.inplace:
df.loc[:, (segments, self.in_column)] = transformed
else:
transformed_features = pd.DataFrame(
transformed, columns=df.loc[:, (segments, self.in_column)].columns, index=df.index
)
transformed_features.columns = pd.MultiIndex.from_product([segments, self.out_columns])
df = pd.concat((df, transformed_features), axis=1)
df = df.sort_index(axis=1)
return df
[docs] def inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Apply inverse transformation to DataFrame.
Parameters
----------
df:
DataFrame to apply inverse transform.
Returns
-------
:
transformed DataFrame.
"""
segments = sorted(set(df.columns.get_level_values("segment")))
if self.in_column is None:
raise ValueError("Transform is not fitted yet.")
if "target" in self.in_column:
quantiles = match_target_quantiles(set(df.columns.get_level_values("feature")))
else:
quantiles = set()
if self.inplace:
quantiles_arrays: Dict[str, pd.DataFrame] = dict()
if self.mode == TransformMode.per_segment:
x = df.loc[:, (segments, self.in_column)].values
transformed = self.transformer.inverse_transform(X=x)
# quantiles inverse transformation
for quantile_column_nm in quantiles:
df_slice_copy = df.loc[:, (segments, self.in_column)].copy()
df_slice_copy.loc[:, (segments, "target")] = df.loc[:, (segments, quantile_column_nm)].values
df_slice_copy.loc[:, (segments, self.in_column)] = self.transformer.inverse_transform(
X=df_slice_copy
)
quantiles_arrays[quantile_column_nm] = df_slice_copy.loc[:, (segments, "target")].rename(
columns={"target": quantile_column_nm}
)
elif self.mode == TransformMode.macro:
x = self._reshape(df)
transformed = self.transformer.inverse_transform(X=x)
transformed = self._inverse_reshape(df, transformed)
# quantiles inverse transformation
for quantile_column_nm in quantiles:
df_slice_copy = df.loc[:, (segments, self.in_column)].copy()
df_slice_copy.loc[:, (segments, "target")] = df.loc[:, (segments, quantile_column_nm)].values
df_slice_copy_reshaped_array = self._reshape(df_slice_copy)
transformed_ = self.transformer.inverse_transform(X=df_slice_copy_reshaped_array)
df_slice_copy.loc[:, (segments, self.in_column)] = self._inverse_reshape(
df_slice_copy, transformed_
)
quantiles_arrays[quantile_column_nm] = df_slice_copy.loc[:, (segments, "target")].rename(
columns={"target": quantile_column_nm}
)
else:
raise ValueError(f"'{self.mode}' is not a valid TransformMode.")
df.loc[:, (segments, self.in_column)] = transformed
for quantile_column_nm in quantiles:
df.loc[:, (segments, quantile_column_nm)] = quantiles_arrays[quantile_column_nm].values
return df
def _reshape(self, df: pd.DataFrame) -> np.ndarray:
segments = sorted(set(df.columns.get_level_values("segment")))
x = df.loc[:, (segments, self.in_column)]
x = pd.concat([x[segment] for segment in segments]).values
return x
def _inverse_reshape(self, df: pd.DataFrame, transformed: np.ndarray) -> np.ndarray:
time_period_len = len(df)
n_segments = len(set(df.columns.get_level_values("segment")))
transformed = np.concatenate(
[transformed[i * time_period_len : (i + 1) * time_period_len, :] for i in range(n_segments)], axis=1
)
return transformed