Skip to content

EvaluationWindow

EvaluationWindow dataclass

A single evaluation window on which the forecast accuracy is measured.

Corresponds to a single train/test split of the time series data at the provided cutoff.

You should never manually create EvaluationWindow objects. Instead, use Task.iter_windows() or Task.get_window() to obtain the evaluation windows corresponding to the task.

Source code in src/fev/task.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
@dataclasses.dataclass
class EvaluationWindow:
    """
    A single evaluation window on which the forecast accuracy is measured.

    Corresponds to a single train/test split of the time series data at the provided `cutoff`.

    You should never manually create `EvaluationWindow` objects. Instead, use [`Task.iter_windows()`][fev.Task.iter_windows]
    or [`Task.get_window()`][fev.Task.get_window] to obtain the evaluation windows corresponding to the task.
    """

    full_dataset: datasets.Dataset = dataclasses.field(repr=False)
    cutoff: int | str
    horizon: int
    min_context_length: int | None
    max_context_length: int | None
    # Dataset info
    id_column: str
    timestamp_column: str
    target_columns: list[str]
    known_dynamic_columns: list[str]
    past_dynamic_columns: list[str]
    static_columns: list[str]

    def __post_init__(self):
        self._dataset_dict: datasets.DatasetDict | None = None

    def get_input_data(self, num_proc: int = DEFAULT_NUM_PROC) -> tuple[datasets.Dataset, datasets.Dataset]:
        """Get data available to the model at prediction time for this evaluation window.

        To convert the input data to a different format, use [`fev.convert_input_data`][fev.convert_input_data].

        Parameters
        ----------
        num_proc : int, default DEFAULT_NUM_PROC
            Number of processes to use when splitting the dataset.

        Returns
        -------
        past_data : datasets.Dataset
            Historical observations up to the cutoff point.
            Contains: id, timestamps, target values, static covariates, and all dynamic covariates.

            Columns corresponding to `id_column`, `timestamp_column`, `target_columns`, `static_columns`,
            `past_dynamic_columns`, `known_dynamic_columns`.
        future_data : datasets.Dataset
            Known future information for the forecast horizon.

            Columns corresponding to `id_column`, `timestamp_column`, `static_columns`, `known_dynamic_columns`.
        """
        if self._dataset_dict is None:
            self._dataset_dict = self._prepare_dataset_dict(num_proc=num_proc)
        return self._dataset_dict[TRAIN], self._dataset_dict[FUTURE]

    def get_ground_truth(self, num_proc: int = DEFAULT_NUM_PROC) -> datasets.Dataset:
        """Get ground truth future test data.

        **This data should never be provided to the model!**

        This is a convenience method that exists for debugging and additional evaluation.

        Parameters
        ----------
        num_proc : int, default DEFAULT_NUM_PROC
            Number of processes to use when splitting the dataset.
        """
        if self._dataset_dict is None:
            self._dataset_dict = self._prepare_dataset_dict(num_proc=num_proc)
        return self._dataset_dict[TEST]

    def compute_metrics(
        self,
        predictions: datasets.DatasetDict,
        metrics: list[Metric],
        seasonality: int,
        quantile_levels: list[float],
    ) -> dict[str, float]:
        """Compute accuracy metrics on the predictions made for this window.

        To compute metrics on your predictions, use [`Task.evaluation_summary`][fev.Task.evaluation_summary] instead.

        This is a convenience method that exists for debugging and additional evaluation.
        """
        test_data = self.get_ground_truth().with_format("numpy")
        past_data = self.get_input_data()[0].with_format("numpy")

        for target_column, predictions_for_column in predictions.items():
            if len(predictions_for_column) != len(test_data):
                raise ValueError(
                    f"Length of predictions for column {target_column} ({len(predictions)}) must "
                    f"match the length of test data ({len(test_data)})"
                )

        test_scores: dict[str, float] = {}
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", category=RuntimeWarning)
            for metric in metrics:
                scores = []
                for col in self.target_columns:
                    scores.append(
                        metric.compute(
                            test_data=test_data,
                            predictions=predictions[col],
                            past_data=past_data,
                            seasonality=seasonality,
                            quantile_levels=quantile_levels,
                            target_column=col,
                        )
                    )
                test_scores[metric.name] = float(np.mean(scores))
        return test_scores

    def _filter_short_series(
        self,
        dataset: datasets.Dataset,
        num_proc: int,
    ) -> datasets.Dataset:
        """Remove records from the dataset that are too short for the given task configuration.

        Filters out time series if they have either fewer than `min_context_length` observations before `cutoff`, or
        fewer than `horizon` observations after `cutoff`.
        """
        num_items_before = len(dataset)
        filtered_dataset = dataset.filter(
            _has_enough_past_and_future_observations,
            fn_kwargs=dict(
                timestamp_column=self.timestamp_column,
                horizon=self.horizon,
                cutoff=self.cutoff,
                min_context_length=self.min_context_length,
            ),
            num_proc=min(num_proc, len(dataset)),
            desc="Filtering short time series",
        )
        num_items_after = len(filtered_dataset)
        if num_items_after < num_items_before:
            logger.info(
                f"Dropped {num_items_before - num_items_after} out of {num_items_before} time series "
                f"because they had fewer than min_context_length ({self.min_context_length}) "
                f"observations before cutoff ({self.cutoff}) "
                f"or fewer than horizon ({self.horizon}) "
                f"observations after cutoff."
            )
        if len(filtered_dataset) == 0:
            raise ValueError(
                "All time series in the dataset are too short for the chosen cutoff, horizon and min_context_length"
            )
        return filtered_dataset

    def _prepare_dataset_dict(self, num_proc: int = DEFAULT_NUM_PROC) -> datasets.DatasetDict:
        dataset = self.full_dataset.select_columns(
            [self.id_column, self.timestamp_column]
            + self.target_columns
            + self.known_dynamic_columns
            + self.past_dynamic_columns
            + self.static_columns
        )
        dataset = self._filter_short_series(dataset, num_proc=num_proc)
        columns_to_slice = [col for col, feat in dataset.features.items() if isinstance(feat, datasets.Sequence)]
        past_data = dataset.map(
            _select_past,
            fn_kwargs=dict(
                columns_to_slice=columns_to_slice,
                timestamp_column=self.timestamp_column,
                cutoff=self.cutoff,
                max_context_length=self.max_context_length,
            ),
            num_proc=min(num_proc, len(dataset)),
            desc="Selecting past data",
        )

        future_data = dataset.map(
            _select_future,
            fn_kwargs=dict(
                columns_to_slice=columns_to_slice,
                timestamp_column=self.timestamp_column,
                cutoff=self.cutoff,
                horizon=self.horizon,
            ),
            num_proc=min(num_proc, len(dataset)),
            desc="Selecting future data",
        )
        future_known = future_data.remove_columns(self.target_columns + self.past_dynamic_columns)
        test = future_data.select_columns([self.id_column, self.timestamp_column] + self.target_columns)
        return datasets.DatasetDict({TRAIN: past_data, FUTURE: future_known, TEST: test})

Attributes

cutoff: int | str instance-attribute

horizon: int instance-attribute

min_context_length: int | None instance-attribute

max_context_length: int | None instance-attribute

id_column: str instance-attribute

timestamp_column: str instance-attribute

target_columns: list[str] instance-attribute

known_dynamic_columns: list[str] instance-attribute

past_dynamic_columns: list[str] instance-attribute

static_columns: list[str] instance-attribute

Functions

get_input_data(num_proc: int = DEFAULT_NUM_PROC) -> tuple[datasets.Dataset, datasets.Dataset]

Get data available to the model at prediction time for this evaluation window.

To convert the input data to a different format, use fev.convert_input_data.

Parameters:

Name Type Description Default
num_proc int

Number of processes to use when splitting the dataset.

DEFAULT_NUM_PROC

Returns:

Name Type Description
past_data Dataset

Historical observations up to the cutoff point. Contains: id, timestamps, target values, static covariates, and all dynamic covariates.

Columns corresponding to id_column, timestamp_column, target_columns, static_columns, past_dynamic_columns, known_dynamic_columns.

future_data Dataset

Known future information for the forecast horizon.

Columns corresponding to id_column, timestamp_column, static_columns, known_dynamic_columns.

Source code in src/fev/task.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def get_input_data(self, num_proc: int = DEFAULT_NUM_PROC) -> tuple[datasets.Dataset, datasets.Dataset]:
    """Get data available to the model at prediction time for this evaluation window.

    To convert the input data to a different format, use [`fev.convert_input_data`][fev.convert_input_data].

    Parameters
    ----------
    num_proc : int, default DEFAULT_NUM_PROC
        Number of processes to use when splitting the dataset.

    Returns
    -------
    past_data : datasets.Dataset
        Historical observations up to the cutoff point.
        Contains: id, timestamps, target values, static covariates, and all dynamic covariates.

        Columns corresponding to `id_column`, `timestamp_column`, `target_columns`, `static_columns`,
        `past_dynamic_columns`, `known_dynamic_columns`.
    future_data : datasets.Dataset
        Known future information for the forecast horizon.

        Columns corresponding to `id_column`, `timestamp_column`, `static_columns`, `known_dynamic_columns`.
    """
    if self._dataset_dict is None:
        self._dataset_dict = self._prepare_dataset_dict(num_proc=num_proc)
    return self._dataset_dict[TRAIN], self._dataset_dict[FUTURE]

get_ground_truth(num_proc: int = DEFAULT_NUM_PROC) -> datasets.Dataset

Get ground truth future test data.

This data should never be provided to the model!

This is a convenience method that exists for debugging and additional evaluation.

Parameters:

Name Type Description Default
num_proc int

Number of processes to use when splitting the dataset.

DEFAULT_NUM_PROC
Source code in src/fev/task.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def get_ground_truth(self, num_proc: int = DEFAULT_NUM_PROC) -> datasets.Dataset:
    """Get ground truth future test data.

    **This data should never be provided to the model!**

    This is a convenience method that exists for debugging and additional evaluation.

    Parameters
    ----------
    num_proc : int, default DEFAULT_NUM_PROC
        Number of processes to use when splitting the dataset.
    """
    if self._dataset_dict is None:
        self._dataset_dict = self._prepare_dataset_dict(num_proc=num_proc)
    return self._dataset_dict[TEST]

compute_metrics(predictions: datasets.DatasetDict, metrics: list[Metric], seasonality: int, quantile_levels: list[float]) -> dict[str, float]

Compute accuracy metrics on the predictions made for this window.

To compute metrics on your predictions, use Task.evaluation_summary instead.

This is a convenience method that exists for debugging and additional evaluation.

Source code in src/fev/task.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def compute_metrics(
    self,
    predictions: datasets.DatasetDict,
    metrics: list[Metric],
    seasonality: int,
    quantile_levels: list[float],
) -> dict[str, float]:
    """Compute accuracy metrics on the predictions made for this window.

    To compute metrics on your predictions, use [`Task.evaluation_summary`][fev.Task.evaluation_summary] instead.

    This is a convenience method that exists for debugging and additional evaluation.
    """
    test_data = self.get_ground_truth().with_format("numpy")
    past_data = self.get_input_data()[0].with_format("numpy")

    for target_column, predictions_for_column in predictions.items():
        if len(predictions_for_column) != len(test_data):
            raise ValueError(
                f"Length of predictions for column {target_column} ({len(predictions)}) must "
                f"match the length of test data ({len(test_data)})"
            )

    test_scores: dict[str, float] = {}
    with warnings.catch_warnings():
        warnings.filterwarnings("ignore", category=RuntimeWarning)
        for metric in metrics:
            scores = []
            for col in self.target_columns:
                scores.append(
                    metric.compute(
                        test_data=test_data,
                        predictions=predictions[col],
                        past_data=past_data,
                        seasonality=seasonality,
                        quantile_levels=quantile_levels,
                        target_column=col,
                    )
                )
            test_scores[metric.name] = float(np.mean(scores))
    return test_scores