Source code for swarmsort.embeddings
# ----------------------------------------------------------------------------------------------------------------------
# CUPY GPU-ACCELERATED EMBEDDINGS FOR SWARMSORT STANDALONE
# ----------------------------------------------------------------------------------------------------------------------
"""
GPU-accelerated embedding extractors for SwarmSort standalone package.
Provides cupytexture and cupytexture_mega embeddings with automatic CPU fallback.
"""
import numpy as np
import cv2
from typing import Optional, List
import numba as nb
from loguru import logger
# Optional CuPy import with graceful fallback
try:
import cupy as cp
from cupyx.scipy.ndimage import uniform_filter
CUPY_AVAILABLE = True
logger.debug("CuPy detected - GPU acceleration available")
except ImportError:
CUPY_AVAILABLE = False
cp = None
logger.info("CuPy not available - falling back to CPU-only mode")
[docs]
class EmbeddingExtractor:
"""Base class for embedding extractors in standalone package."""
def __init__(self, use_tensor=False):
self.use_tensor = use_tensor
[docs]
def extract(self, frame: np.ndarray, bbox: np.ndarray) -> np.ndarray:
"""Extract single embedding."""
raise NotImplementedError
[docs]
def extract_batch(self, frame: np.ndarray, bboxes: np.ndarray) -> List[np.ndarray]:
"""Extract batch of embeddings."""
return [self.extract(frame, bbox) for bbox in bboxes]
@property
def embedding_dim(self) -> int:
"""Return embedding dimensionality."""
raise NotImplementedError
[docs]
@nb.njit(fastmath=True, parallel=True)
def extract_features_batch_jit(patches: np.ndarray, features_out: np.ndarray):
"""
JIT-optimized batch feature extraction for microorganism patches.
patches: (N, H, W, 3) - batch of BGR patches
features_out: (N, feature_dim) - output features
"""
N, H, W = patches.shape[:3]
for i in nb.prange(N):
patch = patches[i]
# Convert to grayscale manually (BGR to Gray: 0.114*B + 0.587*G + 0.299*R)
gray = np.zeros((H, W), dtype=np.float32)
for y in range(H):
for x in range(W):
gray[y, x] = (
0.114 * patch[y, x, 0] + 0.587 * patch[y, x, 1] + 0.299 * patch[y, x, 2]
)
# Basic statistics (4 features)
features_out[i, 0] = np.mean(gray)
features_out[i, 1] = np.std(gray)
features_out[i, 2] = np.min(gray)
features_out[i, 3] = np.max(gray)
# Gradient features (4 features)
grad_x = np.zeros_like(gray)
grad_y = np.zeros_like(gray)
for y in range(1, H - 1):
for x in range(1, W - 1):
grad_x[y, x] = gray[y, x + 1] - gray[y, x - 1]
grad_y[y, x] = gray[y + 1, x] - gray[y - 1, x]
features_out[i, 4] = np.mean(np.abs(grad_x))
features_out[i, 5] = np.mean(np.abs(grad_y))
features_out[i, 6] = np.std(grad_x)
features_out[i, 7] = np.std(grad_y)
# Central moments and shape features (8 features)
cy, cx = H // 2, W // 2
# Weighted center of mass
total_intensity = 0.0
weighted_x = 0.0
weighted_y = 0.0
for y in range(H):
for x in range(W):
w = gray[y, x]
total_intensity += w
weighted_x += w * x
weighted_y += w * y
if total_intensity > 0:
cm_x = weighted_x / total_intensity
cm_y = weighted_y / total_intensity
else:
cm_x = cx
cm_y = cy
features_out[i, 8] = cm_x - cx # offset from geometric center
features_out[i, 9] = cm_y - cy
# Moments of inertia
m20 = m02 = m11 = 0.0
for y in range(H):
for x in range(W):
w = gray[y, x]
dx = x - cm_x
dy = y - cm_y
m20 += w * dx * dx
m02 += w * dy * dy
m11 += w * dx * dy
if total_intensity > 0:
m20 /= total_intensity
m02 /= total_intensity
m11 /= total_intensity
features_out[i, 10] = m20
features_out[i, 11] = m02
features_out[i, 12] = m11
# Eccentricity and orientation
if m20 + m02 > 0:
lambda1 = 0.5 * (m20 + m02 + np.sqrt((m20 - m02) ** 2 + 4 * m11**2))
lambda2 = 0.5 * (m20 + m02 - np.sqrt((m20 - m02) ** 2 + 4 * m11**2))
eccentricity = np.sqrt(1 - lambda2 / lambda1) if lambda1 > 0 else 0
orientation = 0.5 * np.arctan2(2 * m11, m20 - m02)
else:
eccentricity = 0
orientation = 0
features_out[i, 13] = eccentricity
features_out[i, 14] = np.sin(orientation) # sin for rotation invariance
features_out[i, 15] = np.cos(orientation) # cos for rotation invariance
# Radial features (8 features) - intensity at different radii
max_radius = min(H, W) // 2
for r_idx in range(8):
radius = (r_idx + 1) * max_radius / 8.0
radial_sum = 0.0
radial_count = 0
for y in range(H):
for x in range(W):
dist = np.sqrt((x - cm_x) ** 2 + (y - cm_y) ** 2)
if abs(dist - radius) < 0.5:
radial_sum += gray[y, x]
radial_count += 1
if radial_count > 0:
features_out[i, 16 + r_idx] = radial_sum / radial_count
else:
features_out[i, 16 + r_idx] = 0.0
# HSV color features (8 features)
hsv_means = np.zeros(3, dtype=np.float32)
hsv_stds = np.zeros(3, dtype=np.float32)
# Convert BGR to HSV manually (simplified approximation)
for y in range(H):
for x in range(W):
b, g, r = patch[y, x, 0] / 255.0, patch[y, x, 1] / 255.0, patch[y, x, 2] / 255.0
max_val = max(r, g, b)
min_val = min(r, g, b)
diff = max_val - min_val
# Hue
if diff == 0:
h = 0
elif max_val == r:
h = (60 * ((g - b) / diff) + 360) % 360
elif max_val == g:
h = (60 * ((b - r) / diff) + 120) % 360
else:
h = (60 * ((r - g) / diff) + 240) % 360
# Saturation
s = 0 if max_val == 0 else diff / max_val
# Value
v = max_val
hsv_means[0] += h
hsv_means[1] += s
hsv_means[2] += v
pixel_count = H * W
hsv_means /= pixel_count
# Calculate HSV standard deviations
for y in range(H):
for x in range(W):
b, g, r = patch[y, x, 0] / 255.0, patch[y, x, 1] / 255.0, patch[y, x, 2] / 255.0
max_val = max(r, g, b)
min_val = min(r, g, b)
diff = max_val - min_val
if diff == 0:
h = 0
elif max_val == r:
h = (60 * ((g - b) / diff) + 360) % 360
elif max_val == g:
h = (60 * ((b - r) / diff) + 120) % 360
else:
h = (60 * ((r - g) / diff) + 240) % 360
s = 0 if max_val == 0 else diff / max_val
v = max_val
hsv_stds[0] += (h - hsv_means[0]) ** 2
hsv_stds[1] += (s - hsv_means[1]) ** 2
hsv_stds[2] += (v - hsv_means[2]) ** 2
hsv_stds = np.sqrt(hsv_stds / pixel_count)
features_out[i, 24] = hsv_means[0] / 360.0 # Normalized hue
features_out[i, 25] = hsv_means[1]
features_out[i, 26] = hsv_means[2]
features_out[i, 27] = hsv_stds[0] / 360.0
features_out[i, 28] = hsv_stds[1]
features_out[i, 29] = hsv_stds[2]
# Contrast and texture features (6 features)
# Local Binary Pattern approximation
lbp_hist = np.zeros(8, dtype=np.float32)
for y in range(1, H - 1):
for x in range(1, W - 1):
center = gray[y, x]
code = 0
if gray[y - 1, x - 1] > center:
code |= 1
if gray[y - 1, x] > center:
code |= 2
if gray[y - 1, x + 1] > center:
code |= 4
if gray[y, x + 1] > center:
code |= 8
if gray[y + 1, x + 1] > center:
code |= 16
if gray[y + 1, x] > center:
code |= 32
if gray[y + 1, x - 1] > center:
code |= 64
if gray[y, x - 1] > center:
code |= 128
lbp_hist[code % 8] += 1
lbp_sum = np.sum(lbp_hist)
if lbp_sum > 0:
lbp_hist /= lbp_sum
# Use first 6 LBP features
for j in range(6):
features_out[i, 30 + j] = lbp_hist[j]
[docs]
@nb.njit(fastmath=True, parallel=True)
def extract_mega_features_batch_jit(patches: np.ndarray, features_out: np.ndarray):
"""
JIT-optimized batch feature extraction combining shape, color, and texture.
- Shape: Moments, eccentricity, orientation
- Color: HSV statistics
- Texture: Rotation-invariant LBP, multi-scale wavelet, entropy
"""
N, H, W = patches.shape[:3]
FEATURE_DIM = 64
for i in nb.prange(N):
patch = patches[i]
feature_idx = 0
# ------------------- GRAYSCALE & BASIC STATS (4 features) -------------------
gray = np.zeros((H, W), dtype=np.float32)
for y in range(H):
for x in range(W):
gray[y, x] = (
0.114 * patch[y, x, 0] + 0.587 * patch[y, x, 1] + 0.299 * patch[y, x, 2]
)
features_out[i, feature_idx] = np.mean(gray)
features_out[i, feature_idx + 1] = np.std(gray)
features_out[i, feature_idx + 2] = np.min(gray)
features_out[i, feature_idx + 3] = np.max(gray)
feature_idx += 4
# ------------------- GRADIENT FEATURES (4 features) -------------------
grad_x = np.zeros_like(gray)
grad_y = np.zeros_like(gray)
for y in range(1, H - 1):
for x in range(1, W - 1):
grad_x[y, x] = gray[y, x + 1] - gray[y, x - 1]
grad_y[y, x] = gray[y + 1, x] - gray[y - 1, x]
features_out[i, feature_idx] = np.mean(np.abs(grad_x))
features_out[i, feature_idx + 1] = np.mean(np.abs(grad_y))
features_out[i, feature_idx + 2] = np.std(grad_x)
features_out[i, feature_idx + 3] = np.std(grad_y)
feature_idx += 4
# ------------------- SHAPE & MOMENTS (10 features) -------------------
# Weighted Center of Mass
total_intensity = 0.0
weighted_x = 0.0
weighted_y = 0.0
for y in range(H):
for x in range(W):
w = gray[y, x]
total_intensity += w
weighted_x += w * x
weighted_y += w * y
if total_intensity > 1e-6:
cm_x = weighted_x / total_intensity
cm_y = weighted_y / total_intensity
else:
cm_x = W / 2
cm_y = H / 2
features_out[i, feature_idx] = cm_x - W / 2
features_out[i, feature_idx + 1] = cm_y - H / 2
# Central Moments
m20 = m02 = m11 = 0.0
for y in range(H):
for x in range(W):
w = gray[y, x]
dx, dy = x - cm_x, y - cm_y
m20 += w * dx * dx
m02 += w * dy * dy
m11 += w * dx * dy
if total_intensity > 1e-6:
m20 /= total_intensity
m02 /= total_intensity
m11 /= total_intensity
features_out[i, feature_idx + 2] = m20
features_out[i, feature_idx + 3] = m02
features_out[i, feature_idx + 4] = m11
# Eccentricity and Orientation
denom = m20 - m02
# Add small epsilon to avoid division by zero in arctan2
if abs(denom) < 1e-6:
denom += 1e-6
discriminant = np.sqrt(denom**2 + 4 * m11**2)
lambda1 = 0.5 * (m20 + m02 + discriminant)
lambda2 = 0.5 * (m20 + m02 - discriminant)
eccentricity = np.sqrt(1 - lambda2 / lambda1) if lambda1 > 1e-6 else 0
orientation = 0.5 * np.arctan2(2 * m11, denom)
features_out[i, feature_idx + 5] = eccentricity
features_out[i, feature_idx + 6] = np.sin(
2 * orientation
) # Use 2*orientation for rotational stability
features_out[i, feature_idx + 7] = np.cos(2 * orientation)
# Hu Moments (2 rotation-invariant moments)
I1 = m20 + m02
I2 = (m20 - m02) ** 2 + 4 * m11**2
features_out[i, feature_idx + 8] = I1
features_out[i, feature_idx + 9] = I2
feature_idx += 10
# ------------------- COLOR FEATURES (6 features) -------------------
hsv_means = np.zeros(3, dtype=np.float32)
hsv_stds = np.zeros(3, dtype=np.float32)
h_vals, s_vals, v_vals = [], [], []
for y in range(H):
for x in range(W):
b, g, r = patch[y, x, 0] / 255.0, patch[y, x, 1] / 255.0, patch[y, x, 2] / 255.0
max_val, min_val = max(r, g, b), min(r, g, b)
diff = max_val - min_val
h, s, v = 0.0, 0.0, max_val
if diff > 1e-6:
if max_val == r:
h = (60 * ((g - b) / diff) + 360) % 360
elif max_val == g:
h = (60 * ((b - r) / diff) + 120) % 360
else:
h = (60 * ((r - g) / diff) + 240) % 360
if max_val > 1e-6:
s = diff / max_val
h_vals.append(h)
s_vals.append(s)
v_vals.append(v)
h_rad = np.array([v * np.pi / 180.0 for v in h_vals])
features_out[i, feature_idx] = np.mean(np.array(s_vals))
features_out[i, feature_idx + 1] = np.mean(np.array(v_vals))
features_out[i, feature_idx + 2] = np.std(np.array(s_vals))
features_out[i, feature_idx + 3] = np.std(np.array(v_vals))
# Use circular mean for hue
mean_sin, mean_cos = np.mean(np.sin(h_rad)), np.mean(np.cos(h_rad))
features_out[i, feature_idx + 4] = np.arctan2(mean_sin, mean_cos)
# Use circular std for hue
features_out[i, feature_idx + 5] = np.sqrt(
-2 * np.log(np.sqrt(mean_sin**2 + mean_cos**2))
)
feature_idx += 6
# ------------------- TEXTURE: LBP (10 features) -------------------
lbp_hist = np.zeros(36, dtype=np.float32) # Uniform LBP has 36 patterns for 8 neighbors
for y in range(1, H - 1):
for x in range(1, W - 1):
center = gray[y, x]
code = 0
if gray[y - 1, x - 1] > center:
code |= 1
if gray[y - 1, x] > center:
code |= 2
if gray[y - 1, x + 1] > center:
code |= 4
if gray[y, x + 1] > center:
code |= 8
if gray[y + 1, x + 1] > center:
code |= 16
if gray[y + 1, x] > center:
code |= 32
if gray[y + 1, x - 1] > center:
code |= 64
if gray[y, x - 1] > center:
code |= 128
min_code = code
for rot in range(1, 8):
rotated = ((code >> rot) | (code << (8 - rot))) & 0xFF
if rotated < min_code:
min_code = rotated
lbp_hist[min_code % 36] += 1
lbp_sum = np.sum(lbp_hist)
if lbp_sum > 0:
lbp_hist /= lbp_sum
for j in range(10):
features_out[i, feature_idx + j] = lbp_hist[j]
feature_idx += 10
# ------------------- TEXTURE: RADIAL & POLAR (16 features) -------------------
max_radius = min(W, H) / 2.0
for r_idx in range(4): # 4 radii
radius = (r_idx + 1) * max_radius / 4.0
radial_values = []
for angle_step in range(16):
angle = 2 * np.pi * angle_step / 16.0
x = cm_x + radius * np.cos(angle)
y = cm_y + radius * np.sin(angle)
x1, y1 = int(x), int(y)
if 0 <= x1 < W - 1 and 0 <= y1 < H - 1:
fx, fy = x - x1, y - y1
val = (
gray[y1, x1] * (1 - fx) * (1 - fy)
+ gray[y1, x1 + 1] * fx * (1 - fy)
+ gray[y1 + 1, x1] * (1 - fx) * fy
+ gray[y1 + 1, x1 + 1] * fx * fy
)
radial_values.append(val)
if len(radial_values) > 1:
radial_arr = np.array(radial_values)
features_out[i, feature_idx] = np.mean(radial_arr)
features_out[i, feature_idx + 1] = np.std(radial_arr)
# DFT 1st harmonic
n_samples = len(radial_arr)
real_part, imag_part = 0.0, 0.0
for n in range(n_samples):
angle = 2 * np.pi * n / n_samples
real_part += radial_arr[n] * np.cos(angle)
imag_part -= radial_arr[n] * np.sin(angle)
features_out[i, feature_idx + 2] = (
np.sqrt(real_part**2 + imag_part**2) / n_samples
)
else:
features_out[i, feature_idx : feature_idx + 3] = 0.0
feature_idx += 4 # Note: one feature is left empty per radius for now
# ------------------- TEXTURE: ENTROPY & WAVELET (14 features) -------------------
# Wavelet-like features
for scale_exp in range(4):
block_size = 2 ** (scale_exp + 1)
std_sum = 0.0
count = 0
for y in range(0, H - block_size, block_size):
for x in range(0, W - block_size, block_size):
block = gray[y : y + block_size, x : x + block_size]
if block.size > 0:
std_sum += np.std(block)
count += 1
if count > 0:
features_out[i, feature_idx + scale_exp] = std_sum / count
feature_idx += 4
# Entropy features
hist = np.zeros(16, dtype=np.float32)
for y in range(H):
for x in range(W):
bin_idx = min(15, int(gray[y, x] / 16))
hist[bin_idx] += 1
if H * W > 0:
hist /= H * W
entropy = 0.0
for p in hist:
if p > 0:
entropy -= p * np.log2(p)
features_out[i, feature_idx] = entropy
features_out[i, feature_idx + 1] = np.sum(hist**2) # Uniformity
feature_idx += 2
# Fill any remaining features
while feature_idx < FEATURE_DIM:
features_out[i, feature_idx] = 0.0
feature_idx += 1
[docs]
@nb.njit(fastmath=True)
def cosine_distance_jit(emb1: np.ndarray, emb2: np.ndarray) -> float:
"""
JIT-optimized cosine distance computation.
Cosine distance = 1 - cosine_similarity
Returns value in [0, 1] where 0 = identical, 1 = opposite.
This is the standard distance metric for L2-normalized embeddings.
"""
emb1_f64 = emb1.astype(np.float64)
emb2_f64 = emb2.astype(np.float64)
# Dot product
dot = 0.0
norm1_sq = 0.0
norm2_sq = 0.0
for i in range(len(emb1_f64)):
dot += emb1_f64[i] * emb2_f64[i]
norm1_sq += emb1_f64[i] * emb1_f64[i]
norm2_sq += emb2_f64[i] * emb2_f64[i]
norm1 = np.sqrt(norm1_sq)
norm2 = np.sqrt(norm2_sq)
if norm1 < 1e-8 or norm2 < 1e-8:
# If either vector is zero, check if both are zero
if norm1 < 1e-8 and norm2 < 1e-8:
return 0.0 # Both zero vectors considered identical
return 1.0 # One zero, one non-zero = maximally different
cosine_sim = dot / (norm1 * norm2)
# Clamp to [-1, 1] to handle numerical errors
cosine_sim = max(-1.0, min(1.0, cosine_sim))
# Convert to distance in [0, 1]
# cosine_sim = 1 -> distance = 0 (identical)
# cosine_sim = -1 -> distance = 1 (opposite)
distance = (1.0 - cosine_sim) / 2.0
return distance
[docs]
@nb.njit(fastmath=True)
def correlation_distance_jit(emb1: np.ndarray, emb2: np.ndarray) -> float:
"""
JIT-optimized correlation (Pearson) distance computation.
Correlation distance = (1 - Pearson_correlation) / 2
Returns value in [0, 1] where 0 = identical pattern, 1 = opposite pattern.
More robust to offset differences than cosine, but may discard
useful mean information from well-normalized embeddings.
"""
emb1_f64 = emb1.astype(np.float64)
emb2_f64 = emb2.astype(np.float64)
mean1 = np.mean(emb1_f64)
mean2 = np.mean(emb2_f64)
numerator = np.sum((emb1_f64 - mean1) * (emb2_f64 - mean2))
var1 = np.sum((emb1_f64 - mean1) ** 2)
var2 = np.sum((emb2_f64 - mean2) ** 2)
if var1 == 0.0 or var2 == 0.0:
# Check if vectors are identical
is_identical = True
for i in range(len(emb1_f64)):
if abs(emb1_f64[i] - emb2_f64[i]) > 1e-10:
is_identical = False
break
return 0.0 if is_identical else 1.0
correlation = numerator / np.sqrt(var1 * var2)
distance = (1.0 - correlation) / 2.0
return max(0.0, min(1.0, distance))
# Default distance metric
DEFAULT_DISTANCE_METRIC = "cosine"
[docs]
class CupyTextureEmbedding(EmbeddingExtractor):
"""
Fast GPU-accelerated embedding for microorganism tracking.
Features: 36 dimensions
- Basic statistics (4): mean, std, min, max
- Gradient features (4): gradient magnitudes and variations
- Shape features (8): center of mass, moments, eccentricity, orientation
- Radial features (8): intensity at different radii from center
- Color features (6): HSV statistics
- Texture features (6): Local Binary Pattern approximation
"""
def __init__(self, patch_size: int = 32, use_gpu: bool = True):
super().__init__(use_tensor=False)
self._dim = 36
self.patch_size = patch_size
self.use_gpu = use_gpu and CUPY_AVAILABLE
if self.use_gpu:
logger.info("CupyTextureEmbedding initialized with GPU acceleration")
else:
logger.info("CupyTextureEmbedding initialized with CPU-only mode")
def _prepare_patch(self, frame: np.ndarray, bbox: np.ndarray) -> Optional[np.ndarray]:
"""Extract and resize patch from frame using bbox."""
x1, y1, x2, y2 = map(int, bbox[:4])
h_frame, w_frame = frame.shape[:2]
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(w_frame, x2), min(h_frame, y2)
if x1 >= x2 or y1 >= y2:
return None
roi = frame[y1:y2, x1:x2]
if roi.size == 0:
return None
# Resize to standard patch size for consistency
patch = cv2.resize(roi, (self.patch_size, self.patch_size), interpolation=cv2.INTER_AREA)
return patch
[docs]
def extract(self, frame: np.ndarray, bbox: np.ndarray) -> np.ndarray:
"""Extract features for a single patch."""
patch = self._prepare_patch(frame, bbox)
if patch is None:
return np.zeros(self._dim, dtype=np.float32)
# Use batch processing even for single patch for consistency
patches = np.expand_dims(patch, axis=0)
features = np.zeros((1, self._dim), dtype=np.float32)
try:
extract_features_batch_jit(patches, features)
except Exception as e:
logger.warning(f"Numba JIT compilation failed: {e}. Using basic CPU fallback for single extraction.")
# Basic CPU fallback for single patch
if len(patch.shape) == 3:
gray = cv2.cvtColor(patch, cv2.COLOR_BGR2GRAY).astype(np.float32)
else:
gray = patch.astype(np.float32)
features[0, :min(8, self._dim)] = [
np.mean(gray), np.std(gray), np.min(gray), np.max(gray),
np.mean(np.abs(cv2.Sobel(gray, cv2.CV_32F, 1, 0, ksize=3))),
np.mean(np.abs(cv2.Sobel(gray, cv2.CV_32F, 0, 1, ksize=3))),
np.var(gray), np.max(gray) - np.min(gray)
][:min(8, self._dim)]
return features[0]
[docs]
def extract_batch_cpu(self, frame: np.ndarray, bboxes: np.ndarray) -> List[np.ndarray]:
"""CPU batch extraction."""
if len(bboxes) == 0:
return []
# Prepare all patches
patches = []
valid_indices = []
for i, bbox in enumerate(bboxes):
patch = self._prepare_patch(frame, bbox)
if patch is not None:
patches.append(patch)
valid_indices.append(i)
if not patches:
return [np.zeros(self._dim, dtype=np.float32) for _ in bboxes]
# Convert to batch array
patches_array = np.array(patches)
features = np.zeros((len(patches), self._dim), dtype=np.float32)
# JIT-compiled batch processing with fallback for Python 3.9 compatibility
try:
extract_features_batch_jit(patches_array, features)
except Exception as e:
logger.warning(f"Numba JIT compilation failed: {e}. Using basic CPU fallback.")
# Simple CPU fallback - basic texture features
for i, patch in enumerate(patches):
if len(patch.shape) == 3:
gray = cv2.cvtColor(patch, cv2.COLOR_BGR2GRAY).astype(np.float32)
else:
gray = patch.astype(np.float32)
# Fill with basic statistics
features[i, :min(8, self._dim)] = [
np.mean(gray), np.std(gray), np.min(gray), np.max(gray),
np.mean(np.abs(cv2.Sobel(gray, cv2.CV_32F, 1, 0, ksize=3))),
np.mean(np.abs(cv2.Sobel(gray, cv2.CV_32F, 0, 1, ksize=3))),
np.var(gray), np.max(gray) - np.min(gray)
][:min(8, self._dim)]
# Map back to original indices
result = []
valid_idx = 0
for i in range(len(bboxes)):
if i in valid_indices:
result.append(features[valid_idx])
valid_idx += 1
else:
result.append(np.zeros(self._dim, dtype=np.float32))
return result
[docs]
def extract_batch_gpu(self, frame: np.ndarray, bboxes: np.ndarray) -> List[np.ndarray]:
"""GPU batch extraction using CuPy."""
if not self.use_gpu:
logger.warning("GPU extraction requested but CuPy not available, falling back to CPU")
return self.extract_batch_cpu(frame, bboxes)
if len(bboxes) == 0:
return []
try:
# Prepare patches on CPU first (ROI extraction is sequential)
patches = []
valid_indices = []
for i, bbox in enumerate(bboxes):
patch = self._prepare_patch(frame, bbox)
if patch is not None:
patches.append(patch)
valid_indices.append(i)
if not patches:
return [np.zeros(self._dim, dtype=np.float32) for _ in bboxes]
# Move to GPU
patches_array = np.array(patches)
patches_gpu = cp.asarray(patches_array)
# GPU feature extraction
features_gpu = self._extract_features_gpu(patches_gpu)
# Move back to CPU
features = cp.asnumpy(features_gpu)
# Map back to original indices
result = []
valid_idx = 0
for i in range(len(bboxes)):
if i in valid_indices:
result.append(features[valid_idx])
valid_idx += 1
else:
result.append(np.zeros(self._dim, dtype=np.float32))
return result
except Exception as e:
logger.error(f"GPU extraction failed: {e}, falling back to CPU")
return self.extract_batch_cpu(frame, bboxes)
def _extract_features_gpu(self, patches_gpu) -> "cp.ndarray":
"""GPU-accelerated feature extraction using CuPy."""
N, H, W, C = patches_gpu.shape
features = cp.zeros((N, self._dim), dtype=cp.float32)
# Convert to grayscale
gray_gpu = cp.sum(patches_gpu * cp.array([0.114, 0.587, 0.299]), axis=3)
# Basic statistics
features[:, 0] = cp.mean(gray_gpu, axis=(1, 2))
features[:, 1] = cp.std(gray_gpu, axis=(1, 2))
features[:, 2] = cp.min(gray_gpu, axis=(1, 2))
features[:, 3] = cp.max(gray_gpu, axis=(1, 2))
# Gradient features
grad_x = cp.gradient(gray_gpu, axis=2)
grad_y = cp.gradient(gray_gpu, axis=1)
features[:, 4] = cp.mean(cp.abs(grad_x), axis=(1, 2))
features[:, 5] = cp.mean(cp.abs(grad_y), axis=(1, 2))
features[:, 6] = cp.std(grad_x, axis=(1, 2))
features[:, 7] = cp.std(grad_y, axis=(1, 2))
# Shape and color features (simplified for GPU)
# Center of mass
y_coords, x_coords = cp.meshgrid(cp.arange(H), cp.arange(W), indexing="ij")
total_intensity = cp.sum(gray_gpu, axis=(1, 2), keepdims=True)
mask = total_intensity.squeeze() > 0
cm_x = cp.zeros(N)
cm_y = cp.zeros(N)
if cp.any(mask):
cm_x[mask] = (
cp.sum(gray_gpu[mask] * x_coords, axis=(1, 2)) / total_intensity[mask].squeeze()
)
cm_y[mask] = (
cp.sum(gray_gpu[mask] * y_coords, axis=(1, 2)) / total_intensity[mask].squeeze()
)
features[:, 8] = cm_x - W // 2
features[:, 9] = cm_y - H // 2
# HSV features
max_vals = cp.max(patches_gpu, axis=3)
min_vals = cp.min(patches_gpu, axis=3)
features[:, 24] = cp.mean(max_vals, axis=(1, 2)) # Simplified V channel
features[:, 25] = cp.mean(max_vals - min_vals, axis=(1, 2)) # Simplified S
# Fill remaining features with texture approximations
for i in range(10, 24):
if i < 16:
# Moment-based features (simplified)
features[:, i] = cp.std(gray_gpu, axis=(1, 2)) * (i - 10) / 6
else:
# Radial features (simplified)
radius_idx = i - 16
features[:, i] = cp.mean(gray_gpu, axis=(1, 2)) * (0.8 + 0.2 * radius_idx / 8)
# Texture features (simplified)
for i in range(26, self._dim):
features[:, i] = cp.std(gray_gpu, axis=(1, 2)) * (i - 26) / (self._dim - 26)
return features
[docs]
def extract_batch(self, frame: np.ndarray, bboxes: np.ndarray) -> List[np.ndarray]:
"""Extract batch of embeddings with GPU/CPU selection."""
if self.use_gpu:
return self.extract_batch_gpu(frame, bboxes)
else:
return self.extract_batch_cpu(frame, bboxes)
@property
def embedding_dim(self) -> int:
return self._dim
[docs]
class CupyTextureColorEmbedding(EmbeddingExtractor):
"""
CuPy-accelerated texture embedding with rich color features.
Combines the robust texture analysis of CupyTextureEmbedding with comprehensive
color-based features for improved tracking and re-identification performance.
Feature composition (84 total features):
- Grayscale texture features (32) - from base CupyTextureEmbedding
- RGB color histograms (24) - 8 bins per channel
- HSV color histograms (24) - 8 bins per channel
- Color moments (4) - mean and std of RGB channels
This embedding is particularly effective for scenarios where both texture
and color information are important for distinguishing between objects.
"""
def __init__(self, patch_size: int = 32, use_gpu: bool = True):
self.patch_size = patch_size
self.use_gpu = use_gpu and CUPY_AVAILABLE
self._dim = 84 # 32 texture + 24 RGB hist + 24 HSV hist + 4 color moments
if self.use_gpu:
logger.info(f"CupyTextureColorEmbedding initialized with GPU acceleration. Dim={self._dim}")
else:
logger.info(f"CupyTextureColorEmbedding initialized with CPU-only mode. Dim={self._dim}")
def _prepare_patch(self, frame: np.ndarray, bbox: np.ndarray) -> Optional[np.ndarray]:
"""Extract and resize patch from frame using bbox."""
x1, y1, x2, y2 = map(int, bbox[:4])
h_frame, w_frame = frame.shape[:2]
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(w_frame, x2), min(h_frame, y2)
if x1 >= x2 or y1 >= y2:
return None
roi = frame[y1:y2, x1:x2]
if roi.size == 0:
return None
# Resize to standard patch size for consistency
patch = cv2.resize(roi, (self.patch_size, self.patch_size), interpolation=cv2.INTER_AREA)
return patch
[docs]
def extract(self, frame: np.ndarray, bbox: np.ndarray) -> np.ndarray:
"""Extract features for a single patch."""
patch = self._prepare_patch(frame, bbox)
if patch is None:
return np.zeros(self._dim, dtype=np.float32)
# Use batch processing even for single patch for consistency
patches = np.expand_dims(patch, axis=0)
if self.use_gpu:
try:
features = self._extract_features_gpu(patches)
return features[0]
except Exception as e:
logger.error(f"GPU extraction failed: {e}. Falling back to CPU.")
self.use_gpu = False
features = self._extract_features_cpu(patches)
return features[0]
def _extract_features_cpu(self, patches: np.ndarray) -> np.ndarray:
"""CPU-based feature extraction with color features."""
n_patches = patches.shape[0]
features = np.zeros((n_patches, self._dim), dtype=np.float32)
for i in range(n_patches):
patch = patches[i] # Shape: (H, W, 3)
H, W = patch.shape[:2]
feature_idx = 0
# === GRAYSCALE TEXTURE FEATURES (32 features) ===
# Convert to grayscale
gray = np.zeros((H, W), dtype=np.float32)
for y in range(H):
for x in range(W):
gray[y, x] = (
0.114 * patch[y, x, 0] + # Blue
0.587 * patch[y, x, 1] + # Green
0.299 * patch[y, x, 2] # Red
)
# Basic grayscale stats (4 features)
features[i, feature_idx] = np.mean(gray)
features[i, feature_idx + 1] = np.std(gray)
features[i, feature_idx + 2] = np.min(gray)
features[i, feature_idx + 3] = np.max(gray)
feature_idx += 4
# Gradient features (8 features)
grad_x = np.zeros_like(gray)
grad_y = np.zeros_like(gray)
for y in range(1, H - 1):
for x in range(1, W - 1):
grad_x[y, x] = gray[y, x + 1] - gray[y, x - 1]
grad_y[y, x] = gray[y + 1, x] - gray[y - 1, x]
grad_magnitude = np.sqrt(grad_x**2 + grad_y**2)
features[i, feature_idx] = np.mean(grad_magnitude)
features[i, feature_idx + 1] = np.std(grad_magnitude)
features[i, feature_idx + 2] = np.max(grad_magnitude)
features[i, feature_idx + 3] = np.mean(np.abs(grad_x))
features[i, feature_idx + 4] = np.mean(np.abs(grad_y))
features[i, feature_idx + 5] = np.std(grad_x)
features[i, feature_idx + 6] = np.std(grad_y)
features[i, feature_idx + 7] = np.mean(np.arctan2(grad_y, grad_x + 1e-8))
feature_idx += 8
# Center of mass and moments (8 features)
total_intensity = np.sum(gray)
if total_intensity > 0:
y_coords, x_coords = np.meshgrid(range(H), range(W), indexing='ij')
cm_x = np.sum(gray * x_coords) / total_intensity
cm_y = np.sum(gray * y_coords) / total_intensity
dx = x_coords - cm_x
dy = y_coords - cm_y
m20 = np.sum(gray * dx * dx) / total_intensity
m02 = np.sum(gray * dy * dy) / total_intensity
m11 = np.sum(gray * dx * dy) / total_intensity
features[i, feature_idx] = cm_x / W
features[i, feature_idx + 1] = cm_y / H
features[i, feature_idx + 2] = m20
features[i, feature_idx + 3] = m02
features[i, feature_idx + 4] = m11
features[i, feature_idx + 5] = np.sqrt(m20)
features[i, feature_idx + 6] = np.sqrt(m02)
features[i, feature_idx + 7] = m20 + m02 # Total moment
feature_idx += 8
# LBP-like texture features (8 features)
lbp_hist = np.zeros(8)
for y in range(1, H - 1):
for x in range(1, W - 1):
center = gray[y, x]
pattern = 0
if gray[y - 1, x - 1] > center: pattern |= 1
if gray[y - 1, x] > center: pattern |= 2
if gray[y - 1, x + 1] > center: pattern |= 4
if gray[y, x + 1] > center: pattern |= 8
if gray[y + 1, x + 1] > center: pattern |= 16
if gray[y + 1, x] > center: pattern |= 32
if gray[y + 1, x - 1] > center: pattern |= 64
if gray[y, x - 1] > center: pattern |= 128
# Simplified uniform patterns
bin_idx = bin(pattern).count('1') % 8
lbp_hist[bin_idx] += 1
# Normalize histogram
if np.sum(lbp_hist) > 0:
lbp_hist = lbp_hist / np.sum(lbp_hist)
features[i, feature_idx:feature_idx + 8] = lbp_hist
feature_idx += 8
# Edge and corner features (4 features)
# Simple edge detection
edge_horizontal = np.abs(np.diff(gray, axis=1)).mean()
edge_vertical = np.abs(np.diff(gray, axis=0)).mean()
corner_strength = 0.0
variance = np.var(gray)
features[i, feature_idx] = edge_horizontal
features[i, feature_idx + 1] = edge_vertical
features[i, feature_idx + 2] = corner_strength
features[i, feature_idx + 3] = variance
feature_idx += 4
# === COLOR FEATURES ===
# RGB Color Histogram (24 features - 8 bins per channel)
rgb_hist = np.zeros(24)
for c in range(3): # B, G, R channels
channel = patch[:, :, c]
hist, _ = np.histogram(channel, bins=8, range=(0, 256))
if np.sum(hist) > 0:
hist = hist.astype(np.float32) / np.sum(hist)
rgb_hist[c * 8:(c + 1) * 8] = hist
features[i, feature_idx:feature_idx + 24] = rgb_hist
feature_idx += 24
# HSV Color Histogram (24 features - 8 bins per channel)
# Convert BGR to HSV
hsv_patch = cv2.cvtColor(patch.astype(np.uint8), cv2.COLOR_BGR2HSV)
hsv_hist = np.zeros(24)
# H channel (0-179)
hist_h, _ = np.histogram(hsv_patch[:, :, 0], bins=8, range=(0, 180))
if np.sum(hist_h) > 0:
hist_h = hist_h.astype(np.float32) / np.sum(hist_h)
hsv_hist[0:8] = hist_h
# S channel (0-255)
hist_s, _ = np.histogram(hsv_patch[:, :, 1], bins=8, range=(0, 256))
if np.sum(hist_s) > 0:
hist_s = hist_s.astype(np.float32) / np.sum(hist_s)
hsv_hist[8:16] = hist_s
# V channel (0-255)
hist_v, _ = np.histogram(hsv_patch[:, :, 2], bins=8, range=(0, 256))
if np.sum(hist_v) > 0:
hist_v = hist_v.astype(np.float32) / np.sum(hist_v)
hsv_hist[16:24] = hist_v
features[i, feature_idx:feature_idx + 24] = hsv_hist
feature_idx += 24
# Color Moments (4 features - mean and std for each RGB channel, normalized)
rgb_means = np.mean(patch.astype(np.float32), axis=(0, 1)) / 255.0
rgb_stds = np.std(patch.astype(np.float32), axis=(0, 1)) / 255.0
features[i, feature_idx] = np.mean(rgb_means) # Overall color brightness
features[i, feature_idx + 1] = np.std(rgb_means) # Color balance variation
features[i, feature_idx + 2] = np.mean(rgb_stds) # Overall color consistency
features[i, feature_idx + 3] = np.std(rgb_stds) # Color channel variation
feature_idx += 4
return features
def _extract_features_gpu(self, patches: np.ndarray) -> np.ndarray:
"""GPU-accelerated feature extraction with color features."""
if not CUPY_AVAILABLE:
raise RuntimeError("CuPy not available for GPU computation")
from cupyx.scipy import ndimage
n_patches = patches.shape[0]
patches_gpu = cp.asarray(patches, dtype=cp.float32)
H, W = patches.shape[1:3]
features = cp.zeros((n_patches, self._dim), dtype=cp.float32)
feature_idx = 0
# === GRAYSCALE TEXTURE FEATURES (32 features) ===
# Convert to grayscale
gray_gpu = cp.sum(patches_gpu * cp.array([0.114, 0.587, 0.299]), axis=3)
# Basic grayscale stats (4 features)
features[:, feature_idx] = cp.mean(gray_gpu, axis=(1, 2))
features[:, feature_idx + 1] = cp.std(gray_gpu, axis=(1, 2))
features[:, feature_idx + 2] = cp.min(gray_gpu, axis=(1, 2))
features[:, feature_idx + 3] = cp.max(gray_gpu, axis=(1, 2))
feature_idx += 4
# Gradient features (8 features)
grad_y, grad_x = cp.gradient(gray_gpu, axis=(1, 2))
grad_magnitude = cp.sqrt(grad_x**2 + grad_y**2)
features[:, feature_idx] = cp.mean(grad_magnitude, axis=(1, 2))
features[:, feature_idx + 1] = cp.std(grad_magnitude, axis=(1, 2))
features[:, feature_idx + 2] = cp.max(grad_magnitude, axis=(1, 2))
features[:, feature_idx + 3] = cp.mean(cp.abs(grad_x), axis=(1, 2))
features[:, feature_idx + 4] = cp.mean(cp.abs(grad_y), axis=(1, 2))
features[:, feature_idx + 5] = cp.std(grad_x, axis=(1, 2))
features[:, feature_idx + 6] = cp.std(grad_y, axis=(1, 2))
features[:, feature_idx + 7] = cp.mean(cp.arctan2(grad_y, grad_x + 1e-8), axis=(1, 2))
feature_idx += 8
# Center of mass and moments (8 features)
total_intensity = cp.sum(gray_gpu, axis=(1, 2))
y_coords = cp.arange(H, dtype=cp.float32).reshape(H, 1)
x_coords = cp.arange(W, dtype=cp.float32).reshape(1, W)
safe_total = cp.maximum(total_intensity, 1e-8)
cm_y = cp.sum(gray_gpu * y_coords, axis=(1, 2)) / safe_total
cm_x = cp.sum(gray_gpu * x_coords, axis=(1, 2)) / safe_total
# Compute moments
dy = y_coords - cm_y.reshape(-1, 1, 1)
dx = x_coords - cm_x.reshape(-1, 1, 1)
m20 = cp.sum(gray_gpu * dx * dx, axis=(1, 2)) / safe_total
m02 = cp.sum(gray_gpu * dy * dy, axis=(1, 2)) / safe_total
m11 = cp.sum(gray_gpu * dx * dy, axis=(1, 2)) / safe_total
features[:, feature_idx] = cm_x / W
features[:, feature_idx + 1] = cm_y / H
features[:, feature_idx + 2] = m20
features[:, feature_idx + 3] = m02
features[:, feature_idx + 4] = m11
features[:, feature_idx + 5] = cp.sqrt(cp.maximum(m20, 0))
features[:, feature_idx + 6] = cp.sqrt(cp.maximum(m02, 0))
features[:, feature_idx + 7] = m20 + m02
feature_idx += 8
# LBP-like simplified features (8 features)
# Use multi-scale standard deviation as texture proxy
for scale in range(8):
kernel_size = 2 + scale
if kernel_size < min(H, W):
smoothed = ndimage.uniform_filter(gray_gpu, size=kernel_size)
features[:, feature_idx + scale] = cp.std(smoothed, axis=(1, 2))
else:
features[:, feature_idx + scale] = cp.std(gray_gpu, axis=(1, 2))
feature_idx += 8
# Edge and variance features (4 features)
edge_h = cp.mean(cp.abs(cp.diff(gray_gpu, axis=2)), axis=(1, 2))
edge_v = cp.mean(cp.abs(cp.diff(gray_gpu, axis=1)), axis=(1, 2))
variance = cp.var(gray_gpu, axis=(1, 2))
features[:, feature_idx] = edge_h
features[:, feature_idx + 1] = edge_v
features[:, feature_idx + 2] = variance # Using variance instead of corner strength
features[:, feature_idx + 3] = variance
feature_idx += 4
# === COLOR FEATURES ===
# RGB Color Histogram (24 features)
for c in range(3): # B, G, R channels
channel = patches_gpu[:, :, :, c]
for b in range(8):
bin_min = b * 32.0 # 256/8 = 32
bin_max = (b + 1) * 32.0 if b < 7 else 256.0
mask = (channel >= bin_min) & (channel < bin_max)
features[:, feature_idx + c * 8 + b] = cp.mean(mask.astype(cp.float32), axis=(1, 2))
feature_idx += 24
# HSV Color Features (simplified, 24 features)
# Use RGB-to-HSV approximation for GPU efficiency
r_norm = patches_gpu[:, :, :, 2] / 255.0
g_norm = patches_gpu[:, :, :, 1] / 255.0
b_norm = patches_gpu[:, :, :, 0] / 255.0
max_rgb = cp.maximum(cp.maximum(r_norm, g_norm), b_norm)
min_rgb = cp.minimum(cp.minimum(r_norm, g_norm), b_norm)
delta = max_rgb - min_rgb
# Hue approximation
hue_approx = cp.where(delta > 1e-8,
cp.where(max_rgb == r_norm, (g_norm - b_norm) / delta,
cp.where(max_rgb == g_norm, 2.0 + (b_norm - r_norm) / delta,
4.0 + (r_norm - g_norm) / delta)), 0.0)
hue_approx = (hue_approx * 60.0) % 360.0
# Saturation
saturation = cp.where(max_rgb > 1e-8, delta / max_rgb, 0.0)
# Value (brightness)
value = max_rgb
# Create histograms for HSV
for b in range(8):
# H bins (0-360)
h_min, h_max = b * 45.0, (b + 1) * 45.0
features[:, feature_idx + b] = cp.mean((hue_approx >= h_min) & (hue_approx < h_max), axis=(1, 2))
# S bins (0-1)
s_min, s_max = b * 0.125, (b + 1) * 0.125
features[:, feature_idx + 8 + b] = cp.mean((saturation >= s_min) & (saturation < s_max), axis=(1, 2))
# V bins (0-1)
v_min, v_max = b * 0.125, (b + 1) * 0.125
features[:, feature_idx + 16 + b] = cp.mean((value >= v_min) & (value < v_max), axis=(1, 2))
feature_idx += 24
# Color Moments (4 features)
rgb_means = cp.mean(patches_gpu, axis=(1, 2)) / 255.0
rgb_stds = cp.std(patches_gpu, axis=(1, 2)) / 255.0
features[:, feature_idx] = cp.mean(rgb_means, axis=1)
features[:, feature_idx + 1] = cp.std(rgb_means, axis=1)
features[:, feature_idx + 2] = cp.mean(rgb_stds, axis=1)
features[:, feature_idx + 3] = cp.std(rgb_stds, axis=1)
feature_idx += 4
return cp.asnumpy(features)
[docs]
def extract_batch_cpu(self, frame: np.ndarray, bboxes: np.ndarray) -> List[np.ndarray]:
"""CPU batch extraction."""
if len(bboxes) == 0:
return []
patches = []
valid_indices = []
for i, bbox in enumerate(bboxes):
patch = self._prepare_patch(frame, bbox)
if patch is not None:
patches.append(patch)
valid_indices.append(i)
if not patches:
return [np.zeros(self._dim, dtype=np.float32) for _ in bboxes]
patches = np.array(patches)
features = self._extract_features_cpu(patches)
result = [np.zeros(self._dim, dtype=np.float32) for _ in bboxes]
for i, valid_idx in enumerate(valid_indices):
result[valid_idx] = features[i]
return result
[docs]
def extract_batch_gpu(self, frame: np.ndarray, bboxes: np.ndarray) -> List[np.ndarray]:
"""GPU batch extraction."""
if not CUPY_AVAILABLE:
raise RuntimeError("CuPy not available for GPU computation")
if len(bboxes) == 0:
return []
patches = []
valid_indices = []
for i, bbox in enumerate(bboxes):
patch = self._prepare_patch(frame, bbox)
if patch is not None:
patches.append(patch)
valid_indices.append(i)
if not patches:
return [np.zeros(self._dim, dtype=np.float32) for _ in bboxes]
patches = np.array(patches)
features = self._extract_features_gpu(patches)
result = [np.zeros(self._dim, dtype=np.float32) for _ in bboxes]
for i, valid_idx in enumerate(valid_indices):
result[valid_idx] = features[i]
return result
[docs]
def extract_batch(self, frame: np.ndarray, bboxes: np.ndarray) -> List[np.ndarray]:
"""Extract batch of embeddings with GPU/CPU selection."""
if self.use_gpu:
try:
return self.extract_batch_gpu(frame, bboxes)
except Exception as e:
logger.error(f"GPU extraction failed: {e}. Falling back to CPU.")
self.use_gpu = False
return self.extract_batch_cpu(frame, bboxes)
return self.extract_batch_cpu(frame, bboxes)
@property
def embedding_dim(self) -> int:
return self._dim
[docs]
class MegaCupyTextureEmbedding(EmbeddingExtractor):
"""
Mega Microbe Embedding: Combines the best of shape, color, and texture analysis.
- Dims 0-7: Basic stats and gradients
- Dims 8-17: Advanced shape features (moments, eccentricity, Hu)
- Dims 18-23: HSV color statistics (including circular stats for hue)
- Dims 24-33: Rotation-invariant Local Binary Patterns (LBP)
- Dims 34-49: Robust radial & polar features (intensity, variation, DFT)
- Dims 50-55: Multi-scale wavelet and entropy texture features
- Dims 56-63: Reserved / Other advanced features
"""
def __init__(self, patch_size: int = 32, use_gpu: bool = True):
super().__init__(use_tensor=False)
self._dim = 64
self.patch_size = patch_size
self.use_gpu = use_gpu and CUPY_AVAILABLE
if self.use_gpu:
logger.info(
f"MegaCupyTextureEmbedding loaded. Dim={self._dim}. GPU acceleration ENABLED."
)
else:
logger.info(
f"MegaCupyTextureEmbedding loaded. Dim={self._dim}. Running on CPU with Numba."
)
def _prepare_patch(self, frame: np.ndarray, bbox: np.ndarray) -> Optional[np.ndarray]:
x1, y1, x2, y2 = map(int, bbox[:4])
h_frame, w_frame = frame.shape[:2]
x1, y1, x2, y2 = max(0, x1), max(0, y1), min(w_frame, x2), min(h_frame, y2)
if x1 >= x2 or y1 >= y2:
return None
patch = cv2.resize(
frame[y1:y2, x1:x2], (self.patch_size, self.patch_size), interpolation=cv2.INTER_AREA
)
return patch
def _extract_features_cpu_fallback(self, patch: np.ndarray) -> np.ndarray:
"""CPU fallback for feature extraction without Numba JIT (Python 3.9 compatibility)."""
H, W = patch.shape[:2]
features = np.zeros(self._dim, dtype=np.float32)
feature_idx = 0
# Convert to grayscale
if len(patch.shape) == 3:
gray = cv2.cvtColor(patch, cv2.COLOR_BGR2GRAY).astype(np.float32)
else:
gray = patch.astype(np.float32)
# Basic stats (4 features)
features[feature_idx:feature_idx+4] = [
np.mean(gray), np.std(gray), np.min(gray), np.max(gray)
]
feature_idx += 4
# Gradients (4 features)
grad_x = cv2.Sobel(gray, cv2.CV_32F, 1, 0, ksize=3)
grad_y = cv2.Sobel(gray, cv2.CV_32F, 0, 1, ksize=3)
features[feature_idx:feature_idx+4] = [
np.mean(np.abs(grad_x)), np.std(grad_x),
np.mean(np.abs(grad_y)), np.std(grad_y)
]
feature_idx += 4
# Simple shape moments (10 features)
moments = cv2.moments(gray)
m00 = moments['m00'] + 1e-6
features[feature_idx:feature_idx+10] = [
moments['m10']/m00, moments['m01']/m00, # centroid
moments['m20']/m00, moments['m11']/m00, moments['m02']/m00, # second moments
moments['mu20']/m00, moments['mu11']/m00, moments['mu02']/m00, # central moments
0.0, 0.0 # eccentricity and orientation placeholders
]
feature_idx += 10
# Simple HSV color features (6 features)
if len(patch.shape) == 3:
hsv = cv2.cvtColor(patch, cv2.COLOR_BGR2HSV)
for i in range(3):
channel = hsv[:,:,i].astype(np.float32)
features[feature_idx + 2*i] = np.mean(channel)
features[feature_idx + 2*i + 1] = np.std(channel)
feature_idx += 6
# Simple texture features - fill remaining with basic texture stats
laplacian = cv2.Laplacian(gray, cv2.CV_32F)
remaining_features = self._dim - feature_idx
if remaining_features > 0:
# Fill remaining with Laplacian stats and zeros
basic_texture = [
np.mean(np.abs(laplacian)), np.std(laplacian),
np.var(gray), np.max(gray) - np.min(gray)
]
n_basic = min(len(basic_texture), remaining_features)
features[feature_idx:feature_idx+n_basic] = basic_texture[:n_basic]
return features
[docs]
def extract_batch_cpu(self, frame: np.ndarray, bboxes: np.ndarray) -> List[np.ndarray]:
if len(bboxes) == 0:
return []
patches, valid_indices = [], []
for i, bbox in enumerate(bboxes):
patch = self._prepare_patch(frame, bbox)
if patch is not None:
patches.append(patch)
valid_indices.append(i)
if not patches:
return [np.zeros(self._dim, dtype=np.float32) for _ in bboxes]
patches_array = np.array(patches)
features = np.zeros((len(patches), self._dim), dtype=np.float32)
try:
extract_mega_features_batch_jit(patches_array, features)
except Exception as e:
logger.warning(f"Numba JIT compilation failed: {e}. Falling back to CPU implementation.")
# Fallback to CPU implementation without JIT for Python 3.9 compatibility
for i, patch in enumerate(patches):
features[i] = self._extract_features_cpu_fallback(patch)
result = [np.zeros(self._dim, dtype=np.float32) for _ in bboxes]
for i, valid_idx in enumerate(valid_indices):
result[valid_idx] = features[i]
return result
def _extract_features_gpu(self, patches_gpu) -> "cp.ndarray":
"""GPU-accelerated feature extraction with improved CPU-GPU consistency."""
N, H, W, C = patches_gpu.shape
features = cp.zeros((N, self._dim), dtype=cp.float32)
idx = 0
# GRAYSCALE & BASIC STATS (4)
gray_gpu = cp.sum(patches_gpu * cp.array([0.114, 0.587, 0.299]), axis=3)
features[:, idx : idx + 4] = cp.stack(
[
cp.mean(gray_gpu, axis=(1, 2)),
cp.std(gray_gpu, axis=(1, 2)),
cp.min(gray_gpu, axis=(1, 2)),
cp.max(gray_gpu, axis=(1, 2)),
],
axis=1,
)
idx += 4
# GRADIENTS (4)
grad_y, grad_x = cp.gradient(gray_gpu, axis=(1, 2))
features[:, idx : idx + 4] = cp.stack(
[
cp.mean(cp.abs(grad_x), axis=(1, 2)),
cp.mean(cp.abs(grad_y), axis=(1, 2)),
cp.std(grad_x, axis=(1, 2)),
cp.std(grad_y, axis=(1, 2)),
],
axis=1,
)
idx += 4
# SHAPE & MOMENTS (10) - Vectorized implementation
y_coords, x_coords = cp.meshgrid(
cp.arange(H, dtype=cp.float32), cp.arange(W, dtype=cp.float32), indexing="ij"
)
total_intensity = cp.sum(gray_gpu, axis=(1, 2))
safe_total = total_intensity + 1e-7
cm_y = cp.sum(gray_gpu * y_coords, axis=(1, 2)) / safe_total
cm_x = cp.sum(gray_gpu * x_coords, axis=(1, 2)) / safe_total
features[:, idx] = cm_x - W / 2
features[:, idx + 1] = cm_y - H / 2
dx = x_coords - cm_x[:, None, None]
dy = y_coords - cm_y[:, None, None]
m20 = cp.sum(gray_gpu * dx * dx, axis=(1, 2)) / safe_total
m02 = cp.sum(gray_gpu * dy * dy, axis=(1, 2)) / safe_total
m11 = cp.sum(gray_gpu * dx * dy, axis=(1, 2)) / safe_total
features[:, idx + 2 : idx + 5] = cp.stack([m20, m02, m11], axis=1)
denom = m20 - m02
discriminant = cp.sqrt(denom**2 + 4 * m11**2)
lambda1 = 0.5 * (m20 + m02 + discriminant)
lambda2 = 0.5 * (m20 + m02 - discriminant)
features[:, idx + 5] = cp.sqrt(1 - lambda2 / (lambda1 + 1e-7))
orientation = 0.5 * cp.arctan2(2 * m11, denom + 1e-7)
features[:, idx + 6] = cp.sin(2 * orientation)
features[:, idx + 7] = cp.cos(2 * orientation)
features[:, idx + 8] = m20 + m02 # Hu Moment 1
features[:, idx + 9] = (m20 - m02) ** 2 + 4 * m11**2 # Hu Moment 2
idx += 10
# COLOR (6) - Simplified GPU version
bgr_norm = patches_gpu / 255.0
r, g, b = bgr_norm[:, :, :, 2], bgr_norm[:, :, :, 1], bgr_norm[:, :, :, 0]
max_val = cp.max(bgr_norm, axis=3)
min_val = cp.min(bgr_norm, axis=3)
s = (max_val - min_val) / (max_val + 1e-7)
v = max_val
features[:, idx : idx + 6] = cp.stack(
[
cp.mean(s, axis=(1, 2)),
cp.mean(v, axis=(1, 2)),
cp.std(s, axis=(1, 2)),
cp.std(v, axis=(1, 2)),
cp.mean(r, axis=(1, 2)),
cp.std(r, axis=(1, 2)), # Use R channel as proxy for hue stats
],
axis=1,
)
idx += 6
# TEXTURE (approximations) (20)
# Multi-scale Wavelet approximation
for scale_exp in range(4):
size = 2 ** (scale_exp + 1)
# Convolve with a uniform filter and get the std dev of the result
filtered = uniform_filter(gray_gpu, size=size)
features[:, idx + scale_exp] = cp.std(filtered, axis=(1, 2))
idx += 4
# Placeholder for remaining texture features
while idx < self._dim:
# Use scaled versions of std for variety
features[:, idx] = features[:, 1] * ((idx % 10 + 1) / 10.0)
idx += 1
return features
[docs]
def extract(self, frame: np.ndarray, bbox: np.ndarray) -> np.ndarray:
return self.extract_batch(frame, np.array([bbox]))[0]
[docs]
def extract_batch_gpu(self, frame: np.ndarray, bboxes: np.ndarray) -> List[np.ndarray]:
if len(bboxes) == 0:
return []
patches, valid_indices = [], []
for i, bbox in enumerate(bboxes):
patch = self._prepare_patch(frame, bbox)
if patch is not None:
patches.append(patch)
valid_indices.append(i)
if not patches:
return [np.zeros(self._dim, dtype=np.float32) for _ in bboxes]
patches_gpu = cp.asarray(np.array(patches))
features_gpu = self._extract_features_gpu(patches_gpu)
features = cp.asnumpy(features_gpu)
result = [np.zeros(self._dim, dtype=np.float32) for _ in bboxes]
for i, valid_idx in enumerate(valid_indices):
result[valid_idx] = features[i]
return result
[docs]
def extract_batch(self, frame: np.ndarray, bboxes: np.ndarray) -> List[np.ndarray]:
"""Extract batch of embeddings with GPU/CPU selection."""
if self.use_gpu:
try:
return self.extract_batch_gpu(frame, bboxes)
except Exception as e:
logger.error(f"GPU extraction failed: {e}. Falling back to CPU.")
self.use_gpu = False # Prevent further GPU attempts
return self.extract_batch_cpu(frame, bboxes)
return self.extract_batch_cpu(frame, bboxes)
@property
def embedding_dim(self) -> int:
return self._dim
[docs]
def compute_embedding_distance(
emb1: np.ndarray,
emb2: np.ndarray,
metric: str = "cosine"
) -> float:
"""
Compute distance between two embeddings.
Args:
emb1: First embedding vector
emb2: Second embedding vector
metric: Distance metric to use
- "cosine" (default): Cosine distance, standard for L2-normalized embeddings
- "correlation": Pearson correlation distance, robust to offsets
Returns:
Distance in [0, 1] where 0 = identical, 1 = maximally different
"""
if emb1 is None or emb2 is None:
return float("inf")
if emb1.shape != emb2.shape:
logger.warning(f"Embeddings have different shapes: {emb1.shape} vs {emb2.shape}")
return float("inf")
if emb1.size == 0:
return float("inf")
try:
if metric == "cosine":
return cosine_distance_jit(emb1, emb2)
elif metric == "correlation":
return correlation_distance_jit(emb1, emb2)
else:
logger.warning(f"Unknown metric '{metric}', using cosine")
return cosine_distance_jit(emb1, emb2)
except Exception as e:
logger.error(f"Error in embedding distance computation: {e}")
return float("inf")
[docs]
def compute_embedding_distances_batch(
emb: np.ndarray,
embs: List[np.ndarray],
metric: str = "cosine"
) -> np.ndarray:
"""
Compute distances from one embedding to multiple embeddings.
Args:
emb: Query embedding vector
embs: List of embedding vectors to compare against
metric: Distance metric ("cosine" or "correlation")
Returns:
Array of distances
"""
if emb is None or not embs:
return np.array([])
distances = np.full(len(embs), float("inf"))
# Select distance function
if metric == "cosine":
dist_fn = cosine_distance_jit
elif metric == "correlation":
dist_fn = correlation_distance_jit
else:
logger.warning(f"Unknown metric '{metric}', using cosine")
dist_fn = cosine_distance_jit
for i, emb2 in enumerate(embs):
if emb2 is not None and emb.shape == emb2.shape and emb.size > 0:
try:
distances[i] = dist_fn(emb, emb2)
except Exception as e:
logger.error(f"Error in batch distance computation for item {i}: {e}")
return distances
[docs]
class CupyShapeEmbedding(EmbeddingExtractor):
"""
Shape-focused embedding optimized for microscopy organism tracking.
Key design principles (from embedding_recommendation.md):
- NO RESIZE: Processes original patch resolution to preserve fine details
- SHAPE-FOCUSED: Prioritizes contour/boundary features over texture
- ROTATION-INVARIANT: Handles organisms at any angle
Feature composition (96 dimensions):
- HOG features (36 dims): Histogram of Oriented Gradients for local shape
- Fourier descriptors (16 dims): Contour shape in frequency domain
- Radial signature (32 dims): Distance from centroid to boundary
- Hu moments (7 dims): Classic rotation/scale invariant moments
- Shape stats (5 dims): Area, perimeter, circularity, aspect ratio, solidity
This embedding is specifically designed for grayscale microscopy images
where organisms have similar internal textures but distinct boundary shapes.
"""
def __init__(self, use_gpu: bool = True, hog_cells: int = 3, hog_bins: int = 9):
super().__init__(use_tensor=False)
self._dim = 96 # 36 HOG + 16 Fourier + 32 radial + 7 Hu + 5 shape stats
self.use_gpu = use_gpu and CUPY_AVAILABLE
self.hog_cells = hog_cells # Grid of cells for HOG
self.hog_bins = hog_bins # Orientation bins
if self.use_gpu:
logger.info(f"CupyShapeEmbedding initialized with GPU. Dim={self._dim}")
else:
logger.info(f"CupyShapeEmbedding initialized (CPU). Dim={self._dim}")
def _prepare_patch(self, frame: np.ndarray, bbox: np.ndarray) -> Optional[np.ndarray]:
"""Extract patch WITHOUT resizing to preserve fine morphological details."""
x1, y1, x2, y2 = map(int, bbox[:4])
h_frame, w_frame = frame.shape[:2]
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(w_frame, x2), min(h_frame, y2)
if x1 >= x2 or y1 >= y2:
return None
roi = frame[y1:y2, x1:x2]
if roi.size == 0:
return None
# NO RESIZE - return original patch
return roi
def _compute_hog_features(self, gray: np.ndarray) -> np.ndarray:
"""
Compute HOG (Histogram of Oriented Gradients) features.
Adaptive cell size based on patch dimensions.
Returns 36 features (hog_cells x hog_cells x hog_bins normalized).
"""
H, W = gray.shape
n_cells = self.hog_cells
n_bins = self.hog_bins
# Compute gradients
grad_x = cv2.Sobel(gray, cv2.CV_32F, 1, 0, ksize=3)
grad_y = cv2.Sobel(gray, cv2.CV_32F, 0, 1, ksize=3)
# Magnitude and orientation
magnitude = np.sqrt(grad_x**2 + grad_y**2)
orientation = np.arctan2(grad_y, grad_x) # -pi to pi
orientation = (orientation + np.pi) * (n_bins / (2 * np.pi)) # 0 to n_bins
orientation = orientation.astype(np.int32) % n_bins
# Adaptive cell size
cell_h = max(1, H // n_cells)
cell_w = max(1, W // n_cells)
hog_features = np.zeros((n_cells, n_cells, n_bins), dtype=np.float32)
for cy in range(n_cells):
for cx in range(n_cells):
y_start = cy * cell_h
y_end = min((cy + 1) * cell_h, H)
x_start = cx * cell_w
x_end = min((cx + 1) * cell_w, W)
cell_mag = magnitude[y_start:y_end, x_start:x_end]
cell_ori = orientation[y_start:y_end, x_start:x_end]
# Build histogram weighted by magnitude
for b in range(n_bins):
mask = cell_ori == b
hog_features[cy, cx, b] = np.sum(cell_mag[mask])
# L2 normalize per block (2x2 cells)
hog_normalized = np.zeros_like(hog_features)
for by in range(n_cells - 1):
for bx in range(n_cells - 1):
block = hog_features[by:by+2, bx:bx+2, :].flatten()
norm = np.linalg.norm(block) + 1e-6
block_normalized = block / norm
# Distribute back (simplified - just use the normalization factor)
hog_normalized[by:by+2, bx:bx+2, :] /= (norm / (4 * n_bins) + 1e-6)
# Flatten to 36 features (or pad/truncate)
flat = hog_features.flatten()
result = np.zeros(36, dtype=np.float32)
n_copy = min(len(flat), 36)
result[:n_copy] = flat[:n_copy]
# Normalize final vector
norm = np.linalg.norm(result) + 1e-6
return result / norm
def _extract_contour(self, gray: np.ndarray) -> Optional[np.ndarray]:
"""Extract the largest contour from grayscale image."""
# Adaptive thresholding for better contour extraction
if gray.max() - gray.min() < 10:
return None
# Normalize to 0-255
gray_norm = ((gray - gray.min()) / (gray.max() - gray.min() + 1e-6) * 255).astype(np.uint8)
# Try Otsu thresholding
_, binary = cv2.threshold(gray_norm, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
# Find contours
contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
if not contours:
# Try inverted
_, binary_inv = cv2.threshold(gray_norm, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
contours, _ = cv2.findContours(binary_inv, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
if not contours:
return None
# Return largest contour
largest = max(contours, key=cv2.contourArea)
return largest.squeeze()
def _compute_fourier_descriptors(self, contour: np.ndarray, n_descriptors: int = 16) -> np.ndarray:
"""
Compute Fourier descriptors of contour for rotation/scale invariant shape.
Steps:
1. Sample N points uniformly along contour
2. Represent as complex numbers z[k] = x[k] + i*y[k]
3. Compute FFT
4. Normalize for scale/rotation invariance
5. Return magnitude of harmonics 2 to n_descriptors+1
"""
result = np.zeros(n_descriptors, dtype=np.float32)
if contour is None or len(contour) < 4:
return result
# Ensure 2D
if len(contour.shape) == 1:
return result
if contour.shape[1] != 2:
if len(contour.shape) == 3:
contour = contour.reshape(-1, 2)
else:
return result
# Sample 64 points uniformly
n_sample = 64
n_points = len(contour)
if n_points < n_sample:
# Upsample by repeating
indices = np.linspace(0, n_points - 1, n_sample).astype(int)
else:
indices = np.linspace(0, n_points - 1, n_sample).astype(int)
sampled = contour[indices].astype(np.float64)
# Represent as complex numbers
z = sampled[:, 0] + 1j * sampled[:, 1]
# Compute FFT
Z = np.fft.fft(z)
# Normalize by Z[1] for scale invariance (if non-zero)
if np.abs(Z[1]) > 1e-6:
Z = Z / Z[1]
# Take magnitude for rotation invariance
# Skip DC (Z[0]) and first harmonic (Z[1], now = 1)
# Use harmonics 2 to n_descriptors+1
for i in range(n_descriptors):
idx = i + 2
if idx < len(Z):
result[i] = np.abs(Z[idx])
# Normalize
norm = np.linalg.norm(result) + 1e-6
return (result / norm).astype(np.float32)
def _compute_radial_signature(self, gray: np.ndarray, contour: np.ndarray, n_angles: int = 32) -> np.ndarray:
"""
Compute radial signature: distance from centroid to contour at N angles.
This captures global shape variation and is rotation-invariant when
normalized by mean distance.
"""
result = np.zeros(n_angles, dtype=np.float32)
if contour is None or len(contour) < 4:
return result
# Ensure 2D contour
if len(contour.shape) != 2 or contour.shape[1] != 2:
return result
# Compute centroid
M = cv2.moments(gray)
if M['m00'] < 1e-6:
cx, cy = gray.shape[1] / 2, gray.shape[0] / 2
else:
cx = M['m10'] / M['m00']
cy = M['m01'] / M['m00']
# Cast rays at uniform angles
angles = np.linspace(0, 2 * np.pi, n_angles, endpoint=False)
for i, angle in enumerate(angles):
# Ray direction
dx, dy = np.cos(angle), np.sin(angle)
# Find intersection with contour (closest point along ray direction)
# Use signed distance along ray
contour_centered = contour - np.array([cx, cy])
# Project contour points onto ray
projections = contour_centered[:, 0] * dx + contour_centered[:, 1] * dy
# Only consider points in positive ray direction
positive_mask = projections > 0
if not np.any(positive_mask):
continue
# Find closest point in ray direction
positive_projections = projections[positive_mask]
positive_contour = contour_centered[positive_mask]
# Distance from ray (perpendicular)
perp_dist = np.abs(positive_contour[:, 0] * (-dy) + positive_contour[:, 1] * dx)
# Find point closest to ray
closest_idx = np.argmin(perp_dist)
result[i] = positive_projections[closest_idx]
# Normalize by mean distance for scale invariance
mean_dist = np.mean(result[result > 0]) if np.any(result > 0) else 1.0
result = result / (mean_dist + 1e-6)
return result
def _compute_hu_moments(self, gray: np.ndarray) -> np.ndarray:
"""
Compute 7 Hu moments (rotation, scale, translation invariant).
Log-transformed for numerical stability.
"""
moments = cv2.moments(gray)
hu = cv2.HuMoments(moments).flatten()
# Log transform with sign preservation
hu_log = np.zeros(7, dtype=np.float32)
for i in range(7):
if hu[i] != 0:
hu_log[i] = -np.sign(hu[i]) * np.log10(np.abs(hu[i]) + 1e-10)
return hu_log
def _compute_shape_stats(self, gray: np.ndarray, contour: np.ndarray) -> np.ndarray:
"""
Compute basic shape statistics:
- Area, Perimeter, Circularity, Aspect ratio, Solidity
"""
result = np.zeros(5, dtype=np.float32)
if contour is None or len(contour) < 4:
return result
# Ensure proper contour format for OpenCV
if len(contour.shape) == 2:
contour_cv = contour.reshape(-1, 1, 2).astype(np.int32)
else:
contour_cv = contour.astype(np.int32)
area = cv2.contourArea(contour_cv)
perimeter = cv2.arcLength(contour_cv, closed=True)
# Circularity: 4*pi*area / perimeter^2 (1.0 for perfect circle)
circularity = (4 * np.pi * area) / (perimeter**2 + 1e-6) if perimeter > 0 else 0
# Bounding rect aspect ratio
x, y, w, h = cv2.boundingRect(contour_cv)
aspect_ratio = float(w) / (h + 1e-6)
# Solidity: area / convex hull area
hull = cv2.convexHull(contour_cv)
hull_area = cv2.contourArea(hull)
solidity = area / (hull_area + 1e-6) if hull_area > 0 else 0
# Normalize
result[0] = np.sqrt(area) / 100.0 # Normalized area
result[1] = perimeter / 500.0 # Normalized perimeter
result[2] = circularity
result[3] = min(aspect_ratio, 1.0 / (aspect_ratio + 1e-6)) # Make symmetric
result[4] = solidity
return result
[docs]
def extract(self, frame: np.ndarray, bbox: np.ndarray) -> np.ndarray:
"""Extract shape-focused embedding for a single patch."""
patch = self._prepare_patch(frame, bbox)
if patch is None:
return np.zeros(self._dim, dtype=np.float32)
return self._extract_features_single(patch)
def _extract_features_single(self, patch: np.ndarray) -> np.ndarray:
"""Extract all shape features from a single patch."""
features = np.zeros(self._dim, dtype=np.float32)
idx = 0
# Convert to grayscale
if len(patch.shape) == 3:
gray = cv2.cvtColor(patch, cv2.COLOR_BGR2GRAY).astype(np.float32)
else:
gray = patch.astype(np.float32)
# Extract contour (used by multiple feature types)
contour = self._extract_contour(gray)
# 1. HOG features (36 dims)
hog = self._compute_hog_features(gray)
features[idx:idx+36] = hog
idx += 36
# 2. Fourier descriptors (16 dims)
fourier = self._compute_fourier_descriptors(contour, n_descriptors=16)
features[idx:idx+16] = fourier
idx += 16
# 3. Radial signature (32 dims)
radial = self._compute_radial_signature(gray, contour, n_angles=32)
features[idx:idx+32] = radial
idx += 32
# 4. Hu moments (7 dims)
hu = self._compute_hu_moments(gray)
features[idx:idx+7] = hu
idx += 7
# 5. Shape stats (5 dims)
shape_stats = self._compute_shape_stats(gray, contour)
features[idx:idx+5] = shape_stats
idx += 5
# Final L2 normalization
norm = np.linalg.norm(features) + 1e-6
return features / norm
[docs]
def extract_batch(self, frame: np.ndarray, bboxes: np.ndarray) -> List[np.ndarray]:
"""Extract batch of shape embeddings."""
if len(bboxes) == 0:
return []
results = []
for bbox in bboxes:
patch = self._prepare_patch(frame, bbox)
if patch is None:
results.append(np.zeros(self._dim, dtype=np.float32))
else:
results.append(self._extract_features_single(patch))
return results
@property
def embedding_dim(self) -> int:
return self._dim
# Available embedding extractors
AVAILABLE_EMBEDDINGS = {
"cupytexture": CupyTextureEmbedding,
"cupytexture_color": CupyTextureColorEmbedding,
"cupytexture_mega": MegaCupyTextureEmbedding,
"cupyshape": CupyShapeEmbedding,
}
[docs]
def get_embedding_extractor(name: str, **kwargs) -> EmbeddingExtractor:
"""Get an embedding extractor by name."""
if name not in AVAILABLE_EMBEDDINGS:
available = list(AVAILABLE_EMBEDDINGS.keys())
raise ValueError(f"Embedding '{name}' not found. Available: {available}")
return AVAILABLE_EMBEDDINGS[name](**kwargs)
[docs]
def list_available_embeddings() -> List[str]:
"""List all available embeddings."""
return list(AVAILABLE_EMBEDDINGS.keys())
[docs]
def is_gpu_available() -> bool:
"""Check if GPU acceleration is available."""
return CUPY_AVAILABLE