"""
SwarmSort Cost Computation Module
This module contains functions for computing cost matrices and distance metrics
used in the tracking assignment problem. It includes standard and probabilistic
cost computation strategies with embedding-based matching.
Functions:
cosine_similarity_normalized: Normalized cosine similarity for embeddings
compute_embedding_distances_with_method: Multi-method embedding distance computation
compute_cost_matrix_vectorized: Optimized cost matrix computation with squared-distance gating
compute_probabilistic_cost_matrix_vectorized: Probabilistic cost with Mahalanobis distance
compute_freeze_flags_vectorized: Collision detection for embedding freezing
compute_deduplication_mask: Detection deduplication based on proximity
"""
# ============================================================================
# STANDARD IMPORTS
# ============================================================================
import numpy as np
import numba as nb
from typing import Optional, Tuple
import math
# ============================================================================
# NAMED CONSTANTS
# These constants are used throughout the cost computation module.
# For production use, prefer passing values from SwarmSortConfig where applicable.
# ============================================================================
SINGULAR_COV_THRESHOLD: float = 1e-6
"""Threshold for detecting singular covariance matrices.
If determinant < this value, fall back to Euclidean distance."""
COSINE_DISTANCE_SCALE: float = 2.0
"""Scale factor for cosine distance: distance = (1.0 - dot_product) / SCALE.
Results in distance range [0, 1] for normalized embeddings."""
DEFAULT_PREDICTION_MISS_THRESHOLD: int = 3
"""Number of misses before using last position instead of prediction.
Tracks with >= this many misses use last known position for matching."""
DEFAULT_MAHALANOBIS_NORMALIZATION: float = 5.0
"""Normalization factor for Mahalanobis distance.
Mahalanobis distance is a dimensionless statistical measure (chi-squared distributed).
This factor scales it to pixel-space for comparison with max_distance.
Empirical tuning: With base_variance=25.0 and typical velocities:
- mahal_dist=1.0 (1 std) → 5 pixels penalty
- mahal_dist=2.0 (2 std) → 10 pixels penalty
- mahal_dist=3.0 (3 std) → 15 pixels penalty
NOTE: Must match config.py default (5.0)."""
DEFAULT_PROBABILISTIC_GATING_MULTIPLIER: float = 1.5
"""Multiplier for max_distance in probabilistic gating.
Euclidean pre-filter: max_distance * this value."""
DEFAULT_TIME_COVARIANCE_INFLATION: float = 0.2
"""Rate at which covariance inflates per missed frame.
Covariance *= (1 + this * misses).
NOTE: Must match config.py default (0.2)."""
DEFAULT_BASE_POSITION_VARIANCE: float = 25.0
"""Base position variance for covariance estimation.
NOTE: Must match config.py default (25.0)."""
DEFAULT_VELOCITY_VARIANCE_SCALE: float = 2.0
"""Scale factor for velocity contribution to covariance."""
DEFAULT_VELOCITY_ISOTROPIC_THRESHOLD: float = 0.1
"""Velocity threshold below which covariance is isotropic (circular).
NOTE: Must match config.py default (0.1)."""
# ============================================================================
# COVARIANCE ESTIMATION FUNCTIONS
# ============================================================================
[docs]
@nb.njit(fastmath=True, cache=True)
def estimate_track_covariances(
track_velocities: np.ndarray,
base_variance: float = 25.0,
velocity_scale: float = 2.0,
velocity_isotropic_threshold: float = 0.1
) -> np.ndarray:
"""
Estimate base track covariance matrices from velocity.
This function computes base uncertainty for each track based on:
- Base position variance (isotropic component)
- Velocity magnitude (higher velocity = more uncertainty)
- Velocity direction (more uncertainty in direction of motion)
Note: Time-dependent inflation (based on misses) is applied separately
in compute_probabilistic_cost_matrix_vectorized to avoid double inflation.
Args:
track_velocities: Track velocities [N_track, 2] as [vx, vy]
base_variance: Base position variance (default: 15.0)
velocity_scale: Scale for velocity contribution (default: 2.0)
velocity_isotropic_threshold: Velocity below which covariance is isotropic (default: 0.1)
Returns:
Covariance matrices [N_track, 2, 2]
"""
n_tracks = track_velocities.shape[0]
covariances = np.zeros((n_tracks, 2, 2), dtype=np.float32)
for i in range(n_tracks):
vx = track_velocities[i, 0]
vy = track_velocities[i, 1]
# Velocity magnitude
vel_magnitude = np.sqrt(vx * vx + vy * vy)
if vel_magnitude < velocity_isotropic_threshold:
# Low velocity: use isotropic covariance
covariances[i, 0, 0] = base_variance
covariances[i, 1, 1] = base_variance
covariances[i, 0, 1] = 0.0
covariances[i, 1, 0] = 0.0
else:
# High velocity: anisotropic covariance (more uncertainty in motion direction)
# Normalize velocity to get direction
nx = vx / vel_magnitude
ny = vy / vel_magnitude
# Variance along velocity direction (higher)
var_along = base_variance + velocity_scale * vel_magnitude
# Variance perpendicular (lower)
var_perp = base_variance
# Build covariance: C = R * D * R^T where D is diagonal variances
# R = [[nx, -ny], [ny, nx]] is rotation matrix
# This gives: C = var_along * v*v^T + var_perp * (I - v*v^T)
covariances[i, 0, 0] = var_along * nx * nx + var_perp * ny * ny
covariances[i, 1, 1] = var_along * ny * ny + var_perp * nx * nx
covariances[i, 0, 1] = (var_along - var_perp) * nx * ny
covariances[i, 1, 0] = covariances[i, 0, 1]
return covariances
# ============================================================================
# EMBEDDING DISTANCE FUNCTIONS
# ============================================================================
[docs]
def cosine_similarity_normalized(emb1: np.ndarray, emb2: np.ndarray) -> float:
"""
Fast cosine similarity normalized to [0, 1] distance.
Args:
emb1: First embedding vector
emb2: Second embedding vector
Returns:
Distance in range [0, 1] where 0 is identical and 1 is opposite
"""
norm1 = np.sqrt(np.sum(emb1 * emb1))
norm2 = np.sqrt(np.sum(emb2 * emb2))
if norm1 == 0 or norm2 == 0:
return 1.0
cos_sim = np.sum(emb1 * emb2) / (norm1 * norm2)
return (1.0 - cos_sim) / 2.0
[docs]
@nb.njit(fastmath=True, cache=True)
def fast_mahalanobis_distance(diff: np.ndarray, cov_inv: np.ndarray) -> float:
"""
Fast 2D Mahalanobis distance computation.
Args:
diff: Difference vector [2]
cov_inv: Inverse covariance matrix [2, 2]
Returns:
Mahalanobis distance
"""
return np.sqrt(
diff[0] * (cov_inv[0, 0] * diff[0] + cov_inv[0, 1] * diff[1])
+ diff[1] * (cov_inv[1, 0] * diff[0] + cov_inv[1, 1] * diff[1])
)
@nb.njit(fastmath=True, cache=True, inline='always')
def _compute_dot_product(vec1: np.ndarray, vec2_flat: np.ndarray, start: int, length: int) -> float:
"""Compute dot product between vec1 and a slice of vec2_flat. Inlined for performance."""
result = 0.0
for k in range(length):
result += vec1[k] * vec2_flat[start + k]
return result
[docs]
def compute_embedding_distances_matmul(
det_embeddings: np.ndarray,
track_embeddings_flat: np.ndarray,
track_counts: np.ndarray
) -> np.ndarray:
"""
Fast matrix multiplication path for embedding distance (method=0, last embedding).
Uses NumPy matmul instead of Numba loops for 2-3x speedup on large matrices.
Only works for method=0 (last embedding) since it extracts just the last embedding per track.
Args:
det_embeddings: Detection embeddings [N_det, emb_dim] (must be L2-normalized)
track_embeddings_flat: Flattened track embedding histories (L2-normalized)
track_counts: Number of embeddings per track
Returns:
Distance matrix [N_det, N_track] with cosine distances in [0, 1]
"""
n_dets = det_embeddings.shape[0]
n_tracks = len(track_counts)
emb_dim = det_embeddings.shape[1]
# Extract last embedding per track into a dense matrix
last_embeddings = np.zeros((n_tracks, emb_dim), dtype=np.float32)
# Compute track offsets (cumulative sum of counts)
offset = 0
for j in range(n_tracks):
if track_counts[j] > 0:
# Get last embedding for this track
last_start = (offset + track_counts[j] - 1) * emb_dim
last_embeddings[j] = track_embeddings_flat[last_start:last_start + emb_dim]
# else: leave as zeros (will give distance 1.0 after computation)
offset += track_counts[j] if track_counts[j] > 0 else 1
# Matrix multiply: [n_dets, emb_dim] @ [emb_dim, n_tracks] = [n_dets, n_tracks]
# This computes all cosine similarities at once
cos_similarities = det_embeddings @ last_embeddings.T
# Convert to distances: distance = (1 - similarity) * 0.5, range [0, 1]
distances = (1.0 - cos_similarities) * 0.5
# Handle tracks with no embeddings (track_counts == 0)
for j in range(n_tracks):
if track_counts[j] == 0:
distances[:, j] = 1.0
return distances.astype(np.float32)
[docs]
def compute_embedding_distances_sparse(
det_embeddings: np.ndarray,
track_embeddings_flat: np.ndarray,
track_counts: np.ndarray,
sparse_det_indices: np.ndarray,
sparse_track_indices: np.ndarray,
method: int, # 0=last, 1=average, 2=weighted_average, 3=best_match, 4=median
n_dets: int,
n_tracks: int
) -> np.ndarray:
"""
FAST sparse embedding distance computation.
Computes embedding distances ONLY for sparse candidate pairs using Numba,
avoiding the full NxM matmul. This is faster because we only compute
dot products for the ~33K sparse pairs, not all 900K pairs.
Args:
det_embeddings: Detection embeddings [N_det, emb_dim] (must be L2-normalized)
track_embeddings_flat: Flattened track embedding histories (L2-normalized)
track_counts: Number of embeddings per track [N_track]
sparse_det_indices: Detection indices for sparse pairs [N_pairs]
sparse_track_indices: Track indices for sparse pairs [N_pairs]
method: Aggregation method (0=last, 1=average, 2=weighted, 3=best_match, 4=median)
n_dets: Total number of detections
n_tracks: Total number of tracks
Returns:
Distance matrix [N_det, N_track] with distances only for sparse pairs,
all other entries are 1.0 (max distance)
"""
# Initialize output with max distance (non-sparse pairs won't be matched)
distances = np.ones((n_dets, n_tracks), dtype=np.float32)
if len(sparse_det_indices) == 0:
return distances
# Ensure correct types for Numba
det_emb_f32 = np.ascontiguousarray(det_embeddings, dtype=np.float32)
track_emb_f32 = np.ascontiguousarray(track_embeddings_flat, dtype=np.float32)
track_counts_i32 = track_counts.astype(np.int32)
sparse_det_idx = sparse_det_indices.astype(np.int32)
sparse_track_idx = sparse_track_indices.astype(np.int32)
emb_dim = det_embeddings.shape[1]
# Compute track offsets for indexing into flat array
track_offsets = np.zeros(n_tracks + 1, dtype=np.int32)
track_offsets[1:] = np.cumsum(track_counts_i32)
# Compute distances ONLY for sparse pairs (Numba-accelerated)
_compute_sparse_embedding_distances_numba(
det_emb_f32, track_emb_f32, track_counts_i32, track_offsets,
sparse_det_idx, sparse_track_idx, method, emb_dim, distances
)
return distances
@nb.njit(fastmath=True, parallel=True, cache=True)
def _compute_sparse_embedding_distances_numba(
det_embeddings: np.ndarray, # [n_dets, emb_dim]
track_embeddings_flat: np.ndarray, # [total_embs * emb_dim]
track_counts: np.ndarray, # [n_tracks]
track_offsets: np.ndarray, # [n_tracks + 1]
sparse_det_indices: np.ndarray, # [n_pairs]
sparse_track_indices: np.ndarray, # [n_pairs]
method: int,
emb_dim: int,
distances: np.ndarray # Output: [n_dets, n_tracks]
) -> None:
"""
Numba-accelerated sparse embedding distance computation.
Computes dot products and aggregates ONLY for sparse pairs.
This avoids the O(n_dets * total_track_embs) matmul.
"""
n_pairs = len(sparse_det_indices)
# Parallel over sparse pairs
for p in nb.prange(n_pairs):
i = sparse_det_indices[p]
j = sparse_track_indices[p]
n_embs = track_counts[j]
if n_embs == 0:
continue # distances[i,j] stays at 1.0
emb_offset = track_offsets[j] * emb_dim
if method == 0: # last
# Only compute dot product with last embedding
last_offset = emb_offset + (n_embs - 1) * emb_dim
dot = 0.0
for d in range(emb_dim):
dot += det_embeddings[i, d] * track_embeddings_flat[last_offset + d]
distances[i, j] = (1.0 - dot) * 0.5
elif method == 1: # average
total = 0.0
for k in range(n_embs):
offset = emb_offset + k * emb_dim
dot = 0.0
for d in range(emb_dim):
dot += det_embeddings[i, d] * track_embeddings_flat[offset + d]
total += (1.0 - dot) * 0.5
distances[i, j] = total / n_embs
elif method == 2: # weighted_average
total = 0.0
weight_sum = 0.0
for k in range(n_embs):
offset = emb_offset + k * emb_dim
dot = 0.0
for d in range(emb_dim):
dot += det_embeddings[i, d] * track_embeddings_flat[offset + d]
w = float(k + 1)
total += w * (1.0 - dot) * 0.5
weight_sum += w
distances[i, j] = total / weight_sum
elif method == 3: # best_match (min)
min_dist = 1.0
for k in range(n_embs):
offset = emb_offset + k * emb_dim
dot = 0.0
for d in range(emb_dim):
dot += det_embeddings[i, d] * track_embeddings_flat[offset + d]
dist = (1.0 - dot) * 0.5
if dist < min_dist:
min_dist = dist
distances[i, j] = min_dist
else: # method == 4: median
dist_buffer = np.empty(n_embs, dtype=np.float32)
for k in range(n_embs):
offset = emb_offset + k * emb_dim
dot = 0.0
for d in range(emb_dim):
dot += det_embeddings[i, d] * track_embeddings_flat[offset + d]
dist_buffer[k] = (1.0 - dot) * 0.5
# Inline median for small arrays
distances[i, j] = _numba_median_sparse(dist_buffer, n_embs)
@nb.njit(fastmath=True, cache=True)
def _numba_median_sparse(arr: np.ndarray, n: int) -> float:
"""Median computation for sparse embedding distances."""
if n == 0:
return 1.0
if n == 1:
return arr[0]
# Insertion sort for small arrays (typically n <= 15)
sorted_arr = np.empty(n, dtype=np.float32)
for i in range(n):
sorted_arr[i] = arr[i]
for i in range(1, n):
key = sorted_arr[i]
j = i - 1
while j >= 0 and sorted_arr[j] > key:
sorted_arr[j + 1] = sorted_arr[j]
j -= 1
sorted_arr[j + 1] = key
if n % 2 == 1:
return sorted_arr[n // 2]
else:
return (sorted_arr[n // 2 - 1] + sorted_arr[n // 2]) * 0.5
@nb.njit(fastmath=True, cache=True)
def _numba_median(arr: np.ndarray, n: int) -> float:
"""Compute median of first n elements using insertion sort (efficient for small n)."""
if n == 0:
return 1.0
if n == 1:
return arr[0]
# Simple insertion sort for small arrays (max ~15 embeddings typically)
sorted_arr = np.empty(n, dtype=np.float32)
for i in range(n):
sorted_arr[i] = arr[i]
for i in range(1, n):
key = sorted_arr[i]
j = i - 1
while j >= 0 and sorted_arr[j] > key:
sorted_arr[j + 1] = sorted_arr[j]
j -= 1
sorted_arr[j + 1] = key
# Return median
if n % 2 == 1:
return sorted_arr[n // 2]
else:
return (sorted_arr[n // 2 - 1] + sorted_arr[n // 2]) * 0.5
[docs]
@nb.njit(fastmath=True, cache=True, parallel=True)
def compute_embedding_distances_with_method(
det_embeddings: np.ndarray,
track_embeddings_flat: np.ndarray,
track_counts: np.ndarray,
method: int # 0=last, 1=average, 2=weighted_average, 3=best_match, 4=median
) -> np.ndarray:
"""
Compute embedding distances using various methods for historical embeddings.
OPTIMIZED: Uses parallel processing over detections and inlined dot product.
Args:
det_embeddings: Detection embeddings [N_det, emb_dim] (must be L2-normalized)
track_embeddings_flat: Flattened track embedding histories (L2-normalized)
track_counts: Number of embeddings per track
method: Method for aggregating embedding distances
0: Use last embedding only (fastest)
1: Average all distances
2: Weighted average (recent embeddings have more weight)
3: Best match (minimum distance)
4: Median distance (robust to outliers)
Returns:
Distance matrix [N_det, N_track] with cosine distances in [0, 1]
"""
n_dets = det_embeddings.shape[0]
n_tracks = len(track_counts)
emb_dim = det_embeddings.shape[1]
distances = np.zeros((n_dets, n_tracks), dtype=np.float32)
# Compute track offsets for flat array (cumulative sum)
track_offsets = np.zeros(n_tracks + 1, dtype=np.int32)
for t in range(n_tracks):
track_offsets[t + 1] = track_offsets[t] + track_counts[t]
# Parallel over detections (independent computations)
for i in nb.prange(n_dets):
det_emb = det_embeddings[i]
for j in range(n_tracks):
if track_counts[j] == 0:
distances[i, j] = 1.0
continue
start_offset = track_offsets[j] * emb_dim
n_track_embs = track_counts[j]
if method == 0: # last - use only the last embedding
last_start = start_offset + (n_track_embs - 1) * emb_dim
dot_product = _compute_dot_product(det_emb, track_embeddings_flat, last_start, emb_dim)
distances[i, j] = (1.0 - dot_product) * 0.5
elif method == 1: # average - mean distance to all embeddings
total_dist = 0.0
for emb_idx in range(n_track_embs):
emb_start = start_offset + emb_idx * emb_dim
dot_product = _compute_dot_product(det_emb, track_embeddings_flat, emb_start, emb_dim)
total_dist += (1.0 - dot_product) * 0.5
distances[i, j] = total_dist / n_track_embs
elif method == 2: # weighted_average - recent embeddings weighted higher
total_dist = 0.0
total_weight = 0.0
for emb_idx in range(n_track_embs):
weight = float(emb_idx + 1)
emb_start = start_offset + emb_idx * emb_dim
dot_product = _compute_dot_product(det_emb, track_embeddings_flat, emb_start, emb_dim)
total_dist += weight * (1.0 - dot_product) * 0.5
total_weight += weight
distances[i, j] = total_dist / total_weight if total_weight > 0.0 else 1.0
elif method == 3: # best_match - minimum distance
min_dist = 1.0
for emb_idx in range(n_track_embs):
emb_start = start_offset + emb_idx * emb_dim
dot_product = _compute_dot_product(det_emb, track_embeddings_flat, emb_start, emb_dim)
dist = (1.0 - dot_product) * 0.5
if dist < min_dist:
min_dist = dist
distances[i, j] = min_dist
else: # method == 4: median - robust to outliers
# Collect all distances for this track
dist_buffer = np.empty(n_track_embs, dtype=np.float32)
for emb_idx in range(n_track_embs):
emb_start = start_offset + emb_idx * emb_dim
dot_product = _compute_dot_product(det_emb, track_embeddings_flat, emb_start, emb_dim)
dist_buffer[emb_idx] = (1.0 - dot_product) * 0.5
distances[i, j] = _numba_median(dist_buffer, n_track_embs)
return distances
# ============================================================================
# COST MATRIX COMPUTATION FUNCTIONS
# ============================================================================
[docs]
@nb.njit(fastmath=True, cache=True, parallel=True)
def compute_cost_matrix_parallel(
det_positions: np.ndarray,
track_predicted_positions: np.ndarray,
track_last_positions: np.ndarray,
track_misses: np.ndarray,
scaled_embedding_distances: np.ndarray,
embedding_weight: float,
max_distance: float,
do_embeddings: bool,
miss_threshold: int = 3,
embedding_threshold_adjustment: float = 1.0,
track_uncertainty_ratios: np.ndarray = np.empty(0, dtype=np.float32),
uncertainty_weight: float = 0.0
) -> np.ndarray:
"""
Parallel version of cost matrix computation for maximum performance.
WARNING: May produce slightly non-deterministic results due to floating-point
accumulation order. Use compute_cost_matrix_vectorized for deterministic behavior.
See compute_cost_matrix_vectorized for detailed documentation.
"""
n_dets = det_positions.shape[0]
n_tracks = track_predicted_positions.shape[0]
cost_matrix = np.full((n_dets, n_tracks), np.inf, dtype=np.float32)
max_distance_sq = max_distance * max_distance
# Effective max distance for assignment gating (accounts for embedding contribution)
effective_max = max_distance * (1.0 + embedding_weight * embedding_threshold_adjustment) if do_embeddings else max_distance
# Check if uncertainty penalty is enabled (avoid overhead when disabled)
# Note: check weight only, array is always passed (empty if disabled)
use_uncertainty = uncertainty_weight > 0.0
# Parallel over detections (independent computations)
for i in nb.prange(n_dets):
for j in range(n_tracks):
dx_pred = det_positions[i, 0] - track_predicted_positions[j, 0]
dy_pred = det_positions[i, 1] - track_predicted_positions[j, 1]
dist_sq_pred = dx_pred * dx_pred + dy_pred * dy_pred
dx_last = det_positions[i, 0] - track_last_positions[j, 0]
dy_last = det_positions[i, 1] - track_last_positions[j, 1]
dist_sq_last = dx_last * dx_last + dy_last * dy_last
pos_distance_sq = min(dist_sq_pred, dist_sq_last)
# Position gating: skip if position alone exceeds max_distance
if pos_distance_sq > max_distance_sq:
continue
pos_distance = np.sqrt(pos_distance_sq)
if do_embeddings:
# Additive cost: position is primary, embedding adds penalty
# C_i,j = pos_distance + w_e × emb_distance × max_distance
emb_dist = scaled_embedding_distances[i, j]
cost = pos_distance + embedding_weight * emb_dist * max_distance
# Apply uncertainty penalty if enabled
if use_uncertainty:
cost = cost * (1.0 + uncertainty_weight * track_uncertainty_ratios[j])
# Assignment gating: only accept if total cost within effective max
if cost <= effective_max:
cost_matrix[i, j] = cost
else:
cost = pos_distance
# Apply uncertainty penalty if enabled
if use_uncertainty:
cost = cost * (1.0 + uncertainty_weight * track_uncertainty_ratios[j])
cost_matrix[i, j] = cost
return cost_matrix
[docs]
@nb.njit(fastmath=True, cache=True)
def compute_cost_matrix_vectorized(
det_positions: np.ndarray,
track_predicted_positions: np.ndarray,
track_last_positions: np.ndarray,
track_misses: np.ndarray,
scaled_embedding_distances: np.ndarray,
embedding_weight: float,
max_distance: float,
do_embeddings: bool,
miss_threshold: int = 3,
embedding_threshold_adjustment: float = 1.0,
track_uncertainty_ratios: np.ndarray = np.empty(0, dtype=np.float32),
uncertainty_weight: float = 0.0
) -> np.ndarray:
"""
Compute optimized cost matrix with position-centric additive embedding costs.
ALWAYS uses MINIMUM of predicted and last position for matching.
This handles all cases:
1. Object moving predictably -> predicted position is closer
2. Object stopped or changed direction -> last position is closer
Cost Formula (Position-Centric Additive):
C_i,j = pos_distance + w_e × emb_distance × max_distance
With uncertainty penalty (when uncertainty_weight > 0):
C_i,j = base_cost × (1 + uncertainty_weight × miss_ratio_j)
Position is always the primary cost. Embedding adds an additional penalty
that increases the matching cost but never replaces position.
Args:
det_positions: Detection positions [N_det, 2]
track_predicted_positions: Predicted track positions [N_track, 2]
track_last_positions: Last observed track positions [N_track, 2]
track_misses: Number of misses for each track (unused, kept for API compatibility)
scaled_embedding_distances: Pre-computed embedding distances [0, 1]
embedding_weight: Weight for embedding penalty term (0+)
max_distance: Maximum position distance for gating
do_embeddings: Whether to include embedding costs
miss_threshold: Unused (kept for API compatibility)
embedding_threshold_adjustment: Threshold adjustment factor for gating
track_uncertainty_ratios: Recent miss ratios per track [N_track] (optional)
uncertainty_weight: Weight for uncertainty penalty (0 = disabled, no overhead)
Returns:
Cost matrix [N_det, N_track]
"""
n_dets = det_positions.shape[0]
n_tracks = track_predicted_positions.shape[0]
cost_matrix = np.full((n_dets, n_tracks), np.inf, dtype=np.float32)
# Precompute squared max_distance for gating optimization
max_distance_sq = max_distance * max_distance
# Effective max distance for assignment gating (accounts for embedding contribution)
effective_max = max_distance * (1.0 + embedding_weight * embedding_threshold_adjustment) if do_embeddings else max_distance
# Check if uncertainty penalty is enabled (avoid overhead when disabled)
# Note: check weight only, array is always passed (empty if disabled)
use_uncertainty = uncertainty_weight > 0.0
for i in range(n_dets):
for j in range(n_tracks):
# ALWAYS use MINIMUM of predicted and last position
# This handles all cases:
# 1. Object moving predictably → predicted position is closer
# 2. Object stopped or changed direction → last position is closer
# Using MIN ensures the best match is found regardless of motion pattern
dx_pred = det_positions[i, 0] - track_predicted_positions[j, 0]
dy_pred = det_positions[i, 1] - track_predicted_positions[j, 1]
dist_sq_pred = dx_pred * dx_pred + dy_pred * dy_pred
dx_last = det_positions[i, 0] - track_last_positions[j, 0]
dy_last = det_positions[i, 1] - track_last_positions[j, 1]
dist_sq_last = dx_last * dx_last + dy_last * dy_last
# Take the minimum distance
pos_distance_sq = min(dist_sq_pred, dist_sq_last)
# Position gating: skip if position alone exceeds max_distance
if pos_distance_sq > max_distance_sq:
continue
# Only compute sqrt when we know it's within range
pos_distance = np.sqrt(pos_distance_sq)
if do_embeddings:
# Position-centric additive cost formula:
# C_i,j = pos_distance + w_e × emb_distance × max_distance
#
# Position is always the primary cost component.
# Embedding adds a penalty scaled by max_distance to be comparable.
# This ensures position is never ignored, and embedding only increases cost.
emb_dist = scaled_embedding_distances[i, j]
cost = pos_distance + embedding_weight * emb_dist * max_distance
# Apply uncertainty penalty if enabled
if use_uncertainty:
cost = cost * (1.0 + uncertainty_weight * track_uncertainty_ratios[j])
# Assignment gating: only accept if total cost within effective max
if cost <= effective_max:
cost_matrix[i, j] = cost
else:
cost = pos_distance
# Apply uncertainty penalty if enabled
if use_uncertainty:
cost = cost * (1.0 + uncertainty_weight * track_uncertainty_ratios[j])
cost_matrix[i, j] = cost
return cost_matrix
[docs]
@nb.njit(fastmath=True, cache=True)
def compute_probabilistic_cost_matrix_vectorized(
det_positions: np.ndarray,
track_predicted_positions: np.ndarray,
track_last_positions: np.ndarray,
track_misses: np.ndarray,
track_covariances: np.ndarray,
scaled_embedding_distances: np.ndarray,
embedding_weight: float,
max_distance: float,
do_embeddings: bool,
miss_threshold: int = 3,
gating_multiplier: float = 1.5,
mahal_normalization: float = 5.0,
cov_inflation_rate: float = 0.1,
embedding_threshold_adjustment: float = 1.0,
singular_cov_threshold: float = 1e-6,
track_uncertainty_ratios: np.ndarray = np.empty(0, dtype=np.float32),
uncertainty_weight: float = 0.0
) -> np.ndarray:
"""
Compute probabilistic cost matrix using Mahalanobis distance with additive embedding cost.
ALWAYS uses MINIMUM of predicted and last position for matching.
This handles all cases:
1. Object moving predictably -> predicted position is closer
2. Object stopped or changed direction -> last position is closer
Cost Formula (Position-Centric Additive):
C_i,j = normalized_mahal + w_e × emb_distance × max_distance
With uncertainty penalty (when uncertainty_weight > 0):
C_i,j = base_cost × (1 + uncertainty_weight × miss_ratio_j)
Args:
det_positions: Detection positions [N_det, 2]
track_predicted_positions: Predicted track positions [N_track, 2]
track_last_positions: Last observed track positions [N_track, 2]
track_misses: Number of misses for each track
track_covariances: Track covariance matrices [N_track, 2, 2]
scaled_embedding_distances: Pre-computed embedding distances [0, 1]
embedding_weight: Weight for embedding penalty term
max_distance: Maximum allowed distance
do_embeddings: Whether to include embedding costs
miss_threshold: Unused (kept for API compatibility)
gating_multiplier: Multiplier for max_distance in Euclidean pre-filter
mahal_normalization: Normalization factor for Mahalanobis distance (default: 5.0)
cov_inflation_rate: Rate of covariance inflation per missed frame
embedding_threshold_adjustment: Threshold adjustment factor for gating
singular_cov_threshold: Threshold for detecting singular covariance (default: 1e-6)
track_uncertainty_ratios: Recent miss ratios per track [N_track] (optional)
uncertainty_weight: Weight for uncertainty penalty (0 = disabled, no overhead)
Returns:
Cost matrix [N_det, N_track]
"""
n_dets = det_positions.shape[0]
n_tracks = track_predicted_positions.shape[0]
cost_matrix = np.full((n_dets, n_tracks), np.inf, dtype=np.float32)
# Precompute gating threshold
gating_threshold = max_distance * gating_multiplier
gating_threshold_sq = gating_threshold * gating_threshold
# Effective max distance for assignment gating (accounts for embedding contribution)
effective_max = max_distance * (1.0 + embedding_weight * embedding_threshold_adjustment) if do_embeddings else max_distance
# Check if uncertainty penalty is enabled (avoid overhead when disabled)
# Note: check weight only, array is always passed (empty if disabled)
use_uncertainty = uncertainty_weight > 0.0
for i in range(n_dets):
for j in range(n_tracks):
# Get covariance for this track
cov = track_covariances[j]
# Time-dependent covariance inflation
time_factor = 1.0 + cov_inflation_rate * track_misses[j]
inflated_cov = cov * time_factor
# Compute inverse covariance
det_cov = inflated_cov[0, 0] * inflated_cov[1, 1] - inflated_cov[0, 1] * inflated_cov[1, 0]
use_euclidean = abs(det_cov) < singular_cov_threshold
if not use_euclidean:
cov_inv = np.zeros((2, 2), dtype=np.float32)
cov_inv[0, 0] = inflated_cov[1, 1] / det_cov
cov_inv[0, 1] = -inflated_cov[0, 1] / det_cov
cov_inv[1, 0] = -inflated_cov[1, 0] / det_cov
cov_inv[1, 1] = inflated_cov[0, 0] / det_cov
# ALWAYS use MINIMUM of predicted and last position
# This handles all cases:
# 1. Object moving predictably → predicted position is closer
# 2. Object stopped or changed direction → last position is closer
diff_pred = det_positions[i] - track_predicted_positions[j]
diff_last = det_positions[i] - track_last_positions[j]
# Euclidean distances for gating
dist_sq_pred = diff_pred[0] * diff_pred[0] + diff_pred[1] * diff_pred[1]
dist_sq_last = diff_last[0] * diff_last[0] + diff_last[1] * diff_last[1]
# Check if at least one is within gating threshold
min_dist_sq = min(dist_sq_pred, dist_sq_last)
if min_dist_sq > gating_threshold_sq:
continue
# Compute Mahalanobis distances for both positions
if use_euclidean:
mahal_pred = np.sqrt(dist_sq_pred)
mahal_last = np.sqrt(dist_sq_last)
else:
mahal_pred = fast_mahalanobis_distance(diff_pred, cov_inv)
mahal_last = fast_mahalanobis_distance(diff_last, cov_inv)
# Use minimum Mahalanobis distance
mahal_dist = min(mahal_pred, mahal_last)
# Normalize Mahalanobis to be comparable with max_distance
normalized_mahal = mahal_dist * mahal_normalization
# Position gating: skip if normalized Mahalanobis exceeds max_distance
if normalized_mahal > max_distance:
continue
if do_embeddings:
# Position-centric additive cost formula:
# C_i,j = normalized_mahal + w_e × emb_distance × max_distance
emb_dist = scaled_embedding_distances[i, j]
cost = normalized_mahal + embedding_weight * emb_dist * max_distance
# Apply uncertainty penalty if enabled
if use_uncertainty:
cost = cost * (1.0 + uncertainty_weight * track_uncertainty_ratios[j])
# Assignment gating: only accept if total cost within effective max
if cost <= effective_max:
cost_matrix[i, j] = cost
else:
cost = normalized_mahal
# Apply uncertainty penalty if enabled
if use_uncertainty:
cost = cost * (1.0 + uncertainty_weight * track_uncertainty_ratios[j])
cost_matrix[i, j] = cost
return cost_matrix
# ============================================================================
# COLLISION AND FREEZE DETECTION
# ============================================================================
[docs]
@nb.njit(fastmath=True, cache=True)
def compute_neighbor_counts_vectorized(
positions: np.ndarray,
radius: float
) -> np.ndarray:
"""
Count the number of neighbors within radius for each track.
Used to implement embedding_freeze_density - tracks freeze when they have
too many neighbors, indicating a crowded/collision scenario.
Args:
positions: Track positions [N_tracks, 2]
radius: Distance threshold for counting neighbors
Returns:
Integer array with neighbor count for each track (excludes self)
"""
n_tracks = positions.shape[0]
neighbor_counts = np.zeros(n_tracks, dtype=np.int32)
radius_sq = radius * radius
for i in range(n_tracks):
for j in range(i + 1, n_tracks):
dx = positions[i, 0] - positions[j, 0]
dy = positions[i, 1] - positions[j, 1]
distance_sq = dx * dx + dy * dy
if distance_sq < radius_sq:
neighbor_counts[i] += 1
neighbor_counts[j] += 1
return neighbor_counts
[docs]
@nb.njit(fastmath=True, cache=True)
def compute_freeze_flags_vectorized(
positions: np.ndarray,
safety_distance: float
) -> np.ndarray:
"""
Compute freeze flags for tracks based on proximity (collision detection).
DEPRECATED: Use compute_neighbor_counts_vectorized() with embedding_freeze_density
for proper density-based freezing with hysteresis.
Args:
positions: Track positions [N_tracks, 2]
safety_distance: Distance threshold for freezing embeddings
Returns:
Boolean array where True means track should be frozen
"""
n_tracks = positions.shape[0]
freeze_flags = np.zeros(n_tracks, dtype=nb.boolean)
# Use vectorized distance computation
for i in range(n_tracks):
for j in range(i + 1, n_tracks):
# Compute squared distance to avoid sqrt
dx = positions[i, 0] - positions[j, 0]
dy = positions[i, 1] - positions[j, 1]
distance_sq = dx * dx + dy * dy
if distance_sq < safety_distance * safety_distance:
freeze_flags[i] = True
freeze_flags[j] = True
# Early termination for i once frozen
break
return freeze_flags
@nb.njit(fastmath=True, cache=True)
def _compute_pairwise_dist_sq(positions: np.ndarray) -> np.ndarray:
"""
Compute pairwise squared distances between all positions.
Returns upper triangular matrix (i < j only) to save memory.
Uses vectorized operations within Numba for efficiency.
"""
n = positions.shape[0]
# Only store upper triangle (i < j) as flat array
# Number of pairs = n*(n-1)/2
n_pairs = (n * (n - 1)) // 2
dist_sq = np.empty(n_pairs, dtype=np.float32)
idx = 0
for i in range(n):
xi, yi = positions[i, 0], positions[i, 1]
for j in range(i + 1, n):
dx = xi - positions[j, 0]
dy = yi - positions[j, 1]
dist_sq[idx] = dx * dx + dy * dy
idx += 1
return dist_sq
@nb.njit(fastmath=True, cache=True, inline='always')
def _get_pair_index(i: int, j: int, n: int) -> int:
"""Get flat index for pair (i, j) where i < j in upper triangular storage."""
# Formula: sum of (n-1) + (n-2) + ... + (n-i) + (j - i - 1)
# = i*n - i*(i+1)/2 + (j - i - 1)
return i * n - (i * (i + 1)) // 2 + j - i - 1
[docs]
@nb.njit(fastmath=True, cache=True)
def compute_deduplication_mask(
positions: np.ndarray,
confidences: np.ndarray,
dedup_distance: float
) -> np.ndarray:
"""
Compute mask for detection deduplication based on proximity (NMS-style).
OPTIMIZED: Pre-computes pairwise distances for O(1) lookups during NMS.
Args:
positions: Detection positions [N_det, 2]
confidences: Detection confidences [N_det]
dedup_distance: Distance threshold for deduplication
Returns:
Boolean mask where True means keep detection
"""
n_dets = positions.shape[0]
if n_dets <= 1:
return np.ones(n_dets, dtype=nb.boolean)
keep = np.ones(n_dets, dtype=nb.boolean)
dedup_distance_sq = dedup_distance * dedup_distance
# Sort by confidence (higher confidence first)
confidences_1d = confidences.flatten()
sorted_indices = np.argsort(-confidences_1d)
# Pre-compute all pairwise squared distances (O(n²) but fast vectorized)
dist_sq_flat = _compute_pairwise_dist_sq(positions)
for i in range(n_dets):
idx_i = sorted_indices[i]
if not keep[idx_i]:
continue
# Check against all lower confidence detections
for j in range(i + 1, n_dets):
idx_j = sorted_indices[j]
if not keep[idx_j]:
continue
# Get pre-computed distance (ensure i < j for lookup)
if idx_i < idx_j:
pair_idx = _get_pair_index(idx_i, idx_j, n_dets)
else:
pair_idx = _get_pair_index(idx_j, idx_i, n_dets)
if dist_sq_flat[pair_idx] < dedup_distance_sq:
keep[idx_j] = False
return keep
# ============================================================================
# SPARSE COST MATRIX COMPUTATION
# ============================================================================
[docs]
@nb.njit(fastmath=True, cache=True)
def compute_sparse_cost_matrix(
det_positions: np.ndarray,
track_positions: np.ndarray,
track_last_positions: np.ndarray,
track_misses: np.ndarray,
scaled_embedding_distances: np.ndarray,
embedding_weight: float,
max_distance: float,
do_embeddings: bool,
miss_threshold: int,
embedding_threshold_adjustment: float,
sparse_det_indices: np.ndarray,
sparse_track_indices: np.ndarray,
track_uncertainty_ratios: np.ndarray,
uncertainty_weight: float
) -> np.ndarray:
"""Compute cost matrix only for sparse candidate pairs.
For large-scale scenarios, this computes costs only for detection-track pairs
that are spatially close (determined by sparse_det_indices and sparse_track_indices).
All other pairs are set to infinity.
This reduces complexity from O(n*m) to O(k) where k is the number of sparse pairs,
typically k << n*m when objects are spatially distributed.
Args:
det_positions: Detection positions [N_det, 2]
track_positions: Track predicted positions [N_track, 2]
track_last_positions: Track last detection positions [N_track, 2]
track_misses: Number of consecutive misses per track [N_track]
scaled_embedding_distances: Pre-computed embedding distances [N_det, N_track]
embedding_weight: Weight for embedding cost (0-1)
max_distance: Maximum association distance
do_embeddings: Whether to include embedding cost
miss_threshold: Misses before using last position
embedding_threshold_adjustment: Threshold expansion factor for embeddings
sparse_det_indices: Detection indices for sparse pairs [N_pairs]
sparse_track_indices: Track indices for sparse pairs [N_pairs]
track_uncertainty_ratios: Uncertainty ratio per track (for penalty, may be empty)
uncertainty_weight: Weight for uncertainty penalty
Returns:
Cost matrix [N_det, N_track] with inf for non-candidate pairs
"""
n_dets = det_positions.shape[0]
n_tracks = track_positions.shape[0]
n_pairs = len(sparse_det_indices)
# Initialize with infinity (non-candidate pairs are not viable)
cost_matrix = np.full((n_dets, n_tracks), np.inf, dtype=np.float32)
# Compute effective max distance for gating
if do_embeddings:
effective_max = max_distance * (1.0 + embedding_weight * embedding_threshold_adjustment)
else:
effective_max = max_distance
effective_max_sq = effective_max * effective_max
# Check if uncertainty is enabled
use_uncertainty = uncertainty_weight > 0.0 and len(track_uncertainty_ratios) == n_tracks
# Compute costs only for sparse pairs
for p in range(n_pairs):
i = sparse_det_indices[p]
j = sparse_track_indices[p]
# Skip if already computed (duplicate pair from overlapping grid cells)
if cost_matrix[i, j] < np.inf:
continue
# Choose position: predicted vs last based on miss count
if track_misses[j] >= miss_threshold:
tx = track_last_positions[j, 0]
ty = track_last_positions[j, 1]
else:
tx = track_positions[j, 0]
ty = track_positions[j, 1]
# Compute position distance
dx = det_positions[i, 0] - tx
dy = det_positions[i, 1] - ty
pos_distance_sq = dx * dx + dy * dy
# Gating check
if pos_distance_sq > effective_max_sq:
continue
pos_distance = math.sqrt(pos_distance_sq)
# Compute cost
# Use additive formula matching compute_cost_matrix_vectorized:
# Position distance is the base, embedding adds additional cost scaled by max_distance
if do_embeddings:
emb_dist = scaled_embedding_distances[i, j]
cost = pos_distance + embedding_weight * emb_dist * max_distance
else:
cost = pos_distance
# Add uncertainty penalty if enabled
if use_uncertainty:
cost += uncertainty_weight * track_uncertainty_ratios[j] * max_distance
cost_matrix[i, j] = cost
return cost_matrix
[docs]
@nb.njit(fastmath=True, parallel=True, cache=True)
def compute_sparse_embedding_distances(
det_embeddings: np.ndarray,
track_embeddings: np.ndarray,
sparse_det_indices: np.ndarray,
sparse_track_indices: np.ndarray,
n_dets: int,
n_tracks: int
) -> np.ndarray:
"""Compute embedding distances only for sparse candidate pairs.
This is a companion function for sparse mode that computes cosine distances
only for the candidate pairs, avoiding the full O(n*m) matrix computation.
Args:
det_embeddings: Detection embeddings [N_det, emb_dim]
track_embeddings: Track embeddings [N_track, emb_dim]
sparse_det_indices: Detection indices for sparse pairs [N_pairs]
sparse_track_indices: Track indices for sparse pairs [N_pairs]
n_dets: Total number of detections
n_tracks: Total number of tracks
Returns:
Embedding distance matrix [N_det, N_track] with 0 for non-candidate pairs
"""
n_pairs = len(sparse_det_indices)
emb_dim = det_embeddings.shape[1]
# Initialize with zeros (non-candidate pairs get 0 distance, will be gated by position)
distances = np.zeros((n_dets, n_tracks), dtype=np.float32)
# Compute cosine distances for sparse pairs
for p in nb.prange(n_pairs):
i = sparse_det_indices[p]
j = sparse_track_indices[p]
# Skip if already computed
if distances[i, j] > 0:
continue
# Compute cosine similarity
dot_product = 0.0
norm_det_sq = 0.0
norm_track_sq = 0.0
for k in range(emb_dim):
d_val = det_embeddings[i, k]
t_val = track_embeddings[j, k]
dot_product += d_val * t_val
norm_det_sq += d_val * d_val
norm_track_sq += t_val * t_val
norm_det = math.sqrt(norm_det_sq)
norm_track = math.sqrt(norm_track_sq)
if norm_det > 1e-8 and norm_track > 1e-8:
cos_sim = dot_product / (norm_det * norm_track)
# Cosine distance in [0, 1] range
distances[i, j] = (1.0 - cos_sim) * 0.5
else:
distances[i, j] = 1.0 # Max distance for zero embeddings
return distances