swarmsort.cost_computation module
SwarmSort Cost Computation Module
This module contains functions for computing cost matrices and distance metrics used in the tracking assignment problem. It includes standard and probabilistic cost computation strategies with embedding-based matching.
- Functions:
cosine_similarity_normalized: Normalized cosine similarity for embeddings compute_embedding_distances_with_method: Multi-method embedding distance computation compute_cost_matrix_vectorized: Optimized cost matrix computation with squared-distance gating compute_probabilistic_cost_matrix_vectorized: Probabilistic cost with Mahalanobis distance compute_freeze_flags_vectorized: Collision detection for embedding freezing compute_deduplication_mask: Detection deduplication based on proximity
- swarmsort.cost_computation.COSINE_DISTANCE_SCALE: float = 2.0
distance = (1.0 - dot_product) / SCALE. Results in distance range [0, 1] for normalized embeddings.
- Type:
Scale factor for cosine distance
- swarmsort.cost_computation.DEFAULT_BASE_POSITION_VARIANCE: float = 25.0
Base position variance for covariance estimation. NOTE: Must match config.py default (25.0).
- swarmsort.cost_computation.DEFAULT_MAHALANOBIS_NORMALIZATION: float = 5.0
Normalization factor for Mahalanobis distance.
Mahalanobis distance is a dimensionless statistical measure (chi-squared distributed). This factor scales it to pixel-space for comparison with max_distance.
Empirical tuning: With base_variance=25.0 and typical velocities: - mahal_dist=1.0 (1 std) → 5 pixels penalty - mahal_dist=2.0 (2 std) → 10 pixels penalty - mahal_dist=3.0 (3 std) → 15 pixels penalty
NOTE: Must match config.py default (5.0).
- swarmsort.cost_computation.DEFAULT_PREDICTION_MISS_THRESHOLD: int = 3
Number of misses before using last position instead of prediction. Tracks with >= this many misses use last known position for matching.
- swarmsort.cost_computation.DEFAULT_PROBABILISTIC_GATING_MULTIPLIER: float = 1.5
Multiplier for max_distance in probabilistic gating. Euclidean pre-filter: max_distance * this value.
- swarmsort.cost_computation.DEFAULT_TIME_COVARIANCE_INFLATION: float = 0.2
Rate at which covariance inflates per missed frame. Covariance *= (1 + this * misses). NOTE: Must match config.py default (0.2).
- swarmsort.cost_computation.DEFAULT_VELOCITY_ISOTROPIC_THRESHOLD: float = 0.1
Velocity threshold below which covariance is isotropic (circular). NOTE: Must match config.py default (0.1).
- swarmsort.cost_computation.DEFAULT_VELOCITY_VARIANCE_SCALE: float = 2.0
Scale factor for velocity contribution to covariance.
- swarmsort.cost_computation.SINGULAR_COV_THRESHOLD: float = 1e-06
Threshold for detecting singular covariance matrices. If determinant < this value, fall back to Euclidean distance.
- swarmsort.cost_computation.compute_cost_matrix_parallel(det_positions, track_predicted_positions, track_last_positions, track_misses, scaled_embedding_distances, embedding_weight, max_distance, do_embeddings, miss_threshold=3, embedding_threshold_adjustment=1.0, track_uncertainty_ratios=array([], dtype=float32), uncertainty_weight=0.0)[source]
Parallel version of cost matrix computation for maximum performance.
WARNING: May produce slightly non-deterministic results due to floating-point accumulation order. Use compute_cost_matrix_vectorized for deterministic behavior.
See compute_cost_matrix_vectorized for detailed documentation.
- Return type:
- swarmsort.cost_computation.compute_cost_matrix_vectorized(det_positions, track_predicted_positions, track_last_positions, track_misses, scaled_embedding_distances, embedding_weight, max_distance, do_embeddings, miss_threshold=3, embedding_threshold_adjustment=1.0, track_uncertainty_ratios=array([], dtype=float32), uncertainty_weight=0.0)[source]
Compute optimized cost matrix with position-centric additive embedding costs.
ALWAYS uses MINIMUM of predicted and last position for matching. This handles all cases: 1. Object moving predictably -> predicted position is closer 2. Object stopped or changed direction -> last position is closer
- Cost Formula (Position-Centric Additive):
C_i,j = pos_distance + w_e × emb_distance × max_distance
- With uncertainty penalty (when uncertainty_weight > 0):
C_i,j = base_cost × (1 + uncertainty_weight × miss_ratio_j)
Position is always the primary cost. Embedding adds an additional penalty that increases the matching cost but never replaces position.
- Parameters:
det_positions (
ndarray) – Detection positions [N_det, 2]track_predicted_positions (
ndarray) – Predicted track positions [N_track, 2]track_last_positions (
ndarray) – Last observed track positions [N_track, 2]track_misses (
ndarray) – Number of misses for each track (unused, kept for API compatibility)scaled_embedding_distances (
ndarray) – Pre-computed embedding distances [0, 1]embedding_weight (
float) – Weight for embedding penalty term (0+)max_distance (
float) – Maximum position distance for gatingdo_embeddings (
bool) – Whether to include embedding costsmiss_threshold (
int) – Unused (kept for API compatibility)embedding_threshold_adjustment (
float) – Threshold adjustment factor for gatingtrack_uncertainty_ratios (
ndarray) – Recent miss ratios per track [N_track] (optional)uncertainty_weight (
float) – Weight for uncertainty penalty (0 = disabled, no overhead)
- Return type:
- Returns:
Cost matrix [N_det, N_track]
- swarmsort.cost_computation.compute_deduplication_mask(positions, confidences, dedup_distance)[source]
Compute mask for detection deduplication based on proximity (NMS-style).
OPTIMIZED: Pre-computes pairwise distances for O(1) lookups during NMS.
- swarmsort.cost_computation.compute_embedding_distances_matmul(det_embeddings, track_embeddings_flat, track_counts)[source]
Fast matrix multiplication path for embedding distance (method=0, last embedding).
Uses NumPy matmul instead of Numba loops for 2-3x speedup on large matrices. Only works for method=0 (last embedding) since it extracts just the last embedding per track.
- Parameters:
- Return type:
- Returns:
Distance matrix [N_det, N_track] with cosine distances in [0, 1]
- swarmsort.cost_computation.compute_embedding_distances_sparse(det_embeddings, track_embeddings_flat, track_counts, sparse_det_indices, sparse_track_indices, method, n_dets, n_tracks)[source]
FAST sparse embedding distance computation.
Computes embedding distances ONLY for sparse candidate pairs using Numba, avoiding the full NxM matmul. This is faster because we only compute dot products for the ~33K sparse pairs, not all 900K pairs.
- Parameters:
det_embeddings (
ndarray) – Detection embeddings [N_det, emb_dim] (must be L2-normalized)track_embeddings_flat (
ndarray) – Flattened track embedding histories (L2-normalized)track_counts (
ndarray) – Number of embeddings per track [N_track]sparse_det_indices (
ndarray) – Detection indices for sparse pairs [N_pairs]sparse_track_indices (
ndarray) – Track indices for sparse pairs [N_pairs]method (
int) – Aggregation method (0=last, 1=average, 2=weighted, 3=best_match, 4=median)n_dets (
int) – Total number of detectionsn_tracks (
int) – Total number of tracks
- Return type:
- Returns:
Distance matrix [N_det, N_track] with distances only for sparse pairs, all other entries are 1.0 (max distance)
- swarmsort.cost_computation.compute_embedding_distances_with_method(det_embeddings, track_embeddings_flat, track_counts, method)[source]
Compute embedding distances using various methods for historical embeddings.
OPTIMIZED: Uses parallel processing over detections and inlined dot product.
- Parameters:
det_embeddings (
ndarray) – Detection embeddings [N_det, emb_dim] (must be L2-normalized)track_embeddings_flat (
ndarray) – Flattened track embedding histories (L2-normalized)track_counts (
ndarray) – Number of embeddings per trackmethod (
int) – Method for aggregating embedding distances 0: Use last embedding only (fastest) 1: Average all distances 2: Weighted average (recent embeddings have more weight) 3: Best match (minimum distance) 4: Median distance (robust to outliers)
- Return type:
- Returns:
Distance matrix [N_det, N_track] with cosine distances in [0, 1]
- swarmsort.cost_computation.compute_freeze_flags_vectorized(positions, safety_distance)[source]
Compute freeze flags for tracks based on proximity (collision detection).
DEPRECATED: Use compute_neighbor_counts_vectorized() with embedding_freeze_density for proper density-based freezing with hysteresis.
- swarmsort.cost_computation.compute_neighbor_counts_vectorized(positions, radius)[source]
Count the number of neighbors within radius for each track.
Used to implement embedding_freeze_density - tracks freeze when they have too many neighbors, indicating a crowded/collision scenario.
- swarmsort.cost_computation.compute_probabilistic_cost_matrix_vectorized(det_positions, track_predicted_positions, track_last_positions, track_misses, track_covariances, scaled_embedding_distances, embedding_weight, max_distance, do_embeddings, miss_threshold=3, gating_multiplier=1.5, mahal_normalization=5.0, cov_inflation_rate=0.1, embedding_threshold_adjustment=1.0, singular_cov_threshold=1e-06, track_uncertainty_ratios=array([], dtype=float32), uncertainty_weight=0.0)[source]
Compute probabilistic cost matrix using Mahalanobis distance with additive embedding cost.
ALWAYS uses MINIMUM of predicted and last position for matching. This handles all cases: 1. Object moving predictably -> predicted position is closer 2. Object stopped or changed direction -> last position is closer
- Cost Formula (Position-Centric Additive):
C_i,j = normalized_mahal + w_e × emb_distance × max_distance
- With uncertainty penalty (when uncertainty_weight > 0):
C_i,j = base_cost × (1 + uncertainty_weight × miss_ratio_j)
- Parameters:
det_positions (
ndarray) – Detection positions [N_det, 2]track_predicted_positions (
ndarray) – Predicted track positions [N_track, 2]track_last_positions (
ndarray) – Last observed track positions [N_track, 2]track_misses (
ndarray) – Number of misses for each tracktrack_covariances (
ndarray) – Track covariance matrices [N_track, 2, 2]scaled_embedding_distances (
ndarray) – Pre-computed embedding distances [0, 1]embedding_weight (
float) – Weight for embedding penalty termmax_distance (
float) – Maximum allowed distancedo_embeddings (
bool) – Whether to include embedding costsmiss_threshold (
int) – Unused (kept for API compatibility)gating_multiplier (
float) – Multiplier for max_distance in Euclidean pre-filtermahal_normalization (
float) – Normalization factor for Mahalanobis distance (default: 5.0)cov_inflation_rate (
float) – Rate of covariance inflation per missed frameembedding_threshold_adjustment (
float) – Threshold adjustment factor for gatingsingular_cov_threshold (
float) – Threshold for detecting singular covariance (default: 1e-6)track_uncertainty_ratios (
ndarray) – Recent miss ratios per track [N_track] (optional)uncertainty_weight (
float) – Weight for uncertainty penalty (0 = disabled, no overhead)
- Return type:
- Returns:
Cost matrix [N_det, N_track]
- swarmsort.cost_computation.compute_sparse_cost_matrix(det_positions, track_positions, track_last_positions, track_misses, scaled_embedding_distances, embedding_weight, max_distance, do_embeddings, miss_threshold, embedding_threshold_adjustment, sparse_det_indices, sparse_track_indices, track_uncertainty_ratios, uncertainty_weight)[source]
Compute cost matrix only for sparse candidate pairs.
For large-scale scenarios, this computes costs only for detection-track pairs that are spatially close (determined by sparse_det_indices and sparse_track_indices). All other pairs are set to infinity.
This reduces complexity from O(n*m) to O(k) where k is the number of sparse pairs, typically k << n*m when objects are spatially distributed.
- Parameters:
det_positions (
ndarray) – Detection positions [N_det, 2]track_positions (
ndarray) – Track predicted positions [N_track, 2]track_last_positions (
ndarray) – Track last detection positions [N_track, 2]track_misses (
ndarray) – Number of consecutive misses per track [N_track]scaled_embedding_distances (
ndarray) – Pre-computed embedding distances [N_det, N_track]embedding_weight (
float) – Weight for embedding cost (0-1)max_distance (
float) – Maximum association distancedo_embeddings (
bool) – Whether to include embedding costmiss_threshold (
int) – Misses before using last positionembedding_threshold_adjustment (
float) – Threshold expansion factor for embeddingssparse_det_indices (
ndarray) – Detection indices for sparse pairs [N_pairs]sparse_track_indices (
ndarray) – Track indices for sparse pairs [N_pairs]track_uncertainty_ratios (
ndarray) – Uncertainty ratio per track (for penalty, may be empty)uncertainty_weight (
float) – Weight for uncertainty penalty
- Return type:
- Returns:
Cost matrix [N_det, N_track] with inf for non-candidate pairs
- swarmsort.cost_computation.compute_sparse_embedding_distances(det_embeddings, track_embeddings, sparse_det_indices, sparse_track_indices, n_dets, n_tracks)[source]
Compute embedding distances only for sparse candidate pairs.
This is a companion function for sparse mode that computes cosine distances only for the candidate pairs, avoiding the full O(n*m) matrix computation.
- Parameters:
det_embeddings (
ndarray) – Detection embeddings [N_det, emb_dim]track_embeddings (
ndarray) – Track embeddings [N_track, emb_dim]sparse_det_indices (
ndarray) – Detection indices for sparse pairs [N_pairs]sparse_track_indices (
ndarray) – Track indices for sparse pairs [N_pairs]n_dets (
int) – Total number of detectionsn_tracks (
int) – Total number of tracks
- Return type:
- Returns:
Embedding distance matrix [N_det, N_track] with 0 for non-candidate pairs
- swarmsort.cost_computation.cosine_similarity_normalized(emb1, emb2)[source]
Fast cosine similarity normalized to [0, 1] distance.
- swarmsort.cost_computation.estimate_track_covariances(track_velocities, base_variance=25.0, velocity_scale=2.0, velocity_isotropic_threshold=0.1)[source]
Estimate base track covariance matrices from velocity.
This function computes base uncertainty for each track based on: - Base position variance (isotropic component) - Velocity magnitude (higher velocity = more uncertainty) - Velocity direction (more uncertainty in direction of motion)
Note: Time-dependent inflation (based on misses) is applied separately in compute_probabilistic_cost_matrix_vectorized to avoid double inflation.
- Parameters:
track_velocities (
ndarray) – Track velocities [N_track, 2] as [vx, vy]base_variance (
float) – Base position variance (default: 15.0)velocity_scale (
float) – Scale for velocity contribution (default: 2.0)velocity_isotropic_threshold (
float) – Velocity below which covariance is isotropic (default: 0.1)
- Return type:
- Returns:
Covariance matrices [N_track, 2, 2]