Source code for ts_benchmark.baselines.pathformer.pathformer
from typing import Optional, Tuple
import math
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from sklearn.preprocessing import StandardScaler
from torch import optim
from torch.optim import lr_scheduler
from ts_benchmark.baselines.pathformer.models.pathformer_model import PathformerModel
from ts_benchmark.baselines.utils import (
forecasting_data_provider,
train_val_split,
get_time_mark,
)
from ts_benchmark.utils.data_processing import split_before
from .utils.tools import EarlyStopping, adjust_learning_rate
from ...models.model_base import ModelBase, BatchMaker
DEFAULT_TRANSFORMER_BASED_HYPER_PARAMS = {
"k": 2,
"enc_in": 1,
"dec_in": 1,
"c_out": 1,
"e_layers": 1,
"d_layers": 1,
"d_model": 4,
"d_ff": 64,
"embed": "timeF",
"freq": "h",
"lradj": "TST",
"moving_avg": 25,
"num_kernels": 6,
"factor": 1,
"n_heads": 8,
"seg_len": 6,
"win_size": 2,
"activation": "gelu",
"output_attention": 0,
"patch_len": 16,
"stride": 8,
"dropout": 0.1,
"batch_size": 512,
"learning_rate": 0.0001,
"train_epochs": 30,
"num_workers": 0,
"loss": "MAE",
"itr": 1,
"distil": True,
"patience": 5,
"p_hidden_dims": [128, 128],
"p_hidden_layers": 2,
"mem_dim": 32,
"conv_kernel": [12, 16],
"individual": False,
"num_nodes": 21,
"layer_nums": 3,
"num_experts_list": [4, 4, 4],
"patch_size_list": [[56, 28, 12, 24], [42, 28, 16, 21], [56, 16, 28, 42]],
"revin": 1,
"drop": 0.1,
"pct_start": 0.4,
"residual_connection": 0,
"gpu": 0,
"seq_len": 336,
}
[docs]
class TransformerConfig:
def __init__(self, **kwargs):
for key, value in DEFAULT_TRANSFORMER_BASED_HYPER_PARAMS.items():
setattr(self, key, value)
for key, value in kwargs.items():
setattr(self, key, value)
@property
def pred_len(self):
return self.horizon
[docs]
class Pathformer(ModelBase):
def __init__(self, **kwargs):
super(Pathformer, self).__init__()
self.config = TransformerConfig(**kwargs)
self.scaler = StandardScaler()
self.seq_len = self.config.seq_len
self.win_size = self.config.seq_len
[docs]
@staticmethod
def required_hyper_params() -> dict:
"""
Return the hyperparameters required by model.
:return: An empty dictionary indicating that model does not require additional hyperparameters.
"""
return {
"seq_len": "input_chunk_length",
"horizon": "output_chunk_length",
"norm": "norm",
}
@property
def model_name(self):
return "pathformer"
[docs]
def multi_forecasting_hyper_param_tune(self, train_data: pd.DataFrame):
freq = pd.infer_freq(train_data.index)
if freq == None:
raise ValueError("Irregular time intervals")
elif freq[0].lower() not in ["m", "w", "b", "d", "h", "t", "s"]:
self.config.freq = "s"
else:
self.config.freq = freq[0].lower()
column_num = train_data.shape[1]
self.config.enc_in = column_num
self.config.dec_in = column_num
self.config.c_out = column_num
if self.model_name == "MICN":
setattr(self.config, "label_len", self.config.seq_len)
else:
setattr(self.config, "label_len", self.config.seq_len // 2)
[docs]
def single_forecasting_hyper_param_tune(self, train_data: pd.DataFrame):
freq = pd.infer_freq(train_data.index)
if freq == None:
raise ValueError("Irregular time intervals")
elif freq[0].lower() not in ["m", "w", "b", "d", "h", "t", "s"]:
self.config.freq = "s"
else:
self.config.freq = freq[0].lower()
column_num = train_data.shape[1]
self.config.enc_in = column_num
self.config.dec_in = column_num
self.config.c_out = column_num
setattr(self.config, "label_len", self.config.horizon)
[docs]
def detect_hyper_param_tune(self, train_data: pd.DataFrame):
freq = pd.infer_freq(train_data.index)
if freq == None:
raise ValueError("Irregular time intervals")
elif freq[0].lower() not in ["m", "w", "b", "d", "h", "t", "s"]:
self.config.freq = "s"
else:
self.config.freq = freq[0].lower()
column_num = train_data.shape[1]
self.config.enc_in = column_num
self.config.dec_in = column_num
self.config.c_out = column_num
self.config.label_len = 48
[docs]
def padding_data_for_forecast(self, test):
time_column_data = test.index
data_colums = test.columns
start = time_column_data[-1]
date = pd.date_range(
start=start, periods=self.config.horizon + 1, freq=self.config.freq.upper()
)
df = pd.DataFrame(columns=data_colums)
df.iloc[: self.config.horizon + 1, :] = 0
df["date"] = date
df = df.set_index("date")
new_df = df.iloc[1:]
test = pd.concat([test, new_df])
return test
def _padding_time_stamp_mark(
self, time_stamps_list: np.ndarray, padding_len: int
) -> np.ndarray:
"""
Padding time stamp mark for prediction.
:param time_stamps_list: A batch of time stamps.
:param padding_len: The len of time stamp need to be padded.
:return: The padded time stamp mark.
"""
padding_time_stamp = []
for time_stamps in time_stamps_list:
start = time_stamps[-1]
expand_time_stamp = pd.date_range(
start=start,
periods=padding_len + 1,
freq=self.config.freq.upper(),
)
padding_time_stamp.append(expand_time_stamp.to_numpy()[-padding_len:])
padding_time_stamp = np.stack(padding_time_stamp)
whole_time_stamp = np.concatenate(
(time_stamps_list, padding_time_stamp), axis=1
)
padding_mark = get_time_mark(whole_time_stamp, 1, self.config.freq)
return padding_mark
[docs]
def validate(self, valid_data_loader, criterion):
config = self.config
total_loss = []
self.model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
for input, target, input_mark, target_mark in valid_data_loader:
input, target, input_mark, target_mark = (
input.to(device),
target.to(device),
input_mark.to(device),
target_mark.to(device),
)
# decoder input
dec_input = torch.zeros_like(target[:, -config.horizon :, :]).float()
dec_input = (
torch.cat([target[:, : config.label_len, :], dec_input], dim=1)
.float()
.to(device)
)
output, balance_loss = self.model(input)
target = target[:, -config.horizon :, :]
output = output[:, -config.horizon :, :]
loss = criterion(output, target).detach().cpu().numpy()
total_loss.append(loss)
total_loss = np.mean(total_loss)
self.model.train()
return total_loss
[docs]
def forecast_fit(
self, train_valid_data: pd.DataFrame, train_ratio_in_tv: float
) -> "ModelBase":
"""
Train the model.
:param train_data: Time series data used for training.
:param train_ratio_in_tv: Represents the splitting ratio of the training set validation set. If it is equal to 1, it means that the validation set is not partitioned.
:return: The fitted model object.
"""
if train_valid_data.shape[1] == 1:
train_drop_last = False
self.single_forecasting_hyper_param_tune(train_valid_data)
else:
train_drop_last = True
self.multi_forecasting_hyper_param_tune(train_valid_data)
setattr(self.config, "task_name", "short_term_forecast")
self.model = PathformerModel(self.config)
print(
"----------------------------------------------------------",
self.model_name,
)
config = self.config
train_data, valid_data = train_val_split(
train_valid_data, train_ratio_in_tv, config.seq_len
)
self.scaler.fit(train_data.values)
if config.norm:
train_data = pd.DataFrame(
self.scaler.transform(train_data.values),
columns=train_data.columns,
index=train_data.index,
)
if train_ratio_in_tv != 1:
if config.norm:
valid_data = pd.DataFrame(
self.scaler.transform(valid_data.values),
columns=valid_data.columns,
index=valid_data.index,
)
valid_dataset, valid_data_loader = forecasting_data_provider(
valid_data,
config,
timeenc=1,
batch_size=config.batch_size,
shuffle=True,
drop_last=False,
)
train_dataset, train_data_loader = forecasting_data_provider(
train_data,
config,
timeenc=1,
batch_size=config.batch_size,
shuffle=True,
drop_last=train_drop_last,
)
# Define the loss function and optimizer
if config.loss == "MSE":
criterion = nn.MSELoss()
elif config.loss == "MAE":
criterion = nn.L1Loss()
else:
criterion = nn.HuberLoss(delta=0.5)
optimizer = optim.Adam(self.model.parameters(), lr=config.learning_rate)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.early_stopping = EarlyStopping(patience=config.patience)
self.model.to(device)
total_params = sum(
p.numel() for p in self.model.parameters() if p.requires_grad
)
print(f"Total trainable parameters: {total_params}")
train_steps = len(train_data_loader)
scheduler = lr_scheduler.OneCycleLR(
optimizer=optimizer,
steps_per_epoch=train_steps,
pct_start=config.pct_start,
epochs=config.train_epochs,
max_lr=config.learning_rate,
)
for epoch in range(config.train_epochs):
self.model.train()
for i, (input, target, input_mark, target_mark) in enumerate(
train_data_loader
):
optimizer.zero_grad()
input, target, input_mark, target_mark = (
input.to(device),
target.to(device),
input_mark.to(device),
target_mark.to(device),
)
# decoder input
dec_input = torch.zeros_like(target[:, -config.horizon :, :]).float()
dec_input = (
torch.cat([target[:, : config.label_len, :], dec_input], dim=1)
.float()
.to(device)
)
output, balance_loss = self.model(input)
target = target[:, -config.horizon :, :]
output = output[:, -config.horizon :, :]
loss = criterion(output, target)
loss.backward()
optimizer.step()
if config.lradj == "TST":
adjust_learning_rate(
optimizer, scheduler, epoch + 1, config, printout=False
)
scheduler.step()
if train_ratio_in_tv != 1:
valid_loss = self.validate(valid_data_loader, criterion)
self.early_stopping(valid_loss, self.model)
if self.early_stopping.early_stop:
break
if config.lradj != "TST":
adjust_learning_rate(optimizer, scheduler, epoch + 1, config)
[docs]
def forecast(self, horizon: int, train: pd.DataFrame) -> np.ndarray:
"""
Make predictions.
:param horizon: The predicted length.
:param testdata: Time series data used for prediction.
:return: An array of predicted results.
"""
if self.early_stopping.check_point is not None:
self.model.load_state_dict(self.early_stopping.check_point)
if self.config.norm:
train = pd.DataFrame(
self.scaler.transform(train.values),
columns=train.columns,
index=train.index,
)
if self.model is None:
raise ValueError("Model not trained. Call the fit() function first.")
config = self.config
train, test = split_before(train, len(train) - config.seq_len)
# Additional timestamp marks required to generate transformer class methods
test = self.padding_data_for_forecast(test)
test_data_set, test_data_loader = forecasting_data_provider(
test, config, timeenc=1, batch_size=1, shuffle=False, drop_last=False
)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model.to(device)
self.model.eval()
with torch.no_grad():
answer = None
while answer is None or answer.shape[0] < horizon:
for input, target, input_mark, target_mark in test_data_loader:
input, target, input_mark, target_mark = (
input.to(device),
target.to(device),
input_mark.to(device),
target_mark.to(device),
)
dec_input = torch.zeros_like(
target[:, -config.horizon :, :]
).float()
dec_input = (
torch.cat([target[:, : config.label_len, :], dec_input], dim=1)
.float()
.to(device)
)
output, balance_loss = self.model(input)
column_num = output.shape[-1]
temp = output.cpu().numpy().reshape(-1, column_num)[-config.horizon :]
if answer is None:
answer = temp
else:
answer = np.concatenate([answer, temp], axis=0)
if answer.shape[0] >= horizon:
if self.config.norm:
answer[-horizon:] = self.scaler.inverse_transform(
answer[-horizon:]
)
return answer[-horizon:]
output = output.cpu().numpy()[:, -config.horizon :, :]
for i in range(config.horizon):
test.iloc[i + config.seq_len] = output[0, i, :]
test = test.iloc[config.horizon :]
test = self.padding_data_for_forecast(test)
test_data_set, test_data_loader = forecasting_data_provider(
test,
config,
timeenc=1,
batch_size=1,
shuffle=False,
drop_last=False,
)
[docs]
def batch_forecast(
self, horizon: int, batch_maker: BatchMaker, **kwargs
) -> np.ndarray:
"""
Make predictions by batch.
:param horizon: The length of each prediction.
:param batch_maker: Make batch data used for prediction.
:return: An array of predicted results.
"""
if self.early_stopping.check_point is not None:
self.model.load_state_dict(self.early_stopping.check_point)
if self.model is None:
raise ValueError("Model not trained. Call the fit() function first.")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model.to(device)
self.model.eval()
input_data = batch_maker.make_batch(self.config.batch_size, self.config.seq_len)
input_np = input_data["input"]
if self.config.norm:
origin_shape = input_np.shape
flattened_data = input_np.reshape((-1, input_np.shape[-1]))
input_np = self.scaler.transform(flattened_data).reshape(origin_shape)
input_index = input_data["time_stamps"]
padding_len = (
math.ceil(horizon / self.config.horizon) + 1
) * self.config.horizon
all_mark = self._padding_time_stamp_mark(input_index, padding_len)
answers = self._perform_rolling_predictions(horizon, input_np, all_mark, device)
if self.config.norm:
flattened_data = answers.reshape((-1, answers.shape[-1]))
answers = self.scaler.inverse_transform(flattened_data).reshape(
answers.shape
)
return answers
def _perform_rolling_predictions(
self,
horizon: int,
input_np: np.ndarray,
all_mark: np.ndarray,
device: torch.device,
) -> list:
"""
Perform rolling predictions using the given input data and marks.
:param horizon: Length of predictions to be made.
:param input_np: Numpy array of input data.
:param all_mark: Numpy array of all marks (time stamps mark).
:param device: Device to run the model on.
:return: List of predicted results for each prediction batch.
"""
rolling_time = 0
input_np, target_np, input_mark_np, target_mark_np = self._get_rolling_data(
input_np, None, all_mark, rolling_time
)
with torch.no_grad():
answers = []
while not answers or sum(a.shape[1] for a in answers) < horizon:
input, dec_input, input_mark, target_mark = (
torch.tensor(input_np, dtype=torch.float32).to(device),
torch.tensor(target_np, dtype=torch.float32).to(device),
torch.tensor(input_mark_np, dtype=torch.float32).to(device),
torch.tensor(target_mark_np, dtype=torch.float32).to(device),
)
output, balance_loss = self.model(input)
column_num = output.shape[-1]
real_batch_size = output.shape[0]
answer = (
output.cpu()
.numpy()
.reshape(real_batch_size, -1, column_num)[
:, -self.config.horizon :, :
]
)
answers.append(answer)
if sum(a.shape[1] for a in answers) >= horizon:
break
rolling_time += 1
output = output.cpu().numpy()[:, -self.config.horizon :, :]
(
input_np,
target_np,
input_mark_np,
target_mark_np,
) = self._get_rolling_data(input_np, output, all_mark, rolling_time)
answers = np.concatenate(answers, axis=1)
return answers[:, -horizon:, :]
def _get_rolling_data(
self,
input_np: np.ndarray,
output: Optional[np.ndarray],
all_mark: np.ndarray,
rolling_time: int,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Prepare rolling data based on the current rolling time.
:param input_np: Current input data.
:param output: Output from the model prediction.
:param all_mark: Numpy array of all marks (time stamps mark).
:param rolling_time: Current rolling time step.
:return: Updated input data, target data, input marks, and target marks for rolling prediction.
"""
if rolling_time > 0:
input_np = np.concatenate((input_np, output), axis=1)
input_np = input_np[:, -self.config.seq_len :, :]
target_np = np.zeros(
(
input_np.shape[0],
self.config.label_len + self.config.horizon,
input_np.shape[2],
)
)
target_np[:, : self.config.label_len, :] = input_np[
:, -self.config.label_len :, :
]
advance_len = rolling_time * self.config.horizon
input_mark_np = all_mark[:, advance_len : self.config.seq_len + advance_len, :]
start = self.config.seq_len - self.config.label_len + advance_len
end = self.config.seq_len + self.config.horizon + advance_len
target_mark_np = all_mark[
:,
start:end,
:,
]
return input_np, target_np, input_mark_np, target_mark_np