import numpy as np
import pandas as pd
from merlion.utils import TimeSeries
from merlion.models.anomaly.isolation_forest import (
IsolationForest,
IsolationForestConfig,
)
from merlion.models.anomaly.vae import VAE, VAEConfig
from merlion.models.anomaly.windstats import WindStats, WindStatsConfig
from merlion.models.anomaly.autoencoder import AutoEncoder, AutoEncoderConfig
from merlion.models.anomaly.dagmm import DAGMM, DAGMMConfig
from merlion.models.anomaly.dbl import DynamicBaseline, DynamicBaselineConfig
from merlion.models.anomaly.deep_point_anomaly_detector import (
DeepPointAnomalyDetector,
DeepPointAnomalyDetectorConfig,
)
from merlion.models.anomaly.lstm_ed import LSTMED, LSTMEDConfig
from merlion.models.anomaly.random_cut_forest import (
RandomCutForest,
RandomCutForestConfig,
)
from merlion.models.anomaly.spectral_residual import (
SpectralResidual,
SpectralResidualConfig,
)
from merlion.models.anomaly.stat_threshold import StatThreshold, StatThresholdConfig
from merlion.models.anomaly.zms import ZMS, ZMSConfig
from merlion.models.anomaly.change_point.bocpd import BOCPD, BOCPDConfig
from merlion.models.anomaly.forecast_based.arima import (
ArimaDetector,
ArimaDetectorConfig,
)
from merlion.models.anomaly.forecast_based.sarima import (
SarimaDetector,
SarimaDetectorConfig,
)
from merlion.models.anomaly.forecast_based.ets import ETSDetector, ETSDetectorConfig
from merlion.models.anomaly.forecast_based.mses import MSESDetector, MSESDetectorConfig
from sklearn.preprocessing import StandardScaler
[docs]
class MerlionModelAdapter:
"""
Merlion model adapter class, used to adapt models in the Merlion framework to meet the requirements of prediction strategies.
"""
def __init__(
self,
model_name: str,
model_class: type,
config_class: type,
model_args: dict,
allow_label_on_train: bool,
):
"""
Initialize the Merlion model adapter object.
:param model_name: Model name.
:param model_class: Merlion model class.
:param config_class: Merlion configuration class.
:param model_args: Model initialization parameters.
:param allow_label_on_train: Whether to use labels during training.
"""
self.model = None
self.model_class = model_class
self.config_class = config_class
self.model_args = model_args
self.model_name = model_name
self.scaler = StandardScaler()
self.allow_label_on_train = allow_label_on_train
[docs]
def detect_fit(self, series: pd.DataFrame, label: pd.DataFrame) -> object:
"""
Fit a suitable Merlion model on time series data.
:param series: Time series data.
:param label: Label data.
:return: The fitted model object.
"""
config_obj = self.config_class(**self.model_args)
self.model = self.model_class(config_obj)
series = TimeSeries.from_pd(series)
label = TimeSeries.from_pd(label)
return self.model.train(series)
[docs]
def detect_score(self, train: pd.DataFrame) -> np.ndarray:
"""
Calculate anomaly scores using the adapted Merlion model.
:param train: Training data used to calculate scores.
:return: Anomaly score array.
"""
train = TimeSeries.from_pd(train)
fsct_result = self.model.get_anomaly_score(train)
fsct_result = (fsct_result.to_pd()).reindex((train.to_pd()).index, fill_value=0)
fsct_result = fsct_result.values.flatten()
return fsct_result, fsct_result
[docs]
def detect_label(self, train: pd.DataFrame) -> np.ndarray:
"""
Use the adapted Merlion model for anomaly detection and generate labels.
:param train: Training data used for anomaly detection.
:return: Anomaly label array.
"""
train = TimeSeries.from_pd(train)
fsct_result = self.model.get_anomaly_label(train)
fsct_result = (fsct_result.to_pd()).reindex((train.to_pd()).index, fill_value=0)
fsct_result = fsct_result.applymap(lambda x: 1 if x != 0 else 0)
fsct_result = fsct_result.values.flatten()
return fsct_result, fsct_result
def __repr__(self):
"""
Returns a string representation of the model name.
"""
return self.model_name
[docs]
def generate_model_factory(
model_name: str,
model_class: object,
config_class: object,
required_args: dict,
allow_label_on_train: bool,
) -> object:
"""
Generate model factory information for creating Merlion model adapters.
:param model_name: Model name.
:param model_class: Merlion model class.
:param config_class: Merlion configuration class.
:param required_args: Required parameters for model initialization.
:param allow_label_on_train: Whether to use labels during training.
:return: A dictionary containing the model factory and required parameters.
"""
def model_factory(**kwargs) -> object:
"""
Model factory, used to create Merlion model adapter objects.
:param kwargs: Model initialization parameters.
:return: Merlion model adapter object
"""
return MerlionModelAdapter(
model_name,
model_class,
config_class,
kwargs,
allow_label_on_train,
)
return {"model_factory": model_factory, "required_hyper_params": required_args}
MERLION_MODELS = [
(IsolationForest, IsolationForestConfig, {}),
(WindStats, WindStatsConfig, {}),
(VAE, VAEConfig, {}),
(AutoEncoder, AutoEncoderConfig, {}),
(DAGMM, DAGMMConfig, {}),
(DynamicBaseline, DynamicBaselineConfig, {}),
(DeepPointAnomalyDetector, DeepPointAnomalyDetectorConfig, {}),
(LSTMED, LSTMEDConfig, {}),
(RandomCutForest, RandomCutForestConfig, {}),
(SpectralResidual, SpectralResidualConfig, {}),
(StatThreshold, StatThresholdConfig, {}),
(ZMS, ZMSConfig, {}),
(BOCPD, BOCPDConfig, {}),
]
MERLION_STAT_MODELS = [ # The training set does not require labels
(ArimaDetector, ArimaDetectorConfig, {"max_forecast_steps": "max_forecast_steps"}),
(
SarimaDetector,
SarimaDetectorConfig,
{"max_forecast_steps": "max_forecast_steps"},
),
(ETSDetector, ETSDetectorConfig, {"max_forecast_steps": "max_forecast_steps"}),
(MSESDetector, MSESDetectorConfig, {"max_forecast_steps": "max_forecast_steps"}),
]
# Generate model factories for each model class, configuration class, and required parameters in MERLION-MODELS and add them to global variables
for model_class, config_class, required_args in MERLION_MODELS:
globals()[model_class.__name__] = generate_model_factory(
model_class.__name__,
model_class,
config_class,
required_args,
allow_label_on_train=True,
)
# The model name is dynamically pointed to our model
# Generate model factories for each model class, configuration class, and required parameters in MERLION-STAT-MODELS and add them to global variables
for model_class, config_class, required_args in MERLION_STAT_MODELS:
globals()[model_class.__name__] = generate_model_factory(
model_class.__name__,
model_class,
config_class,
required_args,
allow_label_on_train=False,
)