Source code for etna.transforms.nn.pytorch_forecasting
import inspect
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
import pandas as pd
from sklearn.preprocessing import RobustScaler
from sklearn.preprocessing import StandardScaler
from etna import SETTINGS
from etna.datasets.tsdataset import TSDataset
from etna.transforms.base import Transform
if SETTINGS.torch_required:
from pytorch_forecasting.data import TimeSeriesDataSet
from pytorch_forecasting.data.encoders import EncoderNormalizer
from pytorch_forecasting.data.encoders import NaNLabelEncoder
from pytorch_forecasting.data.encoders import TorchNormalizer
else:
TimeSeriesDataSet = None # type: ignore
EncoderNormalizer = None # type: ignore
NaNLabelEncoder = None # type: ignore
TorchNormalizer = None # type: ignore
NORMALIZER = Union[TorchNormalizer, NaNLabelEncoder, EncoderNormalizer]
[docs]class PytorchForecastingTransform(Transform):
"""Transform for models from PytorchForecasting library.
Notes
-----
This transform should be added at the very end of ``transforms`` parameter.
"""
def __init__(
self,
max_encoder_length: int = 30,
min_encoder_length: Optional[int] = None,
min_prediction_idx: Optional[int] = None,
min_prediction_length: Optional[int] = None,
max_prediction_length: int = 1,
static_categoricals: Optional[List[str]] = None,
static_reals: Optional[List[str]] = None,
time_varying_known_categoricals: Optional[List[str]] = None,
time_varying_known_reals: Optional[List[str]] = None,
time_varying_unknown_categoricals: Optional[List[str]] = None,
time_varying_unknown_reals: Optional[List[str]] = None,
variable_groups: Optional[Dict[str, List[int]]] = None,
constant_fill_strategy: Optional[Dict[str, Union[str, float, int, bool]]] = None,
allow_missing_timesteps: bool = True,
lags: Optional[Dict[str, List[int]]] = None,
add_relative_time_idx: bool = True,
add_target_scales: bool = True,
add_encoder_length: Union[bool, str] = True,
target_normalizer: Union[NORMALIZER, str, List[NORMALIZER], Tuple[NORMALIZER]] = "auto",
categorical_encoders: Optional[Dict[str, NaNLabelEncoder]] = None,
scalers: Optional[Dict[str, Union[StandardScaler, RobustScaler, TorchNormalizer, EncoderNormalizer]]] = None,
):
"""Init transform.
Parameters here is used for initialization of :py:class:`pytorch_forecasting.data.timeseries.TimeSeriesDataSet` object.
"""
super().__init__()
self.max_encoder_length = max_encoder_length
self.min_encoder_length = min_encoder_length
self.min_prediction_idx = min_prediction_idx
self.min_prediction_length = min_prediction_length
self.max_prediction_length = max_prediction_length
self.static_categoricals = static_categoricals if static_categoricals else []
self.static_reals = static_reals if static_reals else []
self.time_varying_known_categoricals = (
time_varying_known_categoricals if time_varying_known_categoricals else []
)
self.time_varying_known_reals = time_varying_known_reals if time_varying_known_reals else []
self.time_varying_unknown_categoricals = (
time_varying_unknown_categoricals if time_varying_unknown_categoricals else []
)
self.time_varying_unknown_reals = time_varying_unknown_reals if time_varying_unknown_reals else []
self.variable_groups = variable_groups if variable_groups else {}
self.add_relative_time_idx = add_relative_time_idx
self.add_target_scales = add_target_scales
self.add_encoder_length = add_encoder_length
self.allow_missing_timesteps = allow_missing_timesteps
self.target_normalizer = target_normalizer
self.categorical_encoders = categorical_encoders if categorical_encoders else {}
self.constant_fill_strategy = constant_fill_strategy if constant_fill_strategy else []
self.lags = lags if lags else {}
self.scalers = scalers if scalers else {}
self.pf_dataset_predict: Optional[TimeSeriesDataSet] = None
[docs] def fit(self, df: pd.DataFrame) -> "PytorchForecastingTransform":
"""
Fit TimeSeriesDataSet.
Parameters
----------
df:
data to be fitted.
Returns
-------
PytorchForecastingTransform
"""
self.freq = pd.infer_freq(df.index)
ts = TSDataset(df, self.freq)
df_flat = ts.to_pandas(flatten=True)
df_flat = df_flat.dropna()
self.min_timestamp = df_flat.timestamp.min()
if self.time_varying_known_categoricals:
for feature_name in self.time_varying_known_categoricals:
df_flat[feature_name] = df_flat[feature_name].astype(str)
# making time_idx feature.
# it's needed for pytorch-forecasting for proper train-test split.
# it should be incremented by 1 for every new timestamp.
df_flat["time_idx"] = (df_flat["timestamp"] - self.min_timestamp) // pd.Timedelta("1s")
encoded_unix_times = self._time_encoder(list(df_flat.time_idx.unique()))
df_flat["time_idx"] = df_flat["time_idx"].apply(lambda x: encoded_unix_times[x])
pf_dataset = TimeSeriesDataSet(
df_flat,
time_idx="time_idx",
target="target",
group_ids=["segment"],
time_varying_known_reals=self.time_varying_known_reals,
time_varying_known_categoricals=self.time_varying_known_categoricals,
time_varying_unknown_reals=self.time_varying_unknown_reals,
max_encoder_length=self.max_encoder_length,
max_prediction_length=self.max_prediction_length,
min_encoder_length=self.min_encoder_length,
min_prediction_length=self.min_prediction_length,
add_relative_time_idx=self.add_relative_time_idx,
add_target_scales=self.add_target_scales,
add_encoder_length=self.add_encoder_length,
allow_missing_timesteps=self.allow_missing_timesteps,
target_normalizer=self.target_normalizer,
static_categoricals=self.static_categoricals,
min_prediction_idx=self.min_prediction_idx,
variable_groups=self.variable_groups,
constant_fill_strategy=self.constant_fill_strategy,
lags=self.lags,
categorical_encoders=self.categorical_encoders,
scalers=self.scalers,
)
self.pf_dataset_params = pf_dataset.get_parameters()
return self
[docs] def transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Transform raw df to TimeSeriesDataSet.
Parameters
----------
df:
data to be transformed.
Returns
-------
DataFrame
Notes
-----
We save TimeSeriesDataSet in instance to use it in the model.
It`s not right pattern of using Transforms and TSDataset.
"""
ts = TSDataset(df, self.freq)
df_flat = ts.to_pandas(flatten=True)
df_flat = df_flat[df_flat.timestamp >= self.min_timestamp]
df_flat["target"] = df_flat["target"].fillna(0)
df_flat["time_idx"] = (df_flat["timestamp"] - self.min_timestamp) // pd.Timedelta("1s")
encoded_unix_times = self._time_encoder(list(df_flat.time_idx.unique()))
df_flat["time_idx"] = df_flat["time_idx"].apply(lambda x: encoded_unix_times[x])
if self.time_varying_known_categoricals:
for feature_name in self.time_varying_known_categoricals:
df_flat[feature_name] = df_flat[feature_name].astype(str)
if inspect.stack()[1].function == "make_future":
pf_dataset_predict = TimeSeriesDataSet.from_parameters(
self.pf_dataset_params, df_flat, predict=True, stop_randomization=True
)
self.pf_dataset_predict = pf_dataset_predict
else:
pf_dataset_train = TimeSeriesDataSet.from_parameters(self.pf_dataset_params, df_flat)
self.pf_dataset_train = pf_dataset_train
return df
def _time_encoder(self, values: List[int]) -> Dict[int, int]:
encoded_unix_times = dict()
for idx, value in enumerate(sorted(values)):
encoded_unix_times[value] = idx
return encoded_unix_times