Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions v1/src/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ class Settings(BaseSettings):
pose_confidence_threshold: float = Field(default=0.5, description="Minimum confidence threshold")
pose_processing_batch_size: int = Field(default=32, description="Batch size for pose processing")
pose_max_persons: int = Field(default=10, description="Maximum persons to detect per frame")
pose_multi_person_mvp_energy_threshold: float = Field(
default=0.35,
description="Motion-energy threshold for MVP two-person promotion heuristic"
)

# Streaming settings
stream_fps: int = Field(default=30, description="Streaming frames per second")
Expand Down
67 changes: 67 additions & 0 deletions v1/src/services/multi_person_mvp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Lightweight multi-person MVP helpers for single-stream CSI pipelines."""

from __future__ import annotations

import copy
from datetime import datetime
from typing import Any, Dict, List

import numpy as np


def estimate_motion_energy(csi_data: np.ndarray) -> float:
"""Estimate motion energy from CSI amplitude deltas."""
array = np.asarray(csi_data, dtype=float)
if array.size < 2:
return 0.0
flattened = array.reshape(-1)
deltas = np.diff(flattened)
return float(np.mean(np.abs(deltas)))


def synthesize_secondary_person(pose: Dict[str, Any]) -> Dict[str, Any]:
"""Generate a synthetic second person with a small spatial offset."""
synthetic = copy.deepcopy(pose)
synthetic["person_id"] = f"{pose.get('person_id', 0)}_mvp2"
synthetic["confidence"] = max(0.05, float(pose.get("confidence", 0.0)) * 0.85)

bbox = synthetic.get("bounding_box") or {"x": 0.0, "y": 0.0, "width": 0.0, "height": 0.0}
width = float(bbox.get("width", 0.0) or 0.0)
x = float(bbox.get("x", 0.0) or 0.0)
bbox["x"] = min(1.0, max(0.0, x + max(0.06, width * 0.35)))
synthetic["bounding_box"] = bbox

keypoints = synthetic.get("keypoints")
if isinstance(keypoints, list):
shifted = []
for kp in keypoints:
if not isinstance(kp, dict):
shifted.append(kp)
continue
shifted_kp = dict(kp)
if isinstance(shifted_kp.get("x"), (int, float)):
shifted_kp["x"] = min(1.0, max(0.0, float(shifted_kp["x"]) + 0.08))
shifted.append(shifted_kp)
synthetic["keypoints"] = shifted

synthetic["activity"] = pose.get("activity", "standing")
synthetic["timestamp"] = datetime.now().isoformat()
return synthetic


def apply_multi_person_mvp(
poses: List[Dict[str, Any]],
csi_data: np.ndarray,
*,
max_persons: int,
energy_threshold: float,
) -> List[Dict[str, Any]]:
"""Promote single-person result to two persons when motion energy is high."""
if len(poses) != 1 or max_persons < 2:
return poses

energy = estimate_motion_energy(csi_data)
if energy <= float(energy_threshold):
return poses

return [copy.deepcopy(poses[0]), synthesize_secondary_person(poses[0])]
56 changes: 49 additions & 7 deletions v1/src/services/pose_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
from src.core.phase_sanitizer import PhaseSanitizer
from src.models.densepose_head import DensePoseHead
from src.models.modality_translation import ModalityTranslationNetwork
from src.services.multi_person_mvp import (
apply_multi_person_mvp,
estimate_motion_energy,
synthesize_secondary_person,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -260,20 +265,57 @@ async def _estimate_poses(self, csi_data: np.ndarray, metadata: Dict[str, Any])
if pose.get("confidence", 0.0) >= self.settings.pose_confidence_threshold
]

# MVP multi-person fallback: when motion energy is high and model only
# yields one detection, synthesize a second nearby person hypothesis.
filtered_poses = self._apply_multi_person_mvp(filtered_poses, csi_data, metadata)

# Limit number of persons
if len(filtered_poses) > self.settings.pose_max_persons:
requested_max = int(metadata.get("max_persons", self.settings.pose_max_persons))
effective_max_persons = max(1, min(requested_max, self.settings.pose_max_persons))
if len(filtered_poses) > effective_max_persons:
filtered_poses = sorted(
filtered_poses,
key=lambda x: x.get("confidence", 0.0),
filtered_poses,
key=lambda x: x.get("confidence", 0.0),
reverse=True
)[:self.settings.pose_max_persons]
)[:effective_max_persons]

return filtered_poses

except Exception as e:
self.logger.error(f"Error in pose estimation: {e}")
return []


def _estimate_motion_energy(self, csi_data: np.ndarray) -> float:
"""Estimate frame motion energy from CSI amplitude deltas."""
return estimate_motion_energy(csi_data)

def _synthesize_secondary_person(self, pose: Dict[str, Any]) -> Dict[str, Any]:
"""Generate a second person hypothesis with a small spatial offset."""
return synthesize_secondary_person(pose)

def _apply_multi_person_mvp(
self,
poses: List[Dict[str, Any]],
csi_data: np.ndarray,
metadata: Dict[str, Any],
) -> List[Dict[str, Any]]:
"""MVP heuristic for issue #97: promote to 2 persons on high motion energy."""
requested_max = int(metadata.get("max_persons", self.settings.pose_max_persons))
threshold = float(getattr(self.settings, "pose_multi_person_mvp_energy_threshold", 0.35))
promoted = apply_multi_person_mvp(
poses,
csi_data,
max_persons=requested_max,
energy_threshold=threshold,
)
if len(promoted) > len(poses):
self.logger.debug(
"multi-person MVP promotion triggered: motion_energy=%.4f threshold=%.4f",
self._estimate_motion_energy(csi_data),
threshold,
)
return promoted

def _parse_pose_outputs(self, outputs: torch.Tensor) -> List[Dict[str, Any]]:
"""Parse neural network outputs into pose detections.

Expand Down
62 changes: 62 additions & 0 deletions v1/tests/unit/test_pose_service_multi_person_mvp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import importlib.util
from pathlib import Path

import numpy as np


MODULE_PATH = Path(__file__).resolve().parents[2] / "src" / "services" / "multi_person_mvp.py"
_spec = importlib.util.spec_from_file_location("multi_person_mvp", MODULE_PATH)
_module = importlib.util.module_from_spec(_spec)
assert _spec is not None and _spec.loader is not None
_spec.loader.exec_module(_module)
apply_multi_person_mvp = _module.apply_multi_person_mvp


def _single_pose() -> dict:
return {
"person_id": "0",
"confidence": 0.92,
"bounding_box": {"x": 0.20, "y": 0.25, "width": 0.30, "height": 0.45},
"keypoints": [
{"name": "nose", "x": 0.30, "y": 0.20, "confidence": 0.9},
{"name": "left_shoulder", "x": 0.28, "y": 0.33, "confidence": 0.88},
],
"activity": "walking",
"timestamp": "2026-03-03T00:00:00",
}


def test_mvp_promotes_to_two_persons_when_motion_energy_high():
poses = [_single_pose()]

# High motion energy: alternating values => large first-order deltas.
high_motion_csi = np.array([0.0, 1.0, 0.0, 1.0, 0.0, 1.0], dtype=float)

promoted = apply_multi_person_mvp(
poses=poses,
csi_data=high_motion_csi,
max_persons=3,
energy_threshold=0.35,
)

assert len(promoted) == 2
assert promoted[0]["person_id"] == "0"
assert promoted[1]["person_id"].endswith("_mvp2")
assert promoted[1]["confidence"] < promoted[0]["confidence"]
assert promoted[1]["bounding_box"]["x"] > promoted[0]["bounding_box"]["x"]


def test_mvp_keeps_single_person_when_motion_energy_low():
poses = [_single_pose()]

low_motion_csi = np.array([0.10, 0.11, 0.10, 0.11, 0.10], dtype=float)

result = apply_multi_person_mvp(
poses=poses,
csi_data=low_motion_csi,
max_persons=3,
energy_threshold=0.35,
)

assert len(result) == 1
assert result[0]["person_id"] == "0"