Source code for oineus

from __future__ import absolute_import

__version__ = "0.9.31"

import typing
from concurrent.futures import ThreadPoolExecutor
import numpy as np
import scipy.sparse

from . import _oineus

from ._oineus import ConflictStrategy, DenoiseStrategy, VREdge, FiltrationKind
from ._oineus import DiagramPlaneDomain, FrechetMeanInit
from ._oineus import CombinatorialProdSimplex, CombinatorialSimplex,Simplex, ProdSimplex
from ._oineus import Decomposition, IndexDiagramPoint, DiagramPoint, Diagrams
from ._oineus import reduce
from ._oineus import DecompositionManipStats
from ._oineus import ReductionParams, ReductionTimings, KICRParams, KerImCokReduced, KerImCokReducedProd
from ._oineus import ColumnRepr
from ._oineus import IndicesValues, IndicesValuesProd, TopologyOptimizerProd
from ._oineus import TopologyOptimizerCube_1D, TopologyOptimizerCube_2D, TopologyOptimizerCube_3D
from ._oineus import compute_relative_diagrams, get_boundary_matrix, get_denoise_target, get_induced_matching
from ._oineus import get_nth_persistence, get_permutation_dtv, get_permutation
from ._oineus import bottleneck_distance as _bottleneck_distance_cpp
from ._oineus import wasserstein_distance as _wasserstein_distance_cpp
from ._oineus import init_frechet_mean_first_diagram as _init_frechet_mean_first_diagram_cpp
from ._oineus import init_frechet_mean_random_diagram as _init_frechet_mean_random_diagram_cpp
from ._oineus import init_frechet_mean_medoid_diagram as _init_frechet_mean_medoid_diagram_cpp
from ._oineus import init_frechet_mean_diagonal_grid as _init_frechet_mean_diagonal_grid_cpp
from ._oineus import frechet_mean as _frechet_mean_cpp
from ._oineus import GridDomain_1D, Grid_1D, CombinatorialCube_1D, Cube_1D
from ._oineus import GridDomain_2D, Grid_2D, CombinatorialCube_2D, Cube_2D
from ._oineus import GridDomain_3D, Grid_3D, CombinatorialCube_3D, Cube_3D


# Maps the fat cell-with-value type a user hands to Filtration(...) to the concrete C++
# filtration class that consumes it. The per-encoding internal filtrations are distinct C++
# types (cube vs simplex vs product), but the user sees one Filtration.
_FIL_CLASS_BY_CELL_TYPE = {
    _oineus.Simplex:      _oineus._Filtration,        # fat Simplex (VR / alpha / user-built)
    _oineus.ProdSimplex:  _oineus._ProdFiltration,    # product cells (mapping cylinders)
    _oineus.Cube_1D:      _oineus._CubeFiltration_1D,  # fat cubes (hand-built cubical complexes)
    _oineus.Cube_2D:      _oineus._CubeFiltration_2D,
    _oineus.Cube_3D:      _oineus._CubeFiltration_3D,
}

# Every concrete filtration C++ type, for isinstance(x, oineus.Filtration). Includes the
# factory-produced slim Freudenthal / bit-packed ones, which a user never constructs by hand
# but should still recognize as filtrations.
_ALL_FILTRATION_TYPES = (
    _oineus._Filtration, _oineus._ProdFiltration,
    _oineus._CubeFiltration_1D, _oineus._CubeFiltration_2D, _oineus._CubeFiltration_3D,
    _oineus._FreudenthalFiltration_1D, _oineus._FreudenthalFiltration_2D, _oineus._FreudenthalFiltration_3D,
    _oineus._PackedSimplexFiltration_64, _oineus._PackedSimplexFiltration_128,
)


class _FiltrationMeta(type):
    # isinstance(x, oineus.Filtration) is True for any concrete filtration the library builds,
    # even though Filtration() returns the concrete C++ object (not a _FiltrationMeta instance).
    def __instancecheck__(cls, obj):
        return isinstance(obj, _ALL_FILTRATION_TYPES)

    def __subclasscheck__(cls, sub):
        return issubclass(sub, _ALL_FILTRATION_TYPES)


[docs] class Filtration(metaclass=_FiltrationMeta): """A filtration: an ordered list of cells, each with a filtration value. Construct one from a list of fat cells with values, dispatching on the cell type:: oineus.Filtration([oineus.Simplex([0], 0.0), oineus.Simplex([0, 1], 1.0), ...]) # simplicial oineus.Filtration([oineus.Cube_2D(...), ...]) # cubical oineus.Filtration([oineus.ProdSimplex(...), ...]) # product cells For the common constructions use the factory functions instead, which build the cells for you (and pick an efficient internal cell encoding): vr_filtration / alpha_filtration for point clouds, freudenthal_filtration / cube_filtration for functions on grids. isinstance(x, oineus.Filtration) is True for any filtration the library produces, including the factory-built ones whose concrete C++ type is an internal detail. """ def __new__(cls, cells, *args, **kwargs): try: first = cells[0] except IndexError: # empty list -> the universal fat Simplex filtration (historical default; there is # no cell to dispatch on) return _oineus._Filtration(cells, *args, **kwargs) except TypeError: raise ValueError( "Filtration(cells): cells must be a list of fat cells with values " "(Simplex / Cube_1D/2D/3D / ProdSimplex). For point clouds use vr_filtration or " "alpha_filtration; for functions on grids use freudenthal_filtration or " "cube_filtration.") fil_cls = _FIL_CLASS_BY_CELL_TYPE.get(type(first)) if fil_cls is None: if isinstance(first, tuple): # (vertices, value) pairs -> the universal simplicial constructor, which builds # the Simplex cells itself (used by the diode alpha / Cech-Delaunay paths) return _oineus._Filtration(cells, *args, **kwargs) raise TypeError( f"Filtration(cells): unsupported cell type {type(first).__name__}; expected one " f"of {[t.__name__ for t in _FIL_CLASS_BY_CELL_TYPE]} or (vertices, value) tuples.") return fil_cls(cells, *args, **kwargs)
# Visualization helpers require matplotlib, an optional extra # (`pip install oineus[vis]`). When it is absent, the plot_* helpers and # style constants are simply unavailable; the rest of oineus works normally. try: from .vis import ( plot_diagram, plot_diagram_gradient, plot_matching, plot_chain, default_point_style, default_diagram_a_point_style, default_diagram_b_point_style, default_matching_edge_style, default_longest_edge_style, default_diagonal_style, default_diagonal_projection_a_style, default_diagonal_projection_b_style, default_inf_line_style, default_inf_point_style, default_diagram_gradient_style, default_density_style, default_grid_style, default_chain_vertex_style, default_chain_edge_style, default_chain_triangle_style, default_chain_tetrahedron_style, default_point_cloud_style, DEFAULT_POINT_STYLE, DEFAULT_DIAGRAM_A_POINT_STYLE, DEFAULT_DIAGRAM_B_POINT_STYLE, DEFAULT_MATCHING_EDGE_STYLE, DEFAULT_LONGEST_EDGE_STYLE, DEFAULT_DIAGONAL_STYLE, DEFAULT_DIAGONAL_PROJECTION_A_STYLE, DEFAULT_DIAGONAL_PROJECTION_B_STYLE, DEFAULT_INF_LINE_STYLE, DEFAULT_INF_POINT_STYLE, DEFAULT_DIAGRAM_GRADIENT_STYLE, DEFAULT_DENSITY_STYLE, DEFAULT_DENSITY_THRESHOLD, DEFAULT_GRID_STYLE, DEFAULT_MATCHING_EDGE_QUANTILE, DEFAULT_GRADIENT_TOP_K_ARROWS, DEFAULT_CHAIN_VERTEX_STYLE, DEFAULT_CHAIN_EDGE_STYLE, DEFAULT_CHAIN_TRIANGLE_STYLE, DEFAULT_CHAIN_TETRAHEDRON_STYLE, DEFAULT_POINT_CLOUD_STYLE, OKABE_ITO_BLUE, OKABE_ITO_VERMILLION, ) # Keep vis_utils as a backward-compat alias. from . import vis_utils # noqa: F401 except ImportError: pass from .matching import ( DiagramMatching, BottleneckMatching, InfKind, EssentialMatches, EssentialLongestEdges, LongestEdges, FiniteLongestEdge, EssentialLongestEdge, point_to_diagonal, wasserstein_matching, bottleneck_matching, ) from .sliced_wasserstein import ( sliced_wasserstein_distance, sliced_wasserstein_distance_diag_corrected, ) from ._dtype import REAL_DTYPE, as_real_numpy # from ._oineus import Z2_Column, Z2_Matrix try: import diode _HAS_DIODE = True except: _HAS_DIODE = False # Newer diode builds add the structured-array exporters (combinatorics/values as # NumPy arrays instead of one Python tuple per simplex). Probed once at import; # when False, the code falls back to the list-of-(vertices, value) API. _HAS_DIODE_ARRAYS = _HAS_DIODE and hasattr(diode, "fill_delaunay_arrays") \ and hasattr(diode, "fill_alpha_shapes_arrays") # Maps each filtration cell encoding to its C++ TopologyOptimizer instantiation. The # reduction core is cell-agnostic, but the optimizer is templated on the cell type, so # there is one bound class per encoding (universal Simplex, product, slim cube, slim # Freudenthal, bit-packed VR/alpha). Single source of truth: oineus.diff reuses it. _OPT_CLASS_BY_FIL_TYPE = { _oineus._Filtration: _oineus.TopologyOptimizer, _oineus._ProdFiltration: _oineus.TopologyOptimizerProd, _oineus._CubeFiltration_1D: _oineus.TopologyOptimizerCube_1D, _oineus._CubeFiltration_2D: _oineus.TopologyOptimizerCube_2D, _oineus._CubeFiltration_3D: _oineus.TopologyOptimizerCube_3D, _oineus._FreudenthalFiltration_1D: _oineus.TopologyOptimizerFreudenthal_1D, _oineus._FreudenthalFiltration_2D: _oineus.TopologyOptimizerFreudenthal_2D, _oineus._FreudenthalFiltration_3D: _oineus.TopologyOptimizerFreudenthal_3D, _oineus._PackedSimplexFiltration_64: _oineus.TopologyOptimizerPacked_64, _oineus._PackedSimplexFiltration_128: _oineus.TopologyOptimizerPacked_128, } # Maps each filtration cell encoding to its C++ KerImCokReduced (kernel/image/cokernel) # instantiation. kernel.h is cell-agnostic, so KICR is wired for every encoding; the two # fat classes keep their public names, the rest are hidden underscore names. Keyed by the # filtration type (type(K)) -- NOT by K[0], whose materialized fat cell would misdispatch # slim/packed filtrations into the fat ctor. _KICR_CLASS_BY_FIL_TYPE = { _oineus._Filtration: _oineus.KerImCokReduced, _oineus._ProdFiltration: _oineus.KerImCokReducedProd, _oineus._CubeFiltration_1D: _oineus._KerImCokReduced_Cube_1D, _oineus._CubeFiltration_2D: _oineus._KerImCokReduced_Cube_2D, _oineus._CubeFiltration_3D: _oineus._KerImCokReduced_Cube_3D, _oineus._FreudenthalFiltration_1D: _oineus._KerImCokReduced_Fr_1D, _oineus._FreudenthalFiltration_2D: _oineus._KerImCokReduced_Fr_2D, _oineus._FreudenthalFiltration_3D: _oineus._KerImCokReduced_Fr_3D, _oineus._PackedSimplexFiltration_64: _oineus._KerImCokReduced_Packed_64, _oineus._PackedSimplexFiltration_128: _oineus._KerImCokReduced_Packed_128, }
[docs] class TopologyOptimizer: """Topology optimizer for a filtration of any cell encoding. Dispatches on the filtration's cell type -- universal Simplex (VR / alpha / user), product, slim cube, slim Freudenthal, or bit-packed VR/alpha -- and returns the matching C++ optimizer instance directly, so its full native API (reduce_all, compute_diagram, simplify, match, singletons, combine_loss, ...) is available unchanged. Constructor keywords (with_crit_sets, dims_to_restore_elz, n_threads, u_strategy) are forwarded verbatim. oineus.diff.TopologyOptimizer is the differentiable-pipeline wrapper built on the same dispatch; use this bare class for direct (non-autograd) topology optimization. """ def __new__(cls, fil, *args, **kwargs): opt_cls = _OPT_CLASS_BY_FIL_TYPE.get(type(fil)) if opt_cls is None: raise TypeError( f"TopologyOptimizer: unsupported filtration type " f"{type(fil).__name__}; expected one of " f"{[t.__name__ for t in _OPT_CLASS_BY_FIL_TYPE]}" ) return opt_cls(fil, *args, **kwargs)
def _check_numpy_diagram_shape(dgm): """If ``dgm`` is a numpy array, assert shape (n, 2). Otherwise pass through.""" if isinstance(dgm, np.ndarray) and (dgm.ndim != 2 or dgm.shape[1] != 2): raise ValueError("Expected NumPy array with shape (n_points, 2)") return dgm def _normalize_frechet_weights(n_diagrams: int, weights): if n_diagrams == 0: return np.empty((0,), dtype=REAL_DTYPE) if weights is None: return np.full(n_diagrams, 1.0 / n_diagrams, dtype=REAL_DTYPE) arr = np.asarray(weights) if arr.ndim != 1: raise ValueError("weights must be a 1D array") if arr.shape[0] != n_diagrams: raise ValueError("weights must have same length as diagrams") if np.any(arr < 0.0): raise ValueError("weights must be nonnegative") total = float(np.sum(arr)) if total <= 0.0: raise ValueError("weights must sum to a positive value") return arr / total def _diagram_persistences(dgm: np.ndarray) -> np.ndarray: if dgm.size == 0: return np.empty((0,), dtype=dgm.dtype) pers = np.empty(dgm.shape[0], dtype=dgm.dtype) finite_mask = np.isfinite(dgm[:, 0]) & np.isfinite(dgm[:, 1]) pers[finite_mask] = np.abs(dgm[finite_mask, 1] - dgm[finite_mask, 0]) pers[~finite_mask] = np.inf return pers def _threshold_diagram_by_persistence(dgm: np.ndarray, min_persistence: float, *, include_infinite_points: bool = True): if dgm.size == 0: return dgm.reshape((0, 2)) pers = _diagram_persistences(dgm) finite_mask = np.isfinite(pers) keep_mask = np.zeros(dgm.shape[0], dtype=bool) keep_mask[finite_mask] = pers[finite_mask] >= min_persistence if include_infinite_points: keep_mask |= ~finite_mask return np.ascontiguousarray(dgm[keep_mask]) def _newly_active_diagram_points(dgm: np.ndarray, previous_threshold: float, current_threshold: float): if dgm.size == 0: return dgm.reshape((0, 2)) pers = _diagram_persistences(dgm) finite_mask = np.isfinite(pers) keep_mask = finite_mask & (pers >= current_threshold) & (pers < previous_threshold) return np.ascontiguousarray(dgm[keep_mask]) def _diagrams_to_numpy_list(diagrams): """Convert each diagram in the sequence to a Real-dtype ``(n, 2)`` numpy array. Used by helpers that manipulate diagram points in Python (thresholding, persistence scheduling, pairwise distances, ...). For direct pass-through to the C++ Hera bindings, prefer :func:`as_real_numpy` instead — nanobind picks the correct overload based on input type. """ result = [] for dgm in diagrams: if isinstance(dgm, np.ndarray): result.append(as_real_numpy(_check_numpy_diagram_shape(dgm))) elif hasattr(dgm, "in_dimension"): # oineus.Diagrams if len(dgm) != 1: raise ValueError( "Cannot convert multi-dimensional oineus.Diagrams: specify dim=... " "or extract .in_dimension(d) before calling" ) result.append(dgm.in_dimension(0, as_numpy=True)) else: # list of DiagramPoint or similar if len(dgm) == 0: result.append(np.empty((0, 2), dtype=REAL_DTYPE)) else: result.append( np.array([[p[0], p[1]] for p in dgm], dtype=REAL_DTYPE).reshape((-1, 2)) ) return result def _resolve_multistart_seed(diagrams, seed, *, weights, domain, random_noise_scale, random_seed, grid_n_x_bins, grid_n_y_bins, wasserstein_delta, internal_p, n_threads: int = 1): if not isinstance(seed, str): return as_real_numpy(_check_numpy_diagram_shape(seed)) if seed == "first": return init_frechet_mean_first_diagram(diagrams) if seed == "medoid": return init_frechet_mean_medoid_diagram(diagrams, weights=weights, n_threads=n_threads) if seed == "grid": return init_frechet_mean_diagonal_grid( diagrams, weights=weights, domain=domain, grid_n_x_bins=grid_n_x_bins, grid_n_y_bins=grid_n_y_bins, ) if seed == "random": return init_frechet_mean_random_diagram( diagrams, domain=domain, random_noise_scale=random_noise_scale, random_seed=random_seed, ) # Need numpy arrays for .copy() in the "farthest_from_medoid"/"second_medoid" paths. diagrams = _diagrams_to_numpy_list(diagrams) normalized_weights = _normalize_frechet_weights(len(diagrams), weights) n = len(diagrams) d2 = np.zeros((n, n), dtype=REAL_DTYPE) pairs = [(i, j) for i in range(n) for j in range(i + 1, n)] def pair_dist(ij): i, j = ij return wasserstein_distance( diagrams[i], diagrams[j], q=2.0, delta=wasserstein_delta, internal_p=internal_p, ) if n_threads <= 1 or len(pairs) <= 1: dists = [pair_dist(p) for p in pairs] else: with ThreadPoolExecutor(max_workers=n_threads) as executor: dists = list(executor.map(pair_dist, pairs)) for (i, j), dist in zip(pairs, dists): d2[i, j] = dist * dist d2[j, i] = d2[i, j] medoid_idx = int(np.argmin(d2 @ normalized_weights)) if seed == "farthest_from_medoid": return diagrams[int(np.argmax(d2[:, medoid_idx]))].copy() if seed == "second_medoid": provisional = int(np.argmax(d2[:, medoid_idx])) c2 = np.where(d2[:, provisional] < d2[:, medoid_idx])[0] if c2.size == 0: return diagrams[provisional].copy() restricted_scores = np.array([ np.sum(normalized_weights[c2] * d2[idx, c2]) for idx in c2 ]) return diagrams[int(c2[int(np.argmin(restricted_scores))])].copy() raise ValueError(f"Unknown Fréchet-mean seed '{seed}'") def _prepare_distance_args(dgm_1, dgm_2): """Coerce two single-dimension diagram inputs for the C++ distance entry. Both inputs must be a numpy ``(n, 2)`` array or a ``list[DiagramPoint]``. A multi-dimensional ``oineus.Diagrams`` is rejected — the caller must extract a single dimension first via ``dgm.in_dimension(d)``. """ def coerce(dgm): if isinstance(dgm, Diagrams): raise TypeError( "Pass a single-dimension diagram: use `dgm.in_dimension(d)` " "instead of passing the multi-dimensional Diagrams object.") return as_real_numpy(_check_numpy_diagram_shape(dgm)) return (coerce(dgm_1), coerce(dgm_2))
[docs] def bottleneck_distance(dgm_1, dgm_2, delta: float=0.01): """Compute the bottleneck distance between two persistence diagrams. Args: dgm_1: Single-dimension persistence diagram: a NumPy array of shape ``(n_points, 2)`` or a ``list[DiagramPoint]``. To pass an Oineus ``Diagrams`` object, extract the dimension first via ``dgm.in_dimension(d)``. dgm_2: Same forms as ``dgm_1``. delta: Relative error requested from Hera. Set `delta=0.0` to request the exact bottleneck distance. Returns: The bottleneck distance as a Python float. """ return _bottleneck_distance_cpp(*_prepare_distance_args(dgm_1, dgm_2), delta=delta)
def _diagram_arrays_equal_for_zero_check(dgm_1, dgm_2): if isinstance(dgm_1, np.ndarray): arr_1 = dgm_1 else: arr_1 = np.array([[p[0], p[1]] for p in dgm_1], dtype=REAL_DTYPE).reshape((-1, 2)) if isinstance(dgm_2, np.ndarray): arr_2 = dgm_2 else: arr_2 = np.array([[p[0], p[1]] for p in dgm_2], dtype=REAL_DTYPE).reshape((-1, 2)) if arr_1.shape != arr_2.shape: return False if arr_1.size == 0: return True sort_idx_1 = np.lexsort((arr_1[:, 1], arr_1[:, 0])) sort_idx_2 = np.lexsort((arr_2[:, 1], arr_2[:, 0])) arr_1 = arr_1[sort_idx_1] arr_2 = arr_2[sort_idx_2] finite_mask = np.isfinite(arr_1) & np.isfinite(arr_2) matching_inf_mask = np.isinf(arr_1) & np.isinf(arr_2) & (np.signbit(arr_1) == np.signbit(arr_2)) if not np.all(finite_mask | matching_inf_mask): return False diff = np.zeros_like(arr_1) diff[finite_mask] = np.abs(arr_1[finite_mask] - arr_2[finite_mask]) return np.all(diff < np.finfo(arr_1.dtype).eps)
[docs] def wasserstein_distance(dgm_1, dgm_2, q: float=1.0, delta: float=0.01, internal_p: float=np.inf, wasserstein_q: typing.Optional[float]=None, check_for_zero: bool=True): """Compute the q-Wasserstein distance between two persistence diagrams. Args: dgm_1: Single-dimension persistence diagram: a NumPy array of shape ``(n_points, 2)`` or a ``list[DiagramPoint]``. To pass an Oineus ``Diagrams`` object, extract the dimension first via ``dgm.in_dimension(d)``. dgm_2: Same forms as ``dgm_1``. q: Wasserstein exponent. delta: Relative error requested from Hera. internal_p: Ground-metric norm in the plane. Use `np.inf` for the `L_infinity` norm. wasserstein_q: Alias for `q`, kept for API compatibility. check_for_zero: If `True`, skip Hera when the two inputs are numpy arrays of equal points. Returns: The Wasserstein distance as a Python float. """ if wasserstein_q is not None: q = wasserstein_q if np.isinf(internal_p): internal_p = -1.0 prepared = _prepare_distance_args(dgm_1, dgm_2) if check_for_zero and _diagram_arrays_equal_for_zero_check(*prepared): return 0.0 return _wasserstein_distance_cpp(*prepared, q=q, delta=delta, internal_p=internal_p)
[docs] def init_frechet_mean_first_diagram(diagrams): return _init_frechet_mean_first_diagram_cpp([as_real_numpy(d) for d in diagrams])
[docs] def init_frechet_mean_random_diagram(diagrams, *, domain=DiagramPlaneDomain.AboveDiagonal, random_noise_scale: float = 1.0, random_seed: int = 42): return _init_frechet_mean_random_diagram_cpp( [as_real_numpy(d) for d in diagrams], domain=domain, random_noise_scale=random_noise_scale, random_seed=random_seed, )
[docs] def init_frechet_mean_medoid_diagram(diagrams, *, weights=None, n_threads: int = 1): return _init_frechet_mean_medoid_diagram_cpp( [as_real_numpy(d) for d in diagrams], weights=weights, n_threads=n_threads )
[docs] def init_frechet_mean_diagonal_grid(diagrams, *, weights=None, domain=DiagramPlaneDomain.AboveDiagonal, grid_n_x_bins: int = 16, grid_n_y_bins: int = 16): return _init_frechet_mean_diagonal_grid_cpp( [as_real_numpy(d) for d in diagrams], weights=weights, domain=domain, grid_n_x_bins=grid_n_x_bins, grid_n_y_bins=grid_n_y_bins, )
[docs] def frechet_mean_objective(diagrams, barycenter, *, weights=None, wasserstein_delta: float = 0.01, internal_p: float = np.inf, n_threads: int = 1): normalized_weights = _normalize_frechet_weights(len(diagrams), weights) def term(i_diagram): i, diagram = i_diagram return normalized_weights[i] * wasserstein_distance( barycenter, diagram, q=2.0, delta=wasserstein_delta, internal_p=internal_p, ) ** 2 if n_threads <= 1 or len(diagrams) <= 1: return float(sum(term((i, d)) for i, d in enumerate(diagrams))) with ThreadPoolExecutor(max_workers=n_threads) as executor: terms = list(executor.map(term, enumerate(diagrams))) return float(sum(terms))
def make_frechet_mean_persistence_schedule(diagrams, *, initial_threshold_fraction: float = 0.5, max_active_growth: float = 0.10, min_persistence: float = 0.0): diagrams = _diagrams_to_numpy_list(diagrams) if initial_threshold_fraction <= 0.0: raise ValueError("initial_threshold_fraction must be positive") if max_active_growth < 0.0: raise ValueError("max_active_growth must be nonnegative") finite_persistences = [] for dgm in diagrams: pers = _diagram_persistences(dgm) finite_persistences.extend(pers[np.isfinite(pers)].tolist()) if not finite_persistences: return [float(min_persistence)] values = np.array(sorted(set(float(p) for p in finite_persistences if p >= min_persistence), reverse=True)) if values.size == 0: return [float(min_persistence)] start_target = max(float(min_persistence), float(initial_threshold_fraction) * float(values[0])) start_idx = int(np.where(values >= start_target)[0][-1]) schedule = [float(values[start_idx])] current_idx = start_idx counts = np.array([ sum(int(np.count_nonzero(np.isfinite(_diagram_persistences(dgm)) & (_diagram_persistences(dgm) >= thr))) for dgm in diagrams) for thr in values ], dtype=np.int64) while current_idx + 1 < values.size: current_count = max(int(counts[current_idx]), 1) max_count = int(np.floor((1.0 + max_active_growth) * current_count)) next_idx = current_idx + 1 valid = np.where(counts[current_idx + 1:] <= max_count)[0] if valid.size > 0: next_idx = current_idx + 1 + int(valid[-1]) schedule.append(float(values[next_idx])) current_idx = next_idx if schedule[-1] > float(min_persistence): schedule.append(float(min_persistence)) deduped = [] for threshold in schedule: if not deduped or threshold < deduped[-1]: deduped.append(threshold) return deduped def frechet_mean_newborn_points_from_newly_active(newly_active_diagrams, *, weights=None): newly_active_diagrams = _diagrams_to_numpy_list(newly_active_diagrams) normalized_weights = _normalize_frechet_weights(len(newly_active_diagrams), weights) new_points = [] for diagram_weight, dgm in zip(normalized_weights, newly_active_diagrams): if dgm.size == 0: continue finite_mask = np.isfinite(dgm[:, 0]) & np.isfinite(dgm[:, 1]) finite_points = dgm[finite_mask] if finite_points.size == 0: continue midpoints = 0.5 * (finite_points[:, 0] + finite_points[:, 1]) births = diagram_weight * finite_points[:, 0] + (1.0 - diagram_weight) * midpoints deaths = diagram_weight * finite_points[:, 1] + (1.0 - diagram_weight) * midpoints new_points.append(np.column_stack((births, deaths))) if not new_points: return np.empty((0, 2), dtype=REAL_DTYPE) return np.ascontiguousarray(np.vstack(new_points))
[docs] def frechet_mean_multistart(diagrams, *, weights=None, starts=("medoid", "second_medoid", "farthest_from_medoid"), return_details: bool = False, n_threads: int = 1, **kwargs): diagrams = [as_real_numpy(_check_numpy_diagram_shape(d)) for d in diagrams] normalized_weights = _normalize_frechet_weights(len(diagrams), weights) if not starts: raise ValueError("starts must be non-empty") results = [] for start_idx, start in enumerate(starts): if isinstance(start, dict): local_kwargs = dict(kwargs) local_kwargs.update(start) local_kwargs.pop("init_strategy", None) local_kwargs.pop("custom_initial_barycenter", None) seed = _resolve_multistart_seed( diagrams, local_kwargs.pop("seed", "medoid"), weights=normalized_weights, domain=local_kwargs.get("domain", DiagramPlaneDomain.AboveDiagonal), random_noise_scale=local_kwargs.get("random_noise_scale", 1.0), random_seed=local_kwargs.get("random_seed", 42 + start_idx), grid_n_x_bins=local_kwargs.get("grid_n_x_bins", 16), grid_n_y_bins=local_kwargs.get("grid_n_y_bins", 16), wasserstein_delta=local_kwargs.get("wasserstein_delta", 0.01), internal_p=local_kwargs.get("internal_p", np.inf), n_threads=n_threads, ) else: local_kwargs = dict(kwargs) local_kwargs.pop("init_strategy", None) local_kwargs.pop("custom_initial_barycenter", None) seed = _resolve_multistart_seed( diagrams, start, weights=normalized_weights, domain=local_kwargs.get("domain", DiagramPlaneDomain.AboveDiagonal), random_noise_scale=local_kwargs.get("random_noise_scale", 1.0), random_seed=local_kwargs.get("random_seed", 42 + start_idx), grid_n_x_bins=local_kwargs.get("grid_n_x_bins", 16), grid_n_y_bins=local_kwargs.get("grid_n_y_bins", 16), wasserstein_delta=local_kwargs.get("wasserstein_delta", 0.01), internal_p=local_kwargs.get("internal_p", np.inf), n_threads=n_threads, ) barycenter = frechet_mean( diagrams, weights=normalized_weights, init_strategy=FrechetMeanInit.Custom, custom_initial_barycenter=seed, n_threads=n_threads, **local_kwargs, ) objective = frechet_mean_objective( diagrams, barycenter, weights=normalized_weights, wasserstein_delta=local_kwargs.get("wasserstein_delta", 0.01), internal_p=local_kwargs.get("internal_p", np.inf), n_threads=n_threads, ) results.append({"start": start, "barycenter": barycenter, "objective": objective}) best = min(results, key=lambda item: item["objective"]) if return_details: return best["barycenter"], {"objective": best["objective"], "runs": results} return best["barycenter"]
[docs] def progressive_frechet_mean(diagrams, *, weights=None, thresholds=None, initial_threshold_fraction: float = 0.5, max_active_growth: float = 0.10, min_persistence: float = 0.0, initial_seed="medoid", support_update_predicate=None, support_update_fn=None, return_details: bool = False, n_threads: int = 1, **kwargs): diagrams = _diagrams_to_numpy_list(diagrams) normalized_weights = _normalize_frechet_weights(len(diagrams), weights) ignore_infinite_points = bool(kwargs.get("ignore_infinite_points", False)) if thresholds is None: thresholds = make_frechet_mean_persistence_schedule( diagrams, initial_threshold_fraction=initial_threshold_fraction, max_active_growth=max_active_growth, min_persistence=min_persistence, ) else: thresholds = [float(t) for t in thresholds] if not thresholds: raise ValueError("thresholds must be non-empty") local_kwargs = dict(kwargs) local_kwargs.pop("init_strategy", None) local_kwargs.pop("custom_initial_barycenter", None) barycenter = None history = [] previous_threshold = np.inf for stage_idx, threshold in enumerate(thresholds): active_diagrams = [ _threshold_diagram_by_persistence( dgm, threshold, include_infinite_points=not ignore_infinite_points, ) for dgm in diagrams ] if barycenter is None: seed = _resolve_multistart_seed( active_diagrams, initial_seed, weights=normalized_weights, domain=local_kwargs.get("domain", DiagramPlaneDomain.AboveDiagonal), random_noise_scale=local_kwargs.get("random_noise_scale", 1.0), random_seed=local_kwargs.get("random_seed", 42), grid_n_x_bins=local_kwargs.get("grid_n_x_bins", 16), grid_n_y_bins=local_kwargs.get("grid_n_y_bins", 16), wasserstein_delta=local_kwargs.get("wasserstein_delta", 0.01), internal_p=local_kwargs.get("internal_p", np.inf), n_threads=n_threads, ) else: seed = barycenter if support_update_predicate is not None and support_update_fn is not None: newly_active_diagrams = [ _newly_active_diagram_points(dgm, previous_threshold, threshold) for dgm in diagrams ] should_update = bool(support_update_predicate( stage_index=stage_idx, threshold=threshold, previous_threshold=previous_threshold, current_barycenter=seed, active_diagrams=active_diagrams, newly_active_diagrams=newly_active_diagrams, weights=normalized_weights, )) if should_update: new_points = support_update_fn( stage_index=stage_idx, threshold=threshold, previous_threshold=previous_threshold, current_barycenter=seed, active_diagrams=active_diagrams, newly_active_diagrams=newly_active_diagrams, weights=normalized_weights, ) if new_points is not None: new_points = as_real_numpy(_check_numpy_diagram_shape(new_points)) if new_points.size != 0: seed = np.ascontiguousarray(np.vstack([seed, new_points])) barycenter = frechet_mean( active_diagrams, weights=normalized_weights, init_strategy=FrechetMeanInit.Custom, custom_initial_barycenter=seed, n_threads=n_threads, **local_kwargs, ) objective = frechet_mean_objective( active_diagrams, barycenter, weights=normalized_weights, wasserstein_delta=local_kwargs.get("wasserstein_delta", 0.01), internal_p=local_kwargs.get("internal_p", np.inf), n_threads=n_threads, ) history.append({ "stage_index": stage_idx, "threshold": threshold, "n_active_points": int(sum(dgm.shape[0] for dgm in active_diagrams)), "barycenter": barycenter, "objective": objective, }) previous_threshold = threshold if return_details: return barycenter, {"thresholds": thresholds, "history": history} return barycenter
[docs] def progressive_frechet_mean_multistart(diagrams, *, weights=None, starts=("medoid", "second_medoid", "farthest_from_medoid"), return_details: bool = False, n_threads: int = 1, **kwargs): diagrams = [as_real_numpy(_check_numpy_diagram_shape(d)) for d in diagrams] normalized_weights = _normalize_frechet_weights(len(diagrams), weights) if not starts: raise ValueError("starts must be non-empty") results = [] for start_idx, start in enumerate(starts): local_kwargs = dict(kwargs) initial_seed = local_kwargs.pop("initial_seed", start) if isinstance(start, dict): local_kwargs.update(start) initial_seed = local_kwargs.pop("initial_seed", local_kwargs.pop("seed", "medoid")) barycenter, details = progressive_frechet_mean( diagrams, weights=normalized_weights, initial_seed=initial_seed, return_details=True, n_threads=n_threads, **local_kwargs, ) objective = frechet_mean_objective( diagrams, barycenter, weights=normalized_weights, wasserstein_delta=local_kwargs.get("wasserstein_delta", 0.01), internal_p=local_kwargs.get("internal_p", np.inf), n_threads=n_threads, ) results.append({ "start": start, "barycenter": barycenter, "objective": objective, "progressive_details": details, }) best = min(results, key=lambda item: item["objective"]) if return_details: return best["barycenter"], { "objective": best["objective"], "runs": results, "thresholds": best["progressive_details"]["thresholds"], "history": best["progressive_details"]["history"], } return best["barycenter"]
[docs] def frechet_mean(diagrams, *, weights=None, max_iter: int = 100, tol: float = 1e-7, wasserstein_delta: float = 0.01, internal_p: float = np.inf, init_strategy=FrechetMeanInit.Grid, domain=DiagramPlaneDomain.AboveDiagonal, ignore_infinite_points: bool = False, random_noise_scale: float = 1.0, random_seed: int = 42, grid_n_x_bins: int = 16, grid_n_y_bins: int = 16, custom_initial_barycenter=None, n_threads: int = 1): diagrams = [as_real_numpy(_check_numpy_diagram_shape(d)) for d in diagrams] if weights is not None: weights = np.asarray(weights) if weights.ndim != 1: raise ValueError("weights must be a 1D array") if weights.shape[0] != len(diagrams): raise ValueError("weights must have same length as diagrams") custom_initial_barycenter = ( None if custom_initial_barycenter is None else as_real_numpy(_check_numpy_diagram_shape(custom_initial_barycenter)) ) if np.isinf(internal_p): internal_p = -1.0 return _frechet_mean_cpp(diagrams, weights=weights, max_iter=max_iter, tol=tol, wasserstein_delta=wasserstein_delta, internal_p=internal_p, init_strategy=init_strategy, domain=domain, ignore_infinite_points=ignore_infinite_points, random_noise_scale=random_noise_scale, random_seed=random_seed, grid_n_x_bins=grid_n_x_bins, grid_n_y_bins=grid_n_y_bins, custom_initial_barycenter=custom_initial_barycenter, n_threads=n_threads)
[docs] def to_scipy_matrix(sparse_cols, shape=None): if shape is None: shape = (len(sparse_cols), len(sparse_cols)) row_ind = [j for i in range(len(sparse_cols)) for j in sparse_cols[i]] col_ind = [i for i in range(len(sparse_cols)) for _ in sparse_cols[i]] assert (len(row_ind) == len(col_ind)) data = [1 for _ in range(len(row_ind))] return scipy.sparse.csc_matrix((data, (row_ind, col_ind)), shape=shape)
[docs] def max_distance(data: np.ndarray, from_pwdists: bool=False): # 1.00001 is a small fudge factor so the returned bound is strictly # greater than every pairwise distance even after floating-point # rounding; callers use this as a max_diameter that must enclose all # edges if from_pwdists: return 1.00001 * np.min(np.max(data, axis=1)) else: if data.ndim != 2 or data.shape[0] < 2: raise ValueError("max_distance: data must be a 2D array with at least 2 rows") diff = data[:, np.newaxis, :] - data[np.newaxis, :, :] squared_distances = np.sum(diff**2, axis=2) return 1.00001 * np.sqrt(np.min(np.max(squared_distances, axis=1)))
_FR_GRID_CLASS_BY_NDIM = {1: Grid_1D, 2: Grid_2D, 3: Grid_3D}
[docs] def freudenthal_filtration(data: np.ndarray, negate: bool=False, wrap: bool=False, max_dim: int = 3, with_critical_vertices: bool=False, slim: bool=True, n_threads: int=1): max_dim = min(max_dim, data.ndim) # slim (the default) returns the compact (anchor,type) _FreudenthalFiltration_ND (one shared # FrGeometry, fat simplices materialized on access) for D=1,2,3 on non-wrap grids; it reduces, # produces diagrams, optimizes (oineus.TopologyOptimizer dispatches it) and supports KICR # identically to the fat path but with a far smaller boundary-build footprint. wrap grids and # D>=4 always fall back to the fat universal Filtration (FrGeometry rejects wrap; the slim # builder is bound only for D=1,2,3). Pass slim=False to force the fat path. use_slim = slim and (not wrap) and (1 <= data.ndim <= 3) if use_slim: grid = _FR_GRID_CLASS_BY_NDIM[data.ndim](data, wrap=wrap, values_on="vertices") if with_critical_vertices: fil, vertices = grid.freudenthal_filtration_and_critical_vertices_slim(max_dim=max_dim, negate=negate, n_threads=n_threads) vertices = np.array(vertices, dtype=np.int64) return fil, vertices return grid.freudenthal_filtration_slim(max_dim=max_dim, negate=negate, n_threads=n_threads) if with_critical_vertices: fil, vertices = _oineus.get_freudenthal_filtration_and_crit_vertices(data=data, negate=negate, wrap=wrap, max_dim=max_dim, n_threads=n_threads) vertices = np.array(vertices, dtype=np.int64) return fil, vertices return _oineus.get_freudenthal_filtration(data=data, negate=negate, wrap=wrap, max_dim=max_dim, n_threads=n_threads)
def _vr_packed_word_suffix(n_points, max_dim): # Smallest packed word that holds a (max_dim)-simplex over n_points vertices: # bits = ceil(log2(n_points)) per field (== C++ packed_vertex_bits), (max_dim+1) # fields. Returns "64", "128", or None (too wide -> fat fallback). bits = _packed_bits(n_points) width = (int(max_dim) + 1) * bits if width <= 64: return "64" if width <= 128: return "128" return None def _packed_bits(n_points): # bits per packed vertex field == C++ oin::packed_vertex_bits(n_points); # passed to the packed array builders so they skip a full max-id scan. return max(1, (int(n_points) - 1).bit_length())
[docs] def vr_filtration(data: np.ndarray, from_pwdists: bool = False, max_dim: int = -1, max_diameter: float = -1.0, with_critical_edges: bool = False, packed: bool = True, n_threads: int = 1): """Build a Vietoris-Rips filtration from points or pairwise distances. Construction uses the in-order generation (VRE) algorithm of Vejdemo-Johansson, Matuszewski & Bauer ("In-order generation of Vietoris-Rips Complexes", arXiv:2411.05495). Parameters ---------- data : np.ndarray (n, d) array of points, or (n, n) pairwise distance matrix. from_pwdists : bool Treat ``data`` as a pairwise distance matrix. max_dim : int Largest simplex dimension to enumerate. Default: data dimensionality. max_diameter : float Truncation threshold; only simplices with diameter <= this value are kept. Default: enclosing radius of the point cloud. with_critical_edges : bool Also return an array (one per simplex) of an edge whose length equals the simplex's filtration value. packed : bool Use the compact bit-packed cell encoding (the default) when the vertex ids fit a 64- or 128-bit word; falls back to the fat encoding otherwise. Pass packed=False to force the fat universal Simplex filtration. n_threads : int Threads used for the (parallel) sort inside the Filtration ctor. Enumeration itself is single-threaded. """ if data.ndim != 2: raise ValueError("data must be a 2D array") if from_pwdists and data.shape[0] != data.shape[1]: raise ValueError("from_pwdists=True requires a square pairwise-distance matrix") if max_diameter < 0: max_diameter = max_distance(data, from_pwdists) if max_dim < 0: if from_pwdists: raise RuntimeError("vr_filtration: if input is pairwise distance matrix, max_dim must be specified") else: max_dim = data.shape[1] # packed (the default) returns a bit-packed _PackedSimplexFiltration_64/128 (compact cells, # one shared PackedGeom) when the vertex ids fit a 64- or 128-bit word; if they do not fit # (very large/high-dim complex) it transparently falls back to the fat path. It reduces, # produces diagrams, optimizes (oineus.TopologyOptimizer dispatches it), supports KICR and # the uid-keyed accessors (via the combinatorial-uid translation) identically to fat but with # a smaller footprint. Pass packed=False to force the fat universal Simplex filtration. suffix = _vr_packed_word_suffix(data.shape[0], max_dim) if packed else None if from_pwdists: if suffix is not None: base = "get_vr_filtration_and_critical_edges_packed" if with_critical_edges else "get_vr_filtration_packed" func = getattr(_oineus, base + suffix + "_from_pwdists") elif with_critical_edges: func = _oineus.get_vr_filtration_and_critical_edges_from_pwdists else: func = _oineus.get_vr_filtration_from_pwdists else: if suffix is not None: base = "get_vr_filtration_and_critical_edges_packed" if with_critical_edges else "get_vr_filtration_packed" func = getattr(_oineus, base + suffix) elif with_critical_edges: func = _oineus.get_vr_filtration_and_critical_edges else: func = _oineus.get_vr_filtration result = func(data, max_dim=max_dim, max_diameter=max_diameter, n_threads=n_threads) if with_critical_edges: # convert list of VREdges to numpy array edges = [ [ e.x, e.y] for e in result[1] ] edges = np.array(edges, dtype=np.int64) return result[0], edges else: return result
[docs] def is_reduced(a): """Check whether a Z_2 boundary matrix is reduced. A column is treated as nonzero in row ``i`` iff ``a[i, col] % 2 == 1``, so any integer dtype with mod-2 semantics works (binary 0/1 matrices, or unreduced count matrices). Returns ``True`` iff every nonzero column has a distinct lowest-1 row index, which is the definition of a reduced matrix in the standard persistence reduction. Args: a: 2D array-like with ``.shape[1]`` columns and integer entries. Returns: bool: True if the matrix is reduced. """ lowest_ones = [] for col_idx in range(a.shape[1]): if np.any(a[:, col_idx] % 2 == 1): lowest_ones.append(np.max(np.where(a[:, col_idx] % 2 == 1))) return len(lowest_ones) == len(set(lowest_ones))
_SLIM_SIMPLEX_FIL_TYPES = ( _oineus._FreudenthalFiltration_1D, _oineus._FreudenthalFiltration_2D, _oineus._FreudenthalFiltration_3D, _oineus._PackedSimplexFiltration_64, _oineus._PackedSimplexFiltration_128, ) def _to_fat_simplex_filtration(fil): """Materialize a slim/packed simplicial filtration into a fat _Filtration. mapping_cylinder / multiply_filtration build fat ProductCell<Simplex, Simplex> filtrations, so they need fat Simplex factors. A slim Freudenthal / bit-packed filtration materializes fat Simplex cells on access, so rebuild a fat _Filtration from them. Fat Simplex / product filtrations pass through unchanged; cube (and anything else) is returned as-is so the C++ overload rejects it as before. """ if isinstance(fil, _SLIM_SIMPLEX_FIL_TYPES): return _oineus._Filtration(fil.cells(), fil.negate) return fil
[docs] def mapping_cylinder(fil_domain, fil_codomain, v_domain, v_codomain, v_domain_value=None, v_codomain_value=None, with_indices=False): """Build the mapping cylinder of the inclusion fil_domain -> fil_codomain. The auxiliary vertex values default to filtration-order -inf (``fil_domain.neg_infinity()`` / ``fil_codomain.neg_infinity()``), which keeps the cylinder's persistent homology equivalent to the inclusion's. Pass explicit values only if you intentionally want the auxiliary vertices to enter at a finite point in the filtration. Slim Freudenthal / bit-packed inputs are materialized to the fat encoding first (the cylinder is built over fat product cells); the cells keep their combinatorial uids, so uid-keyed lookups against the original filtrations still resolve. """ fil_domain = _to_fat_simplex_filtration(fil_domain) fil_codomain = _to_fat_simplex_filtration(fil_codomain) # Accept either valued (oin.Simplex / SimplexValue) or bare (CombinatorialSimplex) # auxiliary vertices. Strip the value -- we use the explicit *_value args # below, so any value baked into the simplex would only confuse readers. if isinstance(v_codomain, _oineus.Simplex) or isinstance(v_codomain, _oineus.ProdSimplex): v_codomain = v_codomain.combinatorial_cell if isinstance(v_domain, _oineus.Simplex) or isinstance(v_domain, _oineus.ProdSimplex): v_domain = v_domain.combinatorial_cell if v_domain_value is None: v_domain_value = fil_domain.neg_infinity() if v_codomain_value is None: v_codomain_value = fil_codomain.neg_infinity() if with_indices: return _oineus._mapping_cylinder_with_indices(fil_domain, fil_codomain, v_domain, v_codomain, v_domain_value, v_codomain_value) else: return _oineus._mapping_cylinder(fil_domain, fil_codomain, v_domain, v_codomain, v_domain_value, v_codomain_value)
[docs] def multiply_filtration(fil, sigma, sigma_value=None): """Multiply every cell in fil by the auxiliary simplex sigma. Each product cell receives value ``fil.fil_max(cell.value, sigma_value)``. sigma_value defaults to ``fil.neg_infinity()`` so each product cell inherits its primary factor's value unchanged. A slim Freudenthal / bit-packed fil is materialized to the fat encoding first (the product cells are fat). """ fil = _to_fat_simplex_filtration(fil) if isinstance(sigma, _oineus.Simplex): sigma = sigma.combinatorial_cell if sigma_value is None: sigma_value = fil.neg_infinity() return _oineus._multiply_filtration(fil, sigma, sigma_value)
[docs] def min_filtration(fil_1, fil_2, with_indices=False): if with_indices: return _oineus._min_filtration_with_indices(fil_1, fil_2) else: return _oineus._min_filtration(fil_1, fil_2)
def remove_simplices(fil, dcmp, seeds, *, close_star=True, stats=None, n_threads=1): """SiRUP: remove a coface-closed set of cells from a reduced decomposition. Updates ``dcmp`` in place to the reduced R = D V decomposition of the filtration with the requested cells removed, updating both the barcode and the representative cycles, instead of recomputing from scratch (Giunti and Lazovskis, "Pruning vineyards: updating barcodes and representative cycles by removing simplices"). Parameters ---------- fil The filtration ``dcmp`` was reduced from. dcmp A reduced Decomposition with V (reduce with compute_v = True), from the classic ``oin.Decomposition(fil); dcmp.reduce(params)`` path. Homology only. Mutated in place. seeds sorted_ids of the cells to remove. By default their coface up-closure (union of stars) is taken so that the result is a valid filtration; set ``close_star=False`` if ``seeds`` is already coface-closed. close_star Whether to expand ``seeds`` to ``fil.star_closure(seeds)`` first. stats Optional DecompositionManipStats collecting column-op counts and timings. n_threads Threads for the internal row-index / closure build. Returns ------- A new filtration on the surviving cells, in the same order as the updated decomposition's columns, so ``dcmp.diagram(new_fil)`` gives the updated diagram. """ seeds = [int(s) for s in seeds] cells = fil.star_closure(seeds, n_threads) if close_star else seeds dcmp.remove_simplices(cells, stats, n_threads) return fil.without_cells(cells)
[docs] def compute_diagrams_ls(data: np.ndarray, negate: bool=False, wrap: bool=False, max_dim: typing.Optional[int]=None, params: typing.Optional[ReductionParams]=None, include_inf_points: bool=True, dualize: bool=False): if max_dim is None: max_dim = data.ndim - 1 if params is None: params = _oineus.ReductionParams() # max_dim is maximal dimension of the _diagram_, we need simplices one dimension higher, hence +1 fil = freudenthal_filtration(data=data, negate=negate, wrap=wrap, max_dim=max_dim + 1, n_threads=params.n_threads) dcmp = _oineus.Decomposition(fil, dualize) dcmp.reduce(params) return dcmp.diagram(fil=fil, include_inf_points=include_inf_points, n_threads=params.n_threads)
[docs] def compute_diagrams_vr(data: np.ndarray, from_pwdists: bool=False, max_dim: int=-1, max_diameter: float = -1.0, params: typing.Optional[ReductionParams]=None, include_inf_points: bool=True, dualize: bool=True): if params is None: params = _oineus.ReductionParams() # max_dim is maximal dimension of the _diagram_, we need simplices one dimension higher, hence +1 fil = vr_filtration(data, from_pwdists, max_dim=max_dim, max_diameter=max_diameter, with_critical_edges=False, n_threads=params.n_threads) dcmp = _oineus.Decomposition(fil, dualize) dcmp.reduce(params) return dcmp.diagram(fil=fil, include_inf_points=include_inf_points, n_threads=params.n_threads)
[docs] def alpha_filtration(points: np.ndarray, weights: typing.Optional[np.ndarray]=None, exact: bool=False, periodic: bool=False, compute_bounding_box: bool=True, bbox_min=None, bbox_max=None, packed: bool=True, n_threads: int=1): """Build an alpha-shape filtration from a 2D or 3D point cloud. Combinatorics come from diode (CGAL Delaunay); filtration values are the alpha values returned by diode. For one-shot diagrams use :func:`compute_diagrams_alpha`; the differentiable Cech-Delaunay path reuses the same combinatorics with autograd-attached values. Args: points: NumPy array of shape (n, 2) or (n, 3). weights: Optional 1D array of length n. If provided, computes weighted (regular) alpha-shapes; currently 3D only. exact: Use CGAL's exact kernel. Slower but robust against numerical pathologies. periodic: Use periodic alpha-shapes on a torus. compute_bounding_box: If True, use the bounding box of the data. bbox_min, bbox_max: Bounding box if compute_bounding_box=False. packed: Use the compact bit-packed cell encoding (the default) on the fast unweighted/non-periodic array path when the vertex ids fit a 64/128-bit word. Pass packed=False to force the fat encoding. n_threads: Threads used inside the Filtration constructor. Returns: A Filtration over alpha-shape simplices. """ if points.ndim != 2: raise ValueError("points must be a 2D array of shape (n_points, dim)") if points.shape[1] not in (2, 3): raise ValueError("Alpha-shapes only support 2D and 3D point clouds") if not _HAS_DIODE: raise ImportError( "Alpha-shape construction requires the `diode` package " "(https://github.com/mrzv/diode). Install it via " "`pip install diode` or build from source." ) # Diode wants lists, we accept NumPy array for convenience if isinstance(bbox_min, np.ndarray): bbox_min = [ float(x) for x in bbox_min ] if isinstance(bbox_max, np.ndarray): bbox_max = [ float(x) for x in bbox_max ] if compute_bounding_box: bbox_min = [ float(np.min(points[:, d])) for d in range(points.shape[1]) ] bbox_max = [ float(np.max(points[:, d])) for d in range(points.shape[1]) ] else: if bbox_max is None: bbox_max = [ 1.0 for d in range(points.shape[1]) ] if bbox_min is None: bbox_min = [ 0.0 for d in range(points.shape[1]) ] if weights is not None: weights = np.asarray(weights) if weights.ndim != 1: raise ValueError("weights must be a 1D array") if weights.shape[0] != points.shape[0]: raise ValueError("weights must have same length as points") if points.shape[1] != 3: raise ValueError("Weighted alpha-shapes require 3D points") weighted_points = np.column_stack((points, weights)) if periodic: if not hasattr(diode, "fill_weighted_periodic_alpha_shapes"): raise RuntimeError("diode.fill_weighted_periodic_alpha_shapes is not available in this diode build") fil_diode = diode.fill_weighted_periodic_alpha_shapes(weighted_points, exact, bbox_min, bbox_max) else: fil_diode = diode.fill_weighted_alpha_shapes(weighted_points, exact=exact) else: if periodic: fil_diode = diode.fill_periodic_alpha_shapes(points, exact, bbox_min, bbox_max) elif _HAS_DIODE_ARRAYS: # Fast array path: simplices and alpha values come back as NumPy # arrays, avoiding one Python tuple per simplex. Same combinatorics # and values as fill_alpha_shapes. verts_by_dim, vals_by_dim = diode.fill_alpha_shapes_arrays(points, exact=exact) # packed (the default) returns a bit-packed PackedSimplexFiltration when the vertex # ids fit a 64/128-bit word (only this fast array path supports it; the # weighted/periodic/list fallbacks below stay fat). Reduces and produces # diagrams identically to fat, with a smaller footprint. suffix = _vr_packed_word_suffix(points.shape[0], points.shape[1]) if packed else None if suffix is not None: # bits passed directly (skips the C++ max-id scan); diode rows are # not vertex-sorted, so assume_sorted stays False. fil = getattr(_oineus, "_filtration_from_arrays_packed" + suffix)( verts_by_dim, vals_by_dim, n_threads=n_threads, bits=_packed_bits(points.shape[0])) else: fil = _oineus._filtration_from_arrays(verts_by_dim, vals_by_dim, n_threads=n_threads) fil.kind = _oineus.FiltrationKind.Alpha return fil else: fil_diode = diode.fill_alpha_shapes(points, exact=exact) fil = _oineus._Filtration( fil_diode, duplicates_possible=periodic, n_threads=n_threads, ) fil.kind = _oineus.FiltrationKind.Alpha return fil
def _delaunay_combinatorics(points: np.ndarray, exact: bool=False, packed: bool=False, n_threads: int=1): """Build the Delaunay complex as a Filtration, for its combinatorics only. For callers that recompute and set their own values (the differentiable Cech-Delaunay and weak-alpha filtrations), so the filtration values here are not meaningful and must be overwritten via set_values. The fast path (diode's fill_delaunay_arrays) leaves them at 0; the fallback (alpha_filtration, used when the array exporters are absent) carries alpha values instead. Either way the simplex set is identical for full-dimensional input. Args: points: NumPy array of shape (n, 2) or (n, 3). exact: Use CGAL's exact kernel. packed: Use the compact bit-packed cell encoding when the vertex ids fit a 64/128-bit word (only the fast diode-array path supports it; the alpha_filtration fallback honors packed too). n_threads: Threads used inside the Filtration constructor. Returns: A Filtration over the Delaunay simplices; its values are not meaningful and are expected to be overwritten by the caller. """ if _HAS_DIODE_ARRAYS: verts_by_dim = diode.fill_delaunay_arrays(points, exact=exact) suffix = _vr_packed_word_suffix(points.shape[0], points.shape[1]) if packed else None if suffix is not None: return getattr(_oineus, "_filtration_from_arrays_packed" + suffix)( verts_by_dim, None, n_threads=n_threads, bits=_packed_bits(points.shape[0])) return _oineus._filtration_from_arrays(verts_by_dim, None, n_threads=n_threads) return alpha_filtration(points, exact=exact, packed=packed, n_threads=n_threads)
[docs] def compute_diagrams_alpha(points: np.ndarray, weights: typing.Optional[np.ndarray]=None, params: typing.Optional[ReductionParams]=None, include_inf_points: bool=True, dualize: bool=False, exact: bool=False, periodic: bool=False, compute_bounding_box: bool=True, bbox_min: typing.Optional[typing.Union[np.ndarray, typing.List[float]]]=None, bbox_max: typing.Optional[typing.Union[np.ndarray, typing.List[float]]]=None, ): """Compute alpha-shape persistence diagrams. Args: points: NumPy array of shape (n, 2) or (n, 3). weights: Optional 1D array of length n. If provided, computes weighted alpha-shapes (currently 3D only). params: Reduction parameters. Defaults to ReductionParams(). include_inf_points: Include points at infinity in output diagrams. dualize: If True, compute cohomology; otherwise homology. exact: Passed to diode. If True, uses exact CGAL kernel. periodic: If True, uses periodic alpha-shapes. Duplicate simplices reported by diode are deduplicated before building the filtration. compute_bounding_box: If True, use the bounding box of the data for periodic otherwise diode defaults (unit box) will be used. bbox_min: lexicographically smallest point of the bounding box. NumPy array or list of floats. Ignored, if compute_bounding_box is True. Origin will be used, if compute_bounding_box is False and bbox_min is None. bbox_max: lexicographically largest point of the bounding box. NumPy array or list of floats. Ignored, if compute_bounding_box is True Returns: Diagrams object indexed by homology dimension. """ if params is None: params = _oineus.ReductionParams() fil = alpha_filtration( points, weights=weights, exact=exact, periodic=periodic, compute_bounding_box=compute_bounding_box, bbox_min=bbox_min, bbox_max=bbox_max, n_threads=params.n_threads, ) dcmp = _oineus.Decomposition(fil, dualize) dcmp.reduce(params) return dcmp.diagram(fil=fil, include_inf_points=include_inf_points, n_threads=params.n_threads)
[docs] def get_ls_wasserstein_matching_target_values(dgm, fil, rv, d: int, q: float, mip: bool, mdp: bool): func = _oineus.get_ls_wasserstein_matching_target_values if type(dgm) is np.ndarray: if len(dgm.shape) != 2 or dgm.shape[1] != 2: raise ValueError("dgm must be a 2D array of shape (n_points, 2)") dgm_1 = [] for p in dgm: dgm_1.append(DiagramPoint(p[0], p[1])) dgm = dgm_1 return func(dgm, fil, rv, d, q, mip, mdp)
[docs] def list_to_filtration(data: typing.List[typing.Tuple[int, typing.List[int], float]]): simplices = [ Simplex(id, vertices, val) for id, vertices, val in data ] return Filtration(simplices)
[docs] def compute_kernel_image_cokernel_reduction(K, L, params=None, reduction_params=None): # simplicial filtrations can be supplied as lists, # convert to Oineus filtrations if necessary if isinstance(K, list): K = list_to_filtration(K) if isinstance(L, list): L = list_to_filtration(L) # KICR is templated on the cell type in C++; each encoding has its own bound class. # Dispatch on the FILTRATION type, not K[0]: a slim/packed filtration materializes a # fat Simplex on K[0], which would misdispatch it into the fat ctor. type(K) is the # stable C++ class for every encoding (and is correct for empty filtrations too). KICR_Class = _KICR_CLASS_BY_FIL_TYPE.get(type(K)) if KICR_Class is None: raise TypeError( f"compute_kernel_image_cokernel_reduction: unsupported filtration type " f"{type(K).__name__}; expected one of " f"{[t.__name__ for t in _KICR_CLASS_BY_FIL_TYPE]}") if params is None: # compute all by default params = _oineus.KICRParams(kernel=True, image=True, cokernel=True) elif not isinstance(params, _oineus.KICRParams): raise TypeError("params must be a KICRParams instance") if reduction_params is not None: if not isinstance(reduction_params, _oineus.ReductionParams): raise TypeError("reduction_params must be a ReductionParams instance") params.params_f = reduction_params params.params_g = reduction_params params.params_ker = reduction_params params.params_im = reduction_params params.params_cok = reduction_params return KICR_Class(K, L, params)
[docs] def compute_ker_cok_reduction_cyl(fil_2, fil_3): fil_min = min_filtration(fil_2, fil_3) id_domain = fil_3.size() + fil_min.size() + 1 id_codomain = id_domain + 1 # id_domain: id of vertex at the top of the cylinder, # i.e., we multiply fil_3 with id_domain # id_codomain: id of vertex at the bottom of the cylinder # i.e, we multiply fil_min with id_codomain # The wrapper functions below default the auxiliary vertex values to # fil.neg_infinity(), so the value carried on these Simplex objects is # discarded. We still need a Simplex/CombinatorialSimplex to specify # the vertex labels. v0 = _oineus.CombinatorialSimplex(id_domain, [id_domain]) v1 = _oineus.CombinatorialSimplex(id_codomain, [id_codomain]) fil_cyl = mapping_cylinder(fil_3, fil_min, v0, v1) # to get a subcomplex, we multiply each fil_3 with id_domain fil_3_prod = multiply_filtration(fil_3, v0) params = _oineus.KICRParams() params.kernel = params.cokernel = True params.image = False kicr_reduction = _oineus.KerImCokReducedProd(fil_cyl, fil_3_prod, params) return kicr_reduction
[docs] def cube_filtration(data: np.ndarray, negate: bool=False, wrap: bool=False, max_dim: typing.Optional[int]=None, values_on: str="vertices", n_threads: int=1,): if wrap: raise RuntimeError("cube_filtration: wrap=True is not implemented yet") if max_dim is None: max_dim = data.ndim dim = data.ndim if dim == 1: grid = _oineus.Grid_1D(data, wrap=wrap, values_on=values_on) elif dim == 2: grid = _oineus.Grid_2D(data, wrap=wrap, values_on=values_on) elif dim == 3: grid = _oineus.Grid_3D(data, wrap=wrap, values_on=values_on) else: raise RuntimeError(f"cube_filtration: dim={data.ndim} not supported, recompile from sources") fil = grid.cube_filtration(max_dim=max_dim, n_threads=n_threads, negate=negate) return fil
_PUBLIC_API_NAMES = [ "ConflictStrategy", "DenoiseStrategy", "VREdge", "FiltrationKind", "DiagramPlaneDomain", "FrechetMeanInit", "CombinatorialProdSimplex", "CombinatorialSimplex", "Simplex", "ProdSimplex", "Filtration", "Decomposition", "IndexDiagramPoint", "DiagramPoint", "Diagrams", "reduce", "DecompositionManipStats", "ReductionParams", "ReductionTimings", "KICRParams", "KerImCokReduced", "KerImCokReducedProd", "ColumnRepr", "IndicesValues", "IndicesValuesProd", "TopologyOptimizer", "TopologyOptimizerProd", "TopologyOptimizerCube_1D", "TopologyOptimizerCube_2D", "TopologyOptimizerCube_3D", "compute_relative_diagrams", "get_boundary_matrix", "get_denoise_target", "get_induced_matching", "get_nth_persistence", "get_permutation_dtv", "get_permutation", "GridDomain_1D", "Grid_1D", "CombinatorialCube_1D", "Cube_1D", "GridDomain_2D", "Grid_2D", "CombinatorialCube_2D", "Cube_2D", "GridDomain_3D", "Grid_3D", "CombinatorialCube_3D", "Cube_3D", "DiagramMatching", "BottleneckMatching", "InfKind", "EssentialMatches", "EssentialLongestEdges", "LongestEdges", "FiniteLongestEdge", "EssentialLongestEdge", "point_to_diagonal", "wasserstein_matching", "bottleneck_matching", "sliced_wasserstein_distance", "sliced_wasserstein_distance_diag_corrected", "REAL_DTYPE", "as_real_numpy", "bottleneck_distance", "wasserstein_distance", "init_frechet_mean_first_diagram", "init_frechet_mean_random_diagram", "init_frechet_mean_medoid_diagram", "init_frechet_mean_diagonal_grid", "frechet_mean_objective", "make_frechet_mean_persistence_schedule", "frechet_mean_newborn_points_from_newly_active", "frechet_mean_multistart", "progressive_frechet_mean", "progressive_frechet_mean_multistart", "frechet_mean", "to_scipy_matrix", "max_distance", "freudenthal_filtration", "vr_filtration", "is_reduced", "mapping_cylinder", "multiply_filtration", "min_filtration", "compute_diagrams_ls", "compute_diagrams_vr", "alpha_filtration", "compute_diagrams_alpha", "get_ls_wasserstein_matching_target_values", "list_to_filtration", "compute_kernel_image_cokernel_reduction", "compute_ker_cok_reduction_cyl", "cube_filtration", "plot_diagram", "plot_diagram_gradient", "plot_matching", "plot_chain", "default_point_style", "default_diagram_a_point_style", "default_diagram_b_point_style", "default_matching_edge_style", "default_longest_edge_style", "default_diagonal_style", "default_diagonal_projection_a_style", "default_diagonal_projection_b_style", "default_inf_line_style", "default_inf_point_style", "default_diagram_gradient_style", "default_density_style", "default_grid_style", "default_chain_vertex_style", "default_chain_edge_style", "default_chain_triangle_style", "default_chain_tetrahedron_style", "default_point_cloud_style", "DEFAULT_POINT_STYLE", "DEFAULT_DIAGRAM_A_POINT_STYLE", "DEFAULT_DIAGRAM_B_POINT_STYLE", "DEFAULT_MATCHING_EDGE_STYLE", "DEFAULT_LONGEST_EDGE_STYLE", "DEFAULT_DIAGONAL_STYLE", "DEFAULT_DIAGONAL_PROJECTION_A_STYLE", "DEFAULT_DIAGONAL_PROJECTION_B_STYLE", "DEFAULT_INF_LINE_STYLE", "DEFAULT_INF_POINT_STYLE", "DEFAULT_DIAGRAM_GRADIENT_STYLE", "DEFAULT_DENSITY_STYLE", "DEFAULT_DENSITY_THRESHOLD", "DEFAULT_GRID_STYLE", "DEFAULT_MATCHING_EDGE_QUANTILE", "DEFAULT_GRADIENT_TOP_K_ARROWS", "DEFAULT_CHAIN_VERTEX_STYLE", "DEFAULT_CHAIN_EDGE_STYLE", "DEFAULT_CHAIN_TRIANGLE_STYLE", "DEFAULT_CHAIN_TETRAHEDRON_STYLE", "DEFAULT_POINT_CLOUD_STYLE", "OKABE_ITO_BLUE", "OKABE_ITO_VERMILLION", ] __all__ = [name for name in _PUBLIC_API_NAMES if name in globals()]