Source code for pysad.models.xstream

import numpy as np
from pysad.core.base_model import BaseModel
from pysad.transform.projection.streamhash_projector import StreamhashProjector
from pysad.utils import get_minmax_array


[docs]class xStream(BaseModel): """The xStream model for row-streaming data :cite:`xstream`. It first projects the data via streamhash projection. It then fits half space chains by reference windowing. It scores the instances using the window fitted to the reference window. Args: n_components (int): The number of components for streamhash projection (Default=100). n_chains (int): The number of half-space chains (Default=100). depth (int): The maximum depth for the chains (Default=25). window_size (int): The size (and the sliding length) of the reference window (Default=25). """ def __init__( self, num_components=100, n_chains=100, depth=25, window_size=25): self.streamhash = StreamhashProjector(num_components=num_components) deltamax = np.ones(num_components) * 0.5 deltamax[np.abs(deltamax) <= 0.0001] = 1.0 self.window_size = window_size self.hs_chains = _HSChains( deltamax=deltamax, n_chains=n_chains, depth=depth) self.step = 0 self.cur_window = [] self.ref_window = None
[docs] def fit_partial(self, X, y=None): """Fits the model to next instance. Args: X (np.float64 array of shape (num_features,)): The instance to fit. y (int): Ignored since the model is unsupervised (Default=None). Returns: object: Returns the self. """ self.step += 1 X = self.streamhash.fit_transform_partial(X) X = X.reshape(1, -1) self.cur_window.append(X) self.hs_chains.fit(X) if self.step % self.window_size == 0: self.ref_window = self.cur_window self.cur_window = [] deltamax = self._compute_deltamax() self.hs_chains.set_deltamax(deltamax) self.hs_chains.next_window() return self
[docs] def score_partial(self, X): """Scores the anomalousness of the next instance. Args: X (np.float64 array of shape (num_features,)): The instance to score. Higher scores represent more anomalous instances whereas lower scores correspond to more normal instances. Returns: score (float): The anomalousness score of the input instance. """ X = self.streamhash.fit_transform_partial(X) X = X.reshape(1, -1) score = self.hs_chains.score(X).flatten() return score
def _compute_deltamax(self): # mx = np.max(np.concatenate(self.ref_window, axis=0), axis=0) # mn = np.min(np.concatenate(self.ref_window, axis=0), axis=0) mn, mx = get_minmax_array(np.concatenate(self.ref_window, axis=0)) deltamax = (mx - mn) / 2.0 deltamax[np.abs(deltamax) <= 0.0001] = 1.0 return deltamax
class _Chain: def __init__(self, deltamax, depth): k = len(deltamax) self.depth = depth self.fs = [np.random.randint(0, k) for d in range(depth)] self.cmsketches = [{} for i in range(depth)] * depth self.cmsketches_cur = [{} for i in range(depth)] * depth self.deltamax = deltamax # feature ranges self.rand_arr = np.random.rand(k) self.shift = self.rand_arr * deltamax self.is_first_window = True def fit(self, X): prebins = np.zeros(X.shape, dtype=np.float64) depthcount = np.zeros(len(self.deltamax), dtype=np.int32) for depth in range(self.depth): f = self.fs[depth] depthcount[f] += 1 if depthcount[f] == 1: prebins[:, f] = (X[:, f] + self.shift[f]) / self.deltamax[f] else: prebins[:, f] = 2.0 * prebins[:, f] - \ self.shift[f] / self.deltamax[f] if self.is_first_window: cmsketch = self.cmsketches[depth] for prebin in prebins: l_index = tuple(np.floor(prebin).astype(np.int32)) if l_index not in cmsketch: cmsketch[l_index] = 0 cmsketch[l_index] += 1 self.cmsketches[depth] = cmsketch self.cmsketches_cur[depth] = cmsketch else: cmsketch = self.cmsketches_cur[depth] for prebin in prebins: l_index = tuple(np.floor(prebin).astype(np.int32)) if l_index not in cmsketch: cmsketch[l_index] = 0 cmsketch[l_index] += 1 self.cmsketches_cur[depth] = cmsketch return self def bincount(self, X): scores = np.zeros((X.shape[0], self.depth)) prebins = np.zeros(X.shape, dtype=np.float64) depthcount = np.zeros(len(self.deltamax), dtype=np.int32) for depth in range(self.depth): f = self.fs[depth] depthcount[f] += 1 if depthcount[f] == 1: prebins[:, f] = (X[:, f] + self.shift[f]) / self.deltamax[f] else: prebins[:, f] = 2.0 * prebins[:, f] - \ self.shift[f] / self.deltamax[f] cmsketch = self.cmsketches[depth] for i, prebin in enumerate(prebins): l_index = tuple(np.floor(prebin).astype(np.int32)) if l_index not in cmsketch: scores[i, depth] = 0.0 else: scores[i, depth] = cmsketch[l_index] return scores def score(self, X): # scale score logarithmically to avoid overflow: # score = min_d [ log2(bincount x 2^d) = log2(bincount) + d ] scores = self.bincount(X) depths = np.array([d for d in range(1, self.depth + 1)]) scores = np.log2(1.0 + scores) + depths # add 1 to avoid log(0) return -np.min(scores, axis=1) def next_window(self): self.is_first_window = False self.cmsketches = self.cmsketches_cur self.cmsketches_cur = [{} for _ in range(self.depth)] * self.depth class _HSChains: def __init__(self, deltamax, n_chains=100, depth=25): self.nchains = n_chains self.depth = depth self.chains = [] for i in range(self.nchains): c = _Chain(deltamax=deltamax, depth=self.depth) self.chains.append(c) def score(self, X): scores = np.zeros(X.shape[0]) for ch in self.chains: scores += ch.score(X) scores /= float(self.nchains) return scores def fit(self, X): for ch in self.chains: ch.fit(X) def next_window(self): for ch in self.chains: ch.next_window() def set_deltamax(self, deltamax): for ch in self.chains: ch.deltamax = deltamax ch.shift = ch.rand_arr * deltamax