Source code for etna.pipeline.autoregressive_pipeline

import warnings
from typing import Sequence

import pandas as pd

from etna.datasets import TSDataset
from etna.models.base import BaseModel
from etna.pipeline.base import BasePipeline
from etna.transforms import Transform


[docs]class AutoRegressivePipeline(BasePipeline): """Pipeline that make regressive models autoregressive. Examples -------- >>> from etna.datasets import generate_periodic_df >>> from etna.datasets import TSDataset >>> from etna.models import LinearPerSegmentModel >>> from etna.transforms import LagTransform >>> classic_df = generate_periodic_df( ... periods=100, ... start_time="2020-01-01", ... n_segments=4, ... period=7, ... sigma=3 ... ) >>> df = TSDataset.to_dataset(df=classic_df) >>> ts = TSDataset(df, freq="D") >>> horizon = 7 >>> transforms = [ ... LagTransform(in_column="target", lags=list(range(1, horizon+1))) ... ] >>> model = LinearPerSegmentModel() >>> pipeline = AutoRegressivePipeline(model, horizon, transforms, step=1) >>> _ = pipeline.fit(ts=ts) >>> forecast = pipeline.forecast() >>> pd.options.display.float_format = '{:,.2f}'.format >>> forecast[:, :, "target"] segment segment_0 segment_1 segment_2 segment_3 feature target target target target timestamp 2020-04-10 9.00 9.00 4.00 6.00 2020-04-11 5.00 2.00 7.00 9.00 2020-04-12 0.00 4.00 7.00 9.00 2020-04-13 0.00 5.00 9.00 7.00 2020-04-14 1.00 2.00 1.00 6.00 2020-04-15 5.00 7.00 4.00 7.00 2020-04-16 8.00 6.00 2.00 0.00 """ def __init__(self, model: BaseModel, horizon: int, transforms: Sequence[Transform] = (), step: int = 1): """ Create instance of AutoRegressivePipeline with given parameters. Parameters ---------- model: Instance of the etna Model horizon: Number of timestamps in the future for forecasting transforms: Sequence of the transforms step: Size of prediction for one step of forecasting """ self.model = model self.transforms = transforms self.step = step super().__init__(horizon=horizon)
[docs] def fit(self, ts: TSDataset) -> "AutoRegressivePipeline": """Fit the AutoRegressivePipeline. Fit and apply given transforms to the data, then fit the model on the transformed data. Parameters ---------- ts: Dataset with timeseries data Returns ------- : Fitted Pipeline instance """ self.ts = ts ts.fit_transform(self.transforms) self.model.fit(ts) self.ts.inverse_transform() return self
def _create_predictions_template(self) -> pd.DataFrame: """Create dataframe to fill with forecasts.""" if self.ts is None: raise ValueError( "AutoRegressivePipeline is not fitted! Fit the AutoRegressivePipeline before calling forecast method." ) prediction_df = self.ts[:, :, "target"] future_dates = pd.date_range( start=prediction_df.index.max(), periods=self.horizon + 1, freq=self.ts.freq, closed="right" ) prediction_df = prediction_df.reindex(prediction_df.index.append(future_dates)) prediction_df.index.name = "timestamp" return prediction_df def _forecast(self) -> TSDataset: """Make predictions.""" if self.ts is None: raise ValueError("Something went wrong, ts is None!") prediction_df = self._create_predictions_template() for idx_start in range(0, self.horizon, self.step): current_step = min(self.step, self.horizon - idx_start) current_idx_border = self.ts.index.shape[0] + idx_start current_ts = TSDataset( df=prediction_df.iloc[:current_idx_border], freq=self.ts.freq, df_exog=self.ts.df_exog, known_future=self.ts.known_future, ) # manually set transforms in current_ts, otherwise make_future won't know about them current_ts.transforms = self.transforms with warnings.catch_warnings(): warnings.filterwarnings( message="TSDataset freq can't be inferred", action="ignore", ) warnings.filterwarnings( message="You probably set wrong freq.", action="ignore", ) current_ts_forecast = current_ts.make_future(current_step) current_ts_future = self.model.forecast(current_ts_forecast) prediction_df = prediction_df.combine_first(current_ts_future.to_pandas()[prediction_df.columns]) # construct dataset and add all features prediction_ts = TSDataset( df=prediction_df, freq=self.ts.freq, df_exog=self.ts.df_exog, known_future=self.ts.known_future ) prediction_ts.transform(self.transforms) prediction_ts.inverse_transform() # cut only last timestamps from result dataset prediction_ts.df = prediction_ts.df.tail(self.horizon) prediction_ts.raw_df = prediction_ts.raw_df.tail(self.horizon) return prediction_ts