Source code for etna.transforms.decomposition.change_points_trend
from copy import deepcopy
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Type
import numpy as np
import pandas as pd
from ruptures.base import BaseEstimator
from sklearn.base import RegressorMixin
from etna.analysis.change_points_trend.search import _find_change_points_segment
from etna.transforms.base import PerSegmentWrapper
from etna.transforms.base import Transform
from etna.transforms.utils import match_target_quantiles
TTimestampInterval = Tuple[pd.Timestamp, pd.Timestamp]
TDetrendModel = Type[RegressorMixin]
[docs]class _OneSegmentChangePointsTrendTransform(Transform):
"""_OneSegmentChangePointsTransform subtracts multiple linear trend from series."""
def __init__(
self,
in_column: str,
change_point_model: BaseEstimator,
detrend_model: TDetrendModel,
**change_point_model_predict_params,
):
"""Init _OneSegmentChangePointsTrendTransform.
Parameters
----------
in_column:
name of column to apply transform to
change_point_model:
model to get trend change points
detrend_model:
model to get trend in data
change_point_model_predict_params:
params for ``change_point_model.predict`` method
"""
self.in_column = in_column
self.out_columns = in_column
self.change_point_model = change_point_model
self.detrend_model = detrend_model
self.per_interval_models: Optional[Dict[TTimestampInterval, TDetrendModel]] = None
self.intervals: Optional[List[TTimestampInterval]] = None
self.change_point_model_predict_params = change_point_model_predict_params
@staticmethod
def _build_trend_intervals(change_points: List[pd.Timestamp]) -> List[TTimestampInterval]:
"""Create list of stable trend intervals from list of change points."""
change_points = sorted(change_points)
left_border = pd.Timestamp.min
intervals = []
for point in change_points:
right_border = point
intervals.append((left_border, right_border))
left_border = right_border
intervals.append((left_border, pd.Timestamp.max))
return intervals
def _init_detrend_models(
self, intervals: List[TTimestampInterval]
) -> Dict[Tuple[pd.Timestamp, pd.Timestamp], TDetrendModel]:
"""Create copy of detrend model for each timestamp interval."""
per_interval_models = {interval: deepcopy(self.detrend_model) for interval in intervals}
return per_interval_models
def _get_timestamps(self, series: pd.Series) -> np.ndarray:
"""Convert ETNA timestamp-index to a list of timestamps to fit regression models."""
timestamps = series.index
timestamps = np.array([[ts.timestamp()] for ts in timestamps])
return timestamps
def _fit_per_interval_model(self, series: pd.Series):
"""Fit per-interval models with corresponding data from series."""
if self.intervals is None or self.per_interval_models is None:
raise ValueError("Something went wrong on fit! Check the parameters of the transform.")
for interval in self.intervals:
tmp_series = series[interval[0] : interval[1]]
x = self._get_timestamps(series=tmp_series)
y = tmp_series.values
self.per_interval_models[interval].fit(x, y)
def _predict_per_interval_model(self, series: pd.Series) -> pd.Series:
"""Apply per-interval detrending to series."""
if self.intervals is None or self.per_interval_models is None:
raise ValueError("Transform is not fitted! Fit the Transform before calling transform method.")
trend_series = pd.Series(index=series.index)
for interval in self.intervals:
tmp_series = series[interval[0] : interval[1]]
if tmp_series.empty:
continue
x = self._get_timestamps(series=tmp_series)
trend = self.per_interval_models[interval].predict(x)
trend_series[tmp_series.index] = trend
return trend_series
[docs] def fit(self, df: pd.DataFrame) -> "_OneSegmentChangePointsTrendTransform":
"""Fit OneSegmentChangePointsTransform: find trend change points in ``df``, fit detrend models with data from intervals of stable trend.
Parameters
----------
df:
one segment dataframe indexed with timestamp
Returns
-------
:
"""
series = df.loc[df[self.in_column].first_valid_index() : df[self.in_column].last_valid_index(), self.in_column]
if series.isnull().values.any():
raise ValueError("The input column contains NaNs in the middle of the series! Try to use the imputer.")
change_points = _find_change_points_segment(
series=series, change_point_model=self.change_point_model, **self.change_point_model_predict_params
)
self.intervals = self._build_trend_intervals(change_points=change_points)
self.per_interval_models = self._init_detrend_models(intervals=self.intervals)
self._fit_per_interval_model(series=series)
return self
[docs] def transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""Split df to intervals of stable trend and subtract trend from each one.
Parameters
----------
df:
one segment dataframe to subtract trend
Returns
-------
detrended df: pd.DataFrame
df with detrended in_column series
"""
df._is_copy = False
series = df[self.in_column]
trend_series = self._predict_per_interval_model(series=series)
df.loc[:, self.in_column] -= trend_series
return df
[docs] def inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""Split df to intervals of stable trend according to previous change point detection and add trend to each one.
Parameters
----------
df:
one segment dataframe to turn trend back
Returns
-------
df: pd.DataFrame
df with restored trend in in_column
"""
df._is_copy = False
series = df[self.in_column]
trend_series = self._predict_per_interval_model(series=series)
df.loc[:, self.in_column] += trend_series
if self.in_column == "target":
quantiles = match_target_quantiles(set(df.columns))
for quantile_column_nm in quantiles:
df.loc[:, quantile_column_nm] += trend_series
return df
[docs]class ChangePointsTrendTransform(PerSegmentWrapper):
"""ChangePointsTrendTransform subtracts multiple linear trend from series.
Warning
-------
This transform can suffer from look-ahead bias. For transforming data at some timestamp
it uses information from the whole train part.
"""
def __init__(
self,
in_column: str,
change_point_model: BaseEstimator,
detrend_model: TDetrendModel,
**change_point_model_predict_params,
):
"""Init ChangePointsTrendTransform.
Parameters
----------
in_column:
name of column to apply transform to
change_point_model:
model to get trend change points
detrend_model:
model to get trend in data
change_point_model_predict_params:
params for ``change_point_model.predict`` method
"""
self.in_column = in_column
self.change_point_model = change_point_model
self.detrend_model = detrend_model
self.change_point_model_predict_params = change_point_model_predict_params
super().__init__(
transform=_OneSegmentChangePointsTrendTransform(
in_column=self.in_column,
change_point_model=self.change_point_model,
detrend_model=self.detrend_model,
**self.change_point_model_predict_params,
)
)