Source code for pysad.models.half_space_trees

import copy
import numpy as np
from pysad.core.base_model import BaseModel


[docs]class HalfSpaceTrees(BaseModel): """Half-Space Trees method :cite:`tan2011fast`. Args: feature_mins (np.float64 array of shape (num_features,)): Minimum boundary of the features. feature_maxes (np.float64 array of shape (num_features,)): Maximum boundary of the features. window_size (int): The size of the window (Default=100). num_trees (int): The number of treesint (Default=25). max_depth (int): Maximum depth of the trees (Default=15). initial_window_X (np.float64 array of shape (num_initial_instances,num_features)): The initial window to fit for initial calibration period. If not `None`, we simply apply fit to these instances (Default=None). """ def __init__( self, feature_mins, feature_maxes, window_size=100, num_trees=25, max_depth=15, initial_window_X=None): self.window_size = window_size self.max_depth = max_depth self.num_trees = num_trees self.feature_maxes = feature_maxes self.feature_mins = feature_mins self.num_dimensions = len(self.feature_maxes) self.roots = [ self._build_single_hs_tree( copy.deepcopy( self.feature_mins), copy.deepcopy( self.feature_maxes), 0) for _ in range( self.num_trees)] self.is_first_window = True self.step = 0 if initial_window_X: self.fit(initial_window_X) def _build_single_hs_tree(self, mins, maxes, current_depth): if current_depth == self.max_depth: return self._Node( left=None, right=None, r_mass=0, l_mass=0, split_att=0, split_value=0.0, k=current_depth) q = np.random.randint(self.num_dimensions) p = (maxes[q] + mins[q]) / 2.0 temp = maxes[q] maxes[q] = p left = self._build_single_hs_tree( copy.deepcopy(mins), copy.deepcopy(maxes), current_depth + 1) maxes[q] = temp mins[q] = p right = self._build_single_hs_tree( copy.deepcopy(mins), copy.deepcopy(maxes), current_depth + 1) return self._Node( left=left, right=right, r_mass=0, l_mass=0, split_att=q, split_value=p, k=current_depth) def _update_mass(self, x, node, ref_window): if ref_window: node.r_mass += 1 node.l_mass += 1 # Does not exist in original since we want it to predict while building the first window else: node.l_mass += 1 if node.k < self.max_depth: target_node = node.right if x[node.split_att] > node.split_value else node.left self._update_mass(x, target_node, ref_window) def _update_model(self, node): if node is None: return self.is_first_window = False node.r_mass = node.l_mass node.l_mass = 0 self._update_model(node.left) self._update_model(node.right)
[docs] def fit_partial(self, X, y=None): """Fits the model to next instance. Args: X (np.float64 array of shape (num_features,)): The instance to fit. y (int): Ignored since the model is unsupervised (Default=None). Returns: object: Returns the self. """ self.step += 1 for root in self.roots: self._update_mass(X, root, self.is_first_window) if self.step % self.window_size == 0: for root in self.roots: self._update_model(root) return self
def _score_tree(self, X, node): if node is None: return 0.0 target_node = node.right if X[node.split_att] > node.split_value else node.left return node.r_mass * (2**node.k) + self._score_tree(X, target_node)
[docs] def score_partial(self, X): """Scores the anomalousness of the next instance. Args: X (np.float64 array of shape (num_features,)): The instance to score. Higher scores represent more anomalous instances whereas lower scores correspond to more normal instances. Returns: float: The anomalousness score of the input instance. """ s = 0.0 for root in self.roots: s += self._score_tree(X, root) return -s
class _Node: def __init__(self, left, right, r_mass, l_mass, split_att, split_value, k): self.left = left self.right = right self.r_mass = r_mass self.l_mass = l_mass self.split_att = split_att self.split_value = split_value self.k = k