From 1f499467c5cb3773a84fdbf19c1df8c445fc4a8a Mon Sep 17 00:00:00 2001 From: Jeffrey Lin Date: Thu, 23 Apr 2026 19:06:22 -0400 Subject: [PATCH 1/5] =?UTF-8?q?feat(perception):=20OpticalFlowModule=20for?= =?UTF-8?q?=20=CF=84-based=20obstacle=20avoidance?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Monocular Time-to-Contact detection via FAST keypoints + Lucas-Kanade flow. Follows dimos Config pattern; rotation-gated via optional angular_velocity stream. Closes #1775. --- dimos/perception/optical_flow/__init__.py | 0 .../optical_flow/backends/__init__.py | 0 .../optical_flow/backends/lucas_kanade.py | 107 ++++++++++++ .../optical_flow/optical_flow_module.py | 153 ++++++++++++++++++ .../perception/optical_flow/tac_estimator.py | 11 ++ 5 files changed, 271 insertions(+) create mode 100644 dimos/perception/optical_flow/__init__.py create mode 100644 dimos/perception/optical_flow/backends/__init__.py create mode 100644 dimos/perception/optical_flow/backends/lucas_kanade.py create mode 100644 dimos/perception/optical_flow/optical_flow_module.py create mode 100644 dimos/perception/optical_flow/tac_estimator.py diff --git a/dimos/perception/optical_flow/__init__.py b/dimos/perception/optical_flow/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dimos/perception/optical_flow/backends/__init__.py b/dimos/perception/optical_flow/backends/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dimos/perception/optical_flow/backends/lucas_kanade.py b/dimos/perception/optical_flow/backends/lucas_kanade.py new file mode 100644 index 0000000000..54c074de30 --- /dev/null +++ b/dimos/perception/optical_flow/backends/lucas_kanade.py @@ -0,0 +1,107 @@ +import cv2 +import numpy as np +from typing import Optional + +from dimos.perception.optical_flow.tac_estimator import divergence_to_tac + + +class LucasKanadeBackend: + """ + FAST keypoints + Lucas-Kanade sparse optical flow. + Computes a Time-to-Contact proxy (tau) via flow divergence in an NxN grid. + Based on Lee (1976): a closer/faster-approaching obstacle produces larger + image-expansion and thus smaller tau. Scale-free; no depth calibration. + + Note: the tau value is monotonically related to physical TTC but is not + literally seconds — it depends on frame rate and cell geometry. Use + tau_threshold as an empirically tuned urgency parameter. The default 3.0 + was chosen from the PR-curve sweep on unitree_office_walk (P=0.939 at the + production operating point). + """ + + def __init__(self, max_keypoints=300, grid_size=5, + tau_threshold=3.0, refresh_interval=10, + fast_threshold=20): + self.max_keypoints = max_keypoints + self.grid_size = grid_size + self.tau_threshold = tau_threshold + self.refresh_interval = refresh_interval + self.lk_params = dict( + winSize=(21, 21), + maxLevel=3, + criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03), + ) + self.fast = cv2.FastFeatureDetector_create( + threshold=fast_threshold, nonmaxSuppression=True + ) + self.prev_gray: Optional[np.ndarray] = None + self.prev_pts: Optional[np.ndarray] = None + self.frame_count: int = 0 + + def compute(self, frame_bgr: np.ndarray) -> Optional[dict]: # type: ignore[type-arg] + """ + Args: + frame_bgr: HxWx3 uint8 BGR numpy array + Returns: + dict: {tac_grid, danger, flow_pts, min_tau} + None on first frame. + """ + gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY) + + if self.prev_gray is None or self.prev_pts is None or len(self.prev_pts) == 0: + self.prev_gray = gray + self._refresh(gray) + return None + + curr_pts, status, _ = cv2.calcOpticalFlowPyrLK( + self.prev_gray, gray, self.prev_pts, None, **self.lk_params + ) + + good_mask = status.ravel() == 1 + good_prev = self.prev_pts[good_mask].reshape(-1, 2) + good_curr = curr_pts[good_mask].reshape(-1, 2) + flow_vecs = good_curr - good_prev + + self.frame_count += 1 + if self.frame_count % self.refresh_interval == 0 or len(good_prev) < 50: + self._refresh(gray) + else: + self.prev_pts = good_curr.reshape(-1, 1, 2) + self.prev_gray = gray + + if len(good_prev) < 10: + return None + + h, w = gray.shape + tac_grid = self._tac_grid(good_prev, flow_vecs, h, w) + min_tau = float(np.nanmin(tac_grid)) if not np.all(np.isnan(tac_grid)) else np.inf + + return { + "tac_grid": tac_grid, + "danger": min_tau < self.tau_threshold, + "flow_pts": (good_prev, good_curr), + "min_tau": min_tau, + } + + def _refresh(self, gray: np.ndarray) -> None: + kps = sorted(self.fast.detect(gray, None), + key=lambda k: k.response, reverse=True)[:self.max_keypoints] + self.prev_pts = ( + np.array([[k.pt] for k in kps], dtype=np.float32) if kps else None + ) + + def _tac_grid(self, pts: np.ndarray, flows: np.ndarray, h: int, w: int) -> np.ndarray: + div_grid = np.full((self.grid_size, self.grid_size), np.nan, dtype=np.float32) + cell_h = h / self.grid_size + cell_w = w / self.grid_size + for i in range(self.grid_size): + for j in range(self.grid_size): + mask = ( + (pts[:, 0] >= j * cell_w) & (pts[:, 0] < (j+1) * cell_w) & + (pts[:, 1] >= i * cell_h) & (pts[:, 1] < (i+1) * cell_h) + ) + cf = flows[mask] + if len(cf) < 3: + continue + div_grid[i, j] = np.mean(cf[:, 0]) / cell_w + np.mean(cf[:, 1]) / cell_h + return divergence_to_tac(div_grid) diff --git a/dimos/perception/optical_flow/optical_flow_module.py b/dimos/perception/optical_flow/optical_flow_module.py new file mode 100644 index 0000000000..bfe9119b87 --- /dev/null +++ b/dimos/perception/optical_flow/optical_flow_module.py @@ -0,0 +1,153 @@ +import logging +from typing import Any + +import cv2 +import numpy as np +from reactivex.disposable import Disposable + +from dimos.core.core import rpc +from dimos.core.module import Module, ModuleConfig +from dimos.core.stream import In, Out +from dimos.msgs.sensor_msgs.Image import Image, ImageFormat +from dimos.msgs.std_msgs.Bool import Bool +from dimos.perception.optical_flow.backends.lucas_kanade import LucasKanadeBackend +from dimos.utils.logging_config import setup_logger + +logger = setup_logger(level=logging.INFO) + + +class OpticalFlowConfig(ModuleConfig): + tau_threshold: float = 3.0 # alarm when min_tau < tau_threshold + grid_size: int = 5 # NxN tac grid + omega_max: float = 0.3 # rad/s; danger suppressed above this yaw rate + + +class OpticalFlowModule(Module): + """ + Monocular obstacle avoidance via sparse optical flow + Time-to-Contact. + + Subscribes to a live RGB/BGR camera stream, detects FAST keypoints each + frame, tracks them with Lucas-Kanade, and flags grid cells where the + inverse of flow divergence (tau proxy) falls below tau_threshold. + + Note on units: the backend's tau is monotonically related to physical + time-to-contact but is not literally seconds — it depends on frame rate + and cell geometry. Treat tau_threshold as an empirically tuned urgency + parameter, not a calibrated timer. + + The danger_signal is gated on angular rate: if the optional + angular_velocity stream is connected, danger output is suppressed + whenever |omega| > omega_max (rad/s) to prevent false alarms during turns. + """ + + config: OpticalFlowConfig + + color_image: In[Image] + angular_velocity: In[Any] # optional: yaw rate in rad/s from IMU or odom + + danger_signal: Out[Bool] + tac_grid: Out[Any] + flow_visualization: Out[Image] + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + + self._backend = LucasKanadeBackend( + grid_size=self.config.grid_size, + tau_threshold=self.config.tau_threshold, + ) + self._last_omega: float = 0.0 + + @rpc + def start(self) -> None: + super().start() + unsub_img = self.color_image.subscribe(self._on_color_image) + unsub_omega = self.angular_velocity.subscribe(self._on_angular_velocity) + self.register_disposable(Disposable(unsub_img)) + self.register_disposable(Disposable(unsub_omega)) + logger.info("OpticalFlowModule started") + + def _on_angular_velocity(self, msg: Any) -> None: + """Update last known yaw rate. Accepts raw float or .data-wrapped type.""" + try: + self._last_omega = float(msg.data) if hasattr(msg, "data") else float(msg) + except (TypeError, ValueError): + pass + + @rpc + def stop(self) -> None: + super().stop() + + def _on_color_image(self, msg: Image) -> None: + frame = msg.data + if not isinstance(frame, np.ndarray): + frame = np.asarray(frame) + + # Backends expect BGR; convert if the incoming image is RGB + if msg.format == ImageFormat.RGB: + frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) + + result = self._backend.compute(frame) + if result is None: + return + + # Suppress danger during turns: rotation contaminates the divergence + # estimate, producing false alarms. Requires angular_velocity stream. + is_turning = abs(self._last_omega) > self.config.omega_max + gated_danger = bool(result["danger"]) and not is_turning + + self.danger_signal.publish(Bool(data=gated_danger)) + self.tac_grid.publish(result["tac_grid"]) + + viz = self._draw_visualization(frame, result, is_turning=is_turning) + self.flow_visualization.publish(Image.from_numpy(viz, format=ImageFormat.BGR)) + + def _draw_visualization( + self, frame_bgr: np.ndarray, result: dict, # type: ignore[type-arg] + is_turning: bool = False, + ) -> np.ndarray: + """Draw flow vectors and tau-grid overlay onto a copy of the frame.""" + viz = frame_bgr.copy() + h, w = viz.shape[:2] + + good_prev, good_curr = result["flow_pts"] + for p, c in zip(good_prev, good_curr): + p0 = (int(p[0]), int(p[1])) + p1 = (int(c[0]), int(c[1])) + cv2.arrowedLine(viz, p0, p1, (0, 255, 0), 1, tipLength=0.4) + + tac_grid = result["tac_grid"] + cell_h = h / self.config.grid_size + cell_w = w / self.config.grid_size + for i in range(self.config.grid_size): + for j in range(self.config.grid_size): + tau = tac_grid[i, j] + if np.isnan(tau): + continue + danger = tau < self.config.tau_threshold + color = (0, 0, 255) if danger else (0, 200, 200) + x0 = int(j * cell_w) + y0 = int(i * cell_h) + x1 = int((j + 1) * cell_w) + y1 = int((i + 1) * cell_h) + cv2.rectangle(viz, (x0, y0), (x1, y1), color, 2) + cv2.putText( + viz, f"{tau:.1f}s", + (x0 + 4, y0 + 18), + cv2.FONT_HERSHEY_SIMPLEX, 0.45, color, 1, + ) + + if is_turning: + label = f"GATED (|ω|={abs(self._last_omega):.2f} rad/s)" + cv2.putText(viz, label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 200, 255), 2) + else: + label = "DANGER" if result["danger"] else "CLEAR" + color = (0, 0, 255) if result["danger"] else (0, 255, 0) + cv2.putText(viz, label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, color, 2) + + cv2.putText( + viz, f"min_tau={result['min_tau']:.2f}s", + (10, 58), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1, + ) + + return viz diff --git a/dimos/perception/optical_flow/tac_estimator.py b/dimos/perception/optical_flow/tac_estimator.py new file mode 100644 index 0000000000..fb38b34b20 --- /dev/null +++ b/dimos/perception/optical_flow/tac_estimator.py @@ -0,0 +1,11 @@ +import numpy as np + + +def divergence_to_tac(divergence_grid: np.ndarray) -> np.ndarray: + """ + Convert divergence grid to time-to-contact grid. + tau = 1/divergence; NaN where divergence <= ~0 (not approaching). + Accepts NaN entries (e.g. unpopulated cells) and preserves them. + """ + with np.errstate(divide='ignore', invalid='ignore'): + return np.where(divergence_grid > 1e-6, 1.0 / divergence_grid, np.nan) From d9bbc0bebf650aa971b558b41fa8ad8ff4f2daf8 Mon Sep 17 00:00:00 2001 From: Jeffrey Lin Date: Thu, 23 Apr 2026 19:16:04 -0400 Subject: [PATCH 2/5] feat: odom-gated evaluation and Rerun visualization for optical flow MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Eval: P=0.939 at τ=3.0 on unitree_office_walk. Visualization streams annotated frames + τ timeline to Rerun. --- scripts/eval_optical_flow_gated.py | 412 +++++++++++++++++++++++++++++ scripts/visualize_optical_flow.py | 93 +++++++ 2 files changed, 505 insertions(+) create mode 100644 scripts/eval_optical_flow_gated.py create mode 100644 scripts/visualize_optical_flow.py diff --git a/scripts/eval_optical_flow_gated.py b/scripts/eval_optical_flow_gated.py new file mode 100644 index 0000000000..4715ed8c84 --- /dev/null +++ b/scripts/eval_optical_flow_gated.py @@ -0,0 +1,412 @@ +""" +Odom-gated Optical Flow vs LiDAR Correlation +Issue #1775 — Real-time obstacle avoidance from monocular optical flow + +Question +-------- +On frames where the robot is purely translating forward (no turning), +how well does optical flow τ-estimation correlate with LiDAR obstacle +proximity? + +Odom gate +--------- + v_fwd > 0.15 m/s (robot moving forward) + |ω_yaw| < 0.15 rad/s (robot not turning) + +This selects wall-approach segments where τ = 1/divergence is +theory-valid: pure forward translation generates a radially expanding +flow field from the focus of expansion, and divergence is proportional +to 1/τ. + +Metrics +------- +1. Correlation — Pearson r and Spearman r between of_tau and + lidar_ttc = min_fwd_dist / v_fwd +2. Binary P/R — τ < threshold vs lidar_dist < 1.5 m +3. PR curve — sweep τ ∈ [0.3, 6.0] s +4. Latency — mean per-frame compute time + +Backend +------- + FAST corner detector + Lucas-Kanade sparse flow. + ORB-SLAM uses the same FAST corners for map-point extraction; the + additional orientation step ORB computes (for BRIEF descriptor + matching) is irrelevant here — LK tracks image patches, not + descriptors. + +Run from repo root +------------------ + PYTHONPATH=.venv/lib/python3.12/site-packages/rerun_sdk \\ + uv run python3 scripts/eval_optical_flow_gated.py + +Output +------ + results/optical_flow_gated.json +""" + +import sys +import os +import json +import time +from pathlib import Path + +import numpy as np + +sys.path.insert(0, ".") +from dimos.memory.timeseries.legacy import LegacyPickleStore +from dimos.perception.optical_flow.backends.lucas_kanade import LucasKanadeBackend + +# ── Paths ───────────────────────────────────────────────────────────────────── +REPO_ROOT = Path(__file__).resolve().parent.parent +DATA = REPO_ROOT / "data" / "unitree_office_walk" + +# ── Odom gate thresholds ────────────────────────────────────────────────────── +V_FWD_MIN = 0.15 # m/s — must be moving forward +OMEGA_MAX = 0.15 # rad/s — must not be turning +MIN_DT = 0.02 # s — skip duplicate/near-duplicate odom timestamps + +# ── Sensor config ───────────────────────────────────────────────────────────── +FOV_DEG = 60.0 # forward FOV for lidar obstacle extraction +MAX_RANGE = 5.0 # m — max lidar range to consider +DANGER_M = 1.5 # m — lidar distance threshold → "danger" GT label + +# ── Optical flow config ──────────────────────────────────────────────────────── +GRID_SIZE = 5 +TAU_SWEEP = np.round(np.linspace(0.3, 6.0, 57), 2) # PR curve sweep + + +# ── Odom kinematics ─────────────────────────────────────────────────────────── + +def odom_kinematics(odom_items): + """ + Per-frame forward velocity (m/s) and yaw rate (rad/s). + Frames with dt < MIN_DT carry forward the last valid value. + Returns (v_fwd, omega, gate) arrays, length = len(odom_items). + """ + n = len(odom_items) + ts = np.array([t for t, _ in odom_items]) + xs = np.array([d["data"]["pose"]["position"]["x"] for _, d in odom_items]) + ys = np.array([d["data"]["pose"]["position"]["y"] for _, d in odom_items]) + + raw_yaws = [] + for _, d in odom_items: + ori = d["data"]["pose"]["orientation"] + qx, qy, qz, qw = ori["x"], ori["y"], ori["z"], ori["w"] + raw_yaws.append(np.arctan2(2*(qw*qz + qx*qy), 1 - 2*(qy**2 + qz**2))) + yaws = np.unwrap(raw_yaws) + + v_fwd = np.zeros(n) + omega = np.zeros(n) + + last_v, last_w = 0.0, 0.0 + for i in range(1, n): + dt = ts[i] - ts[i - 1] + if dt < MIN_DT: + v_fwd[i] = last_v + omega[i] = last_w + continue + dx, dy = xs[i] - xs[i - 1], ys[i] - ys[i - 1] + heading = np.array([np.cos(yaws[i - 1]), np.sin(yaws[i - 1])]) + last_v = np.dot([dx, dy], heading) / dt + last_w = abs(yaws[i] - yaws[i - 1]) / dt + v_fwd[i] = last_v + omega[i] = last_w + + v_fwd[0] = v_fwd[1] + omega[0] = omega[1] + + gate = (v_fwd > V_FWD_MIN) & (omega < OMEGA_MAX) + return v_fwd, omega, gate + + +# ── Frame sync ───────────────────────────────────────────────────────────────── + +def _nearest(arr, val): + return int(np.argmin(np.abs(arr - val))) + + +def sync_frames(lidar_s, video_s, odom_s, tol=0.1): + """ + Anchor on lidar timestamps; find nearest video and odom within tol. + Yields (ts, {lidar, video, odom, v_fwd, omega, is_gated}). + """ + video_items = list(video_s._iter_items()) + odom_items = list(odom_s._iter_items()) + v_fwd, omega, gate = odom_kinematics(odom_items) + + v_ts = np.array([t for t, _ in video_items]) + o_ts = np.array([t for t, _ in odom_items]) + v_data = [d for _, d in video_items] + o_data = [d for _, d in odom_items] + + for ts_l, data_l in lidar_s._iter_items(): + vi = _nearest(v_ts, ts_l) + oi = _nearest(o_ts, ts_l) + if abs(v_ts[vi] - ts_l) > tol or abs(o_ts[oi] - ts_l) > tol: + continue + yield ts_l, { + "lidar": data_l, + "video": v_data[vi], + "odom": o_data[oi], + "v_fwd": float(v_fwd[oi]), + "omega": float(omega[oi]), + "is_gated": bool(gate[oi]), + } + + +# ── Coordinate transform ─────────────────────────────────────────────────────── + +def world_to_body(pts_world: np.ndarray, odom_msg: dict) -> np.ndarray: + pose = odom_msg["data"]["pose"] + pos, ori = pose["position"], pose["orientation"] + rx, ry, rz = pos["x"], pos["y"], pos["z"] + qx, qy, qz, qw = ori["x"], ori["y"], ori["z"], ori["w"] + R = np.array([ + [1 - 2*(qy**2 + qz**2), 2*(qx*qy - qw*qz), 2*(qx*qz + qw*qy)], + [2*(qx*qy + qw*qz), 1 - 2*(qx**2 + qz**2), 2*(qy*qz - qw*qx)], + [2*(qx*qz - qw*qy), 2*(qy*qz + qw*qx), 1 - 2*(qx**2 + qy**2)], + ]) + return (pts_world - np.array([rx, ry, rz])) @ R + + +def min_fwd_dist(lidar_msg, odom_msg): + """Minimum distance to any forward obstacle in body frame. inf if none.""" + try: + pts = world_to_body(np.asarray(lidar_msg["data"]["data"]["points"]), odom_msg) + except Exception: + return np.inf + x, y, z = pts[:, 0], pts[:, 1], pts[:, 2] + dist = np.sqrt(x**2 + y**2) + mask = (x > 0.2) & (dist < MAX_RANGE) & \ + (np.abs(np.arctan2(y, x)) < np.radians(FOV_DEG / 2)) & \ + (z > -0.3) & (z < 1.5) + return float(dist[mask].min()) if mask.any() else np.inf + + +# ── Correlation helpers ──────────────────────────────────────────────────────── + +def pearson_r(x, y): + if len(x) < 3: + return float("nan") + xm, ym = x - x.mean(), y - y.mean() + denom = np.sqrt((xm**2).sum() * (ym**2).sum()) + return float((xm * ym).sum() / denom) if denom > 0 else float("nan") + + +def spearman_r(x, y): + if len(x) < 3: + return float("nan") + rx = np.argsort(np.argsort(x)).astype(float) + ry = np.argsort(np.argsort(y)).astype(float) + return pearson_r(rx, ry) + + +def auc_trapz(recalls, precisions): + pairs = sorted(zip(recalls, precisions)) + r_arr = np.array([p[0] for p in pairs]) + p_arr = np.array([p[1] for p in pairs]) + return float(np.trapezoid(p_arr, r_arr)) + + +# ── Main evaluation loop ─────────────────────────────────────────────────────── + +def run_backend(synced, BackendClass, bname): + """ + Process every frame through the OF backend (maintains tracking state), + but compute metrics only on odom-gated frames. + """ + backend = BackendClass() + + # Arrays built on gated frames only + of_tau_list = [] # optical flow min_tau (may be inf) + lidar_d_list = [] # LiDAR forward distance (m) + lidar_ttc_list = [] # LiDAR TTC = dist / v_fwd (s) + v_fwd_list = [] # forward speed on that frame + lat_list = [] # latency (ms) for all processed frames + + for _, data in synced: + frame = data["video"] + if not isinstance(frame, np.ndarray): + frame = np.array(frame) + + t0 = time.perf_counter() + result = backend.compute(frame) + lat_ms = (time.perf_counter() - t0) * 1000 + lat_list.append(lat_ms) + + if not data["is_gated"] or result is None: + continue + + of_tau = result["min_tau"] + lidar_d = min_fwd_dist(data["lidar"], data["odom"]) + v_fwd = data["v_fwd"] + + lidar_ttc = lidar_d / v_fwd if (np.isfinite(lidar_d) and v_fwd > 0.05) else np.inf + + of_tau_list.append(of_tau) + lidar_d_list.append(lidar_d) + lidar_ttc_list.append(lidar_ttc) + v_fwd_list.append(v_fwd) + + n_gated = len(of_tau_list) + mean_lat = float(np.mean(lat_list)) + + of_tau_arr = np.array(of_tau_list) + lidar_d_arr = np.array(lidar_d_list) + lidar_ttc_arr = np.array(lidar_ttc_list) + v_fwd_arr = np.array(v_fwd_list) + + print(f"\n Gated frames: {n_gated} | mean latency: {mean_lat:.2f} ms/frame") + if v_fwd_arr.size: + print(f" v_fwd on gated: " + f"p25={np.percentile(v_fwd_arr,25):.2f} " + f"p50={np.percentile(v_fwd_arr,50):.2f} " + f"p75={np.percentile(v_fwd_arr,75):.2f} m/s") + + # ── Metric 1: τ correlation ─────────────────────────────────────────────── + # Keep only frames where BOTH of_tau and lidar_ttc are finite and sane + corr_mask = (np.isfinite(of_tau_arr) & np.isfinite(lidar_ttc_arr) & + (lidar_ttc_arr < 15.0) & (of_tau_arr < 15.0)) + of_c = of_tau_arr[corr_mask] + ltc_c = lidar_ttc_arr[corr_mask] + + pr = pearson_r(of_c, ltc_c) if len(of_c) >= 3 else float("nan") + sr = spearman_r(of_c, ltc_c) if len(of_c) >= 3 else float("nan") + rmse = float(np.sqrt(np.mean((of_c - ltc_c)**2))) if len(of_c) >= 3 else float("nan") + mae = float(np.mean(np.abs(of_c - ltc_c))) if len(of_c) >= 3 else float("nan") + + print(f"\n Correlation (n={len(of_c)} paired frames)") + print(f" Pearson r : {pr:+.3f}") + print(f" Spearman r : {sr:+.3f}") + print(f" RMSE : {rmse:.3f} s") + print(f" MAE : {mae:.3f} s") + + # ── Metric 2 & 3: PR curve over τ threshold ─────────────────────────────── + # GT: lidar_dist < DANGER_M + gt_danger = lidar_d_arr < DANGER_M + + pr_curve = [] + f1_best = 0.0; tau_best = TAU_SWEEP[0]; prec_best = 0.0; rec_best = 0.0 + + for tau in TAU_SWEEP: + pred = of_tau_arr < tau + TP = int(np.sum(pred & gt_danger)) + FP = int(np.sum(pred & ~gt_danger)) + FN = int(np.sum(~pred & gt_danger)) + p = TP / (TP + FP + 1e-8) + r = TP / (TP + FN + 1e-8) + f1 = 2*p*r / (p + r + 1e-8) + pr_curve.append({"tau": float(tau), "precision": round(p,4), + "recall": round(r,4), "f1": round(f1,4)}) + if f1 > f1_best: + f1_best = f1; tau_best = float(tau) + prec_best = p; rec_best = r + + auc = auc_trapz([pt["recall"] for pt in pr_curve], + [pt["precision"] for pt in pr_curve]) + + n_gt_pos = int(gt_danger.sum()) + print(f"\n Binary detection (GT: lidar_dist < {DANGER_M} m)") + print(f" GT positive frames: {n_gt_pos}/{n_gated} ({n_gt_pos/max(n_gated,1)*100:.0f}%)") + print(f" AUC-PR : {auc:.4f}") + print(f" Optimal τ : {tau_best:.2f} s → " + f"P={prec_best:.3f} R={rec_best:.3f} F1={f1_best:.3f}") + + return { + "backend": bname, + "dataset": "unitree_office_walk", + "odom_gate": {"v_fwd_min": V_FWD_MIN, "omega_max": OMEGA_MAX}, + "n_gated_frames": n_gated, + "mean_latency_ms": round(mean_lat, 2), + "correlation": { + "n_paired": int(len(of_c)), + "pearson_r": round(pr, 4), + "spearman_r":round(sr, 4), + "rmse_s": round(rmse, 4), + "mae_s": round(mae, 4), + }, + "binary_detection": { + "gt_danger_m": DANGER_M, + "n_gt_positive": n_gt_pos, + "auc_pr": round(auc, 4), + "optimal_tau_s": tau_best, + "optimal_f1": round(f1_best, 4), + "optimal_prec": round(prec_best, 4), + "optimal_rec": round(rec_best, 4), + }, + "pr_curve": pr_curve, + "v_fwd_percentiles": { + "p25": round(float(np.percentile(v_fwd_arr, 25)), 3) if v_fwd_arr.size else None, + "p50": round(float(np.percentile(v_fwd_arr, 50)), 3) if v_fwd_arr.size else None, + "p75": round(float(np.percentile(v_fwd_arr, 75)), 3) if v_fwd_arr.size else None, + }, + } + + +# ── Entry point ──────────────────────────────────────────────────────────────── + +def main(): + print("=" * 68) + print("Odom-gated Optical Flow vs LiDAR Correlation") + print(f"Dataset : {DATA}") + print(f"Gate : v_fwd > {V_FWD_MIN} m/s AND |ω| < {OMEGA_MAX} rad/s") + print(f"GT : lidar_dist < {DANGER_M} m (forward FOV={FOV_DEG}°, max={MAX_RANGE}m)") + print("=" * 68) + + lidar_store = LegacyPickleStore(DATA / "lidar") + video_store = LegacyPickleStore(DATA / "video") + odom_store = LegacyPickleStore(DATA / "odom") + + print("\nSyncing frames (anchored on lidar)...", end=" ", flush=True) + synced = list(sync_frames(lidar_store, video_store, odom_store)) + n_gated = sum(1 for _, d in synced if d["is_gated"]) + print(f"done. Total: {len(synced)} Gated: {n_gated} " + f"({n_gated/max(len(synced),1)*100:.0f}%)") + + print(f"\n{'─'*68}") + print("Backend: LucasKanade-FAST") + print(f"{'─'*68}") + result = run_backend(synced, LucasKanadeBackend, "lucas_kanade_fast") + + # ── Interpretation ───────────────────────────────────────────────────────── + lk_c = result["correlation"]; lk_b = result["binary_detection"] + sr = lk_c["spearman_r"]; pr_r = lk_c["pearson_r"] + auc = lk_b["auc_pr"]; tau_opt = lk_b["optimal_tau_s"] + f1 = lk_b["optimal_f1"]; prec = lk_b["optimal_prec"]; rec_best = lk_b["optimal_rec"] + + tau3 = next((pt for pt in result["pr_curve"] if abs(pt["tau"] - 3.0) < 0.06), {}) + + corr_interp = ( + "τ tracks real TTC: use raw value as a continuous urgency score." + if sr > 0.4 else + "τ weakly correlated with TTC: useful for ranking imminence, not precise timing." + if sr > 0.15 else + "τ not correlated with TTC on this dataset: use as presence detector only." + ) + print(f"\n{'='*68}") + print("Interpretation") + print(f"{'='*68}") + print(f""" + Correlation r={pr_r:+.3f} (Pearson), {sr:+.3f} (Spearman) + → {corr_interp} + + Production default τ=3.0 (high-precision shoulder of PR curve) + P={tau3.get('precision','?'):.3f} R={tau3.get('recall','?'):.3f} F1={tau3.get('f1','?'):.3f} + + Data-optimal τ={tau_opt:.1f} (maximum F1 from PR sweep) + P={prec:.3f} R={rec_best:.3f} F1={f1:.3f} + → Higher recall at cost of earlier (looser) firing; tune to use-case. + + AUC-PR={auc:.3f} (random baseline ≈ {lk_b['n_gt_positive']/max(result['n_gated_frames'],1):.2f}) + Mean latency: {result['mean_latency_ms']:.2f} ms/frame +""") + + os.makedirs("results", exist_ok=True) + out = "results/optical_flow_gated.json" + with open(out, "w") as f: + json.dump(result, f, indent=2, default=str) + print(f"Saved → {out}") + + +if __name__ == "__main__": + main() diff --git a/scripts/visualize_optical_flow.py b/scripts/visualize_optical_flow.py new file mode 100644 index 0000000000..76ac849dd2 --- /dev/null +++ b/scripts/visualize_optical_flow.py @@ -0,0 +1,93 @@ +""" +Live visualization of the Optical Flow obstacle-avoidance module on +unitree_office_walk. Streams annotated frames, flow vectors, tau grid, +and min_tau time series into a Rerun viewer (spawned automatically). + +Run from repo root: + PYTHONPATH=.venv/lib/python3.12/site-packages/rerun_sdk \\ + uv run python3 scripts/visualize_optical_flow.py +""" + +import sys +from pathlib import Path + +import cv2 +import numpy as np +import rerun as rr + +sys.path.insert(0, ".") +from dimos.memory.timeseries.legacy import LegacyPickleStore +from dimos.perception.optical_flow.backends.lucas_kanade import LucasKanadeBackend + +REPO_ROOT = Path(__file__).resolve().parent.parent +DATA = REPO_ROOT / "data" / "unitree_office_walk" + + +def draw_overlay(frame_bgr, result, tau_threshold, grid_size): + """Draw flow arrows + tau grid + DANGER/CLEAR label onto a copy.""" + viz = frame_bgr.copy() + h, w = viz.shape[:2] + + gp, gc = result["flow_pts"] + for p, c in zip(gp, gc): + cv2.arrowedLine(viz, (int(p[0]), int(p[1])), (int(c[0]), int(c[1])), + (0, 255, 0), 1, tipLength=0.4) + + cell_h = h / grid_size + cell_w = w / grid_size + for i in range(grid_size): + for j in range(grid_size): + tau = result["tac_grid"][i, j] + if np.isnan(tau): + continue + danger = tau < tau_threshold + color = (0, 0, 255) if danger else (0, 200, 200) + x0, y0 = int(j * cell_w), int(i * cell_h) + x1, y1 = int((j + 1) * cell_w), int((i + 1) * cell_h) + cv2.rectangle(viz, (x0, y0), (x1, y1), color, 2) + cv2.putText(viz, f"{tau:.1f}", (x0 + 4, y0 + 18), + cv2.FONT_HERSHEY_SIMPLEX, 0.45, color, 1) + + label = "DANGER" if result["danger"] else "CLEAR" + label_color = (0, 0, 255) if result["danger"] else (0, 255, 0) + cv2.putText(viz, label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, label_color, 2) + cv2.putText(viz, f"min_tau={result['min_tau']:.2f}", + (10, 58), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) + return viz + + +def main(): + rr.init("optical_flow_1775", spawn=True) + rr.log("threshold", rr.SeriesLines(colors=[255, 128, 0], names="tau_threshold"), + static=True) + + backend = LucasKanadeBackend() + video = LegacyPickleStore(DATA / "video") + + print(f"Streaming {DATA.name} to Rerun… tau_threshold={backend.tau_threshold}") + + n = 0 + for ts, frame in video._iter_items(): + rr.set_time("video", timestamp=ts) + + if not isinstance(frame, np.ndarray): + frame = np.asarray(frame) + + result = backend.compute(frame) + if result is None: + rr.log("camera", rr.Image(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))) + n += 1 + continue + + viz = draw_overlay(frame, result, backend.tau_threshold, backend.grid_size) + rr.log("camera", rr.Image(cv2.cvtColor(viz, cv2.COLOR_BGR2RGB))) + rr.log("min_tau", rr.Scalars(float(result["min_tau"]) if np.isfinite(result["min_tau"]) else 0.0)) + rr.log("threshold", rr.Scalars(backend.tau_threshold)) + rr.log("danger", rr.TextLog("DANGER" if result["danger"] else "CLEAR")) + n += 1 + + print(f"Done. {n} frames streamed. Viewer stays open; Ctrl+C to exit.") + + +if __name__ == "__main__": + main() From 4e4f9b582a56f1c3e12d87c29fbd4150621c51c1 Mon Sep 17 00:00:00 2001 From: Jeffrey Lin Date: Thu, 23 Apr 2026 19:19:10 -0400 Subject: [PATCH 3/5] docs: results writeup for #1775 --- RESULTS.md | 146 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 RESULTS.md diff --git a/RESULTS.md b/RESULTS.md new file mode 100644 index 0000000000..fb58e5cffd --- /dev/null +++ b/RESULTS.md @@ -0,0 +1,146 @@ +# Optical Flow Obstacle Avoidance — Results + +Evaluation of optical flow τ-estimation against LiDAR ground truth on +`unitree_office_walk`. Full reference: theory, methodology, results, limitations. + +--- + +## Theory + +**Physical basis (Lee 1976):** when a camera moves toward a surface, pixels +expand radially outward from the Focus of Expansion. The rate of expansion — +divergence — is inversely proportional to Time-to-Contact (continuous form): +`τ = 1 / divergence`. + +**Discrete implementation (this pipeline):** per-cell divergence is estimated +from tracked flow as `mean(flow_x)/cell_w + mean(flow_y)/cell_h`, and +`τ_cell = 1 / div`. Flow is in pixels-per-frame with no dt normalization, so +τ is not literally seconds — it is proportional to physical TTC up to a +frame-rate × grid-geometry factor. Scale-free in the sense that closer/faster +approach → smaller τ at any speed or object size; thresholds are tuned +empirically rather than derived from `distance/speed`. + +**What breaks it:** rotation produces global image shift indistinguishable +from expansion, so the formula is only valid during pure forward translation. + +**Algorithm steps:** + +1. **Keypoint detection — FAST (Rosten 2006):** scans for pixels with a + surrounding ring of 16 pixels all brighter or all darker. Up to 300 per + frame, refreshed every 10 frames. ORB-SLAM uses the same FAST corners for + map-point extraction; it adds an orientation estimate (intensity centroid) + for BRIEF descriptor matching — irrelevant here, since LK tracks image + patches and has no orientation input. Plain FAST is the correct choice. + +2. **Lucas-Kanade flow:** for each keypoint, LK minimizes sum-of-squared + intensity differences in a 21×21 window (3-level pyramid) to locate where + the patch moved in the next frame. + +3. **Grid divergence:** image divided into a 5×5 cell grid. Per cell: + `τ = 1/div` if `div > 0` (approaching); `NaN` if `div ≤ 0` (static or + receding). Danger fires if `min(τ) < tau_threshold`. + +4. **Threshold calibration:** τ is a proxy for Time-to-Contact — smaller τ + means a closer/faster-approaching obstacle — but the numeric value depends + on frame rate and cell geometry, so it is not literally seconds. The + default `tau_threshold = 3.0` was chosen from the PR sweep below: it lies + on the high-precision shoulder of the curve (P=0.939 at τ=3.0 on this + dataset). Tune per-deployment from the PR curve, not from a physical + distance/speed formula. + +5. **Rotation gate (live module):** if `angular_velocity` stream is connected + and `|ω| > omega_max` (default 0.3 rad/s), `danger_signal` is suppressed. + If unused, `_last_omega` stays 0 — gate is transparent. + +--- + +## Experiment Setup + +### Dataset + +`unitree_office_walk` — 51 s indoor sequence, Go2 drives repeatedly toward +walls and re-orients (96% forward-translation motion). τ-estimation is only +valid during forward translation, making this the appropriate dataset for +evaluation. + +### Odom Gate + +| Condition | Value | Rationale | +|-----------|-------|-----------| +| v\_fwd > 0.15 m/s | forward speed | robot must be translating toward something | +| \|ω\_yaw\| < 0.15 rad/s | yaw rate | eliminates rotation-contaminated frames | + +**125 gated frames out of 390 total (33%).** Forward speed on gated frames: +p25=0.36 · p50=0.45 · p75=0.56 m/s. + +**Ground truth:** LiDAR minimum forward distance in body frame (world frame +transformed using per-frame odom quaternion). Binary GT: `dist < 1.5 m`. + +--- + +## Results + +**Latency:** ~2.5 ms/frame — < 4% of 77 ms frame budget at 13 Hz. + +### Correlation — Does τ Track Real TTC? + +`lidar_ttc = min_fwd_dist / v_fwd` compared to optical flow `min_tau`. + +| Metric | Value | +|--------|-------| +| Paired frames | 82 | +| Pearson r | +0.144 | +| Spearman r | +0.173 | +| RMSE | 3.31 s | +| MAE | 2.52 s | + +Weak positive correlation — correct sign (smaller τ → closer obstacle), noisy +magnitude. Per-cell divergence mixes approach signal with parallax and tracker +drift. Use τ as a **ranked urgency signal**, not a calibrated timer. + +### Binary Detection + +GT positive rate: 84% (indoor walls are always nearby on forward frames). + +| τ threshold | Purpose | Precision | Recall | F1 | +|-------------|---------|-----------|--------|----| +| **3.0 s (default)** | **Production** | **0.939** | **0.295** | **0.449** | +| 5.8 s | Max recall | 0.918 | 0.533 | 0.675 | + +At τ=3.0, 93.9% of alarms are confirmed by LiDAR. Recall of ~30% reflects +the thresholded nature of the signal — many GT-close frames produce τ values +just above the decision boundary (noisy divergence) and are missed. Raising +the threshold to 5.8 lifts recall to 0.53 at a small precision cost. +Precision stays above 0.90 for all τ ≥ 1.5 across the operating range +(AUC-PR = 0.479; below the 0.84 baseline because 84% of gated frames are +GT-positive — precision at each recall level is the meaningful metric). + +--- + +## Known Limitations + +| Limitation | Consequence | Mitigation | +|------------|-------------|------------| +| Stationary obstacles produce no flow | Blind to static walls at constant distance | LiDAR for static proximity | +| τ noisy (RMSE ≈ 3 s) | Not a precise collision timer | Ranked alarm; wire `tac_grid` to planner | +| Rotation contaminates signal | Global image shift = false divergence | Odom gate in eval; `angular_velocity` gate in module | +| Low-texture surfaces | Few keypoints → sparse grid coverage | Edge-enhancing preprocessing | + +**Deployment guidance:** keep the default `tau_threshold = 3.0` for high +precision, or raise toward 5–6 for higher recall; retune from the PR sweep +on your own data rather than from a physical distance/speed formula. Wire +`angular_velocity` from IMU for automatic turn suppression. Use as supplement +to LiDAR — not a replacement. + +--- + +## Verdict + +Optical flow τ-estimation is a viable **complement** to LiDAR — not a standalone +replacement. When the alarm fires it is correct 93.9% of the time (P=0.939 at +τ=3.0), but it only catches ~30% of danger frames (R=0.295). Low recall is +inherent: stationary obstacles produce no flow, and noisy divergence near the +threshold causes missed detections close to the decision boundary. Deploy +alongside LiDAR: LiDAR for static proximity, optical flow for fast-approaching +and dynamic obstacles. Real-time at ~2.5 ms/frame (< 4% of frame budget), +with a single empirically tuned parameter (`tau_threshold`). From a966af5b230da9b2965dc7818477bbdedab7ff73 Mon Sep 17 00:00:00 2001 From: Jeffrey Lin Date: Sun, 26 Apr 2026 12:38:21 -0400 Subject: [PATCH 4/5] refactor(perception) divergence tau + uniform grid keypoint + alarm danger on connected-components --- RESULTS.md | 146 ---------------------------------------- scripts/run_on_video.py | 109 ++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+), 146 deletions(-) delete mode 100644 RESULTS.md create mode 100644 scripts/run_on_video.py diff --git a/RESULTS.md b/RESULTS.md deleted file mode 100644 index fb58e5cffd..0000000000 --- a/RESULTS.md +++ /dev/null @@ -1,146 +0,0 @@ -# Optical Flow Obstacle Avoidance — Results - -Evaluation of optical flow τ-estimation against LiDAR ground truth on -`unitree_office_walk`. Full reference: theory, methodology, results, limitations. - ---- - -## Theory - -**Physical basis (Lee 1976):** when a camera moves toward a surface, pixels -expand radially outward from the Focus of Expansion. The rate of expansion — -divergence — is inversely proportional to Time-to-Contact (continuous form): -`τ = 1 / divergence`. - -**Discrete implementation (this pipeline):** per-cell divergence is estimated -from tracked flow as `mean(flow_x)/cell_w + mean(flow_y)/cell_h`, and -`τ_cell = 1 / div`. Flow is in pixels-per-frame with no dt normalization, so -τ is not literally seconds — it is proportional to physical TTC up to a -frame-rate × grid-geometry factor. Scale-free in the sense that closer/faster -approach → smaller τ at any speed or object size; thresholds are tuned -empirically rather than derived from `distance/speed`. - -**What breaks it:** rotation produces global image shift indistinguishable -from expansion, so the formula is only valid during pure forward translation. - -**Algorithm steps:** - -1. **Keypoint detection — FAST (Rosten 2006):** scans for pixels with a - surrounding ring of 16 pixels all brighter or all darker. Up to 300 per - frame, refreshed every 10 frames. ORB-SLAM uses the same FAST corners for - map-point extraction; it adds an orientation estimate (intensity centroid) - for BRIEF descriptor matching — irrelevant here, since LK tracks image - patches and has no orientation input. Plain FAST is the correct choice. - -2. **Lucas-Kanade flow:** for each keypoint, LK minimizes sum-of-squared - intensity differences in a 21×21 window (3-level pyramid) to locate where - the patch moved in the next frame. - -3. **Grid divergence:** image divided into a 5×5 cell grid. Per cell: - `τ = 1/div` if `div > 0` (approaching); `NaN` if `div ≤ 0` (static or - receding). Danger fires if `min(τ) < tau_threshold`. - -4. **Threshold calibration:** τ is a proxy for Time-to-Contact — smaller τ - means a closer/faster-approaching obstacle — but the numeric value depends - on frame rate and cell geometry, so it is not literally seconds. The - default `tau_threshold = 3.0` was chosen from the PR sweep below: it lies - on the high-precision shoulder of the curve (P=0.939 at τ=3.0 on this - dataset). Tune per-deployment from the PR curve, not from a physical - distance/speed formula. - -5. **Rotation gate (live module):** if `angular_velocity` stream is connected - and `|ω| > omega_max` (default 0.3 rad/s), `danger_signal` is suppressed. - If unused, `_last_omega` stays 0 — gate is transparent. - ---- - -## Experiment Setup - -### Dataset - -`unitree_office_walk` — 51 s indoor sequence, Go2 drives repeatedly toward -walls and re-orients (96% forward-translation motion). τ-estimation is only -valid during forward translation, making this the appropriate dataset for -evaluation. - -### Odom Gate - -| Condition | Value | Rationale | -|-----------|-------|-----------| -| v\_fwd > 0.15 m/s | forward speed | robot must be translating toward something | -| \|ω\_yaw\| < 0.15 rad/s | yaw rate | eliminates rotation-contaminated frames | - -**125 gated frames out of 390 total (33%).** Forward speed on gated frames: -p25=0.36 · p50=0.45 · p75=0.56 m/s. - -**Ground truth:** LiDAR minimum forward distance in body frame (world frame -transformed using per-frame odom quaternion). Binary GT: `dist < 1.5 m`. - ---- - -## Results - -**Latency:** ~2.5 ms/frame — < 4% of 77 ms frame budget at 13 Hz. - -### Correlation — Does τ Track Real TTC? - -`lidar_ttc = min_fwd_dist / v_fwd` compared to optical flow `min_tau`. - -| Metric | Value | -|--------|-------| -| Paired frames | 82 | -| Pearson r | +0.144 | -| Spearman r | +0.173 | -| RMSE | 3.31 s | -| MAE | 2.52 s | - -Weak positive correlation — correct sign (smaller τ → closer obstacle), noisy -magnitude. Per-cell divergence mixes approach signal with parallax and tracker -drift. Use τ as a **ranked urgency signal**, not a calibrated timer. - -### Binary Detection - -GT positive rate: 84% (indoor walls are always nearby on forward frames). - -| τ threshold | Purpose | Precision | Recall | F1 | -|-------------|---------|-----------|--------|----| -| **3.0 s (default)** | **Production** | **0.939** | **0.295** | **0.449** | -| 5.8 s | Max recall | 0.918 | 0.533 | 0.675 | - -At τ=3.0, 93.9% of alarms are confirmed by LiDAR. Recall of ~30% reflects -the thresholded nature of the signal — many GT-close frames produce τ values -just above the decision boundary (noisy divergence) and are missed. Raising -the threshold to 5.8 lifts recall to 0.53 at a small precision cost. -Precision stays above 0.90 for all τ ≥ 1.5 across the operating range -(AUC-PR = 0.479; below the 0.84 baseline because 84% of gated frames are -GT-positive — precision at each recall level is the meaningful metric). - ---- - -## Known Limitations - -| Limitation | Consequence | Mitigation | -|------------|-------------|------------| -| Stationary obstacles produce no flow | Blind to static walls at constant distance | LiDAR for static proximity | -| τ noisy (RMSE ≈ 3 s) | Not a precise collision timer | Ranked alarm; wire `tac_grid` to planner | -| Rotation contaminates signal | Global image shift = false divergence | Odom gate in eval; `angular_velocity` gate in module | -| Low-texture surfaces | Few keypoints → sparse grid coverage | Edge-enhancing preprocessing | - -**Deployment guidance:** keep the default `tau_threshold = 3.0` for high -precision, or raise toward 5–6 for higher recall; retune from the PR sweep -on your own data rather than from a physical distance/speed formula. Wire -`angular_velocity` from IMU for automatic turn suppression. Use as supplement -to LiDAR — not a replacement. - ---- - -## Verdict - -Optical flow τ-estimation is a viable **complement** to LiDAR — not a standalone -replacement. When the alarm fires it is correct 93.9% of the time (P=0.939 at -τ=3.0), but it only catches ~30% of danger frames (R=0.295). Low recall is -inherent: stationary obstacles produce no flow, and noisy divergence near the -threshold causes missed detections close to the decision boundary. Deploy -alongside LiDAR: LiDAR for static proximity, optical flow for fast-approaching -and dynamic obstacles. Real-time at ~2.5 ms/frame (< 4% of frame budget), -with a single empirically tuned parameter (`tau_threshold`). diff --git a/scripts/run_on_video.py b/scripts/run_on_video.py new file mode 100644 index 0000000000..adbf8c882f --- /dev/null +++ b/scripts/run_on_video.py @@ -0,0 +1,109 @@ +import argparse +import sys +from pathlib import Path + +import cv2 +import numpy as np + +sys.path.insert(0, ".") +from dimos.perception.optical_flow.backends.lucas_kanade import LucasKanadeBackend + +REPO_ROOT = Path(__file__).resolve().parent.parent +OUT_DIR = REPO_ROOT / "results" + +TAU_THRESHOLD = 3.0 # alarm threshold (frames); matches OpticalFlowConfig default +TARGET_FPS = 15 # downsample iPhone 30/60fps to match Go2 dataset frame rate; + # τ is measured in frames so the absolute rate matters for the threshold + + +def draw_overlay(frame_bgr: np.ndarray, result: dict, tau_threshold: float) -> np.ndarray: + """Per-point flow arrows colored by tau band + DANGER/CLEAR banner. + Mirrors what OpticalFlowModule._draw_visualization. do""" + viz = frame_bgr.copy() + red = (0, 0, 255) + yellow = (0, 220, 220) + green = (0, 200, 0) + + for x, y, tau, u, v in result["flow_data"]: + if tau < tau_threshold: + color = red + elif tau < 2.0 * tau_threshold: + color = yellow + else: + color = green + cv2.arrowedLine(viz, (int(x), int(y)), (int(x + u), int(y + v)), + color, 1, tipLength=0.4) + + label = "DANGER" if result["danger"] else "CLEAR" + color = red if result["danger"] else (0, 255, 0) + cv2.putText(viz, label, (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.4, color, 3) + return viz + + +def main() -> None: + p = argparse.ArgumentParser() + p.add_argument("video", help="path to the input video file (mp4/mov/etc.)") + p.add_argument("--target-fps", type=int, default=TARGET_FPS, + help=f"downsample to this fps (default: {TARGET_FPS}, matches Go2 dataset)") + args = p.parse_args() + + video_path = Path(args.video) + if not video_path.exists(): + sys.exit(f"video file {video_path} does not exist") + OUT_DIR.mkdir(exist_ok=True) + out_path = OUT_DIR / f"annotated_{video_path.stem}.mp4" + + cap = cv2.VideoCapture(str(video_path)) + src_fps = cap.get(cv2.CAP_PROP_FPS) + n_src = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + + # Downsample by integer stride so τ-in-frames is calibrated for our threshold. + stride = max(1, round(src_fps / args.target_fps)) + out_fps = src_fps / stride + print(f"input: {video_path.name} {w}x{h} @ {src_fps:.2f}fps ({n_src} frames, {n_src/src_fps:.1f}s)") + print(f"stride: every {stride}-th frame → output ~{out_fps:.1f}fps") + + backend = LucasKanadeBackend(tau_threshold=TAU_THRESHOLD) + fourcc = cv2.VideoWriter_fourcc(*"mp4v") + writer = cv2.VideoWriter(str(out_path), fourcc, out_fps, (w, h)) + + n_processed = 0 + n_danger = 0 + n_warmup = 0 + src_idx = 0 + while True: + ok, frame = cap.read() + if not ok: + break + if src_idx % stride != 0: + src_idx += 1 + continue + src_idx += 1 + + result = backend.compute(frame) + if result is None: + n_warmup += 1 + continue + + n_processed += 1 + if result["danger"]: + n_danger += 1 + + viz = draw_overlay(frame, result, TAU_THRESHOLD) + writer.write(viz) + + writer.release() + cap.release() + + print() + print("Done.") + print(f" processed: {n_processed} frames") + print(f" backend warmup/sparse: {n_warmup} frames") + print(f" DANGER fires: {n_danger} / {n_processed} ({100*n_danger/max(1,n_processed):.1f}%)") + print(f" output: {out_path}") + + +if __name__ == "__main__": + main() From 9087af4935bf8575b652c5b9745598b4438d50ac Mon Sep 17 00:00:00 2001 From: Jeffrey Lin Date: Sun, 26 Apr 2026 12:47:02 -0400 Subject: [PATCH 5/5] refactor(perception): divergence tau + uniform grid keypoint + alarm danger on connected-components MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses review feedback on #1775: - FAST corners → uniform meshgrid (light, non-sparse keypoints) - per-point FOE τ → divergence-on-grid τ - min(τ) alarm → connected-components on thresholded divergence - LK reconstruction-error filter on per-point quality - LiDAR-correlation evaluation removed --- dimos/perception/optical_flow/__init__.py | 0 .../optical_flow/backends/__init__.py | 0 .../optical_flow/backends/lucas_kanade.py | 178 +++++--- .../optical_flow/optical_flow_module.py | 109 ++--- .../perception/optical_flow/tac_estimator.py | 11 - scripts/eval_optical_flow_gated.py | 412 ------------------ scripts/visualize_optical_flow.py | 93 ---- 7 files changed, 165 insertions(+), 638 deletions(-) delete mode 100644 dimos/perception/optical_flow/__init__.py delete mode 100644 dimos/perception/optical_flow/backends/__init__.py delete mode 100644 dimos/perception/optical_flow/tac_estimator.py delete mode 100644 scripts/eval_optical_flow_gated.py delete mode 100644 scripts/visualize_optical_flow.py diff --git a/dimos/perception/optical_flow/__init__.py b/dimos/perception/optical_flow/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/dimos/perception/optical_flow/backends/__init__.py b/dimos/perception/optical_flow/backends/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/dimos/perception/optical_flow/backends/lucas_kanade.py b/dimos/perception/optical_flow/backends/lucas_kanade.py index 54c074de30..37994dd281 100644 --- a/dimos/perception/optical_flow/backends/lucas_kanade.py +++ b/dimos/perception/optical_flow/backends/lucas_kanade.py @@ -2,38 +2,55 @@ import numpy as np from typing import Optional -from dimos.perception.optical_flow.tac_estimator import divergence_to_tac - -class LucasKanadeBackend: +class GridDetector: """ - FAST keypoints + Lucas-Kanade sparse optical flow. - Computes a Time-to-Contact proxy (tau) via flow divergence in an NxN grid. - Based on Lee (1976): a closer/faster-approaching obstacle produces larger - image-expansion and thus smaller tau. Scale-free; no depth calibration. - - Note: the tau value is monotonically related to physical TTC but is not - literally seconds — it depends on frame rate and cell geometry. Use - tau_threshold as an empirically tuned urgency parameter. The default 3.0 - was chosen from the PR-curve sweep on unitree_office_walk (P=0.939 at the - production operating point). + Uniform pixel lattice """ - def __init__(self, max_keypoints=300, grid_size=5, - tau_threshold=3.0, refresh_interval=10, - fast_threshold=20): - self.max_keypoints = max_keypoints - self.grid_size = grid_size + def __init__(self, spacing: int = 20): + self.spacing = spacing + + def detect(self, gray: np.ndarray) -> Optional[np.ndarray]: + h, w = gray.shape + xs = np.arange(0, w, self.spacing) + ys = np.arange(0, h, self.spacing) + gx, gy = np.meshgrid(xs, ys) + pts = np.stack([gx.ravel(), gy.ravel()], axis=1).astype(np.float32) + return pts.reshape(-1, 1, 2) if len(pts) else None + + +class LucasKanadeBackend: + def __init__( + self, + detector: Optional[GridDetector] = None, + tau_threshold: float = 3.0, + refresh_interval: int = 10, + window_size: int = 21, + err_threshold: float = 30.0, + min_blob_area: int = 15, + ): + """ + Args: + detector: Generates seed points for tracking (default: GridDetector). + tau_threshold: Time-to-Contact (frames) limit. Triggers danger if τ < this. + refresh_interval: Frames between redetecting points to prevent drift. + window_size: Edge length of the Lucas-Kanade local tracking window. + err_threshold: Cutoff for LK photometric reconstruction error; drops bad tracks. + min_blob_area: Minimum connected-component area (grid cells) required to + trigger the alarm, filtering out isolated tracking noise. + """ + self.detector = detector or GridDetector() self.tau_threshold = tau_threshold self.refresh_interval = refresh_interval + self.window_size = int(window_size) + self.err_threshold = float(err_threshold) + self.min_blob_area = int(min_blob_area) self.lk_params = dict( - winSize=(21, 21), + winSize=(self.window_size, self.window_size), maxLevel=3, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03), ) - self.fast = cv2.FastFeatureDetector_create( - threshold=fast_threshold, nonmaxSuppression=True - ) self.prev_gray: Optional[np.ndarray] = None self.prev_pts: Optional[np.ndarray] = None self.frame_count: int = 0 @@ -43,8 +60,10 @@ def compute(self, frame_bgr: np.ndarray) -> Optional[dict]: # type: ignore[type Args: frame_bgr: HxWx3 uint8 BGR numpy array Returns: - dict: {tac_grid, danger, flow_pts, min_tau} - None on first frame. + dict: {flow_data, danger} + flow_data: (N, 5) float32 [x, y, tau, u, v] per tracked point. + tau is NaN for non-expanding points. + danger: bool — true iff a low-τ blob of area ≥ min_blob_area exists. """ gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY) @@ -53,11 +72,11 @@ def compute(self, frame_bgr: np.ndarray) -> Optional[dict]: # type: ignore[type self._refresh(gray) return None - curr_pts, status, _ = cv2.calcOpticalFlowPyrLK( + curr_pts, status, err = cv2.calcOpticalFlowPyrLK( self.prev_gray, gray, self.prev_pts, None, **self.lk_params ) - good_mask = status.ravel() == 1 + good_mask = (status.ravel() == 1) & (err.ravel() < self.err_threshold) good_prev = self.prev_pts[good_mask].reshape(-1, 2) good_curr = curr_pts[good_mask].reshape(-1, 2) flow_vecs = good_curr - good_prev @@ -72,36 +91,89 @@ def compute(self, frame_bgr: np.ndarray) -> Optional[dict]: # type: ignore[type if len(good_prev) < 10: return None - h, w = gray.shape - tac_grid = self._tac_grid(good_prev, flow_vecs, h, w) - min_tau = float(np.nanmin(tac_grid)) if not np.all(np.isnan(tac_grid)) else np.inf + h, w = gray.shape + spacing = self.detector.spacing + flow_data, div_smooth = self._divergence_tau( + good_prev, flow_vecs, h, w, spacing + ) + + # Alarm via connected components on the thresholded divergence map. + # Equivalent threshold: τ < tau_threshold ⇔ div > 2/tau_threshold. + div_threshold = 2.0 / self.tau_threshold + mask = (div_smooth > div_threshold).astype(np.uint8) + n_labels, _, stats, _ = cv2.connectedComponentsWithStats(mask, connectivity=8) + + danger = False + if n_labels > 1: + largest_area = int(stats[1:, cv2.CC_STAT_AREA].max()) + danger = largest_area >= self.min_blob_area return { - "tac_grid": tac_grid, - "danger": min_tau < self.tau_threshold, - "flow_pts": (good_prev, good_curr), - "min_tau": min_tau, + "flow_data": flow_data, + "danger": danger, } - def _refresh(self, gray: np.ndarray) -> None: - kps = sorted(self.fast.detect(gray, None), - key=lambda k: k.response, reverse=True)[:self.max_keypoints] - self.prev_pts = ( - np.array([[k.pt] for k in kps], dtype=np.float32) if kps else None + @staticmethod + def _divergence_tau( + pts: np.ndarray, + flows: np.ndarray, + h: int, + w: int, + spacing: int, + ) -> tuple: + """ + Returns: + flow_data: (N, 5) float32 [x, y, tau, u, v] per tracked point — + tau mapped from the per-cell divergence map, for + per-arrow visualization coloring. + div_smooth: 2D divergence map at grid resolution, for the + downstream connected-components alarm. + """ + H_grid = int(np.ceil(h / spacing)) + W_grid = int(np.ceil(w / spacing)) + + u_grid = np.zeros((H_grid, W_grid), dtype=np.float32) + v_grid = np.zeros((H_grid, W_grid), dtype=np.float32) + j_idx = np.round(pts[:, 0] / spacing).astype(np.int32) + i_idx = np.round(pts[:, 1] / spacing).astype(np.int32) + in_bounds = (i_idx >= 0) & (i_idx < H_grid) & (j_idx >= 0) & (j_idx < W_grid) + u_grid[i_idx[in_bounds], j_idx[in_bounds]] = flows[in_bounds, 0] + v_grid[i_idx[in_bounds], j_idx[in_bounds]] = flows[in_bounds, 1] + + # Pre-smooth the flow field and suppresses LK noise + u_smooth = cv2.GaussianBlur(u_grid, (3, 3), sigmaX=0) + v_smooth = cv2.GaussianBlur(v_grid, (3, 3), sigmaX=0) + + # Spatial derivatives + du_dy, du_dx = np.gradient(u_smooth, spacing) + dv_dy, dv_dx = np.gradient(v_smooth, spacing) + div = (du_dx + dv_dy).astype(np.float32) + + # Post-smooth divergence to absorb residual outliers. + div_smooth = cv2.medianBlur(div, 3) + + # τ = 2 / div, only where div is positive (point is expanding = + # surface is approaching). Negative or near-zero div → NaN. + tau_grid = np.full_like(div_smooth, np.nan) + eps = 1e-3 + expanding = div_smooth > eps + tau_grid[expanding] = 2.0 / div_smooth[expanding] + + # Per-point τ + taus = np.full(len(pts), np.nan, dtype=np.float32) + taus[in_bounds] = tau_grid[i_idx[in_bounds], j_idx[in_bounds]] + + flow_data = np.stack( + [ + pts[:, 0].astype(np.float32), + pts[:, 1].astype(np.float32), + taus, + flows[:, 0].astype(np.float32), + flows[:, 1].astype(np.float32), + ], + axis=1, ) + return flow_data, div_smooth - def _tac_grid(self, pts: np.ndarray, flows: np.ndarray, h: int, w: int) -> np.ndarray: - div_grid = np.full((self.grid_size, self.grid_size), np.nan, dtype=np.float32) - cell_h = h / self.grid_size - cell_w = w / self.grid_size - for i in range(self.grid_size): - for j in range(self.grid_size): - mask = ( - (pts[:, 0] >= j * cell_w) & (pts[:, 0] < (j+1) * cell_w) & - (pts[:, 1] >= i * cell_h) & (pts[:, 1] < (i+1) * cell_h) - ) - cf = flows[mask] - if len(cf) < 3: - continue - div_grid[i, j] = np.mean(cf[:, 0]) / cell_w + np.mean(cf[:, 1]) / cell_h - return divergence_to_tac(div_grid) + def _refresh(self, gray: np.ndarray) -> None: + self.prev_pts = self.detector.detect(gray) diff --git a/dimos/perception/optical_flow/optical_flow_module.py b/dimos/perception/optical_flow/optical_flow_module.py index bfe9119b87..3f75e41493 100644 --- a/dimos/perception/optical_flow/optical_flow_module.py +++ b/dimos/perception/optical_flow/optical_flow_module.py @@ -17,45 +17,35 @@ class OpticalFlowConfig(ModuleConfig): - tau_threshold: float = 3.0 # alarm when min_tau < tau_threshold - grid_size: int = 5 # NxN tac grid - omega_max: float = 0.3 # rad/s; danger suppressed above this yaw rate + tau_threshold: float = 3.0 # Time-to-Contact limit (in frames); alarm fires when τ drops below this + omega_max: float = 0.3 # rad/s; danger gated above this yaw rate class OpticalFlowModule(Module): """ - Monocular obstacle avoidance via sparse optical flow + Time-to-Contact. - - Subscribes to a live RGB/BGR camera stream, detects FAST keypoints each - frame, tracks them with Lucas-Kanade, and flags grid cells where the - inverse of flow divergence (tau proxy) falls below tau_threshold. - - Note on units: the backend's tau is monotonically related to physical - time-to-contact but is not literally seconds — it depends on frame rate - and cell geometry. Treat tau_threshold as an empirically tuned urgency - parameter, not a calibrated timer. - - The danger_signal is gated on angular rate: if the optional - angular_velocity stream is connected, danger output is suppressed - whenever |omega| > omega_max (rad/s) to prevent false alarms during turns. + Monocular obstacle avoidance via Lucas-Kanade optical flow on a uniform + grid + per-cell time-to-contact via flow divergence. + Alarm fires when a spatially-coherent blob of low-τ cells exceeds a + minimum area (connected-components on the thresholded divergence map), + not on a per-pixel statistic — real obstacles produce coherent blobs. + + Danger is suppressed when |ω| exceeds omega_max (rotation breaks the + fronto-parallel divergence assumption). The angular_velocity stream is + optional; production stacks would feed an IMU here. """ config: OpticalFlowConfig color_image: In[Image] - angular_velocity: In[Any] # optional: yaw rate in rad/s from IMU or odom + angular_velocity: In[Any] # optional: yaw rate in rad/s from IMU danger_signal: Out[Bool] - tac_grid: Out[Any] + flow_data: Out[Any] # (N, 5) float32: columns (x, y, tau, u, v) flow_visualization: Out[Image] def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) - - self._backend = LucasKanadeBackend( - grid_size=self.config.grid_size, - tau_threshold=self.config.tau_threshold, - ) + self._backend = LucasKanadeBackend(tau_threshold=self.config.tau_threshold) self._last_omega: float = 0.0 @rpc @@ -67,23 +57,21 @@ def start(self) -> None: self.register_disposable(Disposable(unsub_omega)) logger.info("OpticalFlowModule started") + @rpc + def stop(self) -> None: + super().stop() + def _on_angular_velocity(self, msg: Any) -> None: - """Update last known yaw rate. Accepts raw float or .data-wrapped type.""" try: self._last_omega = float(msg.data) if hasattr(msg, "data") else float(msg) except (TypeError, ValueError): pass - @rpc - def stop(self) -> None: - super().stop() - def _on_color_image(self, msg: Image) -> None: frame = msg.data if not isinstance(frame, np.ndarray): frame = np.asarray(frame) - # Backends expect BGR; convert if the incoming image is RGB if msg.format == ImageFormat.RGB: frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) @@ -91,63 +79,46 @@ def _on_color_image(self, msg: Image) -> None: if result is None: return - # Suppress danger during turns: rotation contaminates the divergence - # estimate, producing false alarms. Requires angular_velocity stream. + # Suppress danger during turns is_turning = abs(self._last_omega) > self.config.omega_max gated_danger = bool(result["danger"]) and not is_turning self.danger_signal.publish(Bool(data=gated_danger)) - self.tac_grid.publish(result["tac_grid"]) + self.flow_data.publish(result["flow_data"]) viz = self._draw_visualization(frame, result, is_turning=is_turning) self.flow_visualization.publish(Image.from_numpy(viz, format=ImageFormat.BGR)) def _draw_visualization( - self, frame_bgr: np.ndarray, result: dict, # type: ignore[type-arg] + self, frame_bgr: np.ndarray, result: dict, is_turning: bool = False, ) -> np.ndarray: - """Draw flow vectors and tau-grid overlay onto a copy of the frame.""" + """Per-point flow arrows colored by tau band + status banner.""" viz = frame_bgr.copy() - h, w = viz.shape[:2] - - good_prev, good_curr = result["flow_pts"] - for p, c in zip(good_prev, good_curr): - p0 = (int(p[0]), int(p[1])) - p1 = (int(c[0]), int(c[1])) - cv2.arrowedLine(viz, p0, p1, (0, 255, 0), 1, tipLength=0.4) - - tac_grid = result["tac_grid"] - cell_h = h / self.config.grid_size - cell_w = w / self.config.grid_size - for i in range(self.config.grid_size): - for j in range(self.config.grid_size): - tau = tac_grid[i, j] - if np.isnan(tau): - continue - danger = tau < self.config.tau_threshold - color = (0, 0, 255) if danger else (0, 200, 200) - x0 = int(j * cell_w) - y0 = int(i * cell_h) - x1 = int((j + 1) * cell_w) - y1 = int((i + 1) * cell_h) - cv2.rectangle(viz, (x0, y0), (x1, y1), color, 2) - cv2.putText( - viz, f"{tau:.1f}s", - (x0 + 4, y0 + 18), - cv2.FONT_HERSHEY_SIMPLEX, 0.45, color, 1, - ) + thr = self.config.tau_threshold + + red = (0, 0, 255) + yellow = (0, 220, 220) + green = (0, 200, 0) + + for x, y, tau, u, v in result["flow_data"]: + if tau < thr: + color = red + elif tau < 2.0 * thr: + color = yellow + else: + # Includes NaN (non-expanding points): not a threat. + color = green + p0 = (int(x), int(y)) + p1 = (int(x + u), int(y + v)) + cv2.arrowedLine(viz, p0, p1, color, 1, tipLength=0.4) if is_turning: label = f"GATED (|ω|={abs(self._last_omega):.2f} rad/s)" cv2.putText(viz, label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 200, 255), 2) else: label = "DANGER" if result["danger"] else "CLEAR" - color = (0, 0, 255) if result["danger"] else (0, 255, 0) + color = red if result["danger"] else (0, 255, 0) cv2.putText(viz, label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, color, 2) - cv2.putText( - viz, f"min_tau={result['min_tau']:.2f}s", - (10, 58), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1, - ) - return viz diff --git a/dimos/perception/optical_flow/tac_estimator.py b/dimos/perception/optical_flow/tac_estimator.py deleted file mode 100644 index fb38b34b20..0000000000 --- a/dimos/perception/optical_flow/tac_estimator.py +++ /dev/null @@ -1,11 +0,0 @@ -import numpy as np - - -def divergence_to_tac(divergence_grid: np.ndarray) -> np.ndarray: - """ - Convert divergence grid to time-to-contact grid. - tau = 1/divergence; NaN where divergence <= ~0 (not approaching). - Accepts NaN entries (e.g. unpopulated cells) and preserves them. - """ - with np.errstate(divide='ignore', invalid='ignore'): - return np.where(divergence_grid > 1e-6, 1.0 / divergence_grid, np.nan) diff --git a/scripts/eval_optical_flow_gated.py b/scripts/eval_optical_flow_gated.py deleted file mode 100644 index 4715ed8c84..0000000000 --- a/scripts/eval_optical_flow_gated.py +++ /dev/null @@ -1,412 +0,0 @@ -""" -Odom-gated Optical Flow vs LiDAR Correlation -Issue #1775 — Real-time obstacle avoidance from monocular optical flow - -Question --------- -On frames where the robot is purely translating forward (no turning), -how well does optical flow τ-estimation correlate with LiDAR obstacle -proximity? - -Odom gate ---------- - v_fwd > 0.15 m/s (robot moving forward) - |ω_yaw| < 0.15 rad/s (robot not turning) - -This selects wall-approach segments where τ = 1/divergence is -theory-valid: pure forward translation generates a radially expanding -flow field from the focus of expansion, and divergence is proportional -to 1/τ. - -Metrics -------- -1. Correlation — Pearson r and Spearman r between of_tau and - lidar_ttc = min_fwd_dist / v_fwd -2. Binary P/R — τ < threshold vs lidar_dist < 1.5 m -3. PR curve — sweep τ ∈ [0.3, 6.0] s -4. Latency — mean per-frame compute time - -Backend -------- - FAST corner detector + Lucas-Kanade sparse flow. - ORB-SLAM uses the same FAST corners for map-point extraction; the - additional orientation step ORB computes (for BRIEF descriptor - matching) is irrelevant here — LK tracks image patches, not - descriptors. - -Run from repo root ------------------- - PYTHONPATH=.venv/lib/python3.12/site-packages/rerun_sdk \\ - uv run python3 scripts/eval_optical_flow_gated.py - -Output ------- - results/optical_flow_gated.json -""" - -import sys -import os -import json -import time -from pathlib import Path - -import numpy as np - -sys.path.insert(0, ".") -from dimos.memory.timeseries.legacy import LegacyPickleStore -from dimos.perception.optical_flow.backends.lucas_kanade import LucasKanadeBackend - -# ── Paths ───────────────────────────────────────────────────────────────────── -REPO_ROOT = Path(__file__).resolve().parent.parent -DATA = REPO_ROOT / "data" / "unitree_office_walk" - -# ── Odom gate thresholds ────────────────────────────────────────────────────── -V_FWD_MIN = 0.15 # m/s — must be moving forward -OMEGA_MAX = 0.15 # rad/s — must not be turning -MIN_DT = 0.02 # s — skip duplicate/near-duplicate odom timestamps - -# ── Sensor config ───────────────────────────────────────────────────────────── -FOV_DEG = 60.0 # forward FOV for lidar obstacle extraction -MAX_RANGE = 5.0 # m — max lidar range to consider -DANGER_M = 1.5 # m — lidar distance threshold → "danger" GT label - -# ── Optical flow config ──────────────────────────────────────────────────────── -GRID_SIZE = 5 -TAU_SWEEP = np.round(np.linspace(0.3, 6.0, 57), 2) # PR curve sweep - - -# ── Odom kinematics ─────────────────────────────────────────────────────────── - -def odom_kinematics(odom_items): - """ - Per-frame forward velocity (m/s) and yaw rate (rad/s). - Frames with dt < MIN_DT carry forward the last valid value. - Returns (v_fwd, omega, gate) arrays, length = len(odom_items). - """ - n = len(odom_items) - ts = np.array([t for t, _ in odom_items]) - xs = np.array([d["data"]["pose"]["position"]["x"] for _, d in odom_items]) - ys = np.array([d["data"]["pose"]["position"]["y"] for _, d in odom_items]) - - raw_yaws = [] - for _, d in odom_items: - ori = d["data"]["pose"]["orientation"] - qx, qy, qz, qw = ori["x"], ori["y"], ori["z"], ori["w"] - raw_yaws.append(np.arctan2(2*(qw*qz + qx*qy), 1 - 2*(qy**2 + qz**2))) - yaws = np.unwrap(raw_yaws) - - v_fwd = np.zeros(n) - omega = np.zeros(n) - - last_v, last_w = 0.0, 0.0 - for i in range(1, n): - dt = ts[i] - ts[i - 1] - if dt < MIN_DT: - v_fwd[i] = last_v - omega[i] = last_w - continue - dx, dy = xs[i] - xs[i - 1], ys[i] - ys[i - 1] - heading = np.array([np.cos(yaws[i - 1]), np.sin(yaws[i - 1])]) - last_v = np.dot([dx, dy], heading) / dt - last_w = abs(yaws[i] - yaws[i - 1]) / dt - v_fwd[i] = last_v - omega[i] = last_w - - v_fwd[0] = v_fwd[1] - omega[0] = omega[1] - - gate = (v_fwd > V_FWD_MIN) & (omega < OMEGA_MAX) - return v_fwd, omega, gate - - -# ── Frame sync ───────────────────────────────────────────────────────────────── - -def _nearest(arr, val): - return int(np.argmin(np.abs(arr - val))) - - -def sync_frames(lidar_s, video_s, odom_s, tol=0.1): - """ - Anchor on lidar timestamps; find nearest video and odom within tol. - Yields (ts, {lidar, video, odom, v_fwd, omega, is_gated}). - """ - video_items = list(video_s._iter_items()) - odom_items = list(odom_s._iter_items()) - v_fwd, omega, gate = odom_kinematics(odom_items) - - v_ts = np.array([t for t, _ in video_items]) - o_ts = np.array([t for t, _ in odom_items]) - v_data = [d for _, d in video_items] - o_data = [d for _, d in odom_items] - - for ts_l, data_l in lidar_s._iter_items(): - vi = _nearest(v_ts, ts_l) - oi = _nearest(o_ts, ts_l) - if abs(v_ts[vi] - ts_l) > tol or abs(o_ts[oi] - ts_l) > tol: - continue - yield ts_l, { - "lidar": data_l, - "video": v_data[vi], - "odom": o_data[oi], - "v_fwd": float(v_fwd[oi]), - "omega": float(omega[oi]), - "is_gated": bool(gate[oi]), - } - - -# ── Coordinate transform ─────────────────────────────────────────────────────── - -def world_to_body(pts_world: np.ndarray, odom_msg: dict) -> np.ndarray: - pose = odom_msg["data"]["pose"] - pos, ori = pose["position"], pose["orientation"] - rx, ry, rz = pos["x"], pos["y"], pos["z"] - qx, qy, qz, qw = ori["x"], ori["y"], ori["z"], ori["w"] - R = np.array([ - [1 - 2*(qy**2 + qz**2), 2*(qx*qy - qw*qz), 2*(qx*qz + qw*qy)], - [2*(qx*qy + qw*qz), 1 - 2*(qx**2 + qz**2), 2*(qy*qz - qw*qx)], - [2*(qx*qz - qw*qy), 2*(qy*qz + qw*qx), 1 - 2*(qx**2 + qy**2)], - ]) - return (pts_world - np.array([rx, ry, rz])) @ R - - -def min_fwd_dist(lidar_msg, odom_msg): - """Minimum distance to any forward obstacle in body frame. inf if none.""" - try: - pts = world_to_body(np.asarray(lidar_msg["data"]["data"]["points"]), odom_msg) - except Exception: - return np.inf - x, y, z = pts[:, 0], pts[:, 1], pts[:, 2] - dist = np.sqrt(x**2 + y**2) - mask = (x > 0.2) & (dist < MAX_RANGE) & \ - (np.abs(np.arctan2(y, x)) < np.radians(FOV_DEG / 2)) & \ - (z > -0.3) & (z < 1.5) - return float(dist[mask].min()) if mask.any() else np.inf - - -# ── Correlation helpers ──────────────────────────────────────────────────────── - -def pearson_r(x, y): - if len(x) < 3: - return float("nan") - xm, ym = x - x.mean(), y - y.mean() - denom = np.sqrt((xm**2).sum() * (ym**2).sum()) - return float((xm * ym).sum() / denom) if denom > 0 else float("nan") - - -def spearman_r(x, y): - if len(x) < 3: - return float("nan") - rx = np.argsort(np.argsort(x)).astype(float) - ry = np.argsort(np.argsort(y)).astype(float) - return pearson_r(rx, ry) - - -def auc_trapz(recalls, precisions): - pairs = sorted(zip(recalls, precisions)) - r_arr = np.array([p[0] for p in pairs]) - p_arr = np.array([p[1] for p in pairs]) - return float(np.trapezoid(p_arr, r_arr)) - - -# ── Main evaluation loop ─────────────────────────────────────────────────────── - -def run_backend(synced, BackendClass, bname): - """ - Process every frame through the OF backend (maintains tracking state), - but compute metrics only on odom-gated frames. - """ - backend = BackendClass() - - # Arrays built on gated frames only - of_tau_list = [] # optical flow min_tau (may be inf) - lidar_d_list = [] # LiDAR forward distance (m) - lidar_ttc_list = [] # LiDAR TTC = dist / v_fwd (s) - v_fwd_list = [] # forward speed on that frame - lat_list = [] # latency (ms) for all processed frames - - for _, data in synced: - frame = data["video"] - if not isinstance(frame, np.ndarray): - frame = np.array(frame) - - t0 = time.perf_counter() - result = backend.compute(frame) - lat_ms = (time.perf_counter() - t0) * 1000 - lat_list.append(lat_ms) - - if not data["is_gated"] or result is None: - continue - - of_tau = result["min_tau"] - lidar_d = min_fwd_dist(data["lidar"], data["odom"]) - v_fwd = data["v_fwd"] - - lidar_ttc = lidar_d / v_fwd if (np.isfinite(lidar_d) and v_fwd > 0.05) else np.inf - - of_tau_list.append(of_tau) - lidar_d_list.append(lidar_d) - lidar_ttc_list.append(lidar_ttc) - v_fwd_list.append(v_fwd) - - n_gated = len(of_tau_list) - mean_lat = float(np.mean(lat_list)) - - of_tau_arr = np.array(of_tau_list) - lidar_d_arr = np.array(lidar_d_list) - lidar_ttc_arr = np.array(lidar_ttc_list) - v_fwd_arr = np.array(v_fwd_list) - - print(f"\n Gated frames: {n_gated} | mean latency: {mean_lat:.2f} ms/frame") - if v_fwd_arr.size: - print(f" v_fwd on gated: " - f"p25={np.percentile(v_fwd_arr,25):.2f} " - f"p50={np.percentile(v_fwd_arr,50):.2f} " - f"p75={np.percentile(v_fwd_arr,75):.2f} m/s") - - # ── Metric 1: τ correlation ─────────────────────────────────────────────── - # Keep only frames where BOTH of_tau and lidar_ttc are finite and sane - corr_mask = (np.isfinite(of_tau_arr) & np.isfinite(lidar_ttc_arr) & - (lidar_ttc_arr < 15.0) & (of_tau_arr < 15.0)) - of_c = of_tau_arr[corr_mask] - ltc_c = lidar_ttc_arr[corr_mask] - - pr = pearson_r(of_c, ltc_c) if len(of_c) >= 3 else float("nan") - sr = spearman_r(of_c, ltc_c) if len(of_c) >= 3 else float("nan") - rmse = float(np.sqrt(np.mean((of_c - ltc_c)**2))) if len(of_c) >= 3 else float("nan") - mae = float(np.mean(np.abs(of_c - ltc_c))) if len(of_c) >= 3 else float("nan") - - print(f"\n Correlation (n={len(of_c)} paired frames)") - print(f" Pearson r : {pr:+.3f}") - print(f" Spearman r : {sr:+.3f}") - print(f" RMSE : {rmse:.3f} s") - print(f" MAE : {mae:.3f} s") - - # ── Metric 2 & 3: PR curve over τ threshold ─────────────────────────────── - # GT: lidar_dist < DANGER_M - gt_danger = lidar_d_arr < DANGER_M - - pr_curve = [] - f1_best = 0.0; tau_best = TAU_SWEEP[0]; prec_best = 0.0; rec_best = 0.0 - - for tau in TAU_SWEEP: - pred = of_tau_arr < tau - TP = int(np.sum(pred & gt_danger)) - FP = int(np.sum(pred & ~gt_danger)) - FN = int(np.sum(~pred & gt_danger)) - p = TP / (TP + FP + 1e-8) - r = TP / (TP + FN + 1e-8) - f1 = 2*p*r / (p + r + 1e-8) - pr_curve.append({"tau": float(tau), "precision": round(p,4), - "recall": round(r,4), "f1": round(f1,4)}) - if f1 > f1_best: - f1_best = f1; tau_best = float(tau) - prec_best = p; rec_best = r - - auc = auc_trapz([pt["recall"] for pt in pr_curve], - [pt["precision"] for pt in pr_curve]) - - n_gt_pos = int(gt_danger.sum()) - print(f"\n Binary detection (GT: lidar_dist < {DANGER_M} m)") - print(f" GT positive frames: {n_gt_pos}/{n_gated} ({n_gt_pos/max(n_gated,1)*100:.0f}%)") - print(f" AUC-PR : {auc:.4f}") - print(f" Optimal τ : {tau_best:.2f} s → " - f"P={prec_best:.3f} R={rec_best:.3f} F1={f1_best:.3f}") - - return { - "backend": bname, - "dataset": "unitree_office_walk", - "odom_gate": {"v_fwd_min": V_FWD_MIN, "omega_max": OMEGA_MAX}, - "n_gated_frames": n_gated, - "mean_latency_ms": round(mean_lat, 2), - "correlation": { - "n_paired": int(len(of_c)), - "pearson_r": round(pr, 4), - "spearman_r":round(sr, 4), - "rmse_s": round(rmse, 4), - "mae_s": round(mae, 4), - }, - "binary_detection": { - "gt_danger_m": DANGER_M, - "n_gt_positive": n_gt_pos, - "auc_pr": round(auc, 4), - "optimal_tau_s": tau_best, - "optimal_f1": round(f1_best, 4), - "optimal_prec": round(prec_best, 4), - "optimal_rec": round(rec_best, 4), - }, - "pr_curve": pr_curve, - "v_fwd_percentiles": { - "p25": round(float(np.percentile(v_fwd_arr, 25)), 3) if v_fwd_arr.size else None, - "p50": round(float(np.percentile(v_fwd_arr, 50)), 3) if v_fwd_arr.size else None, - "p75": round(float(np.percentile(v_fwd_arr, 75)), 3) if v_fwd_arr.size else None, - }, - } - - -# ── Entry point ──────────────────────────────────────────────────────────────── - -def main(): - print("=" * 68) - print("Odom-gated Optical Flow vs LiDAR Correlation") - print(f"Dataset : {DATA}") - print(f"Gate : v_fwd > {V_FWD_MIN} m/s AND |ω| < {OMEGA_MAX} rad/s") - print(f"GT : lidar_dist < {DANGER_M} m (forward FOV={FOV_DEG}°, max={MAX_RANGE}m)") - print("=" * 68) - - lidar_store = LegacyPickleStore(DATA / "lidar") - video_store = LegacyPickleStore(DATA / "video") - odom_store = LegacyPickleStore(DATA / "odom") - - print("\nSyncing frames (anchored on lidar)...", end=" ", flush=True) - synced = list(sync_frames(lidar_store, video_store, odom_store)) - n_gated = sum(1 for _, d in synced if d["is_gated"]) - print(f"done. Total: {len(synced)} Gated: {n_gated} " - f"({n_gated/max(len(synced),1)*100:.0f}%)") - - print(f"\n{'─'*68}") - print("Backend: LucasKanade-FAST") - print(f"{'─'*68}") - result = run_backend(synced, LucasKanadeBackend, "lucas_kanade_fast") - - # ── Interpretation ───────────────────────────────────────────────────────── - lk_c = result["correlation"]; lk_b = result["binary_detection"] - sr = lk_c["spearman_r"]; pr_r = lk_c["pearson_r"] - auc = lk_b["auc_pr"]; tau_opt = lk_b["optimal_tau_s"] - f1 = lk_b["optimal_f1"]; prec = lk_b["optimal_prec"]; rec_best = lk_b["optimal_rec"] - - tau3 = next((pt for pt in result["pr_curve"] if abs(pt["tau"] - 3.0) < 0.06), {}) - - corr_interp = ( - "τ tracks real TTC: use raw value as a continuous urgency score." - if sr > 0.4 else - "τ weakly correlated with TTC: useful for ranking imminence, not precise timing." - if sr > 0.15 else - "τ not correlated with TTC on this dataset: use as presence detector only." - ) - print(f"\n{'='*68}") - print("Interpretation") - print(f"{'='*68}") - print(f""" - Correlation r={pr_r:+.3f} (Pearson), {sr:+.3f} (Spearman) - → {corr_interp} - - Production default τ=3.0 (high-precision shoulder of PR curve) - P={tau3.get('precision','?'):.3f} R={tau3.get('recall','?'):.3f} F1={tau3.get('f1','?'):.3f} - - Data-optimal τ={tau_opt:.1f} (maximum F1 from PR sweep) - P={prec:.3f} R={rec_best:.3f} F1={f1:.3f} - → Higher recall at cost of earlier (looser) firing; tune to use-case. - - AUC-PR={auc:.3f} (random baseline ≈ {lk_b['n_gt_positive']/max(result['n_gated_frames'],1):.2f}) - Mean latency: {result['mean_latency_ms']:.2f} ms/frame -""") - - os.makedirs("results", exist_ok=True) - out = "results/optical_flow_gated.json" - with open(out, "w") as f: - json.dump(result, f, indent=2, default=str) - print(f"Saved → {out}") - - -if __name__ == "__main__": - main() diff --git a/scripts/visualize_optical_flow.py b/scripts/visualize_optical_flow.py deleted file mode 100644 index 76ac849dd2..0000000000 --- a/scripts/visualize_optical_flow.py +++ /dev/null @@ -1,93 +0,0 @@ -""" -Live visualization of the Optical Flow obstacle-avoidance module on -unitree_office_walk. Streams annotated frames, flow vectors, tau grid, -and min_tau time series into a Rerun viewer (spawned automatically). - -Run from repo root: - PYTHONPATH=.venv/lib/python3.12/site-packages/rerun_sdk \\ - uv run python3 scripts/visualize_optical_flow.py -""" - -import sys -from pathlib import Path - -import cv2 -import numpy as np -import rerun as rr - -sys.path.insert(0, ".") -from dimos.memory.timeseries.legacy import LegacyPickleStore -from dimos.perception.optical_flow.backends.lucas_kanade import LucasKanadeBackend - -REPO_ROOT = Path(__file__).resolve().parent.parent -DATA = REPO_ROOT / "data" / "unitree_office_walk" - - -def draw_overlay(frame_bgr, result, tau_threshold, grid_size): - """Draw flow arrows + tau grid + DANGER/CLEAR label onto a copy.""" - viz = frame_bgr.copy() - h, w = viz.shape[:2] - - gp, gc = result["flow_pts"] - for p, c in zip(gp, gc): - cv2.arrowedLine(viz, (int(p[0]), int(p[1])), (int(c[0]), int(c[1])), - (0, 255, 0), 1, tipLength=0.4) - - cell_h = h / grid_size - cell_w = w / grid_size - for i in range(grid_size): - for j in range(grid_size): - tau = result["tac_grid"][i, j] - if np.isnan(tau): - continue - danger = tau < tau_threshold - color = (0, 0, 255) if danger else (0, 200, 200) - x0, y0 = int(j * cell_w), int(i * cell_h) - x1, y1 = int((j + 1) * cell_w), int((i + 1) * cell_h) - cv2.rectangle(viz, (x0, y0), (x1, y1), color, 2) - cv2.putText(viz, f"{tau:.1f}", (x0 + 4, y0 + 18), - cv2.FONT_HERSHEY_SIMPLEX, 0.45, color, 1) - - label = "DANGER" if result["danger"] else "CLEAR" - label_color = (0, 0, 255) if result["danger"] else (0, 255, 0) - cv2.putText(viz, label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, label_color, 2) - cv2.putText(viz, f"min_tau={result['min_tau']:.2f}", - (10, 58), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) - return viz - - -def main(): - rr.init("optical_flow_1775", spawn=True) - rr.log("threshold", rr.SeriesLines(colors=[255, 128, 0], names="tau_threshold"), - static=True) - - backend = LucasKanadeBackend() - video = LegacyPickleStore(DATA / "video") - - print(f"Streaming {DATA.name} to Rerun… tau_threshold={backend.tau_threshold}") - - n = 0 - for ts, frame in video._iter_items(): - rr.set_time("video", timestamp=ts) - - if not isinstance(frame, np.ndarray): - frame = np.asarray(frame) - - result = backend.compute(frame) - if result is None: - rr.log("camera", rr.Image(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))) - n += 1 - continue - - viz = draw_overlay(frame, result, backend.tau_threshold, backend.grid_size) - rr.log("camera", rr.Image(cv2.cvtColor(viz, cv2.COLOR_BGR2RGB))) - rr.log("min_tau", rr.Scalars(float(result["min_tau"]) if np.isfinite(result["min_tau"]) else 0.0)) - rr.log("threshold", rr.Scalars(backend.tau_threshold)) - rr.log("danger", rr.TextLog("DANGER" if result["danger"] else "CLEAR")) - n += 1 - - print(f"Done. {n} frames streamed. Viewer stays open; Ctrl+C to exit.") - - -if __name__ == "__main__": - main()