Source code for etna.transforms.math.statistics

from abc import ABC
from abc import abstractmethod
from typing import Optional

import bottleneck as bn
import numpy as np
import pandas as pd

from etna.transforms.base import Transform


[docs]class WindowStatisticsTransform(Transform, ABC): """WindowStatisticsTransform handles computation of statistical features on windows.""" def __init__( self, in_column: str, out_column: str, window: int, seasonality: int = 1, min_periods: int = 1, fillna: float = 0, **kwargs, ): """Init WindowStatisticsTransform. Parameters ---------- in_column: str name of processed column out_column: str result column name window: int size of window to aggregate, if -1 is set all history is used seasonality: int seasonality of lags to compute window's aggregation with min_periods: int min number of targets in window to compute aggregation; if there is less than ``min_periods`` number of targets return None fillna: float value to fill results NaNs with """ self.in_column = in_column self.out_column_name = out_column self.window = window self.seasonality = seasonality self.min_periods = min_periods self.fillna = fillna self.kwargs = kwargs
[docs] def fit(self, *args) -> "WindowStatisticsTransform": """Fits transform.""" return self
@abstractmethod def _aggregate(self, series: np.ndarray) -> np.ndarray: """Aggregate targets from given series.""" pass
[docs] def transform(self, df: pd.DataFrame) -> pd.DataFrame: """Compute feature's value. Parameters ---------- df: pd.DataFrame dataframe to generate features for Returns ------- result: pd.DataFrame dataframe with results """ history = self.seasonality * self.window if self.window != -1 else len(df) segments = sorted(df.columns.get_level_values("segment").unique()) x = df.loc[pd.IndexSlice[:], pd.IndexSlice[segments, self.in_column]].values[::-1] # Addend NaNs to obtain a window of length "history" for each point x = np.append(x, np.empty((history - 1, x.shape[1])) * np.nan, axis=0) isnan = np.isnan(x) isnan = np.lib.stride_tricks.sliding_window_view(isnan, window_shape=(history, 1))[:, :, :: self.seasonality] isnan = np.squeeze(isnan, axis=-1) # (len(df), n_segments, window) non_nan_per_window_counts = bn.nansum(~isnan, axis=2) # (len(df), n_segments) x = np.lib.stride_tricks.sliding_window_view(x, window_shape=(history, 1))[:, :, :: self.seasonality] x = np.squeeze(x, axis=-1) # (len(df), n_segments, window) y = self._aggregate(series=x) # (len(df), n_segments) y[non_nan_per_window_counts < self.min_periods] = np.nan y = np.nan_to_num(y, copy=False, nan=self.fillna)[::-1] result = df.join( pd.DataFrame(y, columns=pd.MultiIndex.from_product([segments, [self.out_column_name]]), index=df.index) ) result = result.sort_index(axis=1) return result
[docs]class MeanTransform(WindowStatisticsTransform): """MeanTransform computes average value for given window. .. math:: MeanTransform(x_t) = \\sum_{i=1}^{window}{x_{t - i}\\cdot\\alpha^{i - 1}} """ def __init__( self, in_column: str, window: int, seasonality: int = 1, alpha: float = 1, min_periods: int = 1, fillna: float = 0, out_column: Optional[str] = None, ): """Init MeanTransform. Parameters ---------- in_column: str name of processed column window: int size of window to aggregate seasonality: int seasonality of lags to compute window's aggregation with alpha: float autoregressive coefficient min_periods: int min number of targets in window to compute aggregation; if there is less than ``min_periods`` number of targets return None fillna: float value to fill results NaNs with out_column: str, optional result column name. If not given use ``self.__repr__()`` """ self.window = window self.in_column = in_column self.seasonality = seasonality self.alpha = alpha self.min_periods = min_periods self.fillna = fillna self.out_column = out_column self._alpha_range: Optional[np.ndarray] = None super().__init__( in_column=in_column, window=window, seasonality=seasonality, min_periods=min_periods, out_column=self.out_column if self.out_column is not None else self.__repr__(), fillna=fillna, )
[docs] def transform(self, df: pd.DataFrame) -> pd.DataFrame: """Compute feature's value. Parameters ---------- df: pd.DataFrame dataframe to generate features for Returns ------- result: pd.DataFrame dataframe with results """ window = self.window if self.window != -1 else len(df) self._alpha_range = np.array([self.alpha ** i for i in range(window)]) self._alpha_range = np.expand_dims(self._alpha_range, axis=0) # (1, window) return super().transform(df)
def _aggregate(self, series: np.ndarray) -> np.ndarray: """Compute weighted average for window series.""" mean = np.zeros((series.shape[0], series.shape[1])) for segment in range(mean.shape[1]): # Loop prevents from memory overflow, 3d tensor is materialized after multiplication mean[:, segment] = bn.nanmean(series[:, segment] * self._alpha_range, axis=1) return mean
[docs]class StdTransform(WindowStatisticsTransform): """StdTransform computes std value for given window. Notes ----- Note that ``pd.Series([1]).std()`` is ``np.nan``. """ def __init__( self, in_column: str, window: int, seasonality: int = 1, min_periods: int = 1, fillna: float = 0, out_column: Optional[str] = None, ddof: int = 1, ): """Init StdTransform. Parameters ---------- in_column: str name of processed column window: int size of window to aggregate seasonality: int seasonality of lags to compute window's aggregation with min_periods: int min number of targets in window to compute aggregation; if there is less than ``min_periods`` number of targets return None fillna: float value to fill results NaNs with out_column: str, optional result column name. If not given use ``self.__repr__()`` ddof: delta degrees of freedom; the divisor used in calculations is N - ddof, where N is the number of elements """ self.in_column = in_column self.window = window self.seasonality = seasonality self.min_periods = min_periods self.fillna = fillna self.out_column = out_column self.ddof = ddof super().__init__( window=window, in_column=in_column, seasonality=seasonality, min_periods=min_periods, out_column=self.out_column if self.out_column is not None else self.__repr__(), fillna=fillna, ) def _aggregate(self, series: np.ndarray) -> np.ndarray: """Compute std over the series.""" series = bn.nanstd(series, axis=2, ddof=self.ddof) return series
[docs]class QuantileTransform(WindowStatisticsTransform): """QuantileTransform computes quantile value for given window.""" def __init__( self, in_column: str, quantile: float, window: int, seasonality: int = 1, min_periods: int = 1, fillna: float = 0, out_column: Optional[str] = None, ): """Init QuantileTransform. Parameters ---------- in_column: str name of processed column quantile: float quantile to calculate window: int size of window to aggregate seasonality: int seasonality of lags to compute window's aggregation with min_periods: int min number of targets in window to compute aggregation; if there is less than ``min_periods`` number of targets return None fillna: float value to fill results NaNs with out_column: str, optional result column name. If not given use ``self.__repr__()`` """ self.in_column = in_column self.quantile = quantile self.window = window self.seasonality = seasonality self.min_periods = min_periods self.fillna = fillna self.out_column = out_column super().__init__( in_column=in_column, window=window, seasonality=seasonality, min_periods=min_periods, out_column=self.out_column if self.out_column is not None else self.__repr__(), fillna=fillna, ) def _aggregate(self, series: np.ndarray) -> np.ndarray: """Compute quantile over the series.""" # There is no "nanquantile" in bottleneck, "apply_along_axis" can't be replace with "axis=2" series = np.apply_along_axis(np.nanquantile, axis=2, arr=series, q=self.quantile) return series
[docs]class MinTransform(WindowStatisticsTransform): """MinTransform computes min value for given window.""" def __init__( self, in_column: str, window: int, seasonality: int = 1, min_periods: int = 1, fillna: float = 0, out_column: Optional[str] = None, ): """Init MinTransform. Parameters ---------- in_column: str name of processed column window: int size of window to aggregate seasonality: int seasonality of lags to compute window's aggregation with min_periods: int min number of targets in window to compute aggregation; if there is less than ``min_periods`` number of targets return None fillna: float value to fill results NaNs with out_column: str, optional result column name. If not given use ``self.__repr__()`` """ self.in_column = in_column self.window = window self.seasonality = seasonality self.min_periods = min_periods self.fillna = fillna self.out_column = out_column super().__init__( window=window, in_column=in_column, seasonality=seasonality, min_periods=min_periods, out_column=self.out_column if self.out_column is not None else self.__repr__(), fillna=fillna, ) def _aggregate(self, series: np.ndarray) -> np.ndarray: """Compute min over the series.""" series = bn.nanmin(series, axis=2) return series
[docs]class MaxTransform(WindowStatisticsTransform): """MaxTransform computes max value for given window.""" def __init__( self, in_column: str, window: int, seasonality: int = 1, min_periods: int = 1, fillna: float = 0, out_column: Optional[str] = None, ): """Init MaxTransform. Parameters ---------- in_column: str name of processed column window: int size of window to aggregate seasonality: int seasonality of lags to compute window's aggregation with min_periods: int min number of targets in window to compute aggregation; if there is less than ``min_periods`` number of targets return None fillna: float value to fill results NaNs with out_column: str, optional result column name. If not given use ``self.__repr__()`` """ self.in_column = in_column self.window = window self.seasonality = seasonality self.min_periods = min_periods self.fillna = fillna self.out_column = out_column super().__init__( window=window, in_column=in_column, seasonality=seasonality, min_periods=min_periods, out_column=self.out_column if self.out_column is not None else self.__repr__(), fillna=fillna, ) def _aggregate(self, series: np.ndarray) -> np.ndarray: """Compute max over the series.""" series = bn.nanmax(series, axis=2) return series
[docs]class MedianTransform(WindowStatisticsTransform): """MedianTransform computes median value for given window.""" def __init__( self, in_column: str, window: int, seasonality: int = 1, min_periods: int = 1, fillna: float = 0, out_column: Optional[str] = None, ): """Init MedianTransform. Parameters ---------- in_column: str name of processed column window: int size of window to aggregate seasonality: int seasonality of lags to compute window's aggregation with min_periods: int min number of targets in window to compute aggregation; if there is less than ``min_periods`` number of targets return None fillna: float value to fill results NaNs with out_column: str, optional result column name. If not given use ``self.__repr__()`` """ self.in_column = in_column self.window = window self.seasonality = seasonality self.min_periods = min_periods self.fillna = fillna self.out_column = out_column super().__init__( window=window, in_column=in_column, seasonality=seasonality, min_periods=min_periods, out_column=self.out_column if self.out_column is not None else self.__repr__(), fillna=fillna, ) def _aggregate(self, series: np.ndarray) -> np.ndarray: """Compute median over the series.""" series = bn.nanmedian(series, axis=2) return series
[docs]class MADTransform(WindowStatisticsTransform): """MADTransform computes Mean Absolute Deviation over the window.""" def __init__( self, in_column: str, window: int, seasonality: int = 1, min_periods: int = 1, fillna: float = 0, out_column: Optional[str] = None, ): """Init MADTransform. Parameters ---------- in_column: str name of processed column window: int size of window to aggregate seasonality: int seasonality of lags to compute window's aggregation with min_periods: int min number of targets in window to compute aggregation; if there is less than ``min_periods`` number of targets return None fillna: float value to fill results NaNs with out_column: str, optional result column name. If not given use ``self.__repr__()`` """ self.in_column = in_column self.window = window self.seasonality = seasonality self.min_periods = min_periods self.fillna = fillna self.out_column = out_column super().__init__( window=window, in_column=in_column, seasonality=seasonality, min_periods=min_periods, out_column=self.out_column if self.out_column is not None else self.__repr__(), fillna=fillna, ) def _aggregate(self, series: np.ndarray) -> np.ndarray: """Compute MAD over the series.""" mean = bn.nanmean(series, axis=2) mean = np.expand_dims(mean, axis=-1) # (len(df), n_segments, 1) mad = np.zeros((series.shape[0], series.shape[1])) for segment in range(mad.shape[1]): # Loop prevents from memory overflow, 3d tensor is materialized after multiplication ad = np.abs(series[:, segment] - mean[:, segment]) mad[:, segment] = bn.nanmean(ad, axis=1) return mad
__all__ = [ "MedianTransform", "MaxTransform", "MinTransform", "QuantileTransform", "StdTransform", "MeanTransform", "WindowStatisticsTransform", "MADTransform", ]