Source code for pysad.models.seasonal_esd

import numpy as np
from scipy.stats import t

from pysad.core.base_model import BaseModel
from pysad.transform.preprocessing import ModifiedSTLResidualTransformer
from pysad.utils import Window


[docs] class SeasonalESD(BaseModel): """Window-based Seasonal ESD model :cite:`hochenbaum2017automatic`. This is the paper's S-ESD method: compute the modified STL residual ``X - seasonal - median(X)`` over a fixed-size PySAD window, then apply generalized ESD with mean and standard deviation to the residuals. Args: period (int): Number of observations in one seasonal period. window_size (int): Number of recent observations used for STL and ESD. max_anomalies (int): Maximum number of anomalies tested by ESD. alpha (float): ESD significance level. (Default=0.05). robust (bool): Whether to use robust STL fitting. (Default=True). **stl_kwargs: Additional keyword arguments passed to STL. """ def __init__( self, period, window_size, max_anomalies, alpha=0.05, robust=True, **stl_kwargs): if max_anomalies < 1: raise ValueError("max_anomalies must be greater than 0.") if window_size < 2 * period: raise ValueError("window_size must be at least 2 * period.") if max_anomalies > int(window_size * 0.49): raise ValueError( "max_anomalies must be less than or equal to window_size * 0.49.") if not np.isfinite(alpha) or not (0 < alpha < 1): raise ValueError("alpha must be finite and between 0 and 1.") self.period = period self.window_size = window_size self.max_anomalies = max_anomalies self.alpha = alpha self.window = Window(window_size) self.residual_transformer = ModifiedSTLResidualTransformer( period=period, window_size=window_size, robust=robust, **stl_kwargs ) def _as_value(self, X): X = np.asarray(X, dtype=np.float64) if X.shape != (1,): raise ValueError("SeasonalESD supports univariate inputs.") return X[0] def _center_scale(self, values): return np.mean(values), np.std(values, ddof=1) def _candidate_window(self, X): value = self._as_value(X) values = self.window.get() + [value] values = values[-self.window_size:] return np.asarray(values, dtype=np.float64) def _current_window(self): return np.asarray(self.window.get(), dtype=np.float64) def _critical_value(self, n, i): p = 1.0 - self.alpha / (2.0 * (n - i + 1)) t_value = t.ppf(p, n - i - 1) return ((n - i) * t_value) / np.sqrt((n - i - 1 + t_value**2) * (n - i + 1)) def _esd(self, values): remaining_values = np.asarray(values, dtype=np.float64) remaining_indices = np.arange(remaining_values.shape[0]) candidates = [] for i in range(1, self.max_anomalies + 1): center, scale = self._center_scale(remaining_values) if scale <= 1e-10: break deviations = np.abs(remaining_values - center) / scale local_idx = int(np.argmax(deviations)) candidates.append(( remaining_indices[local_idx], deviations[local_idx], self._critical_value(values.shape[0], i) )) remaining_values = np.delete(remaining_values, local_idx) remaining_indices = np.delete(remaining_indices, local_idx) selected_count = 0 for i, (_, statistic, critical_value) in enumerate(candidates, start=1): if statistic > critical_value: selected_count = i return candidates[:selected_count]
[docs] def fit_partial(self, X, y=None): """Adds the next instance to the model window.""" self.window.update(self._as_value(X)) return self
def _score_window(self, values): if values.shape[0] < self.window_size: return 0.0 residuals = self.residual_transformer.transform_window(values) anomalies = self._esd(residuals) latest_idx = values.shape[0] - 1 for idx, statistic, _ in anomalies: if idx == latest_idx: return statistic return 0.0
[docs] def score_partial(self, X): """Scores whether the next instance is anomalous in a candidate window.""" return self._score_window(self._candidate_window(X))
[docs] def fit_score_partial(self, X, y=None): """Adds and scores the next instance without adding it twice.""" self.fit_partial(X, y) return self._score_window(self._current_window())
[docs] class SeasonalHybridESD(SeasonalESD): """Window-based Seasonal Hybrid ESD model :cite:`hochenbaum2017automatic`. This is the paper's S-H-ESD method: use the same modified STL residual as :class:`SeasonalESD`, then replace ESD's mean and standard deviation with median and MAD-based scale in the test statistic. """ def _center_scale(self, values): center = np.median(values) scale = 1.4826 * np.median(np.abs(values - center)) return center, scale