Source code for etna.loggers.base

from abc import ABC
from abc import abstractmethod
from contextlib import contextmanager
from typing import TYPE_CHECKING
from typing import Any
from typing import Dict
from typing import Union

import numpy as np
import pandas as pd

from etna.core.mixins import BaseMixin

if TYPE_CHECKING:
    from etna.datasets import TSDataset


[docs]class BaseLogger(ABC, BaseMixin): """Abstract class for implementing loggers.""" def __init__(self): """Create logger instance.""" pass
[docs] @abstractmethod def log(self, msg: Union[str, Dict[str, Any]], **kwargs): """ Log any event. e.g. "Fitted segment segment_name" Parameters ---------- msg: Message or dict to log kwargs: Additional parameters for particular implementation """ pass
[docs] @abstractmethod def log_backtest_metrics( self, ts: "TSDataset", metrics_df: pd.DataFrame, forecast_df: pd.DataFrame, fold_info_df: pd.DataFrame ): """ Write metrics to logger. Parameters ---------- ts: TSDataset to with backtest data metrics_df: Dataframe produced with :py:meth:`etna.pipeline.Pipeline._get_backtest_metrics` forecast_df: Forecast from backtest fold_info_df: Fold information from backtest """ pass
[docs] def start_experiment(self, *args, **kwargs): """Start experiment. Complete logger initialization or reinitialize it before the next experiment with the same name. """ pass
[docs] def finish_experiment(self, *args, **kwargs): """Finish experiment.""" pass
[docs] def log_backtest_run(self, metrics: pd.DataFrame, forecast: pd.DataFrame, test: pd.DataFrame): """ Backtest metrics from one fold to logger. Parameters ---------- metrics: Dataframe with metrics from backtest fold forecast: Dataframe with forecast test: Dataframe with ground truth """ pass
[docs]class _Logger(BaseLogger): """Composite for loggers.""" def __init__(self): """Create instance for composite of loggers.""" super().__init__() self.loggers = []
[docs] def add(self, logger: BaseLogger) -> int: """ Add new logger. Parameters ---------- logger: logger to be added Returns ------- result: int identifier of added logger """ self.loggers.append(logger) return len(self.loggers) - 1
[docs] def remove(self, idx: int): """ Remove logger by identifier. Parameters ---------- idx: identifier of added logger """ self.loggers.pop(idx)
[docs] def log(self, msg: Union[str, Dict[str, Any]], **kwargs): """Log any event.""" for logger in self.loggers: logger.log(msg, **kwargs)
[docs] def log_backtest_metrics( self, ts: "TSDataset", metrics_df: pd.DataFrame, forecast_df: pd.DataFrame, fold_info_df: pd.DataFrame ): """ Write metrics to logger. Parameters ---------- ts: TSDataset to with backtest data metrics_df: Dataframe produced with :py:meth:`etna.pipeline.Pipeline._get_backtest_metrics` forecast_df: Forecast from backtest fold_info_df: Fold information from backtest """ for logger in self.loggers: logger.log_backtest_metrics(ts, metrics_df, forecast_df, fold_info_df)
[docs] def log_backtest_run(self, metrics: pd.DataFrame, forecast: pd.DataFrame, test: pd.DataFrame): """ Backtest metrics from one fold to logger. Parameters ---------- metrics: Dataframe with metrics from backtest fold forecast: Dataframe with forecast test: Dataframe with ground truth """ for logger in self.loggers: logger.log_backtest_run(metrics, forecast, test)
[docs] def start_experiment(self, *args, **kwargs): """Start experiment. Complete logger initialization or reinitialize it before the next experiment with the same name. """ for logger in self.loggers: logger.start_experiment(*args, **kwargs)
[docs] def finish_experiment(self): """Finish experiment.""" for logger in self.loggers: logger.finish_experiment()
@property def pl_loggers(self): """Pytorch lightning loggers.""" return [logger.pl_logger for logger in self.loggers if "_pl_logger" in vars(logger)]
[docs] @contextmanager def disable(self): """Context manager for local logging disabling.""" temp_loggers = self.loggers self.loggers = [] yield self.loggers = temp_loggers
[docs]def percentile(n: int): """Percentile for pandas agg.""" def percentile_(x): return np.nanpercentile(a=x.values, q=n) percentile_.__name__ = "percentile_%s" % n return percentile_
[docs]def aggregate_metrics_df(metrics_df: pd.DataFrame) -> Dict[str, float]: """Aggregate metrics in :py:meth:`log_backtest_metrics` method. Parameters ---------- metrics_df: Dataframe produced with :py:meth:`etna.pipeline.Pipeline._get_backtest_metrics` """ # case for aggregate_metrics=False if "fold_number" in metrics_df.columns: metrics_dict = ( metrics_df.groupby("segment") .mean() .reset_index() .drop(["segment", "fold_number"], axis=1) .apply(["median", "mean", "std", percentile(5), percentile(25), percentile(75), percentile(95)]) .to_dict() ) # case for aggregate_metrics=True else: metrics_dict = ( metrics_df.drop(["segment"], axis=1) .apply(["median", "mean", "std", percentile(5), percentile(25), percentile(75), percentile(95)]) .to_dict() ) metrics_dict_wide = { f"{metrics_key}_{statistics_key}": value for metrics_key, values in metrics_dict.items() for statistics_key, value in values.items() } return metrics_dict_wide