Source code for etna.transforms.math.differencing
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from typing import Union
import numpy as np
import pandas as pd
from etna.transforms.base import Transform
from etna.transforms.utils import match_target_quantiles
[docs]class _SingleDifferencingTransform(Transform):
"""Calculate a time series differences of order 1.
This transform can work with NaNs at the beginning of the segment, but fails when meets NaN inside the segment.
Notes
-----
To understand how transform works we recommend:
`Stationarity and Differencing <https://otexts.com/fpp2/stationarity.html>`_
"""
def __init__(
self,
in_column: str,
period: int = 1,
inplace: bool = True,
out_column: Optional[str] = None,
):
"""Create instance of _SingleDifferencingTransform.
Parameters
----------
in_column:
name of processed column
period:
number of steps back to calculate the difference with, it should be >= 1
inplace:
* if True, apply transformation inplace to in_column,
* if False, add transformed column to dataset
out_column:
* if set, name of added column, the final name will be '{out_column}';
* if isn't set, name will be based on ``self.__repr__()``
Raises
------
ValueError:
if period is not integer >= 1
"""
self.in_column = in_column
if not isinstance(period, int) or period < 1:
raise ValueError("Period should be at least 1")
self.period = period
self.inplace = inplace
self.out_column = out_column
self._train_timestamp: Optional[pd.DatetimeIndex] = None
self._train_init_dict: Optional[Dict[str, pd.Series]] = None
self._test_init_df: Optional[pd.DataFrame] = None
def _get_column_name(self) -> str:
if self.out_column is None:
return self.__repr__()
else:
return self.out_column
[docs] def fit(self, df: pd.DataFrame) -> "_SingleDifferencingTransform":
"""Fit the transform.
Parameters
----------
df:
dataframe with data.
Returns
-------
result: _SingleDifferencingTransform
"""
segments = sorted(set(df.columns.get_level_values("segment")))
fit_df = df.loc[:, pd.IndexSlice[segments, self.in_column]].copy()
self._train_timestamp = fit_df.index
self._train_init_dict = {}
for current_segment in segments:
cur_series = fit_df.loc[:, pd.IndexSlice[current_segment, self.in_column]]
cur_series = cur_series.loc[cur_series.first_valid_index() :]
if cur_series.isna().sum() > 0:
raise ValueError(f"There should be no NaNs inside the segments")
self._train_init_dict[current_segment] = cur_series[: self.period]
self._test_init_df = fit_df.iloc[-self.period :, :]
# make multiindex levels consistent
self._test_init_df.columns = self._test_init_df.columns.remove_unused_levels()
return self
[docs] def transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""Make a differencing transformation.
Parameters
----------
df:
dataframe with data to transform.
Returns
-------
result: pd.Dataframe
transformed dataframe
"""
if self._train_init_dict is None or self._test_init_df is None or self._train_timestamp is None:
raise AttributeError("Transform is not fitted")
segments = sorted(set(df.columns.get_level_values("segment")))
transformed = df.loc[:, pd.IndexSlice[segments, self.in_column]].copy()
for current_segment in segments:
to_transform = transformed.loc[:, pd.IndexSlice[current_segment, self.in_column]]
start_idx = to_transform.first_valid_index()
# make a differentiation
transformed.loc[start_idx:, pd.IndexSlice[current_segment, self.in_column]] = to_transform.loc[
start_idx:
].diff(periods=self.period)
if self.inplace:
result_df = df.copy()
result_df.loc[:, pd.IndexSlice[segments, self.in_column]] = transformed
else:
transformed_features = pd.DataFrame(
transformed, columns=df.loc[:, pd.IndexSlice[segments, self.in_column]].columns, index=df.index
)
column_name = self._get_column_name()
transformed_features.columns = pd.MultiIndex.from_product([segments, [column_name]])
result_df = pd.concat((df, transformed_features), axis=1)
result_df = result_df.sort_index(axis=1)
return result_df
def _make_inv_diff(self, to_transform: Union[pd.DataFrame, pd.Series]) -> Union[pd.DataFrame, pd.Series]:
"""Make inverse difference transform."""
for i in range(self.period):
to_transform.iloc[i :: self.period] = to_transform.iloc[i :: self.period].cumsum()
return to_transform
def _reconstruct_train(self, df: pd.DataFrame, columns_to_inverse: Set[str]) -> pd.DataFrame:
"""Reconstruct the train in ``inverse_transform``."""
segments = sorted(set(df.columns.get_level_values("segment")))
result_df = df.copy()
# impute values for reconstruction and run reconstruction
for current_segment in segments:
init_segment = self._train_init_dict[current_segment] # type: ignore
for column in columns_to_inverse:
cur_series = result_df.loc[:, pd.IndexSlice[current_segment, column]]
cur_series[init_segment.index] = init_segment.values
cur_series = self._make_inv_diff(cur_series)
result_df.loc[cur_series.index, pd.IndexSlice[current_segment, column]] = cur_series
return result_df
def _reconstruct_test(self, df: pd.DataFrame, columns_to_inverse: Set[str]) -> pd.DataFrame:
"""Reconstruct the test in ``inverse_transform``."""
segments = sorted(set(df.columns.get_level_values("segment")))
result_df = df.copy()
# check that test is right after the train
expected_min_test_timestamp = pd.date_range(
start=self._test_init_df.index.max(), # type: ignore
periods=2,
freq=pd.infer_freq(self._train_timestamp),
closed="right",
)[0]
if expected_min_test_timestamp != df.index.min():
raise ValueError("Test should go after the train without gaps")
# we can reconstruct the values by concatenating saved fit values before test values
for column in columns_to_inverse:
to_transform = df.loc[:, pd.IndexSlice[segments, column]].copy()
init_df = self._test_init_df.copy() # type: ignore
init_df.columns.set_levels([column], level="feature", inplace=True)
to_transform = pd.concat([init_df, to_transform])
# validate values inside the series to transform
if to_transform.isna().sum().sum() > 0:
raise ValueError(f"There should be no NaNs inside the segments")
# run reconstruction and save the result
to_transform = self._make_inv_diff(to_transform)
result_df.loc[:, pd.IndexSlice[segments, column]] = to_transform
return result_df
[docs] def inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""Apply inverse transformation to DataFrame.
Parameters
----------
df:
DataFrame to apply inverse transform.
Returns
-------
result: pd.DataFrame
transformed DataFrame.
"""
if self._train_init_dict is None or self._test_init_df is None or self._train_timestamp is None:
raise AttributeError("Transform is not fitted")
if not self.inplace:
return df
columns_to_inverse = {self.in_column}
# if we are working with in_column="target" then there can be quantiles to inverse too
if self.in_column == "target":
columns_to_inverse.update(match_target_quantiles(set(df.columns.get_level_values("feature"))))
# determine if we are working with train or test
if self._train_timestamp.shape[0] == df.index.shape[0] and np.all(self._train_timestamp == df.index):
# we are on the train
result_df = self._reconstruct_train(df, columns_to_inverse)
elif df.index.min() > self._train_timestamp.max():
# we are on the test
result_df = self._reconstruct_test(df, columns_to_inverse)
else:
raise ValueError("Inverse transform can be applied only to full train or test that should be in the future")
return result_df
[docs]class DifferencingTransform(Transform):
"""Calculate a time series differences.
This transform can work with NaNs at the beginning of the segment, but fails when meets NaN inside the segment.
Notes
-----
To understand how transform works we recommend:
`Stationarity and Differencing <https://otexts.com/fpp2/stationarity.html>`_
"""
def __init__(
self,
in_column: str,
period: int = 1,
order: int = 1,
inplace: bool = True,
out_column: Optional[str] = None,
):
"""Create instance of DifferencingTransform.
Parameters
----------
in_column:
name of processed column
period:
number of steps back to calculate the difference with, it should be >= 1
order:
number of differences to make, it should be >= 1
inplace:
* if True, apply transformation inplace to in_column,
* if False, add transformed column to dataset
out_column:
* if set, name of added column, the final name will be '{out_column}';
* if isn't set, name will be based on ``self.__repr__()``
Raises
------
ValueError:
if period is not integer >= 1
ValueError:
if order is not integer >= 1
"""
self.in_column = in_column
if not isinstance(period, int) or period < 1:
raise ValueError("Period should be at least 1")
self.period = period
if not isinstance(order, int) or order < 1:
raise ValueError("Order should be at least 1")
self.order = order
self.inplace = inplace
self.out_column = out_column
# add differencing transforms for each order
result_out_column = self._get_column_name()
self._differencing_transforms: List[_SingleDifferencingTransform] = []
# first transform should work like this transform but with prepared out_column name
self._differencing_transforms.append(
_SingleDifferencingTransform(
in_column=self.in_column, period=self.period, inplace=self.inplace, out_column=result_out_column
)
)
# other transforms should make differences inplace
for _ in range(self.order - 1):
self._differencing_transforms.append(
_SingleDifferencingTransform(in_column=result_out_column, period=self.period, inplace=True)
)
def _get_column_name(self) -> str:
if self.inplace:
return self.in_column
if self.out_column is None:
return self.__repr__()
else:
return self.out_column
[docs] def fit(self, df: pd.DataFrame) -> "DifferencingTransform":
"""Fit the transform.
Parameters
----------
df:
dataframe with data.
Returns
-------
result: DifferencingTransform
"""
# this is made because transforms of high order may need some columns created by transforms of lower order
result_df = df.copy()
for transform in self._differencing_transforms:
result_df = transform.fit_transform(result_df)
return self
[docs] def transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""Make a differencing transformation.
Parameters
----------
df:
dataframe with data to transform.
Returns
-------
result: pd.Dataframe
transformed dataframe
"""
result_df = df.copy()
for transform in self._differencing_transforms:
result_df = transform.transform(result_df)
return result_df
[docs] def inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""Apply inverse transformation to DataFrame.
Parameters
----------
df:
DataFrame to apply inverse transform.
Returns
-------
result: pd.DataFrame
transformed DataFrame.
"""
if not self.inplace:
return df
result_df = df.copy()
for transform in self._differencing_transforms[::-1]:
result_df = transform.inverse_transform(result_df)
return result_df