Source code for divik.cluster._two_step

import numpy as np
import pandas as pd
from sklearn.base import (
    BaseEstimator,
    ClusterMixin,
    clone,
)

from divik.core import Subsets, configurable

_DEFAULT = object()


def _get_first_attr(obj, prop_name_candidates, default=_DEFAULT):
    for prop_name in prop_name_candidates:
        try:
            return getattr(obj, prop_name)
        except AttributeError:
            pass  # purposeful silence
    if default is _DEFAULT:
        raise AttributeError(
            f"{prop_name_candidates} do not exist in {obj.__class__.__name__}"
        )
    return default


def _get_final_estimator(estimator):
    try:  # sklearn.Pipeline or similar
        return estimator[-1]
    except TypeError:
        return estimator


[docs]@configurable class TwoStep(BaseEstimator, ClusterMixin): """Perform a two-step clustering with a given clusterer Separates a dataset into ``n_subsets``, processes each of them separately and then combines the results. Works with centroid-based clustering methods, as it requires cluster representatives to combine the result. Parameters ---------- clusterer : Union[AutoKMeans, Pipeline, KMeans] A centroid-based estimator for the purpose of clustering. n_subsets : int, default 10 The number of subsets into which the original dataset should be separated random_state : int, default 42 Random state to use for seeding the random number generator. Examples -------- >>> from sklearn.datasets import make_blobs >>> from divik.cluster import KMeans, TwoStep >>> X, _ = make_blobs( ... n_samples=10_000, n_features=2, centers=3, random_state=42 ... ) >>> kmeans = KMeans(n_clusters=3) >>> ctr = TwoStep(kmeans).fit(X) """ def __init__(self, clusterer, n_subsets: int = 10, random_state: int = 42): self.clusterer = clusterer self.n_subsets = n_subsets self.random_state = random_state def _label_in_subsets(self, X): subsets = Subsets(n_splits=self.n_subsets, random_state=self.random_state) X_sct = subsets.scatter(X) labels_part = [clone(self.clusterer).fit_predict(X_) for X_ in X_sct] sct_groups = [l * 0 + i for i, l in enumerate(labels_part)] labels = subsets.combine(labels_part) groups = subsets.combine(sct_groups) cross_fold_labels = [f"g{g}_l{l}" for l, g in zip(labels, groups)] return cross_fold_labels
[docs] def fit(self, X, y=None): initial_labels = self._label_in_subsets(X) centroids = pd.DataFrame(X).groupby(initial_labels).mean() self.estimator_ = clone(self.clusterer) centroids_labels = self.estimator_.fit_predict(centroids) to_final = dict(zip(centroids.index, centroids_labels)) final_labels = np.array([to_final[l] for l in initial_labels]) self.labels_ = final_labels self.n_clusters_ = _get_first_attr( _get_final_estimator(self.estimator_), ["n_clusters", "n_clusters_"], ) return self
[docs] def predict(self, X, y=None): return self.estimator_.predict(X)
[docs] def fit_predict(self, X, y=None): return self.fit(X, y).labels_