Source code for pysad.transform.preprocessing.modified_stl_residual
import numpy as np
from pysad.core.base_transformer import BaseTransformer
from pysad.utils import Window
[docs]
class ModifiedSTLResidualTransformer(BaseTransformer):
"""Modified STL residual transformer used by S-ESD and S-H-ESD.
The transformer follows the residual construction in
:cite:`hochenbaum2017automatic`: estimate the seasonal component with STL,
replace STL's trend component with the median of the raw series, and return
``X - seasonal - median(X)``. It owns only the paper's residual step; use
:class:`pysad.models.SeasonalESD` or
:class:`pysad.models.SeasonalHybridESD` for the full detector.
Args:
period (int): Number of observations in one seasonal period.
window_size (int): Number of recent observations used for partial
transforms.
robust (bool): Whether to use robust STL fitting. (Default=True).
**stl_kwargs: Additional keyword arguments passed to
``statsmodels.tsa.seasonal.STL``.
"""
def __init__(self, period, window_size=None, robust=True, **stl_kwargs):
super().__init__(-1)
if period < 2:
raise ValueError("period must be greater than 1.")
if window_size is not None and window_size < 2 * period:
raise ValueError("window_size must be at least 2 * period.")
self.period = period
self.window_size = window_size
self.robust = robust
self.stl_kwargs = stl_kwargs
self.window = Window(window_size) if window_size is not None else None
def _normalize_input(self, X):
if isinstance(X, tuple):
X = X[0]
return X
def _as_univariate(self, X):
X = self._normalize_input(X)
X = np.asarray(X, dtype=np.float64)
if X.ndim == 2 and X.shape[1] == 1:
X = X.ravel()
elif X.ndim != 1:
raise ValueError("ModifiedSTLResidualTransformer supports univariate inputs.")
return X
[docs]
def transform_window(self, X):
"""Transforms a full window into modified STL residuals.
Args:
X (np.float64 array of shape (num_instances,) or
(num_instances, 1)): Window of univariate values.
Returns:
residual_X (np.float64 array of shape (num_instances,)): Modified STL residuals.
"""
from statsmodels.tsa.seasonal import STL
X = self._as_univariate(X)
if X.shape[0] < 2 * self.period:
raise ValueError("At least 2 * period observations are required for STL.")
seasonal = STL(
X,
period=self.period,
robust=self.robust,
**self.stl_kwargs
).fit().seasonal
return X - seasonal - np.median(X)
[docs]
def fit_partial(self, X):
"""Fits the next timestep by adding it to the residual window."""
if self.window is None:
raise ValueError("window_size is required for partial fitting.")
X = self._as_univariate(X)
if X.shape[0] != 1:
raise ValueError("Partial fitting requires a single univariate value.")
self.window.update(X[0])
return self
def _as_partial_value(self, X):
X = self._as_univariate(X)
if X.shape[0] != 1:
raise ValueError("Partial operations require a single univariate value.")
return X[0]
def _current_values(self):
return np.asarray(self.window.get(), dtype=np.float64)
def _candidate_values(self, X):
value = self._as_partial_value(X)
values = self.window.get() + [value]
values = values[-self.window_size:]
return np.asarray(values, dtype=np.float64)
def _latest_residual(self, values):
if values.shape[0] < 2 * self.period:
return 0.0
residuals = self.transform_window(values)
return residuals[-1]
[docs]
def transform_partial(self, X):
"""Returns the latest residual for the next candidate window."""
if self.window is None:
raise ValueError("window_size is required for partial transforms.")
return self._latest_residual(self._candidate_values(X))
[docs]
def fit_transform_partial(self, X):
"""Adds and transforms the next timestep without adding it twice."""
self.fit_partial(X)
return self._latest_residual(self._current_values())