Source code for ts_benchmark.evaluation.strategy.rolling_forecast

# -*- coding: utf-8 -*-
import itertools
import time
from typing import List, Optional, Tuple

import numpy as np
import pandas as pd
from numpy.lib.stride_tricks import sliding_window_view

from ts_benchmark.evaluation.metrics import regression_metrics
from ts_benchmark.evaluation.strategy.constants import FieldNames
from ts_benchmark.evaluation.strategy.forecasting import ForecastingStrategy
from ts_benchmark.models import ModelFactory
from ts_benchmark.models.model_base import BatchMaker, ModelBase
from ts_benchmark.utils.data_processing import split_before


[docs] class RollingForecastEvalBatchMaker: def __init__( self, series: pd.DataFrame, index_list: List[int], ): self.series = series self.index_list = index_list self.current_sample_count = 0
[docs] def make_batch_predict(self, batch_size: int, win_size: int) -> dict: """ Return a batch of data with index and column to be used for batch prediction. :param batch_size: The size of batch. :param win_size: The length of data used for prediction. :return: a batch of data and its time stamps. """ index_list = self.index_list[ self.current_sample_count : self.current_sample_count + batch_size ] series = self.series.values windows = sliding_window_view(series, window_shape=(win_size, series.shape[1])) predict_batch = windows[np.array(index_list) - win_size] predict_batch = np.squeeze(predict_batch, axis=1) indexes = self.series.index windows_time_stamps = sliding_window_view(indexes, window_shape=win_size) time_stamps_batch = windows_time_stamps[np.array(index_list) - win_size] self.current_sample_count += len(index_list) return {"input": predict_batch, "time_stamps": time_stamps_batch}
[docs] def make_batch_eval(self, horizon: int) -> dict: """ Return all data to be used for batch evaluation. :param horizon: The size of horizon. :return: All data to be used for batch evaluation. """ series = self.series.values horizons = sliding_window_view(series, window_shape=(horizon, series.shape[1])) test_batch = horizons[np.array(self.index_list)] return { "target": np.squeeze(test_batch, axis=1), }
[docs] def has_more_batches(self) -> bool: """ Check if there are more batches to process. :return: True if there are more batches, False otherwise. """ return self.current_sample_count < len(self.index_list)
[docs] class RollingForecastPredictBatchMaker(BatchMaker): def __init__(self, batch_maker: RollingForecastEvalBatchMaker): self._batch_maker = batch_maker
[docs] def make_batch(self, batch_size: int, win_size: int) -> dict: """ Return a batch of data to be used for batch prediction. :param batch_size: The size of batch. :param win_size: The length of data used for prediction. :return: A batch of data. """ return self._batch_maker.make_batch_predict(batch_size, win_size)
[docs] def has_more_batches(self) -> bool: """ Check if there are more batches to process. :return: True if there are more batches, False otherwise. """ return self._batch_maker.has_more_batches()
[docs] class RollingForecast(ForecastingStrategy): """ Rolling forecast strategy class This strategy defines a forecasting task that fits once on the training set and forecasts on the testing set in a rolling window style. The required strategy configs include: - horizon (int): The length of each prediction; - tv_ratio (float): The ratio of the train-validation series when performing train-test split; - train_ratio_in_tv (float): The ratio of the training series when performing train-validation split; - stride (int): Rolling stride, i.e. the interval between two windows; - num_rollings (int): The maximum number of steps to forecast; The accepted metrics include all regression metrics. The return fields other than the specified metrics are (in order): - FieldNames.FILE_NAME: The name of the series; - FieldNames.FIT_TIME: The training time; - FieldNames.INFERENCE_TIME: The inference time; - FieldNames.ACTUAL_DATA: The true test data, encoded as a string. - FieldNames.INFERENCE_DATA: The predicted data, encoded as a string. - FieldNames.LOG_INFO: Any log returned by the evaluator. """ REQUIRED_CONFIGS = [ "horizon", "tv_ratio", "train_ratio_in_tv", "stride", "num_rollings", ] @staticmethod def _get_index( train_length: int, test_length: int, horizon: int, stride: int ) -> List[int]: """ Get the index list of the rolling windows. :param train_length: Training data length. :param test_length: Test data length. :param horizon: Prediction length. :param stride: Rolling stride. :return: Index list of the rolling windows. """ data_len = train_length + test_length index_list = list(range(train_length, data_len - horizon + 1, stride)) + ( [data_len - horizon] if (test_length - horizon) % stride != 0 else [] ) return index_list def _get_split_lens( self, series: pd.DataFrame, meta_info: Optional[pd.Series], tv_ratio: float, ) -> Tuple[int, int]: """ Gets the size of the train-validation series and the test series :param series: Target series. :param meta_info: Meta-information of the target series. :param tv_ratio: The ratio of the train-validation series when performing train-test split; :return: The length of the train-validation series, and the length of the test series. """ data_len = int(self._get_meta_info(meta_info, "length", len(series))) train_length = int(tv_ratio * data_len) test_length = data_len - train_length if train_length <= 0 or test_length <= 0: raise ValueError( "The length of training or testing data is less than or equal to 0" ) return train_length, test_length def _execute( self, series: pd.DataFrame, meta_info: Optional[pd.Series], model_factory: ModelFactory, series_name: str, ) -> List: """ The entry function of execution pipeline of forecasting tasks :param series: Target series to evaluate. :param meta_info: The corresponding meta-info. :param model_factory: The factory to create models. :param series_name: the name of the target series. :return: The evaluation results. """ model = model_factory() if model.batch_forecast.__annotations__.get("not_implemented_batch"): return self._eval_sample(series, meta_info, model, series_name) else: return self._eval_batch(series, meta_info, model, series_name) def _eval_sample( self, series: pd.DataFrame, meta_info: Optional[pd.Series], model: ModelBase, series_name: str, ) -> List: """ The sample execution pipeline of forecasting tasks. :param series: Target series to evaluate. :param meta_info: The corresponding meta-info. :param model: The model used for prediction. :param series_name: the name of the target series. :return: The evaluation results. """ stride = self._get_scalar_config_value("stride", series_name) horizon = self._get_scalar_config_value("horizon", series_name) num_rollings = self._get_scalar_config_value("num_rollings", series_name) train_ratio_in_tv = self._get_scalar_config_value( "train_ratio_in_tv", series_name ) tv_ratio = self._get_scalar_config_value("tv_ratio", series_name) train_length, test_length = self._get_split_lens(series, meta_info, tv_ratio) train_valid_data, test_data = split_before(series, train_length) start_fit_time = time.time() fit_method = model.forecast_fit if hasattr(model, "forecast_fit") else model.fit fit_method(train_valid_data, train_ratio_in_tv=train_ratio_in_tv) end_fit_time = time.time() eval_scaler = self._get_eval_scaler(train_valid_data, train_ratio_in_tv) index_list = self._get_index(train_length, test_length, horizon, stride) total_inference_time = 0 all_test_results = [] all_rolling_actual = [] all_rolling_predict = [] for i, index in itertools.islice(enumerate(index_list), num_rollings): train, rest = split_before(series, index) test, _ = split_before(rest, horizon) start_inference_time = time.time() predict = model.forecast(horizon, train) end_inference_time = time.time() total_inference_time += end_inference_time - start_inference_time single_series_result = self.evaluator.evaluate( test.to_numpy(), predict, eval_scaler, train_valid_data.values ) inference_data = pd.DataFrame( predict, columns=test.columns, index=test.index ) all_rolling_actual.append(test) all_rolling_predict.append(inference_data) all_test_results.append(single_series_result) average_inference_time = float(total_inference_time) / min( len(index_list), num_rollings ) single_series_results = np.mean(np.stack(all_test_results), axis=0).tolist() # we do not save rolling results by default because it is often too large single_series_results += [ series_name, end_fit_time - start_fit_time, average_inference_time, np.nan, np.nan, "", ] return single_series_results def _eval_batch( self, series: pd.DataFrame, meta_info: Optional[pd.Series], model: ModelBase, series_name: str, ) -> List: """ The batch execution pipeline of forecasting tasks. :param series: Target series to evaluate. :param meta_info: The corresponding meta-info. :param model: The model used for prediction. :param series_name: The name of the target series. :return: The evaluation results. """ stride = self._get_scalar_config_value("stride", series_name) horizon = self._get_scalar_config_value("horizon", series_name) num_rollings = self._get_scalar_config_value("num_rollings", series_name) train_ratio_in_tv = self._get_scalar_config_value( "train_ratio_in_tv", series_name ) tv_ratio = self._get_scalar_config_value("tv_ratio", series_name) train_length, test_length = self._get_split_lens(series, meta_info, tv_ratio) train_valid_data, test_data = split_before(series, train_length) start_fit_time = time.time() fit_method = model.forecast_fit if hasattr(model, "forecast_fit") else model.fit fit_method(train_valid_data, train_ratio_in_tv=train_ratio_in_tv) end_fit_time = time.time() eval_scaler = self._get_eval_scaler(train_valid_data, train_ratio_in_tv) index_list = self._get_index(train_length, test_length, horizon, stride) index_list = index_list[:num_rollings] batch_maker = RollingForecastEvalBatchMaker( series, index_list, ) all_predicts = [] total_inference_time = 0 predict_batch_maker = RollingForecastPredictBatchMaker(batch_maker) while predict_batch_maker.has_more_batches(): start_inference_time = time.time() predicts = model.batch_forecast(horizon, predict_batch_maker) end_inference_time = time.time() total_inference_time += end_inference_time - start_inference_time all_predicts.append(predicts) all_predicts = np.concatenate(all_predicts, axis=0) targets = batch_maker.make_batch_eval(horizon)["target"] if len(targets) != len(all_predicts): raise RuntimeError("Predictions' len don't equal targets' len!") all_test_results = [] for predicts, target in zip(all_predicts, targets): single_series_results = self.evaluator.evaluate( target, predicts, eval_scaler, train_valid_data.values, ) all_test_results.append(single_series_results) single_series_results = np.mean(np.stack(all_test_results), axis=0).tolist() average_inference_time = float(total_inference_time) / min( len(index_list), num_rollings ) single_series_results += [ series_name, end_fit_time - start_fit_time, average_inference_time, np.nan, np.nan, "", ] return single_series_results
[docs] @staticmethod def accepted_metrics() -> List[str]: return regression_metrics.__all__
@property def field_names(self) -> List[str]: return self.evaluator.metric_names + [ FieldNames.FILE_NAME, FieldNames.FIT_TIME, FieldNames.INFERENCE_TIME, FieldNames.ACTUAL_DATA, FieldNames.INFERENCE_DATA, FieldNames.LOG_INFO, ]