Source code for dbscan.dbscan

import numpy as np
from .utils import euclidean_distance


[docs] class DBSCAN: """ Density-Based Spatial Clustering of Applications with Noise. """ def __init__(self, eps, min_samples): """ Args: eps (float): The maximum distance between two samples for one to be considered as in the neighborhood of the other. min_samples (int): The number of samples (or total weight) in a neighborhood for a point to be considered as a core point. """ self.eps = eps self.min_samples = min_samples self.labels_ = []
[docs] def fit(self, X): """ Perform DBSCAN clustering from vector array. Args: X (np.array): Input data (points). Returns: self """ n_samples = len(X) # Initialize all labels to -1 (Noise) by default self.labels_ = np.full(n_samples, -1) cluster_id = 0 visited = np.full(n_samples, False) for i in range(n_samples): if visited[i]: continue visited[i] = True neighbors = self._get_neighbors(X, i) if len(neighbors) < self.min_samples: # if not enough neighbors, label as Noise (-1). # Note: It might be revisited later and included in a cluster # if it's a border point of another cluster. self.labels_[i] = -1 else: # Found a core point -> Start a new cluster self._expand_cluster(X, i, neighbors, cluster_id, visited) cluster_id += 1 return self
def _get_neighbors(self, X, point_idx): """ Finds indices of neighbors within epsilon distance. Args: X (np.array): All data points. point_idx (int): Index of the point to find neighbors for. Returns: list: Indices of neighbors. """ neighbors = [] for i, point in enumerate(X): # Calculate distance using our utility function dist = euclidean_distance(X[point_idx], point) if dist <= self.eps: neighbors.append(i) return neighbors def _expand_cluster(self, X, point_idx, neighbors, cluster_id, visited): """ Recursively expands the cluster from a starting core point. Args: X (np.array): All data points. point_idx (int): Index of the seed point. neighbors (list): Neighbors of the seed point. cluster_id (int): ID of the current cluster. visited (np.array): Boolean array tracking visited points. """ # assign the seed point to the current cluster self.labels_[point_idx] = cluster_id # we use a while loop to iterate through neighbors. # since we might append to 'neighbors' list we act like a queue. i = 0 while i < len(neighbors): neighbor_point_idx = neighbors[i] if not visited[neighbor_point_idx]: visited[neighbor_point_idx] = True # find neighbors of this neighbor new_neighbors = self._get_neighbors(X, neighbor_point_idx) # if this neighbor is also a core point add its neighbors to the queue if len(new_neighbors) >= self.min_samples: neighbors = neighbors + new_neighbors # if the point was previously labeled as Noise (-1) or not labeled yet if self.labels_[neighbor_point_idx] == -1: self.labels_[neighbor_point_idx] = cluster_id # move to the next point in the queue i += 1