Source code for pysad.models.robust_random_cut_forest

from pysad.core.base_model import BaseModel


[docs]class RobustRandomCutForest(BaseModel): """Robust Random Cut Forest model :cite:`guha2016robust`. The implementation uses `rrcf library <https://github.com/kLabUM/rrcf>`_ :cite:`bartos_2019_rrcf`. Args: num_trees (int): The number of trees. shingle_size (int): The shingle size (Default=4). tree_size (int): The tree size (Default=256). """ def __init__(self, num_trees=4, shingle_size=4, tree_size=256): from rrcf import rrcf self.tree_size = tree_size self.shingle_size = shingle_size self.num_trees = num_trees self.forest = [] for _ in range(self.num_trees): tree = rrcf.RCTree() self.forest.append(tree) self.index = 0
[docs] def fit_partial(self, X, y=None): """Fits the model to next instance. Args: X (np.float64 array of shape (num_features,)): The instance to fit. y (int): Ignored since the model is unsupervised (Default=None). Returns: object: Returns the self. """ for tree in self.forest: if len(tree.leaves) > self.tree_size: tree.forget_point(self.index - self.tree_size) tree.insert_point(X, index=self.index) self.index += 1 return self
[docs] def score_partial(self, X): """Scores the anomalousness of the next instance. Args: X (np.float64 array of shape (num_features,)): The instance to score. Higher scores represent more anomalous instances whereas lower scores correspond to more normal instances. Returns: float: The anomalousness score of the input instance. """ score = 0.0 for tree in self.forest: leaf = tree.find_duplicate(X) if leaf is None: tree.insert_point(X, index="test_point") score += 1.0 * tree.codisp("test_point") / self.num_trees tree.forget_point("test_point") else: score += 1.0 * tree.codisp(leaf) / self.num_trees return score