Source code for pysad.models.integrations.reference_window_model

from .pyod_model import PYODModel
import numpy as np


[docs]class ReferenceWindowModel(PYODModel): """This PyOD model wrapper wraps the batch anomaly detectors. This wrapper keeps track of the reference window of size `window_length`. For every `sliding_size` instnaces, it resets the model by training new `model_cls` instance with the reference window. This implementation is based on the reference windowing described in :cite:`xstream`. Args: model_cls (class): The model class to be instantiated. window_size (int): The size of each window. sliding_size (int): The sliding length of the windows. initial_X (np.float64 array of shape (num_initial_instances, num_features)): Initial instances to fit. initial_y (np.int32 array of shape (num_initial_instances,)): Initial window's ground truth labels. Used if not None. Needs to be `None` for the unsupervised `model_cls` models. (Default=None). **kwargs (Keyword arguments): Keyword arguments that is passed to the `model_cls`. """ def __init__( self, model_cls, window_size, sliding_size, initial_window_X=None, initial_window_y=None, **kwargs): """ Args: model_cls: window_size: sliding_size: initial_window_X: initial_window_y: """ super().__init__(model_cls, **kwargs) self.window_size = window_size self.sliding_size = sliding_size self.cur_window_X = [] self.cur_window_y = [] self.reference_window_X = initial_window_X self.reference_window_y = initial_window_y if self.reference_window_X is not None: self._fit_model() self.initial_ref_window = True else: self.initial_ref_window = False
[docs] def fit_partial(self, X, y=None): """Fits the model to next instance. Args: X (np.float64 array of shape (num_features,)): The instance to fit. y (int): The label of the instance (Optional for unsupervised models, default=None). Returns: object: self. """ self.cur_window_X.append(X) if y is not None: self.cur_window_y.append(y) if not self.initial_ref_window and len( self.cur_window_X) < self.window_size: self.reference_window_X = self.cur_window_X self.reference_window_y = self.cur_window_y if y is not None else None self._fit_model() elif len(self.cur_window_X) % self.sliding_size == 0: self.reference_window_X = np.concatenate( [self.reference_window_X, self.cur_window_X], axis=0) self.reference_window_X = self.reference_window_X[max( 0, len(self.reference_window_X) - self.window_size):] if y is not None: self.reference_window_y = self.reference_window_y[max( 0, len(self.reference_window_y) - self.window_size):] self.reference_window_y = np.concatenate( [self.reference_window_y, self.cur_window_y], axis=0) self.cur_window = [] self._fit_model() return self
def _fit_model(self): self.reset_model() if self.reference_window_y is None: self.model.fit(self.reference_window_X) else: self.model.fit(self.reference_window_X, self.reference_window_y)
[docs] def score_partial(self, X): """Scores the anomalousness of the next instance. Args: X (np.float64 array of shape (num_features,)): The instance to score. Higher scores represent more anomalous instances whereas lower scores correspond to more normal instances. Returns: float: The anomalousness score of the input instance. """ score = self.model.decision_function([X])[0] return score