Source code for etna.analysis.outliers.prediction_interval_outliers

from copy import deepcopy
from typing import TYPE_CHECKING
from typing import Dict
from typing import List
from typing import Type
from typing import Union

import numpy as np
import pandas as pd

if TYPE_CHECKING:
    from etna.datasets import TSDataset
    from etna.models import ProphetModel
    from etna.models import SARIMAXModel


[docs]def create_ts_by_column(ts: "TSDataset", column: str) -> "TSDataset": """Create TSDataset based on original ts with selecting only column in each segment and setting it to target. Parameters ---------- ts: dataset with timeseries data column: column to select in each. Returns ------- result: TSDataset dataset with selected column. """ from etna.datasets import TSDataset new_df = ts[:, :, [column]] new_columns_tuples = [(x[0], "target") for x in new_df.columns.tolist()] new_df.columns = pd.MultiIndex.from_tuples(new_columns_tuples, names=new_df.columns.names) return TSDataset(new_df, freq=ts.freq)
[docs]def get_anomalies_prediction_interval( ts: "TSDataset", model: Union[Type["ProphetModel"], Type["SARIMAXModel"]], interval_width: float = 0.95, in_column: str = "target", **model_params, ) -> Dict[str, List[pd.Timestamp]]: """ Get point outliers in time series using prediction intervals (estimation model-based method). Outliers are all points out of the prediction interval predicted with the model. Parameters ---------- ts: dataset with timeseries data(should contains all the necessary features). model: model for prediction interval estimation. interval_width: the significance level for the prediction interval. By default a 95% prediction interval is taken. in_column: column to analyze * If it is set to "target", then all data will be used for prediction. * Otherwise, only column data will be used. Returns ------- : dict of outliers in format {segment: [outliers_timestamps]}. Notes ----- For not "target" column only column data will be used for learning. """ if in_column == "target": ts_inner = ts else: ts_inner = create_ts_by_column(ts, in_column) outliers_per_segment = {} time_points = np.array(ts.index.values) model_instance = model(**model_params) model_instance.fit(ts_inner) lower_p, upper_p = [(1 - interval_width) / 2, (1 + interval_width) / 2] prediction_interval = model_instance.forecast( deepcopy(ts_inner), prediction_interval=True, quantiles=[lower_p, upper_p] ) for segment in ts_inner.segments: segment_slice = prediction_interval[:, segment, :][segment] anomalies_mask = (segment_slice["target"] > segment_slice[f"target_{upper_p:.4g}"]) | ( segment_slice["target"] < segment_slice[f"target_{lower_p:.4g}"] ) outliers_per_segment[segment] = list(time_points[anomalies_mask]) return outliers_per_segment