Skip to content

Task

Task

A univariate or multivariate time series forecasting task.

A Task stores all information uniquely identifying the task, such as path to the dataset, forecast horizon, evaluation metric and names of the target & covariate columns.

This object handles dataset loading, train/test splitting, and prediction evaluation for time series forecasting tasks.

A single Task consists of one or more EvaluationWindow objects that can be accessed using iter_windows() or get_window() methods. After making predictions for each evaluation window, you can evaluate their accuracy using evaluation_summary().

Typical workflow:

task = fev.Task(dataset_path="...", num_windows=3, horizon=24)

predictions_per_window = []
for window in task.iter_windows():
    past_data, future_data = window.get_input_data()
    predictions = model.predict(past_data, future_data)
    predictions_per_window.append(predictions)

summary = task.evaluation_summary(predictions_per_window, model_name="my_model")

Parameters:

Name Type Description Default
dataset_path str

Path to the time series dataset stored locally, on S3, or on Hugging Face Hub. See the Examples section below for information on how to load datasets from different sources.

required
dataset_config str | None

Name of the configuration used when loading datasets from Hugging Face Hub. If dataset_config is provided, the datasets will be loaded from HF Hub. If dataset_config=None, the dataset will be loaded from a local or S3 path.

None
horizon int

Length of the forecast horizon (in time steps).

1
num_windows int

Number of rolling evaluation windows included in the task.

1
initial_cutoff int | str | None

Starting position for the first evaluation window that separates past from future data.

Can be specified as:

  • Integer: Index position using Python indexing. y[:initial_cutoff] becomes the past/training data, and y[initial_cutoff:initial_cutoff+horizon] becomes the first forecast horizon to predict. Negative values are interpreted as steps from the end of the series.
  • Timestamp string: Date or datetime (e.g., "2024-02-01"). Data up to and including this timestamp becomes past data, and the next horizon observations become the first forecast horizon.

If None, defaults to -horizon - (num_windows - 1) * window_step_size.

Note: Time series that are too short for any evaluation window (i.e., have fewer than min_context_length observations before a cutoff or fewer than horizon observations after a cutoff) will be filtered out during data loading.

None
window_step_size int | str | None

Step size between consecutive evaluation windows. Must be an integer if initial_cutoff is an integer. Can be an integer or pandas offset string (e.g., 'D', '15min') if initial_cutoff is a timestamp. Defaults to horizon.

horizon
min_context_length int

Time series with fewer than min_context_length observations before a cutoff will be ignored during evaluation.

1
max_context_length int | None

If provided, the past time series will be shortened to at most this many observations.

None
seasonality int

Seasonal period of the dataset (e.g., 24 for hourly data, 12 for monthly data). This parameter is used when computing metrics like Mean Absolute Scaled Error (MASE).

1
eval_metric str | dict[str, Any]

Evaluation metric used for ultimate evaluation on the test set. Can be specified as either a single string with the metric's name or a dictionary containing a "name" key and extra hyperparameters for the metric. For example, MASE can also be specified as {"name": "MASE", "epsilon": 0.0001} to prevent zero denominators when scaling errors.

'MASE'
extra_metrics list[str] | list[dict[str, Any]]

Additional metrics to be included in the results. Can be specified as a list of strings with the metric's name or a list of dictionaries. See documentation for eval_metric for more details.

[]
quantile_levels list[float]

Quantiles that must be predicted. List of floats between 0 and 1 (for example, [0.1, 0.5, 0.9]).

[]
id_column str

Name of the column with the unique identifier of each time series. This column will be casted to string dtype and the dataset will be sorted according to it.

'id'
timestamp_column str

Name of the column with the timestamps of the observations.

'timestamp'
target str | list[str]

Name of the column that must be predicted. If a string is provided, a univariate forecasting task is created. If a list of strings is provided, a multivariate forecasting task is created.

'target'
generate_univariate_targets_from list[str] | Literal['__ALL__'] | None

If provided, a separate univariate time series will be created from each of the generate_univariate_targets_from columns. Only valid for univariate tasks.

If set to "__ALL__", then a separate univariate instance will be created from each column of type Sequence.

For example, if generate_univariate_targets_from = ["X", "Y"] then the raw multivariate time series {"id": "A", "timestamp": [...], "X": [...], "Y": [...]} will be split into two univariate time series {"id": "A_X", "timestamp": [...], "target": [...]} and {"id": "A_Y", "timestamp": [...], "target": [...]}.

None
past_dynamic_columns list[str]

Names of covariate columns that are known only in the past. These will be available in the past data, but not in the future data. An error will be raised if these columns are missing from the dataset.

[]
known_dynamic_columns list[str]

Names of covariate columns that are known in both past and future. These will be available in past data and future data. An error will be raised if these columns are missing from the dataset.

[]
static_columns list[str]

Names of columns containing static covariates that don't change over time. An error will be raised if these columns are missing from the dataset.

[]
task_name str | None

Human-readable name for the task. Defaults to dataset_config for datasets stored on HF hub, and to the name of 2 parent directories for local or S3-based datasets.

This field is only here for convenience and is not used for any validation when computing the results.

None

Examples:

Dataset stored on the Hugging Face Hub

>>> Task(dataset_path="autogluon/chronos_datasets", dataset_config="m4_hourly", ...)

Dataset stored as a parquet file (local or S3)

>>> Task(dataset_path="s3://my-bucket/m4_hourly/data.parquet", ...)

Dataset consisting of multiple parquet files (local or S3)

>>> Task(dataset_path="s3://my-bucket/m4_hourly/*.parquet", ...)
Source code in src/fev/task.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
@pydantic.dataclasses.dataclass(config={"extra": "forbid"})
class Task:
    """A univariate or multivariate time series forecasting task.

    A `Task` stores all information uniquely identifying the task, such as path to the dataset, forecast horizon,
    evaluation metric and names of the target & covariate columns.

    This object handles dataset loading, train/test splitting, and prediction evaluation for time series forecasting tasks.

    A single `Task` consists of one or more [`EvaluationWindow`][fev.task.EvaluationWindow] objects that can be
    accessed using [`iter_windows()`][fev.Task.iter_windows] or [`get_window()`][fev.Task.get_window] methods.
    After making predictions for each evaluation window, you can evaluate their accuracy using [`evaluation_summary()`][fev.Task.evaluation_summary].

    Typical workflow:
    ```python
    task = fev.Task(dataset_path="...", num_windows=3, horizon=24)

    predictions_per_window = []
    for window in task.iter_windows():
        past_data, future_data = window.get_input_data()
        predictions = model.predict(past_data, future_data)
        predictions_per_window.append(predictions)

    summary = task.evaluation_summary(predictions_per_window, model_name="my_model")
    ```

    Parameters
    ----------
    dataset_path : str
        Path to the time series dataset stored locally, on S3, or on Hugging Face Hub. See the Examples section below
        for information on how to load datasets from different sources.
    dataset_config : str | None, default None
        Name of the configuration used when loading datasets from Hugging Face Hub. If `dataset_config` is provided,
        the datasets will be loaded from HF Hub. If `dataset_config=None`, the dataset will be loaded from a local or
        S3 path.
    horizon : int, default 1
        Length of the forecast horizon (in time steps).
    num_windows : int, default 1
        Number of rolling evaluation windows included in the task.
    initial_cutoff : int | str | None, default None
        Starting position for the first evaluation window that separates past from future data.

        Can be specified as:

        - *Integer*: Index position using Python indexing. `y[:initial_cutoff]` becomes the past/training data,
            and `y[initial_cutoff:initial_cutoff+horizon]` becomes the first forecast horizon to predict.
            Negative values are interpreted as steps from the end of the series.
        - *Timestamp string*: Date or datetime (e.g., `"2024-02-01"`). Data up to and including this timestamp becomes
            past data, and the next `horizon` observations become the first forecast horizon.

        If `None`, defaults to `-horizon - (num_windows - 1) * window_step_size`.

        **Note**: Time series that are too short for any evaluation window (i.e., have fewer than `min_context_length`
        observations before a cutoff or fewer than `horizon` observations after a cutoff) will be filtered out during
        data loading.
    window_step_size : int | str | None, default horizon
        Step size between consecutive evaluation windows. Must be an integer if `initial_cutoff` is an integer.
        Can be an integer or pandas offset string (e.g., `'D'`, `'15min'`) if `initial_cutoff` is a timestamp.
        Defaults to `horizon`.
    min_context_length : int, default 1
        Time series with fewer than `min_context_length` observations before a cutoff will be ignored during evaluation.
    max_context_length : int | None, default None
        If provided, the past time series will be shortened to at most this many observations.
    seasonality : int, default 1
        Seasonal period of the dataset (e.g., 24 for hourly data, 12 for monthly data). This parameter is used when
        computing metrics like Mean Absolute Scaled Error (MASE).
    eval_metric : str | dict[str, Any], default 'MASE'
        Evaluation metric used for ultimate evaluation on the test set. Can be specified as either a single string
        with the metric's name or a dictionary containing a "name" key and extra hyperparameters for the metric.
        For example, MASE can also be specified as `{"name": "MASE", "epsilon": 0.0001}` to prevent zero
        denominators when scaling errors.
    extra_metrics : list[str] | list[dict[str, Any]], default []
        Additional metrics to be included in the results. Can be specified as a list of strings with the metric's
        name or a list of dictionaries. See documentation for `eval_metric` for more details.
    quantile_levels : list[float], default []
        Quantiles that must be predicted. List of floats between 0 and 1 (for example, `[0.1, 0.5, 0.9]`).
    id_column : str, default 'id'
        Name of the column with the unique identifier of each time series.
        This column will be casted to `string` dtype and the dataset will be sorted according to it.
    timestamp_column : str, default 'timestamp'
        Name of the column with the timestamps of the observations.
    target : str | list[str], default 'target'
        Name of the column that must be predicted. If a string is provided, a univariate forecasting task is created.
        If a list of strings is provided, a multivariate forecasting task is created.
    generate_univariate_targets_from : list[str] | Literal["__ALL__"] | None, default None
        If provided, a separate univariate time series will be created from each of the
        `generate_univariate_targets_from` columns. Only valid for univariate tasks.

        If set to `"__ALL__"`, then a separate univariate instance will be created from each column of type `Sequence`.

        For example, if `generate_univariate_targets_from = ["X", "Y"]` then the raw multivariate time series
        `{"id": "A", "timestamp": [...], "X": [...], "Y": [...]}` will be split into two univariate time series
        `{"id": "A_X", "timestamp": [...], "target": [...]}` and `{"id": "A_Y", "timestamp": [...], "target": [...]}`.
    past_dynamic_columns : list[str], default []
        Names of covariate columns that are known only in the past. These will be available in the past data, but not
        in the future data. An error will be raised if these columns are missing from the dataset.
    known_dynamic_columns : list[str], default []
        Names of covariate columns that are known in both past and future. These will be available in past data
        and future data. An error will be raised if these columns are missing from the dataset.
    static_columns : list[str], default []
        Names of columns containing static covariates that don't change over time. An error will be raised if these
        columns are missing from the dataset.
    task_name : str | None, default None
        Human-readable name for the task. Defaults to `dataset_config` for datasets stored on HF hub, and to the
        name of 2 parent directories for local or S3-based datasets.

        This field is only here for convenience and is not used for any validation when computing the results.

    Examples
    --------
    Dataset stored on the Hugging Face Hub

    >>> Task(dataset_path="autogluon/chronos_datasets", dataset_config="m4_hourly", ...)

    Dataset stored as a parquet file (local or S3)

    >>> Task(dataset_path="s3://my-bucket/m4_hourly/data.parquet", ...)

    Dataset consisting of multiple parquet files (local or S3)

    >>> Task(dataset_path="s3://my-bucket/m4_hourly/*.parquet", ...)
    """

    dataset_path: str
    dataset_config: str | None = None
    # Forecast horizon parameters
    horizon: int = 1
    num_windows: int = 1
    initial_cutoff: int | str | None = None
    window_step_size: int | str | None = None
    min_context_length: int = 1
    max_context_length: int | None = None
    # Evaluation parameters
    seasonality: int = 1
    eval_metric: str | dict[str, Any] = "MASE"
    extra_metrics: list[str | dict[str, Any]] = dataclasses.field(default_factory=list)
    quantile_levels: list[float] = dataclasses.field(default_factory=list)
    # Feature information
    id_column: str = "id"
    timestamp_column: str = "timestamp"
    target: str | list[str] = "target"
    generate_univariate_targets_from: list[str] | Literal["__ALL__"] | None = None
    known_dynamic_columns: list[str] = dataclasses.field(default_factory=list)
    past_dynamic_columns: list[str] = dataclasses.field(default_factory=list)
    static_columns: list[str] = dataclasses.field(default_factory=list)
    task_name: str | None = None

    def __post_init__(self):
        if self.task_name is None:
            if self.dataset_config is not None:
                # HF Hub dataset -> name of dataset_config
                self.task_name = self.dataset_config
            else:
                # File dataset -> names of up to 2 parent directories
                # e.g. /home/foo/bar/data.parquet -> foo/bar
                self.task_name = "/".join(Path(self.dataset_path).parts[-3:-1])

        assert self.num_windows >= 1, "`num_windows` must satisfy >= 1"
        if self.window_step_size is None:
            self.window_step_size = self.horizon
        if isinstance(self.window_step_size, int):
            assert self.window_step_size >= 1, "`window_step_size` must satisfy >= 1"
        else:
            offset = pd.tseries.frequencies.to_offset(self.window_step_size)
            assert offset.n >= 1, "If `window_step_size` is a string, it must correspond to a positive timedelta"
            self.window_step_size = offset.freqstr

        if self.initial_cutoff is None:
            assert isinstance(self.window_step_size, int), (
                "If `initial_cutoff` is None, `window_step_size` must be an int"
            )
            self.initial_cutoff = -self.horizon - (self.num_windows - 1) * self.window_step_size

        if isinstance(self.initial_cutoff, int):
            if not isinstance(self.window_step_size, int):
                raise ValueError("`window_step_size` must be an int if `initial_cutoff` is an int")
            assert self.window_step_size >= 1
            max_allowed_cutoff = -self.horizon - (self.num_windows - 1) * self.window_step_size
            if self.initial_cutoff < 0 and self.initial_cutoff > max_allowed_cutoff:
                raise ValueError(
                    "Negative `initial_cutoff` must be <= `-horizon - (num_windows - 1) * window_step_size`"
                )
        else:
            self.initial_cutoff = pd.Timestamp(self.initial_cutoff).isoformat()

        assert all(0 < q < 1 for q in self.quantile_levels), "All quantile_levels must satisfy 0 < q < 1"
        self.quantile_levels = sorted(self.quantile_levels)

        metrics = [get_metric(m) for m in [self.eval_metric] + self.extra_metrics]

        metric_names = [m.name for m in metrics]
        duplicate_metric_names = {x for x in metric_names if metric_names.count(x) > 1}
        if duplicate_metric_names:
            raise ValueError(
                f"Duplicate metric names found: {duplicate_metric_names}. Please configure "
                "only one instance for each metric."
            )

        if len(self.quantile_levels) == 0:
            for m in metrics:
                if m.needs_quantiles:
                    raise ValueError(f"Please provide quantile_levels when using a quantile metric '{m.name}'")

        if self.min_context_length < 1:
            raise ValueError("`min_context_length` must satisfy >= 1")
        if self.max_context_length is not None:
            if self.max_context_length < 1:
                raise ValueError("If provided, `max_context_length` must satisfy >= 1")

        if isinstance(self.target, list):
            if len(self.target) < 1:
                raise ValueError("For multivariate tasks `target` must contain at least one entry")
            # Ensure that column names are sorted alphabetically so that univariate adapters return sorted data
            self.target = sorted(self.target)

        # Ensure that column names are sorted alphabetically for deterministic task comparison
        self.known_dynamic_columns = sorted(self.known_dynamic_columns)
        self.past_dynamic_columns = sorted(self.past_dynamic_columns)
        self.static_columns = sorted(self.static_columns)

        if self.generate_univariate_targets_from is not None and self.is_multivariate:
            raise ValueError(
                "`generate_univariate_targets_from` cannot be used for multivariate tasks (when `target` is a list)"
            )

        # Attributes computed after the dataset is loaded
        self._full_dataset: datasets.Dataset | None = None
        self._freq: str | None = None
        self._dataset_fingerprint: str | None = None

    @property
    def cutoffs(self) -> list[int] | list[str]:
        """Cutoffs corresponding to each `EvaluationWindow` in the task.

        Computed based on `num_windows`, `initial_cutoff` and `window_step_size` attributes of the task.
        """
        if isinstance(self.initial_cutoff, int):
            assert isinstance(self.window_step_size, int)
            return [self.initial_cutoff + window_idx * self.window_step_size for window_idx in range(self.num_windows)]
        else:
            assert isinstance(self.initial_cutoff, str)
            if isinstance(self.window_step_size, str):
                offset = pd.tseries.frequencies.to_offset(self.window_step_size)
            else:
                assert isinstance(self.window_step_size, int)
                offset = pd.tseries.frequencies.to_offset(self.freq) * self.window_step_size

            cutoffs = []
            for window_idx in range(self.num_windows):
                cutoff = pd.Timestamp(self.initial_cutoff)
                # We don't add the offset for window_idx=0 to avoid applying an "anchored" offset
                # (e.g. `Timestamp("2020-01-01") + i * to_offset("ME")` returns "2020-01-31" for i=0 and i=1)
                if window_idx != 0:
                    cutoff += window_idx * offset
                cutoffs.append(cutoff.isoformat())
            return cutoffs

    def to_dict(self) -> dict:
        """Convert task definition to a dictionary."""
        return dataclasses.asdict(self)

    def load_full_dataset(
        self,
        storage_options: dict | None = None,
        trust_remote_code: bool | None = None,
        num_proc: int = DEFAULT_NUM_PROC,
    ) -> datasets.Dataset:
        """Load the full raw dataset with preprocessing applied.

        This method validates the data, loads and preprocesses the dataset according to the task configuration,
        including generating univariate targets if `generate_univariate_targets_from` is provided.

        **Note:** This method is only provided for information and debugging purposes. For model evaluation, use
        [`iter_windows()`][fev.Task.iter_windows] instead to get properly split train/test data.

        Parameters
        ----------
        storage_options : dict, optional
            Passed to `datasets.load_dataset()` for accessing remote datasets (e.g., S3 credentials).
        trust_remote_code : bool, optional
            Passed to `datasets.load_dataset()` for trusting remote code from Hugging Face Hub.
        num_proc : int, default DEFAULT_NUM_PROC
            Number of processes to use for dataset preprocessing.

        Returns
        -------
        datasets.Dataset
            The preprocessed dataset with all time series.
        """
        if self._full_dataset is None:
            self._full_dataset = self._load_dataset(
                storage_options=storage_options, trust_remote_code=trust_remote_code, num_proc=num_proc
            )
        return self._full_dataset

    def iter_windows(
        self,
        storage_options: dict | None = None,
        trust_remote_code: bool | None = None,
        num_proc: int = DEFAULT_NUM_PROC,
    ) -> Iterable[EvaluationWindow]:
        """Iterate over the rolling evaluation windows in the task.

        Each window contains train/test splits at different cutoff points for time series
        cross-validation. Use this method for model evaluation and benchmarking.

        Parameters
        ----------
        storage_options : dict, optional
            Passed to `datasets.load_dataset()` for accessing remote datasets (e.g., S3 credentials).
        trust_remote_code : bool, optional
            Passed to `datasets.load_dataset()` for trusting remote code from Hugging Face Hub.
        num_proc : int, default DEFAULT_NUM_PROC
            Number of processes to use for dataset preprocessing.

        Yields
        ------
        EvaluationWindow
            A single evaluation window at a specific cutoff containing the data needed to make and evaluate forecasts.

        Examples
        --------
        >>> for window in task.iter_windows():
        ...     past_data, future_data = window.get_input_data()
        ...     # Make predictions using past_data and future_data
        """
        for window_idx in range(self.num_windows):
            yield self.get_window(
                window_idx, storage_options=storage_options, trust_remote_code=trust_remote_code, num_proc=num_proc
            )

    def get_window(
        self,
        window_idx: int,
        storage_options: dict | None = None,
        trust_remote_code: bool | None = None,
        num_proc: int = DEFAULT_NUM_PROC,
    ) -> EvaluationWindow:
        """Get a single evaluation window by index.

        Parameters
        ----------
        window_idx : int
            Index of the evaluation window in [0, 1, ..., num_windows - 1].
        storage_options : dict, optional
            Passed to `datasets.load_dataset()` for accessing remote datasets (e.g., S3 credentials).
        trust_remote_code : bool, optional
            Passed to `datasets.load_dataset()` for trusting remote code from Hugging Face Hub.
        num_proc : int, default DEFAULT_NUM_PROC
            Number of processes to use for dataset preprocessing.

        Returns
        -------
        EvaluationWindow
            A single evaluation window at a specific cutoff containing the data needed to make and evaluate forecasts.
        """
        full_dataset = self.load_full_dataset(
            storage_options=storage_options, trust_remote_code=trust_remote_code, num_proc=num_proc
        )
        if window_idx >= self.num_windows:
            raise ValueError(f"Window index {window_idx} is out of range (num_windows={self.num_windows})")
        return EvaluationWindow(
            full_dataset=full_dataset,
            cutoff=self.cutoffs[window_idx],
            horizon=self.horizon,
            min_context_length=self.min_context_length,
            max_context_length=self.max_context_length,
            id_column=self.id_column,
            timestamp_column=self.timestamp_column,
            target_columns=self.target_columns,
            known_dynamic_columns=self.known_dynamic_columns,
            past_dynamic_columns=self.past_dynamic_columns,
            static_columns=self.static_columns,
        )

    @pydantic.model_validator(mode="before")
    @classmethod
    def handle_deprecated_fields(cls, data: ArgsKwargs) -> ArgsKwargs:
        def _warn_and_rename_kwarg(data: ArgsKwargs, old_name: str, new_name: str) -> ArgsKwargs:
            assert data.kwargs is not None
            if old_name in data.kwargs:
                warnings.warn(
                    f"Field '{old_name}' is deprecated and will be removed in a future release. "
                    f"Please use '{new_name}' instead",
                    category=FutureWarning,
                    stacklevel=4,
                )
                data.kwargs[new_name] = data.kwargs.pop(old_name)
            return data

        if data.kwargs is not None:
            if "lead_time" in data.kwargs:
                # 'lead_time' was never used before, quietly ignore
                data.kwargs.pop("lead_time")
            for old_name, new_name in DEPRECATED_TASK_FIELDS.items():
                data = _warn_and_rename_kwarg(data, old_name, new_name)
        return data

    @property
    def freq(self) -> str:
        """Pandas string corresponding to the frequency of the time series in the dataset.

        See [pandas documentation](https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#cutoff-aliases)
        for the list of possible values.

        This attribute is available after the dataset is loaded with `load_full_dataset`, `iter_windows` or `get_window`.
        """
        if self._freq is None:
            raise ValueError("Please load dataset first with `task.load_full_dataset()`")
        return self._freq

    @property
    def dynamic_columns(self) -> list[str]:
        """List of dynamic covariates available in the task. Does not include the target columns."""
        return self.known_dynamic_columns + self.past_dynamic_columns

    def _load_dataset(
        self,
        storage_options: dict | None = None,
        trust_remote_code: bool | None = None,
        num_proc: int = DEFAULT_NUM_PROC,
    ) -> datasets.Dataset:
        """Load the raw dataset and apply initial preprocessing based on the Task definition."""
        if self.dataset_config is not None:
            # Load dataset from HF Hub
            path = self.dataset_path
            name = self.dataset_config
            data_files = None
        else:
            # Load dataset from a local or remote file
            dataset_format = Path(self.dataset_path).suffix.lstrip(".")
            allowed_formats = ["parquet", "arrow"]
            if dataset_format not in allowed_formats:
                raise ValueError(f"When loading dataset from file, path must end in one of {allowed_formats}.")
            path = dataset_format
            name = None
            data_files = self.dataset_path

        if storage_options is None:
            storage_options = {}

        load_dataset_kwargs = dict(
            path=path,
            name=name,
            data_files=data_files,
            split=TRAIN,
            storage_options=copy.deepcopy(storage_options),
            trust_remote_code=trust_remote_code,
        )
        try:
            ds = datasets.load_dataset(
                **load_dataset_kwargs,
                # PatchedDownloadConfig fixes https://github.com/huggingface/datasets/issues/6598
                download_config=utils.PatchedDownloadConfig(storage_options=copy.deepcopy(storage_options)),
            )
        except Exception:
            raise RuntimeError(
                "Failed to load the dataset when calling `datasets.load_dataset` with arguments\n"
                f"{pprint.pformat(load_dataset_kwargs)}"
            )
        # Since we loaded with split=TRAIN and streaming=False, ds is a datasets.Dataset object
        assert isinstance(ds, datasets.Dataset)
        ds.set_format("numpy")

        required_columns = self.known_dynamic_columns + self.past_dynamic_columns + self.static_columns
        if self.generate_univariate_targets_from is None:
            required_columns += self.target_columns
        elif self.generate_univariate_targets_from == ALL_AVAILABLE_COLUMNS:
            pass
        else:
            required_columns += self.generate_univariate_targets_from

        utils.validate_time_series_dataset(
            ds,
            id_column=self.id_column,
            timestamp_column=self.timestamp_column,
            required_columns=required_columns,
            num_proc=num_proc,
        )

        # Create separate instances from columns listed in `generate_univariate_targets_from`
        if self.generate_univariate_targets_from is not None:
            if self.generate_univariate_targets_from == ALL_AVAILABLE_COLUMNS:
                generate_univariate_targets_from = [
                    col
                    for col, feat in ds.features.items()
                    if isinstance(feat, datasets.Sequence) and col != self.timestamp_column
                ]
            else:
                generate_univariate_targets_from = self.generate_univariate_targets_from
            assert isinstance(self.target, str)
            ds = utils.generate_univariate_targets_from_multivariate(
                ds,
                id_column=self.id_column,
                new_target_column=self.target,
                generate_univariate_targets_from=generate_univariate_targets_from,
                num_proc=num_proc,
            )

        # Ensure that IDs are sorted alphabetically for consistent ordering
        if ds.features[self.id_column].dtype != "string":
            ds = ds.cast_column(self.id_column, datasets.Value("string"))
        ds = ds.sort(self.id_column)
        self._freq = pd.infer_freq(ds[0][self.timestamp_column])
        if self._freq is None:
            raise ValueError("Dataset contains irregular timestamps")

        available_dynamic_columns, available_static_columns = utils.infer_column_types(
            ds, id_column=self.id_column, timestamp_column=self.timestamp_column
        )
        missing_dynamic = set(self.dynamic_columns) - set(available_dynamic_columns)
        if len(missing_dynamic) > 0:
            raise ValueError(f"Dynamic columns not found in dataset: {sorted(missing_dynamic)}")
        missing_static = set(self.static_columns) - set(available_static_columns)
        if len(missing_static) > 0:
            raise ValueError(f"Static columns not found in dataset: {sorted(missing_static)}")
        self._dataset_fingerprint = utils.generate_fingerprint(ds)
        return ds

    @property
    def dataset_info(self) -> dict:
        return {
            "id_column": self.id_column,
            "timestamp_column": self.timestamp_column,
            "target": self.target,
            "static_columns": self.static_columns,
            "dynamic_columns": self.dynamic_columns,
            "known_dynamic_columns": self.known_dynamic_columns,
            "past_dynamic_columns": self.past_dynamic_columns,
        }

    @property
    def predictions_schema(self) -> datasets.Features:
        """Describes the format that the predictions must follow.

        Forecast must always include the key `"predictions"` corresponding to the point forecast.

        The predictions must also include a key for each of the `quantile_levels`.
        For example, if `quantile_levels = [0.1, 0.9]`, then keys `"0.1"` and `"0.9"` must be included in the forecast.
        """
        predictions_length = self.horizon
        predictions_schema = {
            PREDICTIONS: datasets.Sequence(datasets.Value("float64"), length=predictions_length),
        }
        for q in sorted(self.quantile_levels):
            predictions_schema[str(q)] = datasets.Sequence(datasets.Value("float64"), length=predictions_length)
        return datasets.Features(predictions_schema)

    def clean_and_validate_predictions(
        self, predictions: datasets.DatasetDict | dict[str, list[dict]] | datasets.Dataset | list[dict]
    ) -> datasets.DatasetDict:
        """Convert predictions for a single window into the format needed for computing the metrics.

        The following formats are supported for both multivariate and univariate tasks:

        - `DatasetDict`: Must contain a single key for each target in `task.target_columns`. Each value in
            the dict must be a `datasets.Dataset` with schema compatible with `task.predictions_schema`. This is the
            recommended format for providing predictions.
        - `dict[str, list[dict]]`: A dictionary with one key for each target in `task.target_columns`. Each value in
            the dict must be a list of dictionaries, each dict following the schema in `task.predictions_schema`.

        Additionally for univariate tasks, the following formats are supported:

        - `datasets.Dataset`: A single `datasets.Dataset` with schema compatible with `task.predictions_schema`.
        - `list[dict]`: A list of dictionaries, where each dict follows the schema in `task.predictions_schema`.

        Returns
        -------
        predictions :
            A `DatasetDict` where each key is the name of the target column and the corresponding value is a
            `datasets.Dataset` with the predictions.
        """

        def _to_dataset(preds: datasets.Dataset | list[dict]) -> datasets.Dataset:
            if isinstance(preds, list):
                try:
                    preds = datasets.Dataset.from_list(list(preds))
                except Exception:
                    raise ValueError(
                        "`datasets.Dataset.from_list(predictions)` failed. Please convert predictions to `datasets.Dataset` format."
                    )
            if not isinstance(preds, datasets.Dataset):
                raise ValueError(f"predictions must be of type `datasets.Dataset` (received {type(preds)})")
            return preds

        if isinstance(predictions, datasets.DatasetDict):
            pass
        elif isinstance(predictions, dict):
            predictions = datasets.DatasetDict({col: _to_dataset(preds) for col, preds in predictions.items()})
        elif isinstance(predictions, (list, datasets.Dataset)):
            predictions = datasets.DatasetDict({self.target_columns[0]: _to_dataset(predictions)})
        else:
            raise ValueError(
                f"Expected predictions to be a `DatasetDict`, `Dataset`, `list` or `dict` (got {type(predictions)})"
            )
        if missing_columns := set(self.target_columns) - set(predictions.keys()):
            raise ValueError(f"Missing predictions for columns {missing_columns} (got {sorted(predictions.keys())})")
        predictions = predictions.cast(self.predictions_schema).with_format("numpy")
        for target_column, predictions_for_column in predictions.items():
            self._assert_all_columns_finite(predictions_for_column)
        return predictions

    @staticmethod
    def _assert_all_columns_finite(predictions: datasets.Dataset) -> None:
        for col in predictions.column_names:
            nan_row_idx, _ = np.where(~np.isfinite(np.array(predictions[col])))
            if len(nan_row_idx) > 0:
                raise ValueError(
                    "Predictions contain NaN or Inf values. "
                    f"First invalid value encountered in column {col} for item {nan_row_idx[0]}:\n"
                    f"{predictions[int(nan_row_idx[0])]}"
                )

    def evaluation_summary(
        self,
        predictions_per_window: Iterable[datasets.Dataset | list[dict] | datasets.DatasetDict | dict[str, list[dict]]],
        model_name: str,
        training_time_s: float | None = None,
        inference_time_s: float | None = None,
        trained_on_this_dataset: bool = False,
        extra_info: dict | None = None,
    ) -> dict[str, Any]:
        """Get a summary of the model performance for the given forecasting task.

        Parameters
        ----------
        predictions_per_window : Iterable[datasets.Dataset | list[dict] | datasets.DatasetDict | dict[str, list[dict]]]
            Predictions generated by the model for each evaluation window in the task.

            The length of `predictions_per_window` must be equal to `task.num_windows`.

            The predictions for each window must be formatted as described in [clean_and_validate_predictions][fev.Task.clean_and_validate_predictions].
        model_name : str
            Name of the model that generated the predictions.
        training_time_s : float | None
            Training time of the model for this task (in seconds).
        inference_time_s : float | None
            Total inference time to generate all predictions (in seconds).
        trained_on_this_dataset : bool
            Was the model trained on the dataset associated with this task? Set to False if the model is used in
            zero-shot mode.
        extra_info : dict | None
            Optional dictionary with additional information that will be appended to the evaluation summary.

        Returns
        -------
        summary : dict
            Dictionary that summarizes the model performance on this task. Includes following keys:

            - `model_name` - name of the model
            - `test_error` - value of the `task.eval_metric` averaged over all evaluation windows (lower is better)
            - all `Task` attributes obtained via `task.to_dict()`.
            - values of `task.extra_metrics` averaged all evaluation windows
            - `dataset_fingerprint` - fingerprint of the dataset generated by the HF `datasets` library
            - `trained_on_this_dataset` - whether the model was trained on the dataset used in the task
            - `fev_version` - version of the `fev` package used to obtain the summary
        """
        summary: dict[str, Any] = {"model_name": model_name}
        summary.update(self.to_dict())
        metrics = [get_metric(m) for m in [self.eval_metric] + self.extra_metrics]
        eval_metric = metrics[0]

        metrics_per_window = {metric.name: [] for metric in metrics}
        if isinstance(predictions_per_window, (datasets.Dataset, datasets.DatasetDict, dict)):
            raise ValueError(
                f"predictions_per_window must be iterable (e.g., a list) but got {type(predictions_per_window)}"
            )
        # Use strict=True to raise error if num_predictions does not match num_windows
        for predictions, window in zip(predictions_per_window, self.iter_windows(), strict=True):
            metric_scores = window.compute_metrics(
                self.clean_and_validate_predictions(predictions),
                metrics=metrics,
                seasonality=self.seasonality,
                quantile_levels=self.quantile_levels,
            )
            for metric, value in metric_scores.items():
                metrics_per_window[metric].append(value)
        metrics_averaged = {metric_name: float(np.mean(values)) for metric_name, values in metrics_per_window.items()}
        summary.update(
            {
                "test_error": metrics_averaged[eval_metric.name],
                "training_time_s": training_time_s,
                "inference_time_s": inference_time_s,
                "dataset_fingerprint": self._dataset_fingerprint,
                "trained_on_this_dataset": trained_on_this_dataset,
                "fev_version": FEV_VERSION,
                **metrics_averaged,
            }
        )
        if extra_info is not None:
            summary.update(extra_info)
        return summary

    @property
    def is_multivariate(self) -> bool:
        """Returns `True` if `task.target` is a `list`, `False` otherwise."""
        return isinstance(self.target, list)

    @property
    def target_columns(self) -> list[str]:
        """A list including names of all target columns for this task.

        Unlike `task.target` that can be a string or a list, `task.target_columns` is always a list of strings.
        """
        if isinstance(self.target, list):
            return self.target
        else:
            return [self.target]

Attributes

predictions_schema: datasets.Features property

Describes the format that the predictions must follow.

Forecast must always include the key "predictions" corresponding to the point forecast.

The predictions must also include a key for each of the quantile_levels. For example, if quantile_levels = [0.1, 0.9], then keys "0.1" and "0.9" must be included in the forecast.

freq: str property

Pandas string corresponding to the frequency of the time series in the dataset.

See pandas documentation for the list of possible values.

This attribute is available after the dataset is loaded with load_full_dataset, iter_windows or get_window.

cutoffs: list[int] | list[str] property

Cutoffs corresponding to each EvaluationWindow in the task.

Computed based on num_windows, initial_cutoff and window_step_size attributes of the task.

target_columns: list[str] property

A list including names of all target columns for this task.

Unlike task.target that can be a string or a list, task.target_columns is always a list of strings.

is_multivariate: bool property

Returns True if task.target is a list, False otherwise.

dynamic_columns: list[str] property

List of dynamic covariates available in the task. Does not include the target columns.

Functions

get_window(window_idx: int, storage_options: dict | None = None, trust_remote_code: bool | None = None, num_proc: int = DEFAULT_NUM_PROC) -> EvaluationWindow

Get a single evaluation window by index.

Parameters:

Name Type Description Default
window_idx int

Index of the evaluation window in [0, 1, ..., num_windows - 1].

required
storage_options dict

Passed to datasets.load_dataset() for accessing remote datasets (e.g., S3 credentials).

None
trust_remote_code bool

Passed to datasets.load_dataset() for trusting remote code from Hugging Face Hub.

None
num_proc int

Number of processes to use for dataset preprocessing.

DEFAULT_NUM_PROC

Returns:

Type Description
EvaluationWindow

A single evaluation window at a specific cutoff containing the data needed to make and evaluate forecasts.

Source code in src/fev/task.py
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
def get_window(
    self,
    window_idx: int,
    storage_options: dict | None = None,
    trust_remote_code: bool | None = None,
    num_proc: int = DEFAULT_NUM_PROC,
) -> EvaluationWindow:
    """Get a single evaluation window by index.

    Parameters
    ----------
    window_idx : int
        Index of the evaluation window in [0, 1, ..., num_windows - 1].
    storage_options : dict, optional
        Passed to `datasets.load_dataset()` for accessing remote datasets (e.g., S3 credentials).
    trust_remote_code : bool, optional
        Passed to `datasets.load_dataset()` for trusting remote code from Hugging Face Hub.
    num_proc : int, default DEFAULT_NUM_PROC
        Number of processes to use for dataset preprocessing.

    Returns
    -------
    EvaluationWindow
        A single evaluation window at a specific cutoff containing the data needed to make and evaluate forecasts.
    """
    full_dataset = self.load_full_dataset(
        storage_options=storage_options, trust_remote_code=trust_remote_code, num_proc=num_proc
    )
    if window_idx >= self.num_windows:
        raise ValueError(f"Window index {window_idx} is out of range (num_windows={self.num_windows})")
    return EvaluationWindow(
        full_dataset=full_dataset,
        cutoff=self.cutoffs[window_idx],
        horizon=self.horizon,
        min_context_length=self.min_context_length,
        max_context_length=self.max_context_length,
        id_column=self.id_column,
        timestamp_column=self.timestamp_column,
        target_columns=self.target_columns,
        known_dynamic_columns=self.known_dynamic_columns,
        past_dynamic_columns=self.past_dynamic_columns,
        static_columns=self.static_columns,
    )

iter_windows(storage_options: dict | None = None, trust_remote_code: bool | None = None, num_proc: int = DEFAULT_NUM_PROC) -> Iterable[EvaluationWindow]

Iterate over the rolling evaluation windows in the task.

Each window contains train/test splits at different cutoff points for time series cross-validation. Use this method for model evaluation and benchmarking.

Parameters:

Name Type Description Default
storage_options dict

Passed to datasets.load_dataset() for accessing remote datasets (e.g., S3 credentials).

None
trust_remote_code bool

Passed to datasets.load_dataset() for trusting remote code from Hugging Face Hub.

None
num_proc int

Number of processes to use for dataset preprocessing.

DEFAULT_NUM_PROC

Yields:

Type Description
EvaluationWindow

A single evaluation window at a specific cutoff containing the data needed to make and evaluate forecasts.

Examples:

>>> for window in task.iter_windows():
...     past_data, future_data = window.get_input_data()
...     # Make predictions using past_data and future_data
Source code in src/fev/task.py
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
def iter_windows(
    self,
    storage_options: dict | None = None,
    trust_remote_code: bool | None = None,
    num_proc: int = DEFAULT_NUM_PROC,
) -> Iterable[EvaluationWindow]:
    """Iterate over the rolling evaluation windows in the task.

    Each window contains train/test splits at different cutoff points for time series
    cross-validation. Use this method for model evaluation and benchmarking.

    Parameters
    ----------
    storage_options : dict, optional
        Passed to `datasets.load_dataset()` for accessing remote datasets (e.g., S3 credentials).
    trust_remote_code : bool, optional
        Passed to `datasets.load_dataset()` for trusting remote code from Hugging Face Hub.
    num_proc : int, default DEFAULT_NUM_PROC
        Number of processes to use for dataset preprocessing.

    Yields
    ------
    EvaluationWindow
        A single evaluation window at a specific cutoff containing the data needed to make and evaluate forecasts.

    Examples
    --------
    >>> for window in task.iter_windows():
    ...     past_data, future_data = window.get_input_data()
    ...     # Make predictions using past_data and future_data
    """
    for window_idx in range(self.num_windows):
        yield self.get_window(
            window_idx, storage_options=storage_options, trust_remote_code=trust_remote_code, num_proc=num_proc
        )

evaluation_summary(predictions_per_window: Iterable[datasets.Dataset | list[dict] | datasets.DatasetDict | dict[str, list[dict]]], model_name: str, training_time_s: float | None = None, inference_time_s: float | None = None, trained_on_this_dataset: bool = False, extra_info: dict | None = None) -> dict[str, Any]

Get a summary of the model performance for the given forecasting task.

Parameters:

Name Type Description Default
predictions_per_window Iterable[Dataset | list[dict] | DatasetDict | dict[str, list[dict]]]

Predictions generated by the model for each evaluation window in the task.

The length of predictions_per_window must be equal to task.num_windows.

The predictions for each window must be formatted as described in clean_and_validate_predictions.

required
model_name str

Name of the model that generated the predictions.

required
training_time_s float | None

Training time of the model for this task (in seconds).

None
inference_time_s float | None

Total inference time to generate all predictions (in seconds).

None
trained_on_this_dataset bool

Was the model trained on the dataset associated with this task? Set to False if the model is used in zero-shot mode.

False
extra_info dict | None

Optional dictionary with additional information that will be appended to the evaluation summary.

None

Returns:

Name Type Description
summary dict

Dictionary that summarizes the model performance on this task. Includes following keys:

  • model_name - name of the model
  • test_error - value of the task.eval_metric averaged over all evaluation windows (lower is better)
  • all Task attributes obtained via task.to_dict().
  • values of task.extra_metrics averaged all evaluation windows
  • dataset_fingerprint - fingerprint of the dataset generated by the HF datasets library
  • trained_on_this_dataset - whether the model was trained on the dataset used in the task
  • fev_version - version of the fev package used to obtain the summary
Source code in src/fev/task.py
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
def evaluation_summary(
    self,
    predictions_per_window: Iterable[datasets.Dataset | list[dict] | datasets.DatasetDict | dict[str, list[dict]]],
    model_name: str,
    training_time_s: float | None = None,
    inference_time_s: float | None = None,
    trained_on_this_dataset: bool = False,
    extra_info: dict | None = None,
) -> dict[str, Any]:
    """Get a summary of the model performance for the given forecasting task.

    Parameters
    ----------
    predictions_per_window : Iterable[datasets.Dataset | list[dict] | datasets.DatasetDict | dict[str, list[dict]]]
        Predictions generated by the model for each evaluation window in the task.

        The length of `predictions_per_window` must be equal to `task.num_windows`.

        The predictions for each window must be formatted as described in [clean_and_validate_predictions][fev.Task.clean_and_validate_predictions].
    model_name : str
        Name of the model that generated the predictions.
    training_time_s : float | None
        Training time of the model for this task (in seconds).
    inference_time_s : float | None
        Total inference time to generate all predictions (in seconds).
    trained_on_this_dataset : bool
        Was the model trained on the dataset associated with this task? Set to False if the model is used in
        zero-shot mode.
    extra_info : dict | None
        Optional dictionary with additional information that will be appended to the evaluation summary.

    Returns
    -------
    summary : dict
        Dictionary that summarizes the model performance on this task. Includes following keys:

        - `model_name` - name of the model
        - `test_error` - value of the `task.eval_metric` averaged over all evaluation windows (lower is better)
        - all `Task` attributes obtained via `task.to_dict()`.
        - values of `task.extra_metrics` averaged all evaluation windows
        - `dataset_fingerprint` - fingerprint of the dataset generated by the HF `datasets` library
        - `trained_on_this_dataset` - whether the model was trained on the dataset used in the task
        - `fev_version` - version of the `fev` package used to obtain the summary
    """
    summary: dict[str, Any] = {"model_name": model_name}
    summary.update(self.to_dict())
    metrics = [get_metric(m) for m in [self.eval_metric] + self.extra_metrics]
    eval_metric = metrics[0]

    metrics_per_window = {metric.name: [] for metric in metrics}
    if isinstance(predictions_per_window, (datasets.Dataset, datasets.DatasetDict, dict)):
        raise ValueError(
            f"predictions_per_window must be iterable (e.g., a list) but got {type(predictions_per_window)}"
        )
    # Use strict=True to raise error if num_predictions does not match num_windows
    for predictions, window in zip(predictions_per_window, self.iter_windows(), strict=True):
        metric_scores = window.compute_metrics(
            self.clean_and_validate_predictions(predictions),
            metrics=metrics,
            seasonality=self.seasonality,
            quantile_levels=self.quantile_levels,
        )
        for metric, value in metric_scores.items():
            metrics_per_window[metric].append(value)
    metrics_averaged = {metric_name: float(np.mean(values)) for metric_name, values in metrics_per_window.items()}
    summary.update(
        {
            "test_error": metrics_averaged[eval_metric.name],
            "training_time_s": training_time_s,
            "inference_time_s": inference_time_s,
            "dataset_fingerprint": self._dataset_fingerprint,
            "trained_on_this_dataset": trained_on_this_dataset,
            "fev_version": FEV_VERSION,
            **metrics_averaged,
        }
    )
    if extra_info is not None:
        summary.update(extra_info)
    return summary

clean_and_validate_predictions(predictions: datasets.DatasetDict | dict[str, list[dict]] | datasets.Dataset | list[dict]) -> datasets.DatasetDict

Convert predictions for a single window into the format needed for computing the metrics.

The following formats are supported for both multivariate and univariate tasks:

  • DatasetDict: Must contain a single key for each target in task.target_columns. Each value in the dict must be a datasets.Dataset with schema compatible with task.predictions_schema. This is the recommended format for providing predictions.
  • dict[str, list[dict]]: A dictionary with one key for each target in task.target_columns. Each value in the dict must be a list of dictionaries, each dict following the schema in task.predictions_schema.

Additionally for univariate tasks, the following formats are supported:

  • datasets.Dataset: A single datasets.Dataset with schema compatible with task.predictions_schema.
  • list[dict]: A list of dictionaries, where each dict follows the schema in task.predictions_schema.

Returns:

Name Type Description
predictions DatasetDict

A DatasetDict where each key is the name of the target column and the corresponding value is a datasets.Dataset with the predictions.

Source code in src/fev/task.py
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
def clean_and_validate_predictions(
    self, predictions: datasets.DatasetDict | dict[str, list[dict]] | datasets.Dataset | list[dict]
) -> datasets.DatasetDict:
    """Convert predictions for a single window into the format needed for computing the metrics.

    The following formats are supported for both multivariate and univariate tasks:

    - `DatasetDict`: Must contain a single key for each target in `task.target_columns`. Each value in
        the dict must be a `datasets.Dataset` with schema compatible with `task.predictions_schema`. This is the
        recommended format for providing predictions.
    - `dict[str, list[dict]]`: A dictionary with one key for each target in `task.target_columns`. Each value in
        the dict must be a list of dictionaries, each dict following the schema in `task.predictions_schema`.

    Additionally for univariate tasks, the following formats are supported:

    - `datasets.Dataset`: A single `datasets.Dataset` with schema compatible with `task.predictions_schema`.
    - `list[dict]`: A list of dictionaries, where each dict follows the schema in `task.predictions_schema`.

    Returns
    -------
    predictions :
        A `DatasetDict` where each key is the name of the target column and the corresponding value is a
        `datasets.Dataset` with the predictions.
    """

    def _to_dataset(preds: datasets.Dataset | list[dict]) -> datasets.Dataset:
        if isinstance(preds, list):
            try:
                preds = datasets.Dataset.from_list(list(preds))
            except Exception:
                raise ValueError(
                    "`datasets.Dataset.from_list(predictions)` failed. Please convert predictions to `datasets.Dataset` format."
                )
        if not isinstance(preds, datasets.Dataset):
            raise ValueError(f"predictions must be of type `datasets.Dataset` (received {type(preds)})")
        return preds

    if isinstance(predictions, datasets.DatasetDict):
        pass
    elif isinstance(predictions, dict):
        predictions = datasets.DatasetDict({col: _to_dataset(preds) for col, preds in predictions.items()})
    elif isinstance(predictions, (list, datasets.Dataset)):
        predictions = datasets.DatasetDict({self.target_columns[0]: _to_dataset(predictions)})
    else:
        raise ValueError(
            f"Expected predictions to be a `DatasetDict`, `Dataset`, `list` or `dict` (got {type(predictions)})"
        )
    if missing_columns := set(self.target_columns) - set(predictions.keys()):
        raise ValueError(f"Missing predictions for columns {missing_columns} (got {sorted(predictions.keys())})")
    predictions = predictions.cast(self.predictions_schema).with_format("numpy")
    for target_column, predictions_for_column in predictions.items():
        self._assert_all_columns_finite(predictions_for_column)
    return predictions

load_full_dataset(storage_options: dict | None = None, trust_remote_code: bool | None = None, num_proc: int = DEFAULT_NUM_PROC) -> datasets.Dataset

Load the full raw dataset with preprocessing applied.

This method validates the data, loads and preprocesses the dataset according to the task configuration, including generating univariate targets if generate_univariate_targets_from is provided.

Note: This method is only provided for information and debugging purposes. For model evaluation, use iter_windows() instead to get properly split train/test data.

Parameters:

Name Type Description Default
storage_options dict

Passed to datasets.load_dataset() for accessing remote datasets (e.g., S3 credentials).

None
trust_remote_code bool

Passed to datasets.load_dataset() for trusting remote code from Hugging Face Hub.

None
num_proc int

Number of processes to use for dataset preprocessing.

DEFAULT_NUM_PROC

Returns:

Type Description
Dataset

The preprocessed dataset with all time series.

Source code in src/fev/task.py
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
def load_full_dataset(
    self,
    storage_options: dict | None = None,
    trust_remote_code: bool | None = None,
    num_proc: int = DEFAULT_NUM_PROC,
) -> datasets.Dataset:
    """Load the full raw dataset with preprocessing applied.

    This method validates the data, loads and preprocesses the dataset according to the task configuration,
    including generating univariate targets if `generate_univariate_targets_from` is provided.

    **Note:** This method is only provided for information and debugging purposes. For model evaluation, use
    [`iter_windows()`][fev.Task.iter_windows] instead to get properly split train/test data.

    Parameters
    ----------
    storage_options : dict, optional
        Passed to `datasets.load_dataset()` for accessing remote datasets (e.g., S3 credentials).
    trust_remote_code : bool, optional
        Passed to `datasets.load_dataset()` for trusting remote code from Hugging Face Hub.
    num_proc : int, default DEFAULT_NUM_PROC
        Number of processes to use for dataset preprocessing.

    Returns
    -------
    datasets.Dataset
        The preprocessed dataset with all time series.
    """
    if self._full_dataset is None:
        self._full_dataset = self._load_dataset(
            storage_options=storage_options, trust_remote_code=trust_remote_code, num_proc=num_proc
        )
    return self._full_dataset

to_dict() -> dict

Convert task definition to a dictionary.

Source code in src/fev/task.py
476
477
478
def to_dict(self) -> dict:
    """Convert task definition to a dictionary."""
    return dataclasses.asdict(self)