Source code for swarmsort.core

"""
SwarmSort Core Implementation

This module contains the core SwarmSort multi-object tracking algorithm implementation.
SwarmSort combines Kalman filtering, Hungarian algorithm assignment, and deep learning
embeddings for robust real-time object tracking.

Classes:
    SwarmSortTracker: Main tracking class implementing the full algorithm
"""

# ============================================================================
# STANDARD IMPORTS
# ============================================================================
import numpy as np
from typing import List, Dict, Optional, Tuple, Any, Union
from collections import deque
from scipy.optimize import linear_sum_assignment
import time
import gc

# ============================================================================
# LOGGER
# ============================================================================
from loguru import logger

# ============================================================================
# INTERNAL IMPORTS
# ============================================================================
from .data_classes import Detection, TrackedObject
from .config import SwarmSortConfig
from .embedding_scaler import EmbeddingDistanceScaler
from .track_state import PendingDetection, FastTrackState

from .kalman_filters import (
    simple_kalman_update,
    oc_sort_predict,
    oc_sort_update,
    compute_oc_sort_cost_matrix
)

from .cost_computation import (
    cosine_similarity_normalized,
    compute_embedding_distances_with_method,
    compute_embedding_distances_matmul,
    compute_embedding_distances_sparse,
    compute_cost_matrix_vectorized,
    compute_cost_matrix_parallel,
    compute_probabilistic_cost_matrix_vectorized,
    compute_freeze_flags_vectorized,
    compute_neighbor_counts_vectorized,
    compute_deduplication_mask,
    estimate_track_covariances,
    compute_sparse_cost_matrix,
)

from .assignment import (
    numba_greedy_assignment,
    hungarian_assignment_wrapper,
    hybrid_assignment
)


# ============================================================================
# PERFORMANCE TIMING UTILITIES
# ============================================================================
[docs] class Timer: """Simple high-resolution timer for performance profiling.""" def __init__(self): self._start_times = {}
[docs] def start(self, key: str) -> None: """Start timing for the given key.""" self._start_times[key] = time.perf_counter()
[docs] def stop(self, key: str, store: dict) -> None: """Stop timing for the given key and accumulate the duration.""" if key in self._start_times: duration = time.perf_counter() - self._start_times[key] store[key] = store.get(key, 0.0) + duration
# ============================================================================ # MAIN TRACKER CLASS # ============================================================================
[docs] class SwarmSortTracker: """ Main SwarmSort multi-object tracker implementation. This class implements a sophisticated tracking algorithm that combines: - Kalman filtering for motion prediction - Hungarian/Greedy assignment for detection-track matching - Deep learning embeddings for appearance-based matching - Re-identification for recovering lost tracks """
[docs] def __init__( self, config: Optional[Union[SwarmSortConfig, dict]] = None, embedding_type: Optional[str] = None, use_gpu: Optional[bool] = None, **kwargs ): """Initialize the SwarmSort tracker with configuration. Args: config: Configuration object or dictionary embedding_type: Type of embedding extractor to use (overrides config) use_gpu: Whether to use GPU for embeddings (overrides config) **kwargs: Additional keyword arguments (ignored for compatibility) """ # Handle configuration if config is None: self.config = SwarmSortConfig() elif isinstance(config, dict): self.config = SwarmSortConfig.from_dict(config) else: self.config = config # Store embedding preferences for later initialization self.embedding_type_override = embedding_type self.use_gpu = use_gpu if use_gpu is not None else True self._initialize_from_config() self._setup_tracker_state() self._initialize_embeddings() self._precompile_numba()
def _initialize_from_config(self): """Initialize tracker parameters from configuration.""" # Core tracking parameters self.max_distance = self.config.max_distance self.max_track_age = self.config.max_track_age self.min_consecutive_detections = self.config.min_consecutive_detections self.max_detection_gap = self.config.max_detection_gap # Embedding parameters self.do_embeddings = self.config.do_embeddings self.embedding_weight = self.config.embedding_weight self.embedding_threshold_adjustment = getattr(self.config, 'embedding_threshold_adjustment', 1.0) self.max_embeddings_per_track = self.config.max_embeddings_per_track self.embedding_matching_method = self.config.embedding_matching_method # Assignment strategy self.assignment_strategy = self.config.assignment_strategy self.greedy_threshold = self.config.greedy_threshold self.hungarian_fallback_threshold = self.config.hungarian_fallback_threshold # Kalman filter type self.kalman_type = self.config.kalman_type self.kalman_velocity_damping = self.config.kalman_velocity_damping # ReID parameters self.reid_enabled = self.config.reid_enabled self.reid_max_distance = self.config.reid_max_distance self.reid_embedding_threshold = self.config.reid_embedding_threshold self.reid_min_frames_lost = self.config.reid_min_frames_lost # Advanced features self.collision_freeze_embeddings = self.config.collision_freeze_embeddings self.collision_safety_distance = self.config.collision_safety_distance self.deduplication_distance = self.config.deduplication_distance # Embedding freeze density parameters self.embedding_freeze_density = getattr(self.config, 'embedding_freeze_density', 1) # Use local_density_radius if set, otherwise fall back to collision_safety_distance local_density = getattr(self.config, 'local_density_radius', -1.0) self.local_density_radius = local_density if local_density > 0 else self.collision_safety_distance # Debug options self.debug_timings = self.config.debug_timings self.debug_embeddings = self.config.debug_embeddings # Cost computation self.use_probabilistic_costs = self.config.use_probabilistic_costs # Uncertainty penalty (0 = disabled, no overhead) self.uncertainty_weight = getattr(self.config, 'uncertainty_weight', 0.0) self.uncertainty_window = getattr(self.config, 'uncertainty_window', 10) # Performance options # parallel_cost_matrix: Use parallel cost matrix computation for better performance # Note: May produce slightly non-deterministic results due to floating-point order self.parallel_cost_matrix = getattr(self.config, 'parallel_cost_matrix', False) # Sparse computation threshold - use grid-based spatial indexing for large-scale scenarios self.sparse_computation_threshold = getattr(self.config, 'sparse_computation_threshold', 300) # Initialization threshold - minimum confidence to create pending detection self.init_conf_threshold = getattr(self.config, 'init_conf_threshold', 0.0) def _setup_tracker_state(self): """Initialize tracker state variables.""" self._tracks: Dict[int, FastTrackState] = {} self._pending_detections: Dict[int, PendingDetection] = {} self._next_id = 1 self._next_pending_id = 1 self._frame_count = 0 self.timings = {} # OPTIMIZATION: Pre-allocate array pools to avoid per-frame allocations # Pre-allocate with reasonable default size to avoid early reallocation spikes # These grow as needed if object count exceeds initial allocation initial_pool_size = getattr(self.config, 'initial_pool_size', 150) self._reusable_det_positions = np.empty((initial_pool_size, 2), dtype=np.float32) self._reusable_track_positions = np.empty((initial_pool_size, 2), dtype=np.float32) self._reusable_track_last_positions = np.empty((initial_pool_size, 2), dtype=np.float32) self._reusable_track_misses = np.empty(initial_pool_size, dtype=np.int32) self._max_dets_seen = initial_pool_size self._max_tracks_seen = initial_pool_size def _initialize_embeddings(self): """Initialize embedding-related components. Embedding modes: 1. Built-in extractor: Set embedding_function to 'cupytexture', etc. SwarmSort will compute embeddings from image crops. 2. External embeddings: Set embedding_function=None or 'external'. You must provide embeddings in Detection.embedding field. 3. No embeddings: Set do_embeddings=False. """ if self.do_embeddings: # Determine which embedding type to use # Priority: embedding_type_override > config.embedding_function > config.embedding_extractor embedding_type = ( self.embedding_type_override or getattr(self.config, 'embedding_function', None) or getattr(self.config, 'embedding_extractor', None) ) # Check if user wants external embeddings (provided in Detection objects) if embedding_type is None or embedding_type.lower() == 'external': logger.info("Using external embeddings mode - embeddings must be provided in Detection.embedding") self.embedding_extractor = None else: # Try to initialize built-in embedding extractor try: from .embeddings import get_embedding_extractor self.embedding_extractor = get_embedding_extractor( embedding_type, use_gpu=self.use_gpu ) logger.info(f"Using built-in embedding extractor: {embedding_type}") except Exception as e: # Check if this looks like an external embedding type (not a built-in) builtin_types = ['cupytexture', 'cupytexture_color', 'cupytexture_mega', 'cupyshape'] if embedding_type.lower() not in builtin_types: logger.warning( f"Embedding '{embedding_type}' is not a built-in type. " f"Available built-in: {builtin_types}. " f"Assuming external embeddings will be provided in Detection.embedding." ) self.embedding_extractor = None else: # Built-in type failed to load - this is an error raise RuntimeError( f"Failed to create built-in embedding extractor '{embedding_type}': {e}. " f"If providing external embeddings, set embedding_function='external' or None." ) # Initialize embedding scaler self.embedding_scaler = EmbeddingDistanceScaler( method=getattr(self.config, 'embedding_scaling_method', 'min_robustmax') ) # Warmup the scaler to avoid mode transition spike at min_samples if hasattr(self.embedding_scaler, 'warmup'): self.embedding_scaler.warmup() else: self.embedding_extractor = None self.embedding_scaler = None def _precompile_numba(self): """Pre-compile Numba functions for better performance. Uses realistic data sizes to ensure all code paths are compiled upfront, avoiding JIT compilation spikes during actual tracking. """ if not getattr(self, '_numba_compiled', False): try: # Use realistic data sizes to trigger full compilation # Small dummy data (1x1) doesn't exercise all code paths warmup_size = getattr(self.config, 'initial_pool_size', 150) n_warmup_dets = min(warmup_size, 100) n_warmup_tracks = min(warmup_size, 100) emb_dim = getattr(self.config, 'default_embedding_dimension', 128) # Create realistic-sized dummy data dummy_det = np.random.randn(n_warmup_dets, 2).astype(np.float32) * 100 dummy_track = np.random.randn(n_warmup_tracks, 2).astype(np.float32) * 100 dummy_track_last = np.random.randn(n_warmup_tracks, 2).astype(np.float32) * 100 dummy_misses = np.zeros(n_warmup_tracks, dtype=np.int32) dummy_emb = np.random.randn(n_warmup_tracks, emb_dim).astype(np.float32) dummy_cost = np.random.randn(n_warmup_dets, n_warmup_tracks).astype(np.float32) # Trigger compilation of cost matrix functions compute_cost_matrix_vectorized( dummy_det, dummy_track, dummy_track_last, dummy_misses, dummy_emb, 0.5, 100.0, False, 3, 1.0 ) # Trigger compilation of assignment functions numba_greedy_assignment(dummy_cost, 100.0) # Trigger compilation of freeze flag computation if available try: compute_freeze_flags_vectorized(dummy_track, 25.0, 1) except Exception: pass # Optional function # Trigger compilation of neighbor count if available try: compute_neighbor_counts_vectorized(dummy_track, 33.0) except Exception: pass # Optional function self._numba_compiled = True logger.debug(f"Numba functions compiled successfully (warmup size: {n_warmup_dets}x{n_warmup_tracks})") except Exception as e: logger.warning(f"Numba compilation warning: {e}")
[docs] def update(self, detections: List[Detection]) -> List[TrackedObject]: """ Main update function for the tracker. Args: detections: List of Detection objects for current frame Returns: List of TrackedObject instances representing current tracks Raises: TypeError: If detections is not a list ValueError: If any detection has invalid data """ # Input validation if detections is not None and not isinstance(detections, list): raise TypeError(f"detections must be a list, got {type(detections).__name__}") self._frame_count += 1 timer = Timer() if self.debug_timings else None # Handle empty detections if not detections: return self._handle_empty_frame() # Validate individual detections self._validate_detections(detections) # Filter low confidence detections if timer: timer.start("filter_conf") detections = self._filter_detections(detections) if timer: timer.stop("filter_conf", self.timings) # Deduplication and collision detection if timer: timer.start("dedup_and_collision") detections = self._deduplicate_detections(detections) self._update_collision_states() if timer: timer.stop("dedup_and_collision", self.timings) # Get active tracks tracks = list(self._tracks.values()) if not tracks: return self._handle_no_tracks(detections) # CRITICAL: Predict ALL track positions BEFORE assignment # This ensures cost matrix uses current-frame predictions if timer: timer.start("prediction") for track in tracks: track.predict_position(self._frame_count) if timer: timer.stop("prediction", self.timings) # Perform assignment if timer: timer.start("assignment") matches, unmatched_dets, unmatched_tracks = self._perform_assignment( detections, tracks, timer ) if timer: timer.stop("assignment", self.timings) # Update matched tracks if timer: timer.start("update_matched") self._update_matched_tracks(matches, detections, tracks) if timer: timer.stop("update_matched", self.timings) # Handle unmatched if timer: timer.start("handle_unmatched") self._handle_unmatched_tracks(unmatched_tracks, tracks) self._handle_unmatched_detections(unmatched_dets, detections) if timer: timer.stop("handle_unmatched", self.timings) # ReID attempt if self.reid_enabled and len(unmatched_dets) > 0: if timer: timer.start("reid") self._attempt_reid(detections, unmatched_dets) if timer: timer.stop("reid", self.timings) # Update pending detections if timer: timer.start("update_pending") self._update_pending_detections() if timer: timer.stop("update_pending", self.timings) # Cleanup if timer: timer.start("cleanup") self._cleanup_tracks() if timer: timer.stop("cleanup", self.timings) # Get results if timer: timer.start("get_results") results = self._get_results() if timer: timer.stop("get_results", self.timings) if self.debug_timings: self._log_timings() return results
def _validate_detections(self, detections: List[Detection]) -> None: """ Validate detection inputs. Args: detections: List of Detection objects to validate Raises: TypeError: If detection is not a Detection object ValueError: If detection has invalid position or contains non-finite values """ for i, det in enumerate(detections): # Duck typing check - allow any object with required attributes # This supports Detection objects from different modules (e.g., swarmtracker) if not hasattr(det, 'position') or not hasattr(det, 'confidence'): raise TypeError( f"Detection {i} must have 'position' and 'confidence' attributes, got {type(det).__name__}" ) # Position validation if det.position is None: raise ValueError(f"Detection {i}: position cannot be None") if not isinstance(det.position, np.ndarray): # Auto-convert list/tuple to numpy array try: det.position = np.asarray(det.position, dtype=np.float32) except (TypeError, ValueError): raise ValueError( f"Detection {i}: position must be array-like, got {type(det.position).__name__}" ) # Auto-convert position to shape (2,) if possible if det.position.shape != (2,): if det.position.size == 2: # Flatten compatible shapes like (1, 2), (2, 1), (2,) det.position = det.position.flatten().astype(np.float32) else: raise ValueError( f"Detection {i}: position must have 2 elements, got shape {det.position.shape}" ) if not np.isfinite(det.position).all(): raise ValueError( f"Detection {i}: position contains non-finite values: {det.position}" ) # Confidence validation if not 0 <= det.confidence <= 1: raise ValueError( f"Detection {i}: confidence must be in [0, 1], got {det.confidence}" ) def _filter_detections(self, detections: List[Detection]) -> List[Detection]: """Filter out low confidence detections.""" threshold = getattr(self.config, 'detection_conf_threshold', 0.0) if threshold > 0: return [d for d in detections if d.confidence >= threshold] return detections def _deduplicate_detections(self, detections: List[Detection]) -> List[Detection]: """Remove duplicate detections based on proximity.""" if not detections or self.deduplication_distance <= 0: return detections n_dets = len(detections) positions = np.array([d.position for d in detections], dtype=np.float32) confidences = np.array([d.confidence for d in detections], dtype=np.float32) # Use Numba function for efficient deduplication keep_mask = compute_deduplication_mask( positions, confidences, self.deduplication_distance ) return [d for i, d in enumerate(detections) if keep_mask[i]] def _update_collision_states(self): """Update embedding freeze states based on track proximity with hysteresis. Uses embedding_freeze_density to determine when to freeze: - Freeze when neighbor_count >= embedding_freeze_density - Unfreeze when neighbor_count < max(0, embedding_freeze_density - 1) The hysteresis (freeze_density - 1 for unfreeze) prevents oscillation when tracks are on the boundary of the collision zone. """ if not self.collision_freeze_embeddings or len(self._tracks) < 2: return positions = np.array( [t.position for t in self._tracks.values()], dtype=np.float32 ) # Count neighbors within local_density_radius neighbor_counts = compute_neighbor_counts_vectorized( positions, self.local_density_radius ) # Hysteresis thresholds freeze_threshold = self.embedding_freeze_density # Unfreeze requires fewer neighbors than freeze (hysteresis prevents oscillation) unfreeze_threshold = max(0, self.embedding_freeze_density - 1) for (track, count) in zip(self._tracks.values(), neighbor_counts): if count >= freeze_threshold: # Too many neighbors - freeze embeddings track.freeze_embeddings() elif count <= unfreeze_threshold and track.embedding_frozen: # Few enough neighbors and currently frozen - safe to unfreeze track.unfreeze_embeddings() def _handle_empty_frame(self) -> List[TrackedObject]: """Handle frame with no detections.""" for track in self._tracks.values(): track.predict_only(self._frame_count) self._cleanup_tracks() return self._get_results() def _handle_no_tracks(self, detections: List[Detection]) -> List[TrackedObject]: """Handle case when there are no active tracks.""" self._handle_unmatched_detections(list(range(len(detections))), detections) self._update_pending_detections() # Return any newly promoted tracks (don't always return empty!) return self._get_results() def _perform_assignment( self, detections: List[Detection], tracks: List[FastTrackState], timer: Optional[Timer] = None ) -> Tuple[List[Tuple[int, int]], List[int], List[int]]: """Perform detection-track assignment using configured strategy.""" detailed_timing = self.debug_timings # Compute cost matrix if detailed_timing: t0 = time.perf_counter() cost_matrix = self._compute_cost_matrix(detections, tracks) if detailed_timing: t_cost = time.perf_counter() - t0 t0 = time.perf_counter() # Apply assignment strategy if self.assignment_strategy == "greedy": result = numba_greedy_assignment(cost_matrix, self.max_distance) elif self.assignment_strategy == "hungarian": result = hungarian_assignment_wrapper(cost_matrix, self.max_distance) elif self.assignment_strategy == "hybrid": # hungarian_fallback_threshold is a multiplier, convert to absolute threshold hungarian_threshold = self.max_distance * self.hungarian_fallback_threshold result = hybrid_assignment( cost_matrix, self.max_distance, self.greedy_threshold, hungarian_threshold ) else: # Default to Hungarian result = hungarian_assignment_wrapper(cost_matrix, self.max_distance) if detailed_timing: t_assign = time.perf_counter() - t0 n_dets, n_tracks = cost_matrix.shape print(f" [Assignment] {self.assignment_strategy} | cost_matrix={t_cost*1000:.2f}ms | assign={t_assign*1000:.2f}ms | matches={len(result[0])}") return result def _compute_cost_matrix( self, detections: List[Detection], tracks: List[FastTrackState] ) -> np.ndarray: """Compute cost matrix for detection-track assignment.""" n_dets = len(detections) n_tracks = len(tracks) # Detailed timing for performance profiling detailed_timing = self.debug_timings if detailed_timing: t_start = time.perf_counter() cost_timings = {} if n_dets == 0 or n_tracks == 0: return np.full((n_dets, n_tracks), np.inf, dtype=np.float32) # OPTIMIZATION: Use reusable array pools to avoid per-frame allocations # Grow arrays if needed, but never shrink (avoids repeated allocations) if detailed_timing: t0 = time.perf_counter() if self._reusable_det_positions is None or self._reusable_det_positions.shape[0] < n_dets: new_size = max(n_dets, self._max_dets_seen, 50) # Minimum 50 to reduce early reallocations self._reusable_det_positions = np.empty((new_size, 2), dtype=np.float32) self._max_dets_seen = new_size if self._reusable_track_positions is None or self._reusable_track_positions.shape[0] < n_tracks: new_size = max(n_tracks, self._max_tracks_seen, 50) self._reusable_track_positions = np.empty((new_size, 2), dtype=np.float32) self._reusable_track_last_positions = np.empty((new_size, 2), dtype=np.float32) self._reusable_track_misses = np.empty(new_size, dtype=np.int32) self._max_tracks_seen = new_size if detailed_timing: cost_timings['array_pool'] = time.perf_counter() - t0 t0 = time.perf_counter() # Fill reusable arrays (no new allocation) for i, d in enumerate(detections): self._reusable_det_positions[i] = d.position for i, t in enumerate(tracks): self._reusable_track_positions[i] = t.predicted_position self._reusable_track_last_positions[i] = t.last_detection_pos self._reusable_track_misses[i] = t.misses if detailed_timing: cost_timings['fill_arrays'] = time.perf_counter() - t0 t0 = time.perf_counter() # Use views into reusable arrays det_positions = self._reusable_det_positions[:n_dets] track_positions = self._reusable_track_positions[:n_tracks] track_last_positions = self._reusable_track_last_positions[:n_tracks] track_misses = self._reusable_track_misses[:n_tracks] # Check embedding availability has_embedding_mask = np.array([ hasattr(d, 'embedding') and d.embedding is not None for d in detections ], dtype=bool) any_embeddings = self.do_embeddings and np.any(has_embedding_mask) if detailed_timing: cost_timings['embedding_mask'] = time.perf_counter() - t0 t0 = time.perf_counter() # Get configurable parameters with defaults miss_threshold = getattr(self.config, 'prediction_miss_threshold', 3) # Compute uncertainty ratios only if uncertainty_weight > 0 (no overhead when disabled) if self.uncertainty_weight > 0.0: track_uncertainty_ratios = np.array( [t.get_recent_miss_ratio() for t in tracks], dtype=np.float32 ) else: # Pass empty array when disabled (Numba requires consistent types) track_uncertainty_ratios = np.empty(0, dtype=np.float32) if detailed_timing: cost_timings['uncertainty_ratios'] = time.perf_counter() - t0 t0 = time.perf_counter() # ===================================================================== # SPARSE MODE: Compute sparse pairs FIRST, then sparse embedding distances # This is 20-30x faster for large-scale (1000+ objects) scenarios # ===================================================================== sparse_pairs = None if not self.use_probabilistic_costs: sparse_pairs = self._compute_sparse_pairs( det_positions, track_positions, self.max_distance * 1.5 ) if detailed_timing and sparse_pairs is not None: cost_timings['sparse_pairs'] = time.perf_counter() - t0 n_pairs = len(sparse_pairs[0]) t0 = time.perf_counter() # Use sparse cost matrix if candidate pairs were found if sparse_pairs is not None: sparse_det_indices, sparse_track_indices = sparse_pairs # Compute embedding distances ONLY for sparse pairs (huge speedup!) # For 1000 objects: 36K pairs vs 880K pairs = 24x fewer aggregations scaled_embedding_distances = np.zeros((n_dets, n_tracks), dtype=np.float32) if any_embeddings: scaled_embedding_distances = self._compute_embedding_distances_sparse( detections, tracks, has_embedding_mask, sparse_det_indices, sparse_track_indices, det_positions, track_positions ) if detailed_timing: cost_timings['embedding_distances'] = time.perf_counter() - t0 t0 = time.perf_counter() result = compute_sparse_cost_matrix( det_positions, track_positions, track_last_positions, track_misses, scaled_embedding_distances, self.embedding_weight, self.max_distance, any_embeddings, miss_threshold, self.embedding_threshold_adjustment, sparse_det_indices, sparse_track_indices, track_uncertainty_ratios, self.uncertainty_weight ) if detailed_timing: cost_timings['cost_func'] = time.perf_counter() - t0 cost_timings['total'] = time.perf_counter() - t_start self._print_cost_matrix_timings( cost_timings, n_dets, n_tracks, f'sparse({n_pairs}/{n_dets*n_tracks})' ) return result # ===================================================================== # NON-SPARSE MODE: Full embedding distance computation # ===================================================================== scaled_embedding_distances = np.zeros((n_dets, n_tracks), dtype=np.float32) if any_embeddings: scaled_embedding_distances = self._compute_embedding_distances( detections, tracks, has_embedding_mask, det_positions, track_positions ) if detailed_timing: cost_timings['embedding_distances'] = time.perf_counter() - t0 t0 = time.perf_counter() # do_embeddings flag for cost matrix - only True if we computed embeddings do_embeddings = any_embeddings # Use appropriate cost function if self.use_probabilistic_costs: # Extract track velocities for covariance estimation track_velocities = np.array([t.velocity for t in tracks], dtype=np.float32) # Get covariance estimation parameters (fallbacks match config defaults) base_variance = getattr(self.config, 'base_position_variance', 15.0) velocity_scale = getattr(self.config, 'velocity_variance_scale', 2.0) velocity_isotropic_threshold = getattr(self.config, 'velocity_isotropic_threshold', 0.1) cov_inflation_rate = getattr(self.config, 'time_covariance_inflation', 0.1) # Estimate base covariances from velocity (miss-based inflation applied in cost function) track_covariances = estimate_track_covariances( track_velocities, base_variance, velocity_scale, velocity_isotropic_threshold ) # Get probabilistic-specific config parameters (fallbacks match config defaults) gating_multiplier = getattr(self.config, 'probabilistic_gating_multiplier', 1.5) mahal_normalization = getattr(self.config, 'mahalanobis_normalization', 5.0) singular_cov_threshold = getattr(self.config, 'singular_covariance_threshold', 1e-6) if detailed_timing: cost_timings['prob_prep'] = time.perf_counter() - t0 t0 = time.perf_counter() result = compute_probabilistic_cost_matrix_vectorized( det_positions, track_positions, track_last_positions, track_misses, track_covariances, scaled_embedding_distances, self.embedding_weight, self.max_distance, do_embeddings, miss_threshold, gating_multiplier, mahal_normalization, cov_inflation_rate, self.embedding_threshold_adjustment, singular_cov_threshold, track_uncertainty_ratios, self.uncertainty_weight ) if detailed_timing: cost_timings['cost_func'] = time.perf_counter() - t0 cost_timings['total'] = time.perf_counter() - t_start self._print_cost_matrix_timings(cost_timings, n_dets, n_tracks, 'probabilistic') return result else: # Select cost function based on parallel mode setting cost_func = ( compute_cost_matrix_parallel if self.parallel_cost_matrix else compute_cost_matrix_vectorized ) func_name = 'parallel' if self.parallel_cost_matrix else 'vectorized' result = cost_func( det_positions, track_positions, track_last_positions, track_misses, scaled_embedding_distances, self.embedding_weight, self.max_distance, do_embeddings, miss_threshold, self.embedding_threshold_adjustment, track_uncertainty_ratios, self.uncertainty_weight ) if detailed_timing: cost_timings['cost_func'] = time.perf_counter() - t0 cost_timings['total'] = time.perf_counter() - t_start self._print_cost_matrix_timings(cost_timings, n_dets, n_tracks, func_name) return result def _print_cost_matrix_timings(self, timings: dict, n_dets: int, n_tracks: int, mode: str): """Print detailed cost matrix computation timings.""" total_ms = timings['total'] * 1000 breakdown = [] for key, val in timings.items(): if key != 'total': ms = val * 1000 pct = (val / timings['total']) * 100 if timings['total'] > 0 else 0 breakdown.append(f"{key}={ms:.2f}ms({pct:.0f}%)") print(f" [CostMatrix] {mode} | {n_dets}x{n_tracks} | total={total_ms:.2f}ms | {' '.join(breakdown)}") def _compute_sparse_pairs( self, det_positions: np.ndarray, track_positions: np.ndarray, max_search_radius: float ) -> Optional[Tuple[np.ndarray, np.ndarray]]: """Compute sparse candidate pairs using spatial grid indexing. For large-scale scenarios (many detections and tracks), this uses grid-based spatial indexing to find only nearby detection-track pairs, reducing complexity from O(n*m) to O(n*k) where k is the average number of nearby tracks. Args: det_positions: Detection positions [N_det, 2] track_positions: Track predicted positions [N_track, 2] max_search_radius: Maximum distance to consider (grid cell size) Returns: Tuple of (det_indices, track_indices) for candidate pairs, or None if sparse mode should not be used (too few objects or too many pairs) """ n_dets = len(det_positions) n_tracks = len(track_positions) # Only use spatial indexing when it's beneficial (overhead is worth it) if n_dets < self.sparse_computation_threshold or n_tracks < self.sparse_computation_threshold: return None # Build spatial grid index for tracks grid_size = max_search_radius if grid_size <= 0: return None track_grid = {} # Place tracks in grid cells for j in range(n_tracks): x, y = track_positions[j] grid_x = int(x / grid_size) grid_y = int(y / grid_size) key = (grid_x, grid_y) if key not in track_grid: track_grid[key] = [] track_grid[key].append(j) # Find candidate pairs by checking 3x3 neighborhood of each detection det_indices = [] track_indices = [] for i in range(n_dets): x, y = det_positions[i] grid_x = int(x / grid_size) grid_y = int(y / grid_size) # Check neighboring grid cells (3x3 neighborhood) for dx in [-1, 0, 1]: for dy in [-1, 0, 1]: key = (grid_x + dx, grid_y + dy) if key in track_grid: for j in track_grid[key]: det_indices.append(i) track_indices.append(j) if not det_indices: return None # Only use sparse mode if it reduces computation significantly (< 50% of full matrix) n_pairs = len(det_indices) if n_pairs >= n_dets * n_tracks * 0.5: return None return np.array(det_indices, dtype=np.int32), np.array(track_indices, dtype=np.int32) def _compute_embedding_distances( self, detections: List[Detection], tracks: List[FastTrackState], has_embedding_mask: np.ndarray = None, det_positions: np.ndarray = None, track_positions: np.ndarray = None ) -> np.ndarray: """Compute embedding distances between detections and tracks. OPTIMIZED VERSION with buffer reuse and vectorized operations. For detections without embeddings (has_embedding_mask[i] = False), the embedding distance is set to normalized position distance so that the combined cost equals position-only cost: cost = (1-w)*pos + w*(pos/max)*max = pos Args: detections: List of Detection objects tracks: List of FastTrackState objects has_embedding_mask: Boolean mask indicating which detections have embeddings det_positions: Detection positions [N_det, 2] (for fallback computation) track_positions: Track predicted positions [N_track, 2] (for fallback computation) Returns: Scaled embedding distance matrix [N_det, N_track] """ from scipy.spatial.distance import cdist n_dets = len(detections) n_tracks = len(tracks) # Handle backward compatibility - if mask not provided, assume all have embeddings if has_embedding_mask is None: has_embedding_mask = np.ones(n_dets, dtype=bool) # Get indices of detections with embeddings dets_with_emb = np.where(has_embedding_mask)[0] dets_without_emb = np.where(~has_embedding_mask)[0] # Get embedding dimension from first detection with embedding, or config default emb_dim = getattr(self.config, 'default_embedding_dimension', 128) if len(dets_with_emb) > 0: first_emb = detections[dets_with_emb[0]].embedding if first_emb is not None: emb_dim = len(first_emb) # === OPTIMIZATION: Reusable buffer for detection embeddings === if not hasattr(self, '_emb_dist_det_buffer') or self._emb_dist_det_buffer.shape[0] < n_dets or self._emb_dist_det_buffer.shape[1] != emb_dim: self._emb_dist_det_buffer = np.zeros((max(n_dets, 100), emb_dim), dtype=np.float32) det_embeddings = self._emb_dist_det_buffer[:n_dets, :emb_dim] det_embeddings[:] = 0.0 # Batch extract detection embeddings for i in dets_with_emb: emb = detections[i].embedding if emb is not None: det_embeddings[i] = np.asarray(emb, dtype=np.float32)[:emb_dim] # Vectorized normalization norms = np.linalg.norm(det_embeddings, axis=1, keepdims=True) mask = (norms > 1e-8).flatten() det_embeddings[mask] /= norms[mask] # === Track embedding extraction with buffer reuse === track_counts = np.array([len(t.embedding_history) for t in tracks], dtype=np.int32) total_embs = sum(max(c, 1) for c in track_counts) # Reusable buffer for track embeddings if not hasattr(self, '_emb_dist_track_buffer') or len(self._emb_dist_track_buffer) < total_embs * emb_dim: self._emb_dist_track_buffer = np.zeros(max(total_embs * emb_dim, 10000), dtype=np.float32) track_embeddings_flat = self._emb_dist_track_buffer[:total_embs * emb_dim] track_embeddings_flat[:] = 0.0 # Copy track embeddings - try vectorized stack first, fallback to loop offset = 0 for track in tracks: hist = track.embedding_history n_hist = len(hist) if n_hist > 0: try: # Vectorized: stack all embeddings at once stacked = np.vstack(list(hist)) flat_len = n_hist * emb_dim track_embeddings_flat[offset:offset + flat_len] = stacked.ravel()[:flat_len] offset += flat_len except Exception: # Fallback: copy one by one for emb in hist: emb_arr = np.asarray(emb, dtype=np.float32).ravel() track_embeddings_flat[offset:offset + emb_dim] = emb_arr[:emb_dim] offset += emb_dim else: offset += emb_dim # Method mapping method_map = { "last": 0, "average": 1, "weighted_average": 2, "best_match": 3, "median": 4 } method = method_map.get(self.embedding_matching_method, 1) # Compute distances # OPTIMIZATION: Use fast matrix multiplication path for method=0 (last embedding) if method == 0: distances = compute_embedding_distances_matmul( det_embeddings, track_embeddings_flat, track_counts ) else: distances = compute_embedding_distances_with_method( det_embeddings, track_embeddings_flat, track_counts, method ) # Scale distances if scaler is available if self.embedding_scaler: distances_flat = distances.flatten() scaled_flat = self.embedding_scaler.scale_distances(distances_flat) self.embedding_scaler.update_statistics(distances_flat) distances = scaled_flat.reshape(distances.shape) # === OPTIMIZATION: Vectorized fallback for dets without embeddings === if len(dets_without_emb) > 0 and det_positions is not None and track_positions is not None: pos_dists = cdist(det_positions[dets_without_emb], track_positions) distances[dets_without_emb, :] = np.clip(pos_dists / self.max_distance, 0, 1) return distances def _compute_embedding_distances_sparse( self, detections: List[Detection], tracks: List[FastTrackState], has_embedding_mask: np.ndarray, sparse_det_indices: np.ndarray, sparse_track_indices: np.ndarray, det_positions: np.ndarray = None, track_positions: np.ndarray = None ) -> np.ndarray: """Compute embedding distances ONLY for sparse candidate pairs. FAST SPARSE VERSION - computes embedding distances only for spatially close detection-track pairs, not the full N×M matrix. For 1000 objects with 36K sparse pairs out of 880K total: - Full computation: ~100ms (880K aggregations) - Sparse computation: ~4ms (36K aggregations) = 24x speedup! Args: detections: List of Detection objects tracks: List of FastTrackState objects has_embedding_mask: Boolean mask indicating which detections have embeddings sparse_det_indices: Detection indices for sparse pairs sparse_track_indices: Track indices for sparse pairs det_positions: Detection positions [N_det, 2] (for fallback) track_positions: Track predicted positions [N_track, 2] (for fallback) Returns: Sparse embedding distance matrix [N_det, N_track] Non-sparse entries are set to 1.0 (max distance) """ from scipy.spatial.distance import cdist n_dets = len(detections) n_tracks = len(tracks) # Get embedding dimension emb_dim = getattr(self.config, 'default_embedding_dimension', 128) dets_with_emb = np.where(has_embedding_mask)[0] if len(dets_with_emb) > 0: first_emb = detections[dets_with_emb[0]].embedding if first_emb is not None: emb_dim = len(first_emb) # === Extract detection embeddings (reuse buffer) === if not hasattr(self, '_emb_dist_det_buffer') or self._emb_dist_det_buffer.shape[0] < n_dets or self._emb_dist_det_buffer.shape[1] != emb_dim: self._emb_dist_det_buffer = np.zeros((max(n_dets, 100), emb_dim), dtype=np.float32) det_embeddings = self._emb_dist_det_buffer[:n_dets, :emb_dim] det_embeddings[:] = 0.0 for i in dets_with_emb: emb = detections[i].embedding if emb is not None: det_embeddings[i] = np.asarray(emb, dtype=np.float32)[:emb_dim] # Normalize norms = np.linalg.norm(det_embeddings, axis=1, keepdims=True) mask = (norms > 1e-8).flatten() det_embeddings[mask] /= norms[mask] # === Extract track embeddings (NO padding for empty tracks) === track_counts = np.array([len(t.embedding_history) for t in tracks], dtype=np.int32) total_embs = int(np.sum(track_counts)) # Exact count, no padding if total_embs == 0: return np.ones((n_dets, n_tracks), dtype=np.float32) # Reusable buffer if not hasattr(self, '_emb_dist_track_buffer') or len(self._emb_dist_track_buffer) < total_embs * emb_dim: self._emb_dist_track_buffer = np.zeros(max(total_embs * emb_dim, 10000), dtype=np.float32) track_embeddings_flat = self._emb_dist_track_buffer[:total_embs * emb_dim] track_embeddings_flat[:] = 0.0 # Copy track embeddings - NO padding for empty tracks offset = 0 for track in tracks: hist = track.embedding_history n_hist = len(hist) if n_hist > 0: try: stacked = np.vstack(list(hist)) flat_len = n_hist * emb_dim track_embeddings_flat[offset:offset + flat_len] = stacked.ravel()[:flat_len] offset += flat_len except Exception: for emb in hist: emb_arr = np.asarray(emb, dtype=np.float32).ravel() track_embeddings_flat[offset:offset + emb_dim] = emb_arr[:emb_dim] offset += emb_dim # NOTE: No "else: offset += emb_dim" - no padding for empty tracks # Method mapping method_map = { "last": 0, "average": 1, "weighted_average": 2, "best_match": 3, "median": 4 } method = method_map.get(self.embedding_matching_method, 1) # === SPARSE COMPUTATION: Only aggregate for sparse pairs === distances = compute_embedding_distances_sparse( det_embeddings, track_embeddings_flat, track_counts, sparse_det_indices, sparse_track_indices, method, n_dets, n_tracks ) # Scale distances if scaler is available (VECTORIZED - not per-pair loop) if self.embedding_scaler: # Extract all sparse distances at once sparse_distances = distances[sparse_det_indices, sparse_track_indices] # Scale in batch (single call, not 4000+ individual calls) scaled_sparse = self.embedding_scaler.scale_distances(sparse_distances) self.embedding_scaler.update_statistics(sparse_distances) # Put back distances[sparse_det_indices, sparse_track_indices] = scaled_sparse # Fallback for dets without embeddings (VECTORIZED) dets_without_emb = np.where(~has_embedding_mask)[0] if len(dets_without_emb) > 0 and det_positions is not None and track_positions is not None: # Find sparse pairs where detection doesn't have embedding (numpy vectorized) fallback_mask = np.isin(sparse_det_indices, dets_without_emb) if np.any(fallback_mask): fb_det_idx = sparse_det_indices[fallback_mask] fb_track_idx = sparse_track_indices[fallback_mask] # Compute position distances vectorized pos_dists = np.sqrt( (det_positions[fb_det_idx, 0] - track_positions[fb_track_idx, 0])**2 + (det_positions[fb_det_idx, 1] - track_positions[fb_track_idx, 1])**2 ) distances[fb_det_idx, fb_track_idx] = np.clip(pos_dists / self.max_distance, 0, 1) return distances def _update_matched_tracks( self, matches: List[Tuple[int, int]], detections: List[Detection], tracks: List[FastTrackState] ): """Update tracks with matched detections.""" store_scores = getattr(self.config, 'store_embedding_scores', False) for det_idx, track_idx in matches: if track_idx < len(tracks): track = tracks[track_idx] det = detections[det_idx] # Compute and store embedding similarity score if enabled if store_scores and self.do_embeddings: det_emb = getattr(det, 'embedding', None) if det_emb is not None and len(track.embedding_history) > 0: # Compute cosine similarity with track's representative embedding track_emb = track.get_representative_embedding() if track_emb is not None: # Normalize embeddings det_norm = det_emb / (np.linalg.norm(det_emb) + 1e-8) track_norm = track_emb / (np.linalg.norm(track_emb) + 1e-8) # Cosine similarity (1.0 = identical, 0.0 = orthogonal, -1.0 = opposite) similarity = float(np.dot(det_norm, track_norm)) track.embedding_score_history.append(similarity) track.update_with_detection( position=det.position, embedding=getattr(det, 'embedding', None), bbox=getattr(det, 'bbox', None), frame=self._frame_count, det_conf=det.confidence ) # Update confirmation status if not track.confirmed and track.hits >= self.min_consecutive_detections: track.confirmed = True def _handle_unmatched_tracks( self, unmatched_track_indices: List[int], tracks: List[FastTrackState] ): """Handle tracks that weren't matched to any detection. Note: predict_position() was already called for ALL tracks BEFORE assignment, so we only need to update counters here (no double prediction). """ for track_idx in unmatched_track_indices: if track_idx < len(tracks): track = tracks[track_idx] # Position already predicted before assignment - just update counters track.age += 1 track.misses += 1 track.lost_frames += 1 def _get_detection_confidence(self, detection: Detection) -> float: """Extract confidence from detection object.""" if hasattr(detection, 'confidence'): return float(detection.confidence) return 1.0 def _handle_unmatched_detections( self, unmatched_det_indices: List[int], detections: List[Detection] ): """Handle detections that weren't matched to any track. IMPORTANT: This function now includes safeguards against creating duplicate tracks by: 1. Filtering detections by init_conf_threshold 2. Skipping detections that are close to EXISTING CONFIRMED tracks (these should have matched but failed - don't create duplicates) """ if len(unmatched_det_indices) == 0: return # Get existing confirmed track positions for spatial filtering confirmed_tracks = [t for t in self._tracks.values() if t.confirmed] # OPTIMIZATION: Pre-compute confirmed track positions as arrays (once, outside loop) dedup_threshold = self.deduplication_distance has_confirmed_tracks = len(confirmed_tracks) > 0 if has_confirmed_tracks: confirmed_pred_positions = np.array( [t.predicted_position for t in confirmed_tracks], dtype=np.float32 ) confirmed_last_positions = np.array( [t.last_detection_pos for t in confirmed_tracks], dtype=np.float32 ) for det_idx in unmatched_det_indices: det = detections[det_idx] # Check confidence threshold for track initialization det_conf = self._get_detection_confidence(det) if det_conf < self.init_conf_threshold: continue # Skip low-confidence detections det_pos = det.position.flatten()[:2] if hasattr(det.position, 'flatten') else det.position det_pos_arr = np.asarray(det_pos, dtype=np.float32) # CRITICAL: Skip detections that are very close to existing confirmed tracks # These should have matched but failed - creating pending would cause duplicates # OPTIMIZATION: Vectorized distance computation to all confirmed tracks at once near_existing_track = False if has_confirmed_tracks: # Compute distances to all confirmed tracks in one operation dists_to_pred = np.linalg.norm(confirmed_pred_positions - det_pos_arr, axis=1) dists_to_last = np.linalg.norm(confirmed_last_positions - det_pos_arr, axis=1) min_dists = np.minimum(dists_to_pred, dists_to_last) # Check if any track is within threshold if np.any(min_dists < dedup_threshold): near_existing_track = True if near_existing_track: continue # Skip - this detection is very close to an existing track # Try to match with pending detections # OPTIMIZATION: Vectorized distance computation to all pending at once matched_pending = False pending_distance = self.config.pending_detection_distance if self._pending_detections: # Pre-compute pending positions array (only when there are pending detections) pending_ids = list(self._pending_detections.keys()) pending_positions = np.array( [self._pending_detections[pid].position for pid in pending_ids], dtype=np.float32 ) # Vectorized distance to all pending dists_to_pending = np.linalg.norm(pending_positions - det_pos_arr, axis=1) # Find closest within threshold within_threshold_mask = dists_to_pending < pending_distance if np.any(within_threshold_mask): # Get the closest pending within threshold closest_idx = np.argmin(dists_to_pending) if dists_to_pending[closest_idx] < pending_distance: pending_id = pending_ids[closest_idx] pending = self._pending_detections[pending_id] # Update existing pending detection pending.update( det_pos, getattr(det, 'embedding', None), getattr(det, 'bbox', None), det_conf ) pending.last_seen_frame = self._frame_count matched_pending = True if not matched_pending: # Create new pending detection pending = PendingDetection( position=det_pos.astype(np.float32) if hasattr(det_pos, 'astype') else np.array(det_pos, dtype=np.float32), embedding=getattr(det, 'embedding', None), bbox=getattr(det, 'bbox', None), confidence=det_conf, first_seen_frame=self._frame_count, last_seen_frame=self._frame_count ) self._pending_detections[self._next_pending_id] = pending self._next_pending_id += 1 def _update_pending_detections(self): """Promote pending detections to tracks if they meet criteria.""" to_remove = [] for pending_id, pending in list(self._pending_detections.items()): if pending.is_ready_for_promotion( self.min_consecutive_detections, self.max_detection_gap, self._frame_count ): # Create new track track = FastTrackState( id=self._next_id, position=pending.average_position, kalman_type=self.kalman_type, velocity_damping=self.kalman_velocity_damping ) # Set embedding parameters score_history_len = getattr(self.config, 'embedding_score_history_length', 5) track.set_embedding_params( self.max_embeddings_per_track, self.embedding_matching_method, score_history_len ) # Set uncertainty window if enabled if self.uncertainty_weight > 0.0: track.set_uncertainty_window(self.uncertainty_window) # Add initial embedding if available if pending.embedding is not None: track.add_embedding(pending.embedding) # Initialize track track.update_with_detection( pending.average_position, pending.embedding, pending.bbox, self._frame_count, pending.confidence ) track.hits = pending.total_detections track.age = pending.consecutive_frames if track.hits >= self.min_consecutive_detections: track.confirmed = True self._tracks[self._next_id] = track self._next_id += 1 to_remove.append(pending_id) elif (self._frame_count - pending.last_seen_frame) > self.max_detection_gap: # Remove stale pending detection to_remove.append(pending_id) for pending_id in to_remove: del self._pending_detections[pending_id] def _attempt_reid( self, detections: List[Detection], unmatched_det_indices: List[int] ): """Attempt to re-identify lost tracks using embeddings. OPTIMIZED VERSION: - Uses representative embeddings (single per track) instead of full history - Uses greedy assignment O(n²) instead of Hungarian O(n³) - Uses matrix multiplication for fast cosine similarity IMPORTANT: ReID only matches detections that are NOT close to any active track. This prevents "stealing" detections that likely belong to active tracks but temporarily failed to match due to noise or occlusion. """ if not self.do_embeddings or len(unmatched_det_indices) == 0: return # Get lost tracks eligible for ReID lost_tracks_with_ids = [ (tid, track) for tid, track in self._tracks.items() if track.misses >= self.reid_min_frames_lost and track.misses < self.max_track_age and len(track.embedding_history) > 0 ] if not lost_tracks_with_ids: return # Get unmatched detections with embeddings unmatched_dets_with_emb = [ (idx, detections[idx]) for idx in unmatched_det_indices if hasattr(detections[idx], 'embedding') and detections[idx].embedding is not None ] if not unmatched_dets_with_emb: return # --- CRITICAL: Filter out detections close to ACTIVE tracks --- active_tracks = [ track for track in self._tracks.values() if track.misses < self.reid_min_frames_lost ] if active_tracks: active_positions = np.array( [t.predicted_position for t in active_tracks], dtype=np.float32 ) det_positions_filter = np.array( [det.position for _, det in unmatched_dets_with_emb], dtype=np.float32 ) # Vectorized distance computation all_distances = np.linalg.norm( det_positions_filter[:, None, :] - active_positions[None, :, :], axis=2 ) min_distances = np.min(all_distances, axis=1) safe_mask = min_distances > self.max_distance unmatched_dets_with_emb = [ item for item, is_safe in zip(unmatched_dets_with_emb, safe_mask) if is_safe ] if not unmatched_dets_with_emb: return n_dets = len(unmatched_dets_with_emb) n_tracks = len(lost_tracks_with_ids) if n_dets == 0 or n_tracks == 0: return # --- Use same embedding matching method as main assignment --- # Extract detection embeddings det_embeddings = np.array([ det.embedding for _, det in unmatched_dets_with_emb ], dtype=np.float32) # Normalize detection embeddings det_norms = np.linalg.norm(det_embeddings, axis=1, keepdims=True) det_norms = np.where(det_norms > 1e-8, det_norms, 1.0) det_embeddings = det_embeddings / det_norms # Extract ALL track embeddings (full history, same as main assignment) emb_dim = det_embeddings.shape[1] track_counts = np.array([len(track.embedding_history) for _, track in lost_tracks_with_ids], dtype=np.int32) total_embs = int(np.sum(track_counts)) track_embeddings_flat = np.zeros(total_embs * emb_dim, dtype=np.float32) offset = 0 for _, track in lost_tracks_with_ids: hist = track.embedding_history n_hist = len(hist) if n_hist > 0: stacked = np.vstack(list(hist)) # Normalize each embedding norms = np.linalg.norm(stacked, axis=1, keepdims=True) norms = np.where(norms > 1e-8, norms, 1.0) stacked = stacked / norms flat_len = n_hist * emb_dim track_embeddings_flat[offset:offset + flat_len] = stacked.ravel()[:flat_len] offset += flat_len # Use same matching method as main assignment (best_match, average, etc.) method_map = {"last": 0, "average": 1, "weighted_average": 2, "best_match": 3, "median": 4} method = method_map.get(self.embedding_matching_method, 3) # default to best_match raw_embedding_distances = compute_embedding_distances_with_method( det_embeddings, track_embeddings_flat, track_counts, method ) # Scale embedding distances if self.embedding_scaler: scaled_flat = self.embedding_scaler.scale_distances(raw_embedding_distances.flatten()) scaled_embedding_distances = scaled_flat.reshape(n_dets, n_tracks) else: scaled_embedding_distances = raw_embedding_distances # Get position data det_positions = np.array([det.position for _, det in unmatched_dets_with_emb], dtype=np.float32) track_positions = np.array([t.predicted_position for _, t in lost_tracks_with_ids], dtype=np.float32) track_last_positions = np.array([t.last_detection_pos for _, t in lost_tracks_with_ids], dtype=np.float32) track_misses = np.array([t.misses for _, t in lost_tracks_with_ids], dtype=np.int32) # Compute ReID cost matrix reid_boost = getattr(self.config, 'reid_embedding_weight_boost', 1.5) reid_embedding_weight = min(self.embedding_weight * reid_boost, 0.95) reid_cost_matrix = compute_cost_matrix_vectorized( det_positions, track_positions, track_last_positions, track_misses, scaled_embedding_distances, reid_embedding_weight, self.reid_max_distance, do_embeddings=True, miss_threshold=3, embedding_threshold_adjustment=self.embedding_threshold_adjustment ) # --- OPTIMIZATION: Use greedy assignment O(n²) instead of Hungarian O(n³) --- # ReID typically has few candidates, greedy is sufficient and much faster reid_matches, _, _ = numba_greedy_assignment( reid_cost_matrix, self.reid_max_distance ) # Apply ReID matches for det_match_idx, track_match_idx in reid_matches: det_idx, det = unmatched_dets_with_emb[det_match_idx] track_id, track = lost_tracks_with_ids[track_match_idx] track = self._tracks[track_id] track.update_with_detection( position=det.position, embedding=det.embedding, bbox=getattr(det, 'bbox', None), frame=self._frame_count, det_conf=det.confidence, is_reid=True ) def _cleanup_tracks(self): """Remove old or unconfirmed tracks.""" to_remove = [] for track_id, track in self._tracks.items(): if track.misses > self.max_track_age: to_remove.append(track_id) elif not track.confirmed and track.misses > 3: to_remove.append(track_id) for track_id in to_remove: del self._tracks[track_id] def _get_results(self) -> List[TrackedObject]: """Get current tracking results.""" results = [] store_scores = getattr(self.config, 'store_embedding_scores', False) for track in self._tracks.values(): if track.confirmed and track.misses == 0: # Compute average embedding score if available embedding_score = None if store_scores and len(track.embedding_score_history) > 0: embedding_score = float(np.mean(list(track.embedding_score_history))) results.append(TrackedObject( id=track.id, position=track.position, velocity=track.velocity, confidence=track.detection_confidence, age=track.age, hits=track.hits, time_since_update=track.misses, state=1 if track.confirmed else 0, # 1: Confirmed, 0: Tentative bbox=track.bbox, predicted_position=track.predicted_position.copy() if track.predicted_position is not None else (track.position + track.velocity), embedding_score=embedding_score )) return results def _log_timings(self): """Log timing information for debugging. Prints to stdout every frame when debug_timings is enabled. Uses print() instead of logger to ensure visibility regardless of logging config. """ formatted = {k: f"{v*1000:.2f} ms" for k, v in self.timings.items()} print(f"[Frame {self._frame_count}] Timings: {formatted}") self.timings.clear()
[docs] def reset(self): """Reset the tracker to initial state. This clears all tracks, pending detections, and resets the embedding scaler statistics. Use when starting a new video or scene. """ self._tracks.clear() self._pending_detections.clear() self._next_id = 1 self._next_pending_id = 1 self._frame_count = 0 self.timings.clear() # Re-initialize array pools to initial size (preserves warmup allocation) # This avoids re-triggering allocation spikes on new videos initial_pool_size = getattr(self.config, 'initial_pool_size', 150) self._reusable_det_positions = np.empty((initial_pool_size, 2), dtype=np.float32) self._reusable_track_positions = np.empty((initial_pool_size, 2), dtype=np.float32) self._reusable_track_last_positions = np.empty((initial_pool_size, 2), dtype=np.float32) self._reusable_track_misses = np.empty(initial_pool_size, dtype=np.int32) self._max_dets_seen = initial_pool_size self._max_tracks_seen = initial_pool_size # Reset embedding scaler statistics if self.embedding_scaler: self.embedding_scaler.reset() gc.collect()
@property def frame_count(self): """Get current frame count.""" return self._frame_count
[docs] def get_state(self) -> dict: """Get current tracker state for debugging.""" return { "frame_count": self._frame_count, "n_tracks": len(self._tracks), "n_pending": len(self._pending_detections), "track_ids": list(self._tracks.keys()) }
[docs] def get_statistics(self) -> dict: """Get tracker statistics for benchmarking and analysis. Returns: Dictionary containing: - next_id: Next track ID to be assigned (= total tracks created) - active_tracks: Number of currently active tracks - confirmed_tracks: Number of confirmed tracks - pending_detections: Number of pending detections - frame_count: Total frames processed """ confirmed = sum(1 for t in self._tracks.values() if t.confirmed) return { "next_id": self._next_id, "active_tracks": len(self._tracks), "confirmed_tracks": confirmed, "pending_detections": len(self._pending_detections), "frame_count": self._frame_count }