Source code for etna.analysis.feature_selection.mrmr_selection

from enum import Enum
from typing import List

import numpy as np
import pandas as pd


[docs]class AggregationMode(str, Enum): """Enum for different aggregation modes.""" mean = "mean" max = "max" min = "min" median = "median"
AGGREGATION_FN = { AggregationMode.mean: np.mean, AggregationMode.max: np.max, AggregationMode.min: np.min, AggregationMode.median: np.median, }
[docs]def mrmr( relevance_table: pd.DataFrame, regressors: pd.DataFrame, top_k: int, relevance_aggregation_mode: str = AggregationMode.mean, redundancy_aggregation_mode: str = AggregationMode.mean, atol: float = 1e-10, ) -> List[str]: """ Maximum Relevance and Minimum Redundancy feature selection method. Here relevance for each regressor is calculated as the per-segment aggregation of the relevance values in relevance_table. The redundancy term for the regressor is calculated as a mean absolute correlation between this regressor and other ones. The correlation between the two regressors is an aggregated pairwise correlation for the regressors values in each segment. Parameters ---------- relevance_table: dataframe of shape n_segment x n_exog_series with relevance table, where ``relevance_table[i][j]`` contains relevance of j-th ``df_exog`` series to i-th df series regressors: dataframe with regressors in etna format top_k: num of regressors to select; if there are not enough regressors, then all will be selected relevance_aggregation_mode: the method for relevance values per-segment aggregation redundancy_aggregation_mode: the method for redundancy values per-segment aggregation atol: the absolute tolerance to compare the float values Returns ------- selected_features: List[str] list of ``top_k`` selected regressors, sorted by their importance """ relevance_aggregation_fn = AGGREGATION_FN[AggregationMode(relevance_aggregation_mode)] redundancy_aggregation_fn = AGGREGATION_FN[AggregationMode(redundancy_aggregation_mode)] relevance = relevance_table.apply(relevance_aggregation_fn).fillna(0) all_features = relevance.index.to_list() selected_features: List[str] = [] not_selected_features = all_features.copy() redundancy_table = pd.DataFrame(np.inf, index=all_features, columns=all_features) top_k = min(top_k, len(all_features)) for i in range(top_k): score_numerator = relevance.loc[not_selected_features] score_denominator = pd.Series(1, index=not_selected_features) if i > 0: last_selected_feature = selected_features[-1] not_selected_regressors = regressors.loc[pd.IndexSlice[:], pd.IndexSlice[:, not_selected_features]] last_selected_regressor = regressors.loc[pd.IndexSlice[:], pd.IndexSlice[:, last_selected_feature]] redundancy_table.loc[not_selected_features, last_selected_feature] = ( not_selected_regressors.apply(lambda col: last_selected_regressor.corrwith(col)) .abs() .groupby("feature") .apply(redundancy_aggregation_fn) .T.groupby("feature") .apply(redundancy_aggregation_fn) .clip(atol) .fillna(np.inf) .loc[not_selected_features] .values.squeeze() ) score_denominator = redundancy_table.loc[not_selected_features, selected_features].mean(axis=1) score_denominator[np.isclose(score_denominator, 1, atol=atol)] = np.inf score = score_numerator / score_denominator best_feature = score.index[score.argmax()] selected_features.append(best_feature) not_selected_features.remove(best_feature) return selected_features