From 4c24f28a8818a9a8d386319b946dbb21f9f77bca Mon Sep 17 00:00:00 2001 From: Aron Novak Date: Thu, 11 Jun 2026 10:34:00 +0200 Subject: [PATCH 1/3] Add universal viewer with TC002C Duo radiometric support New src/thermalcam.py auto-detects InfiRay/Topdon USB thermal cameras and decodes real temperatures: TC001 (256x384, /64) and TC002C Duo (512x484, /16, which also yields a clean 512x384 image), with a labelled relative fallback otherwise. Adds docs/TC002C-DUO.md (frame layout + reverse-engineered vendor protocol), requirements.txt, and an experimental command module. Co-Authored-By: Claude Opus 4.8 (1M context) --- README.md | 56 ++++ docs/TC002C-DUO.md | 169 ++++++++++ requirements.txt | 7 + src/thermal_protocol.py | 146 ++++++++ src/thermalcam.py | 712 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 1090 insertions(+) create mode 100644 docs/TC002C-DUO.md create mode 100644 requirements.txt create mode 100644 src/thermal_protocol.py create mode 100644 src/thermalcam.py diff --git a/README.md b/README.md index 409cad6..cb25716 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,62 @@ https://www.eevblog.com/forum/thermal-imaging/infiray-and-their-p2-pro-discussio Check out Leo's Github here: https://github.com/LeoDJ/P2Pro-Viewer/tree/main +## Universal viewer — `src/thermalcam.py` + +The original program (`src/tc001v4.2.py`, below) is hard-coded for the **Topdon +TC001** frame format. `src/thermalcam.py` is a camera-agnostic rewrite that also +works with newer Topdon / InfiRay models such as the **TC002C Duo**, which expose +a different USB frame geometry. + +What it adds: + +- **Auto-detects** the thermal camera on the USB bus — no `--device` needed. +- **Auto-detects the frame geometry** instead of assuming `256x384`, so it + handles the TC001 (`256x384`) and the TC002C Duo family (`256x392`, …). +- **Real error handling** (the original has none). +- **Real temperatures.** It auto-detects the camera's radiometric mode and + decodes true °C — the TC001 at `256x384` and the **TC002C Duo at `512x484`** + (which also gives a crisp 512×384 image). Cameras with no 16-bit mode fall back + to a clearly-labelled *relative* scale. The absolute reading can be offset- + calibrated with `--temp-offset` / the `[` `]` keys. See `docs/TC002C-DUO.md`. +- Same niceties: colormaps, HUD, recording, snapshots, scaling, blur, contrast, + hot/cold spot tracking — plus fixed-pattern-noise removal and a contrast stretch + for cameras that don't pre-AGC their preview. + +### Install + +```bash +sudo apt-get install python3-opencv # Debian/Ubuntu/Raspberry Pi +# or: pip install -r requirements.txt +``` + +### Run + +```bash +python3 src/thermalcam.py # auto-detect everything +python3 src/thermalcam.py --device /dev/video2 +python3 src/thermalcam.py --resolution 256x384 # e.g. force TC001 radiometric mode +python3 src/thermalcam.py --selftest 20 # headless: save a snapshot + diagnostics, no GUI +``` + +Useful options: `--temp-offset N` to correct the absolute °C, `--temp-scale +{auto,64,16}` / `--temp-order {le,be}` to force a radiometric decode, +`--no-stretch` / `--no-destripe` / `--smooth` for the image cleanups. + +### Keys + +`a/z` blur · `s/x` min-max threshold · `d/c` scale · `f/v` contrast · `m` colormap +· `h` HUD · `n` swap bands · `g` temporal smoothing · `e/w` fullscreen on/off · +`r/t` record/stop · `p` snapshot · `[`/`]` temperature-offset calibration · `q`/ESC quit + +### Camera support + +| Camera | Image | Temperature | +|--------|-------|-------------| +| Topdon TC001 | ✅ | ✅ real °C (`256x384`, /64) | +| Topdon TC002C Duo | ✅ hi-res 512×384 | ✅ real °C (`512x484`, /16; offset-calibratable) | +| Other InfiRay UVC | ✅ likely | auto-detected if a radiometric mode is present | + ## Introduction diff --git a/docs/TC002C-DUO.md b/docs/TC002C-DUO.md new file mode 100644 index 0000000..a9825a0 --- /dev/null +++ b/docs/TC002C-DUO.md @@ -0,0 +1,169 @@ +# Topdon TC002C Duo (and other InfiRay UVC cameras) on Linux + +Notes from getting `src/thermalcam.py` working with a **Topdon TC002C Duo**, and a +reverse-engineering log of the radiometric (true-temperature) protocol pulled +out of the Android app. Written so the work can be continued. + +The TC002C Duo is a dual-sensor (thermal + visible) camera built on the InfiRay +`iruvc` engine — the same family as the TC001 this repo originally targeted, but +it behaves differently over USB. + +USB id: `2bdf:0102` (`Topdon`). It enumerates as a standard UVC camera +(`/dev/videoN`), so `thermalcam.py` auto-detects it. + +## Frame geometry + +`v4l2-ctl -d /dev/video2 --list-formats-ext` reports a partly-garbage table (the +firmware exposes bogus descriptors like `4x12305`, `60x3299`). The real, usable +YUYV modes are: + +| Resolution | What it contains | +|-----------|------------------| +| `256x192` | thermal preview only (8-bit) | +| `256x392` | stacked: two 8-bit thermal planes (**no** 16-bit data) | +| `512x384` | 2× thermal preview, 8-bit only | +| `512x484` | **16-bit temperature band (top) + 512x384 thermal image** ← use this | +| `644x384`, `520x192` | other banded/fused layouts | + +**The `512x484` mode is the one to use.** It is a stack of horizontal bands: + +``` +rows 0 - 95 : 16-bit temperature data (256x192, little-endian, /16 Kelvin), + packed two sensor rows per 512-wide line -> 96*512*2 = 256*192*2 bytes +rows 96 - 97 : telemetry +rows 98 -481 : 512x384 thermal AGC image (8-bit, what we display) +rows 482 -483 : telemetry +``` + +Read naively as one YUYV image the temperature band looks like a green/torn +stripe (its high byte is mis-read as chroma) — which is exactly why it was first +mistaken for garbage. Decoded as 16-bit it is a clean radiometric frame. + +Two gotchas cost real debugging time and are handled in `thermalcam.py`: + +* **OpenCV buffering** hands back stale, mid-frame-desynced buffers. Set + `CAP_PROP_BUFFERSIZE = 1`. (`v4l2-ctl --stream-mmap` never desyncs.) +* **Opening an unsupported resolution desyncs the next stream.** Enumerate the + advertised modes (`VIDIOC_ENUM_FRAMESIZES`) and never probe one the camera + doesn't list. The occasional desynced frame is also detected (its shifted + bytes decode to absurd temperatures) and skipped. + +## Temperature + +Decode the band as little-endian 16-bit and apply the standard InfiRay TPD +formula: + +``` +temp_C = raw / 16 - 273.15 +``` + +This gives a stable, spatially-correct field (the warm/cool structure matches the +scene exactly). The **absolute** value can be offset by several °C because we are +not running the camera's shutter-based NUC or ambient/emissivity correction — use +`--temp-offset` / the `[` `]` keys to dial it in against a known reference (e.g. +your own skin ~34 °C). The original TC001 uses the same scheme at `256x384` but +with a `/64` divisor; `thermalcam.py` knows both via `KNOWN_LAYOUTS`. + +> Earlier notes in this file claimed the Duo exposes *no* 16-bit data over UVC. +> That was wrong — it was based on only testing `256x392`. The `512x484` mode +> carries full radiometric data with no vendor command required. + +## Vendor command protocol (still useful for image quality) + +True temperatures need no command, but the **image** still shows the sensor's +uncorrected non-uniformity (blotchy on flat scenes) because we can't trigger the +camera's shutter NUC over plain UVC. That — and finer temperature calibration — +is what the vendor command protocol below is for. It was reverse-engineered from +the Android app's native libraries before the `512x484` mode was found. + +## The vendor command protocol (reverse-engineered) + +The Android APK's Kotlin/Java only holds the InfiRay SDK *wrapper* +(`com.energy.iruvc.*`); the real protocol is in the native `.so` libraries, +which live in the **`split_config.arm64_v8a.apk`** split (pull from the phone via +`adb`, see below). The relevant libs: + +- `libircmd.so` — the `ir_cmd` command set + temperature math +- `libUSBUVCCamera.so` — a libuvc + libusb fork; `UVCCamera::sendCommand` is the transport +- `libUSBDualCamera.so` — the Duo dual-sensor path +- `libadvirtemp.so`, `libdualcalibration.so` — Y16→°C + Duo calibration + +### Transport: `UVCCamera::sendCommand` + +Commands are **USB vendor control transfers** (not UVC-class), i.e. plain +`libusb_control_transfer`: + +``` +sendCommand(this, bmRequestType, bRequest, wValue, wIndex, data, wLength) + -> libusb_control_transfer(handle, bmRequestType, bRequest, + wValue, wIndex, data, wLength, timeout=1000) +``` + +| Operation | bmRequestType | bRequest | wValue | +|-----------|--------------|----------|--------| +| register WRITE | `0x41` (OUT, vendor, interface) | `0x45` (`'E'`) | `0x0078` | +| register READ | `0xc1` (IN, vendor, interface) | `0x44` (`'D'`) | `0x0078` | + +`wIndex` is a **device register**: + +| Register | Meaning | +|----------|---------| +| `0x9d00` | command descriptor (opcode + arg length) | +| `0x9d08` | command payload (bulk data) | +| `0x1d00`, `0x1d08`, `0x1d40` | command parameters | +| `0x1d04`, `0x1d08` | result read-back | +| `0x200` | status register (poll: bit0 = busy, retry; error if >3) | + +### Worked example: `preview_start` + +Decoded from `Java_com_energy_iruvc_ircmd_LibIRCMD_preview_1start` in `libircmd.so`: + +``` +1. WRITE 0x9d00 <- 0f c1 00 00 00 00 00 08 # opcode 0xc10f, 8-byte arg block +2. WRITE 0x1d08 <- [fps, + (source==1 ? 0x80 : 0x00), # source: 0=SENSOR, 1=FIX_PATTERN + width_hi, width_lo, # big-endian + height_hi, height_lo, # big-endian + mode, # StartPreviewMode (0=VOC_DVP_MODE) + path] # PreviewPathChannel +3. POLL 0x200 (read 1 byte) until (status & 1) == 0; abort if status > 3 +``` + +`thermal_protocol.py` in this directory implements this transport and command. +**It is experimental and unverified on hardware** — see the safety notes there. + +## Optional next steps (nice-to-have, not required) + +Basic temperatures and a clean hi-res image already work from the `512x484` +stream with no commands. The vendor protocol would let us go further: + +* **Shutter NUC** — periodically trigger the sensor's flat-field correction + (`shutter_manual_switch` / `shutter_sta_set`) to remove the slowly-drifting + blotchy non-uniformity that's visible on flat scenes. +* **Calibrated °C** — apply the camera's emissivity / ambient / NUC correction + (`libadvirtemp.so` + calibration assets `nuc_table_high.bin`, `tau*.bin`, + `kt_high.bin`, `bt_high.bin`, `dual_calibration_parameters2.bin`) instead of the + raw linear formula, removing the absolute offset. +* **Visible / fused image** — the `644x384` and other modes appear to carry the + Duo's visible camera for thermal+visible fusion (`libUSBDualCamera.so`). + +To pursue these, either continue the static RE the way `preview_start` was +decoded, or capture the real app's USB traffic (phone with `usbmon`+root, or +Waydroid on Linux with the camera passed through). None of it is needed for the +working radiometric viewer. + +Once the stream delivers genuine 16-bit data, `thermalcam.py` already +auto-detects it (`auto_detect_radiometric`) and shows real °C — or force it with +`--temp-scale`. + +## Pulling the native libs from the phone + +```bash +adb shell pm path com.topdon.topInfrared # find base + split APKs +adb pull <.../split_config.arm64_v8a.apk> /tmp/split.apk +unzip -o /tmp/split.apk 'lib/*' -d /tmp/topdon_libs +ls /tmp/topdon_libs/lib/arm64-v8a/ # *.so +``` + +The libs are ARM64 Android (won't run on an x86_64 laptop) but disassemble fine +(Ghidra/radare2), which is how the protocol above was recovered. diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ffe746e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +# Core viewer (src/thermalcam.py) +opencv-python>=4.5 +numpy>=1.20 + +# Only for the experimental radiometric-unlock work (src/thermal_protocol.py). +# Not needed for the viewer. +# pyusb>=1.2 diff --git a/src/thermal_protocol.py b/src/thermal_protocol.py new file mode 100644 index 0000000..f9f4f53 --- /dev/null +++ b/src/thermal_protocol.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python3 +""" +thermal_protocol.py - EXPERIMENTAL InfiRay/Topdon vendor command transport. + +Reverse-engineered from the Topdon Android app's native libraries +(libircmd.so / libUSBUVCCamera.so). See docs/TC002C-DUO.md for the full writeup. + +NOTE: real temperatures already work in thermalcam.py with NO commands at all +(via the 512x484 radiometric mode). This module is NOT needed for that. It exists +for the optional next steps - shutter NUC, full °C calibration, the visible/fused +image - which do need the vendor command channel. + +>>> THIS IS UNVERIFIED ON HARDWARE AND NOT WIRED INTO thermalcam.py. <<< +By default it runs DRY (prints the control transfers it WOULD send). Pass --run +to actually talk to the camera, at your own risk. + +Safety: + * Only the safe register reads and the `preview_start` command are implemented. + * The flash/OEM/erase commands from the SDK are deliberately NOT implemented. + * A USB replug resets the camera if it ends up in a bad state. + +Linux caveat: these are USB *vendor* control transfers (bmRequestType 0x41/0xc1), +not UVC-class controls, so the kernel uvcvideo driver does not expose them. To +send them, libusb must talk to the device directly, which conflicts with +streaming the same interface via v4l2 at the same time. Expect to claim/detach +the interface here and do the video capture through libuvc rather than v4l2 once +this path works. That integration is the open task. +""" + +import argparse +import sys +import time + +VID, PID = 0x2BDF, 0x0102 + +# Transport constants (see docs/TC002C-DUO.md). +REQTYPE_WRITE = 0x41 # OUT | vendor | interface +REQTYPE_READ = 0xC1 # IN | vendor | interface +BREQUEST_WRITE = 0x45 # 'E' +BREQUEST_READ = 0x44 # 'D' +WVALUE = 0x0078 +TIMEOUT_MS = 1000 + +# Registers. +REG_CMD = 0x9D00 # command descriptor +REG_PAYLOAD = 0x9D08 # bulk payload +REG_PARAM = 0x1D08 # command parameters +REG_STATUS = 0x0200 # status poll + + +class InfiRayDevice: + def __init__(self, dry_run=True): + self.dry_run = dry_run + self.dev = None + self.usb = None + + def open(self): + import usb.core # pyusb; only needed for a real run + self.usb = usb.core + dev = usb.core.find(idVendor=VID, idProduct=PID) + if dev is None: + raise RuntimeError(f"No {VID:04x}:{PID:04x} device found") + self.dev = dev + if not self.dry_run: + # The kernel uvcvideo driver owns the interface; vendor control + # transfers need it detached. (This is what stops simultaneous v4l2.) + for cfg in dev: + for intf in cfg: + n = intf.bInterfaceNumber + if dev.is_kernel_driver_active(n): + dev.detach_kernel_driver(n) + return self + + def write_reg(self, reg, data): + """Vendor WRITE of `data` bytes to register `reg`.""" + data = bytes(data) + if self.dry_run: + print(f" WRITE reg=0x{reg:04x} <- {data.hex(' ')}") + return len(data) + return self.dev.ctrl_transfer(REQTYPE_WRITE, BREQUEST_WRITE, WVALUE, reg, data, TIMEOUT_MS) + + def read_reg(self, reg, length): + """Vendor READ of `length` bytes from register `reg`.""" + if self.dry_run: + print(f" READ reg=0x{reg:04x} ({length} bytes) [dry-run -> zeros]") + return bytes(length) + return bytes(self.dev.ctrl_transfer(REQTYPE_READ, BREQUEST_READ, WVALUE, reg, length, TIMEOUT_MS)) + + def poll_status(self, tries=1000): + """Poll REG_STATUS until the command completes (bit0 clear). Returns the + last status byte; values > 3 indicate an error per the SDK.""" + for _ in range(tries): + status = self.read_reg(REG_STATUS, 1)[0] + if self.dry_run: + return 0 + if status & 1: # busy + continue + if (status >> 1) & 1 and status > 3: + raise RuntimeError(f"command error, status=0x{status:02x}") + return status + raise TimeoutError("status poll timed out") + + def preview_start(self, width, height, fps=25, mode=0, source=0, path=0): + """The decoded `preview_start` command (see docs/TC002C-DUO.md). + + NOTE: starts the sensor preview; it does not by itself switch the UVC + stream to 16-bit. The data-flow-mode command is still to be recovered. + """ + # 1. command descriptor: opcode 0xc10f, 8-byte arg block. + self.write_reg(REG_CMD, bytes([0x0F, 0xC1, 0, 0, 0, 0, 0, 0x08])) + # 2. parameter block (width/height big-endian). + params = bytes([ + fps & 0xFF, + 0x80 if source == 1 else 0x00, + (width >> 8) & 0xFF, width & 0xFF, + (height >> 8) & 0xFF, height & 0xFF, + mode & 0xFF, + path & 0xFF, + ]) + self.write_reg(REG_PARAM, params) + # 3. wait for completion. + return self.poll_status() + + +def main(argv=None): + p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + p.add_argument("--run", action="store_true", + help="Actually send to the camera (default: dry-run, prints only).") + p.add_argument("--width", type=int, default=256) + p.add_argument("--height", type=int, default=384) + args = p.parse_args(argv) + + dev = InfiRayDevice(dry_run=not args.run) + if args.run: + try: + dev.open() + except Exception as exc: # pyusb missing, no perms, device busy, ... + sys.exit(f"open failed: {exc}\n(Try: pip install pyusb; run as root; unplug other users.)") + print(f"preview_start({args.width}x{args.height}){' [DRY-RUN]' if not args.run else ''}:") + dev.preview_start(args.width, args.height) + print("done.") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/thermalcam.py b/src/thermalcam.py new file mode 100644 index 0000000..2dc43a7 --- /dev/null +++ b/src/thermalcam.py @@ -0,0 +1,712 @@ +#!/usr/bin/env python3 +""" +thermalcam.py - Universal Linux viewer for InfiRay-based USB thermal cameras. + +Originally written by Les Wright for the Topdon TC001 (see tc001v4.2.py), this +version is a camera-agnostic rewrite that also works with newer Topdon/InfiRay +models such as the TC002C Duo. + +These cameras pack a frame as a stack of horizontal bands: a high-res thermal +*image* band (8-bit AGC, neutral chroma) plus a 16-bit *temperature* band (raw +radiometric data, which shows up with extreme chroma when mis-read as YUYV). The +band geometry differs per model: + + * TC001 256x384 : image 256x192 + temp 256x192 (16-bit, /64 Kelvin) + * TC002C Duo 512x484 : temp 256x192 (16-bit, /16 Kelvin) + image 512x384 + +This viewer auto-detects the camera, picks a mode that carries real temperature +data, decodes it, and shows real degrees C. If no radiometric mode is available +it falls back to a clearly-labelled relative scale. See docs/TC002C-DUO.md. +""" + +import argparse +import fcntl +import glob +import os +import re +import struct +import sys +import time + +try: + import cv2 + import numpy as np +except ImportError as exc: # pragma: no cover - environment guard + sys.exit( + f"Missing dependency: {exc.name}. Install with:\n" + " sudo apt-get install python3-opencv (Debian/Ubuntu/Raspberry Pi)\n" + " pip install opencv-python numpy (everything else)" + ) + +# USB vendor IDs known to ship InfiRay-based thermal cameras. 0x2BDF is Topdon. +KNOWN_THERMAL_VIDS = {0x2BDF, 0x0BDA, 0x1514, 0x3474} + +KELVIN = 273.15 # temp_C = raw/scale - 273.15 + +# Exact band geometry per capture resolution, keyed by (width, height): +# temp_rows, image_rows, sensor (w,h), scale (raw units per Kelvin), byte order. +# Fixed geometry is far more robust than per-frame chroma detection, which breaks +# on the occasional desynced/metadata-header frame these cameras emit. +KNOWN_LAYOUTS = { + (256, 384): ((192, 384), (0, 192), (256, 192), 64.0, "le"), # TC001 / P2 family + (512, 484): ((0, 96), (98, 482), (256, 192), 16.0, "le"), # TC002C Duo +} +# Resolutions to try for genuine temperature data, in order of preference. +RADIOMETRIC_MODES = [(256, 384), (512, 484)] +# Plain image modes (no temperature), used only if no radiometric mode works. +IMAGE_MODES = [(512, 384), (256, 392), (256, 192)] + +COLORMAPS = [ + (cv2.COLORMAP_JET, "Jet"), + (cv2.COLORMAP_INFERNO, "Inferno"), + (cv2.COLORMAP_HOT, "Hot"), + (cv2.COLORMAP_MAGMA, "Magma"), + (cv2.COLORMAP_PLASMA, "Plasma"), + (cv2.COLORMAP_BONE, "Bone"), + (cv2.COLORMAP_VIRIDIS, "Viridis"), + (cv2.COLORMAP_RAINBOW, "Rainbow"), +] + + +def is_raspberrypi(): + try: + with open("/sys/firmware/devicetree/base/model", "r") as m: + return "raspberry pi" in m.read().lower() + except OSError: + return False + + +def _sysfs_usb_ids(video_node): + """Return (vid, pid, name) for a /dev/videoN node by walking sysfs, or None.""" + name = os.path.basename(video_node) + base = f"/sys/class/video4linux/{name}" + try: + modalias = open(os.path.join(base, "device", "modalias")).read().strip() + match = re.search(r"v([0-9A-Fa-f]{4})p([0-9A-Fa-f]{4})", modalias) + if not match: + return None + vid, pid = int(match.group(1), 16), int(match.group(2), 16) + except OSError: + return None + try: + product = open(os.path.join(base, "name")).read().strip() + except OSError: + product = "" + return vid, pid, product + + +def find_thermal_device(): + """Locate the thermal camera's /dev/videoN, preferring known thermal VIDs.""" + nodes = sorted(glob.glob("/dev/video*"), + key=lambda p: int(p[10:]) if p[10:].isdigit() else 999) + fallback = None + for node in nodes: + ids = _sysfs_usb_ids(node) + if ids is None: + continue + vid, _pid, product = ids + if vid in KNOWN_THERMAL_VIDS and _can_capture(node): + return node + if fallback is None and "USB Camera" in product and _can_capture(node): + fallback = node + return fallback + + +def supported_resolutions(device): + """Set of (w, h) YUYV modes the device advertises, via VIDIOC_ENUM_FRAMESIZES. + + Used to avoid even *opening* an unsupported resolution: doing so desyncs these + cameras' subsequent streams. Returns None if enumeration fails. + """ + yuyv = ord("Y") | (ord("U") << 8) | (ord("Y") << 16) | (ord("V") << 24) + # _IOWR('V', 74, sizeof(struct v4l2_frmsizeenum)==44) + vidioc_enum_framesizes = (3 << 30) | (44 << 16) | (ord("V") << 8) | 74 + try: + fd = os.open(device, os.O_RDWR) + except OSError: + return None + sizes = set() + try: + for idx in range(128): + buf = struct.pack("III", idx, yuyv, 0) + b"\x00" * 32 + try: + res = fcntl.ioctl(fd, vidioc_enum_framesizes, buf) + except OSError: + break + _i, _fmt, ftype, w, h = struct.unpack("IIIII", res[:20]) + if ftype != 1: # 1 == V4L2_FRMSIZE_TYPE_DISCRETE + break + sizes.add((w, h)) + finally: + os.close(fd) + return sizes or None + + +def _can_capture(node): + cap = cv2.VideoCapture(node, cv2.CAP_V4L2) + try: + if not cap.isOpened(): + return False + cap.set(cv2.CAP_PROP_CONVERT_RGB, 0) + ok, _ = cap.read() + return ok + finally: + cap.release() + + +# -- frame parsing -------------------------------------------------------- +def _largest_run(mask): + """Return (start, end) of the longest contiguous True run in a 1-D mask.""" + best_len = best = 0 + start = None + for i, v in enumerate(mask): + if v and start is None: + start = i + elif not v and start is not None: + if i - start > best_len: + best_len, best = i - start, (start, i) + start = None + if start is not None and len(mask) - start > best_len: + best = (start, len(mask)) + return best or None + + +def classify_bands(raw, swap=False): + """Split a raw YUYV frame into (image_band, temp_band) row ranges. + + The image band reads as a real YUYV picture (chroma near the neutral 128); + the temperature band is 16-bit data that reads as extreme chroma. Telemetry + rows (luma pinned to 0/255) are excluded. Either range may be None. + """ + chroma = raw[..., 1].mean(axis=1) + luma = raw[..., 0].mean(axis=1) + telemetry = (luma < 8) | (luma > 248) + is_image = (np.abs(chroma - 128.0) < 40) & ~telemetry + is_temp = ~is_image & ~telemetry + image_band = _largest_run(is_image) + temp_band = _largest_run(is_temp) + if swap: + image_band, temp_band = temp_band, image_band + return image_band, temp_band + + +class Frame: + """One parsed frame: a display image and (optionally) a temperature map, + both as float32 arrays at the image band's native resolution. + + `valid` is False when a known radiometric layout produced an implausible + temperature field - i.e. the camera emitted a desynced/garbage frame that the + caller should skip rather than display. + """ + + def __init__(self, image, temp, is_real, valid=True): + self.image = image # (H, W) luma + self.temp = temp # (H, W) degrees C, or None + self.is_real = is_real # True when temp is genuine radiometric + self.valid = valid + self.h, self.w = image.shape + + +def _plausible(temp): + """True for a physically sensible temperature field. Rejects desynced frames, + whose shifted image/telemetry bytes decode to a big chunk of absurd values, + while still allowing a small genuinely-hot region (a soldering iron, a flame).""" + median = float(np.median(temp)) + if not -25.0 <= median <= 160.0: + return False + garbage = float(np.mean((temp < -40.0) | (temp > 300.0))) + return garbage < 0.02 + + +def _decode_temp(raw, temp_rows, sensor, scale, order): + """Decode a temperature band into a sensor-resolution °C array, or None.""" + t0, t1 = temp_rows + sw, sh = sensor + band = np.ascontiguousarray(raw[t0:t1]).reshape(-1) + lo, hi = band[0::2].astype(np.uint16), band[1::2].astype(np.uint16) + u16 = (lo + (hi << 8)) if order == "le" else (hi + (lo << 8)) + if u16.size < sw * sh: + return None + temp = u16[:sw * sh].astype(np.float32) / float(scale) - KELVIN + temp = temp.reshape(sh, sw) + return temp if _plausible(temp) else None + + +def parse_frame(raw, scale=None, order="le", swap=False): + """Decode a raw YUYV frame into a Frame (image + optional temperature).""" + h, w, _ = raw.shape + layout = KNOWN_LAYOUTS.get((w, h)) + if layout is not None: + temp_rows, image_rows, sensor, def_scale, def_order = layout + if swap: + temp_rows, image_rows = image_rows, temp_rows + i0, i1 = image_rows + image = raw[i0:i1, :, 0].astype(np.float32) + temp = _decode_temp(raw, temp_rows, sensor, scale or def_scale, order or def_order) + if temp is None: + # Known radiometric layout but bad data this frame -> skip it. + return Frame(image, None, False, valid=False) + if temp.shape != image.shape: + temp = cv2.resize(temp, (image.shape[1], image.shape[0]), interpolation=cv2.INTER_NEAREST) + return Frame(image, temp, True) + + # Unknown resolution: fall back to chroma-based band detection (best effort). + image_band, temp_band = classify_bands(raw, swap) + if image_band is None: + return Frame(raw[..., 0].astype(np.float32), None, False) + i0, i1 = image_band + image = raw[i0:i1, :, 0].astype(np.float32) + if temp_band is None or not scale: + return Frame(image, None, False) + ih = i1 - i0 + npix = (temp_band[1] - temp_band[0]) * w + sensor = (w, ih) if ih * w <= npix else (w // 2, ih // 2) + temp = _decode_temp(raw, temp_band, sensor, scale, order) + if temp is None: + return Frame(image, None, False) + if temp.shape != image.shape: + temp = cv2.resize(temp, (image.shape[1], image.shape[0]), interpolation=cv2.INTER_NEAREST) + return Frame(image, temp, True) + + +class ThermalApp: + def __init__(self, args): + self.args = args + self.is_pi = is_raspberrypi() + + self.alpha = args.contrast + self.colormap = 0 + self.blur = 0 + self.threshold = 2.0 + self.hud = True + self.recording = False + self.video_out = None + self.rec_start = 0.0 + self.elapsed = "00:00:00" + self.snaptime = "None" + self.fullscreen = False + self.swap = args.swap_halves + self.smooth = args.smooth + self._ema = None + + self.temp_scale = None # raw-units-per-Kelvin, or None for relative + self.temp_order = args.temp_order + self.temp_offset = args.temp_offset + self.radiometric = False + + self.cap = None + self.native_w = self.native_h = 0 + self.scale = args.scale # finalised once native size is known + + # -- camera lifecycle ------------------------------------------------- + @staticmethod + def _open_at(device, w, h): + cap = cv2.VideoCapture(device, cv2.CAP_V4L2) + if not cap.isOpened(): + return None, None + cap.set(cv2.CAP_PROP_CONVERT_RGB, 0) + cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"YUYV")) + cap.set(cv2.CAP_PROP_FRAME_WIDTH, w) + cap.set(cv2.CAP_PROP_FRAME_HEIGHT, h) + # Without this OpenCV hands back stale buffered frames that are desynced + # mid-frame (the band layout shifts and the parse fails). + cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) + # Read a few frames: the first ones after a mode change can be warm-up + # garbage that wouldn't classify correctly. + raw = None + for _ in range(5): + ok, frame = cap.read() + raw = ThermalApp._as_raw(frame) if ok else None + if raw is None or raw.shape[1] != w or raw.shape[0] != h: + cap.release() + return None, None + return cap, raw + + def _probe_radiometric(self, cap, raw, scale): + """True if this mode yields genuine temperature data. Checks the probe + frame plus a few live ones, since the occasional frame desyncs.""" + if parse_frame(raw, scale, self.temp_order, self.swap).is_real: + return True + for _ in range(10): + ok, frame = cap.read() + r = self._as_raw(frame) if ok else None + if r is not None and parse_frame(r, scale, self.temp_order, self.swap).is_real: + return True + return False + + def open_camera(self): + device = self.args.device or find_thermal_device() + if device is None: + sys.exit( + "No thermal camera found. Plug it in and check it appears in\n" + " v4l2-ctl --list-devices\n" + "then pass it explicitly, e.g. --device /dev/video2" + ) + if device.isdigit(): + device = "/dev/video" + device + + forced = self.args.temp_scale != "auto" + forced_scale = float(self.args.temp_scale) if forced else None + + # Only probe modes the camera actually advertises - opening an + # unsupported resolution desyncs the stream that follows it. + supported = supported_resolutions(device) + ok = (lambda wh: supported is None or wh in supported) + + pick = None # (cap, frame, scale, order) + if self.args.resolution: + w, h = (int(x) for x in self.args.resolution.lower().split("x")) + cap, raw = self._open_at(device, w, h) + if cap is not None: + pick = (cap, raw, forced_scale, self.temp_order) + else: + # Prefer a mode that yields genuine temperature data. Probe a few + # frames since the occasional one desyncs. + for w, h in RADIOMETRIC_MODES: + if not ok((w, h)): + continue + cap, raw = self._open_at(device, w, h) + if cap is None: + continue + scale = forced_scale if forced else KNOWN_LAYOUTS[(w, h)][3] + if self._probe_radiometric(cap, raw, scale): + pick = (cap, raw, scale, self.temp_order) + break + cap.release() + if pick is None: + for w, h in IMAGE_MODES: + if not ok((w, h)): + continue + cap, raw = self._open_at(device, w, h) + if cap is not None: + pick = (cap, raw, forced_scale, self.temp_order) + break + + if pick is None: + sys.exit(f"{device}: no usable YUYV mode. Try --resolution 256x192.") + + self.cap, raw, self.temp_scale, self.temp_order = pick + self.radiometric = self.temp_scale is not None + if not self.radiometric: + self.temp_scale = None + frame = parse_frame(raw, self.temp_scale, self.temp_order, self.swap) + self.native_h, self.native_w = frame.h, frame.w + if not self.args.scale: # auto: aim for ~768px wide + self.scale = max(1, min(5, round(768 / self.native_w))) + print(f"Opened {device}: frame {raw.shape[1]}x{raw.shape[0]}, " + f"image {self.native_w}x{self.native_h}, " + f"temperature={'REAL °C' if self.radiometric else 'relative (uncalibrated)'}") + if self.radiometric: + print(f" radiometric: scale=1/{self.temp_scale:g} K, order={self.temp_order}, " + f"offset={self.temp_offset:+.1f} °C (adjust with [ and ])") + else: + print(" no 16-bit data in this stream - see docs/TC002C-DUO.md") + + @staticmethod + def _as_raw(frame): + if frame is None: + return None + if frame.ndim == 3 and frame.shape[2] == 2: + return frame + if frame.ndim == 2 and frame.shape[1] % 2 == 0: + return frame.reshape(frame.shape[0], frame.shape[1] // 2, 2) + return None + + # -- per-frame processing -------------------------------------------- + @staticmethod + def _destripe(luma): + """Remove per-row and per-column fixed-pattern offsets while keeping + smooth gradients (uncorrected previews carry 2D FPN that a stretch + turns into stripes; we can't run the camera's shutter NUC over UVC).""" + row = np.median(luma, axis=1) + luma = luma - (row - cv2.blur(row.reshape(-1, 1), (1, 9)).ravel())[:, None] + col = np.median(luma, axis=0) + return luma - (col - cv2.blur(col.reshape(1, -1), (9, 1)).ravel())[None, :] + + def _display_luma(self, image): + """Turn the raw image band into a stretched, denoised 8-bit luma.""" + luma = image.copy() + if self.smooth > 0: + if self._ema is None or self._ema.shape != luma.shape: + self._ema = luma + else: + self._ema = self.smooth * self._ema + (1.0 - self.smooth) * luma + luma = self._ema + if not self.args.no_destripe: + luma = self._destripe(luma) + if not self.args.no_stretch: + lo, hi = np.percentile(luma, [1, 99]) + span = max(hi - lo, 45.0) + luma = np.clip((luma - lo) * (255.0 / span), 0, 255) + return luma.astype(np.uint8) + + def render(self, raw): + frame = parse_frame(raw, self.temp_scale, self.temp_order, self.swap) + if not frame.valid: + return None # desynced/garbage frame - caller reuses the last one + is_real = frame.is_real + if is_real: + temp_map = frame.temp + self.temp_offset + unit = "C" + else: + temp_map = self.args.rel_gain * frame.image + self.args.rel_offset + unit = "lvl" + + h, w = frame.h, frame.w + center = temp_map[h // 2, w // 2] + inner = temp_map[2:-2, 2:-2] + max_pos = tuple(p + 2 for p in np.unravel_index(np.argmax(inner), inner.shape)) + min_pos = tuple(p + 2 for p in np.unravel_index(np.argmin(inner), inner.shape)) + tmax, tmin = float(temp_map[max_pos]), float(temp_map[min_pos]) + tavg = float(temp_map.mean()) + + luma = self._display_luma(frame.image) + luma = cv2.convertScaleAbs(luma, alpha=self.alpha) + new_w, new_h = w * self.scale, h * self.scale + luma = cv2.resize(luma, (new_w, new_h), interpolation=cv2.INTER_CUBIC) + if self.blur > 0: + luma = cv2.blur(luma, (self.blur, self.blur)) + + cmap, cmap_name = COLORMAPS[self.colormap] + heatmap = cv2.applyColorMap(luma, cmap) + + self._draw_crosshair(heatmap, new_w, new_h, center, unit) + self._draw_markers(heatmap, max_pos, min_pos, tmax, tmin, tavg, unit, w, h, new_w, new_h) + if self.hud: + self._draw_hud(heatmap, cmap_name, tavg, unit, is_real) + return heatmap + + def _draw_crosshair(self, img, w, h, center_temp, unit): + cx, cy = w // 2, h // 2 + for color, thick in (((255, 255, 255), 2), ((0, 0, 0), 1)): + cv2.line(img, (cx, cy + 20), (cx, cy - 20), color, thick) + cv2.line(img, (cx + 20, cy), (cx - 20, cy), color, thick) + label = f"{center_temp:.1f} {unit}" + cv2.putText(img, label, (cx + 10, cy - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 0, 0), 2, cv2.LINE_AA) + cv2.putText(img, label, (cx + 10, cy - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 255), 1, cv2.LINE_AA) + + def _draw_markers(self, img, max_pos, min_pos, tmax, tmin, tavg, unit, w, h, nw, nh): + sx, sy = nw / w, nh / h + for (row, col), col_bgr, value in ((max_pos, (0, 0, 255), tmax), (min_pos, (255, 0, 0), tmin)): + if abs(value - tavg) < self.threshold: + continue + x, y = int(col * sx), int(row * sy) + cv2.circle(img, (x, y), 5, (0, 0, 0), 2) + cv2.circle(img, (x, y), 5, col_bgr, -1) + text = f"{value:.1f} {unit}" + cv2.putText(img, text, (x + 10, y + 5), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 0, 0), 2, cv2.LINE_AA) + cv2.putText(img, text, (x + 10, y + 5), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 255), 1, cv2.LINE_AA) + + def _draw_hud(self, img, cmap_name, tavg, unit, is_real): + cv2.rectangle(img, (0, 0), (180, 130), (0, 0, 0), -1) + rows = [ + f"Avg: {tavg:.1f} {unit}", + f"Threshold: {self.threshold:.0f}", + f"Colormap: {cmap_name}", + f"Blur: {self.blur} Scale: {self.scale}", + f"Contrast: {self.alpha:.1f}", + f"Snapshot: {self.snaptime}", + ] + for i, text in enumerate(rows): + cv2.putText(img, text, (8, 14 + i * 14), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 255, 255), 1, cv2.LINE_AA) + mode = "RADIOMETRIC" if is_real else "RELATIVE (uncal)" + mode_color = (0, 255, 0) if is_real else (0, 165, 255) + cv2.putText(img, mode, (8, 14 + len(rows) * 14), cv2.FONT_HERSHEY_SIMPLEX, 0.4, mode_color, 1, cv2.LINE_AA) + rec_color = (40, 40, 255) if self.recording else (200, 200, 200) + cv2.putText(img, f"REC {self.elapsed}", (8, 14 + (len(rows) + 1) * 14), cv2.FONT_HERSHEY_SIMPLEX, 0.4, rec_color, 1, cv2.LINE_AA) + + # -- recording / snapshots ------------------------------------------- + def start_recording(self, frame_size): + now = time.strftime("%Y%m%d--%H%M%S") + self.video_out = cv2.VideoWriter(now + "-output.avi", cv2.VideoWriter_fourcc(*"XVID"), 25, frame_size) + self.recording = True + self.rec_start = time.time() + + def stop_recording(self): + self.recording = False + self.elapsed = "00:00:00" + if self.video_out is not None: + self.video_out.release() + self.video_out = None + + def snapshot(self, heatmap): + now = time.strftime("%Y%m%d-%H%M%S") + cv2.imwrite(f"thermal-{now}.png", heatmap) + self.snaptime = time.strftime("%H:%M:%S") + print(f"Saved thermal-{now}.png") + + # -- main loop -------------------------------------------------------- + def run(self): + self.open_camera() + win = "Thermal" + cv2.namedWindow(win, cv2.WINDOW_GUI_NORMAL) + cv2.resizeWindow(win, self.native_w * self.scale, self.native_h * self.scale) + self._print_keys() + while True: + ok, frame = self.cap.read() + if not ok or frame is None: + time.sleep(0.05) + continue + raw = self._as_raw(frame) + if raw is None: + continue + heatmap = self.render(raw) + if heatmap is None: # desynced frame; show the previous one + continue + cv2.imshow(win, heatmap) + if self.recording and self.video_out is not None: + self.elapsed = time.strftime("%H:%M:%S", time.gmtime(time.time() - self.rec_start)) + self.video_out.write(heatmap) + if self.handle_key(cv2.waitKey(1) & 0xFF, heatmap): + break + self.cleanup() + + def handle_key(self, key, heatmap): + if key in (ord("q"), 27): + return True + elif key == ord("a"): + self.blur += 1 + elif key == ord("z"): + self.blur = max(0, self.blur - 1) + elif key == ord("s"): + self.threshold += 1 + elif key == ord("x"): + self.threshold = max(0, self.threshold - 1) + elif key == ord("d"): + self.scale = min(5, self.scale + 1) + self._resize_window() + elif key == ord("c"): + self.scale = max(1, self.scale - 1) + self._resize_window() + elif key == ord("f"): + self.alpha = min(3.0, round(self.alpha + 0.1, 1)) + elif key == ord("v"): + self.alpha = max(0.0, round(self.alpha - 0.1, 1)) + elif key == ord("m"): + self.colormap = (self.colormap + 1) % len(COLORMAPS) + elif key == ord("h"): + self.hud = not self.hud + elif key == ord("n"): + self.swap = not self.swap + elif key == ord("g"): + self.smooth = 0.0 if self.smooth > 0 else (self.args.smooth or 0.5) + elif key == ord("w"): + self._set_fullscreen(False) + elif key == ord("e"): + self._set_fullscreen(True) + elif key == ord("p"): + self.snapshot(heatmap) + elif key == ord("r") and not self.recording: + self.start_recording((heatmap.shape[1], heatmap.shape[0])) + elif key == ord("t"): + self.stop_recording() + elif key == ord("]"): # calibration: nudge temperature offset / relative gain + if self.radiometric: + self.temp_offset = round(self.temp_offset + 0.5, 1) + else: + self.args.rel_gain = round(self.args.rel_gain + 0.05, 3) + elif key == ord("["): + if self.radiometric: + self.temp_offset = round(self.temp_offset - 0.5, 1) + else: + self.args.rel_gain = round(self.args.rel_gain - 0.05, 3) + return False + + def _resize_window(self): + if not self.fullscreen and not self.is_pi: + cv2.resizeWindow("Thermal", self.native_w * self.scale, self.native_h * self.scale) + + def _set_fullscreen(self, on): + self.fullscreen = on + cv2.setWindowProperty("Thermal", cv2.WND_PROP_FULLSCREEN, + cv2.WINDOW_FULLSCREEN if on else cv2.WINDOW_NORMAL) + if not on: + self._resize_window() + + def cleanup(self): + if self.recording: + self.stop_recording() + if self.cap is not None: + self.cap.release() + cv2.destroyAllWindows() + + def _print_keys(self): + print( + "\nKey bindings:\n" + " a/z blur +/- s/x min-max label threshold +/-\n" + " d/c scale +/- f/v contrast +/-\n" + " m cycle colormap h toggle HUD\n" + " n swap bands g toggle temporal smoothing\n" + " e/w fullscreen on/off\n" + " r/t record / stop p snapshot\n" + " [/] temperature offset (or relative gain) calibration\n" + " q/ESC quit\n" + ) + + # -- headless self-test ---------------------------------------------- + def selftest(self, frames): + self.open_camera() + last = None + for i in range(frames): + ok, frame = self.cap.read() + if not ok or frame is None: + continue + raw = self._as_raw(frame) + if raw is None: + continue + f = parse_frame(raw, self.temp_scale, self.temp_order, self.swap) + rendered = self.render(raw) + if rendered is not None: + last = rendered + if i == frames - 1 and f.valid: + if f.is_real: + t = f.temp + self.temp_offset + print(f"frame {i}: REAL temps center={t[f.h//2, f.w//2]:.1f}C " + f"min={t.min():.1f} max={t.max():.1f}") + else: + print(f"frame {i}: relative (no radiometric data)") + self.cap.release() + if last is None: + print("SELFTEST FAILED: no frames decoded") + return 1 + cv2.imwrite("selftest-thermal.png", last) + print(f"SELFTEST OK: wrote selftest-thermal.png ({last.shape[1]}x{last.shape[0]})") + return 0 + + +def parse_args(argv=None): + p = argparse.ArgumentParser(description="Universal Linux thermal camera viewer (TC001, TC002C Duo, InfiRay).") + p.add_argument("--device", help="Video device, e.g. /dev/video2 or 2. Default: auto-detect.") + p.add_argument("--resolution", help="Force capture WxH, e.g. 512x484. Default: best radiometric mode.") + p.add_argument("--scale", type=int, default=0, help="Display upscaling 1-5 (0 = auto from image size).") + p.add_argument("--contrast", type=float, default=1.0, help="Initial contrast (0.0-3.0).") + p.add_argument("--temp-scale", default="auto", + help="Radiometric raw-units-per-Kelvin divisor: 'auto', 64 (TC001), or 16 (Duo).") + p.add_argument("--temp-order", default="le", choices=["le", "be"], help="16-bit byte order.") + p.add_argument("--temp-offset", type=float, default=0.0, + help="°C added to decoded temperatures (calibrate against a known reference; live keys [ ]).") + p.add_argument("--rel-gain", type=float, default=0.2, help="Relative-mode level->pseudo-temp gain.") + p.add_argument("--rel-offset", type=float, default=0.0, help="Relative-mode offset.") + p.add_argument("--no-stretch", action="store_true", help="Disable the display contrast stretch.") + p.add_argument("--no-destripe", action="store_true", help="Disable fixed-pattern-noise removal.") + p.add_argument("--smooth", type=float, default=0.5, help="Temporal smoothing 0.0-0.9 (0 disables). Live: 'g'.") + p.add_argument("--swap-halves", action="store_true", help="Swap which band is image vs data. Live: 'n'.") + p.add_argument("--selftest", type=int, metavar="N", help="Headless: grab N frames, save a snapshot, exit.") + return p.parse_args(argv) + + +def main(argv=None): + args = parse_args(argv) + app = ThermalApp(args) + if args.selftest: + return app.selftest(args.selftest) + try: + app.run() + except KeyboardInterrupt: + app.cleanup() + print("\nBye.") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) From 33fb0ff1aba133c363339220c511212522125530 Mon Sep 17 00:00:00 2001 From: Aron Novak Date: Thu, 11 Jun 2026 10:55:44 +0200 Subject: [PATCH 2/3] Add MJPEG bridge, orientation controls, reusable frame API - src/thermal_bridge.py: serve the parsed/colormapped thermal image as an MJPEG-over-HTTP stream so any app (OpenCV, ffmpeg, browser) can consume it like a webcam, no dependency. - thermalcam: --rotate / --flip (live 'o' key) and a clean colormap_frame() method (overlay-free) used by the bridge. Co-Authored-By: Claude Opus 4.8 (1M context) --- README.md | 26 ++++++ src/thermal_bridge.py | 178 ++++++++++++++++++++++++++++++++++++++++++ src/thermalcam.py | 50 ++++++++++-- 3 files changed, 249 insertions(+), 5 deletions(-) create mode 100644 src/thermal_bridge.py diff --git a/README.md b/README.md index cb25716..692723b 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,32 @@ Useful options: `--temp-offset N` to correct the absolute °C, `--temp-scale · `h` HUD · `n` swap bands · `g` temporal smoothing · `e/w` fullscreen on/off · `r/t` record/stop · `p` snapshot · `[`/`]` temperature-offset calibration · `q`/ESC quit +### Feed it to other apps (MJPEG bridge) + +`src/thermal_bridge.py` re-serves the parsed, colormapped thermal image as an +MJPEG-over-HTTP stream, so any app can consume it like a webcam — no kernel +module, no root, no dependency: + +```bash +python3 src/thermal_bridge.py --colormap inferno --rotate 90 # serves http://127.0.0.1:8090 +``` + +```python +import cv2 +cap = cv2.VideoCapture("http://127.0.0.1:8090/stream.mjpg") # OpenCV, ffmpeg, browsers, ... +``` + +Options mirror the viewer (`--colormap`, `--rotate`, `--flip`, `--width/--height`, +`--temp-scale`, `--device`, …). Open `http://127.0.0.1:8090/` in a browser for a +live preview. + +### Orientation + +The sensor is landscape (4:3). Rotate/mirror in software with `--rotate +{0,90,180,270}` and `--flip {none,h,v}`, or live with the `o` key. (The InfiRay +sensor also has a hardware mirror/flip, reachable only via the vendor command +protocol — software rotation is simpler.) + ### Camera support | Camera | Image | Temperature | diff --git a/src/thermal_bridge.py b/src/thermal_bridge.py new file mode 100644 index 0000000..67c8ceb --- /dev/null +++ b/src/thermal_bridge.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 +""" +thermal_bridge.py - Serve the thermal camera as an MJPEG-over-HTTP stream. + +Re-exposes an InfiRay/Topdon thermal camera (parsed + colormapped by thermalcam) +as a plain MJPEG stream that ANY app can open like a webcam - no kernel module, +no root, no reboot, no code dependency: + + python3 src/thermal_bridge.py --colormap inferno --rotate 90 + + # then, in any consumer: + OpenCV: cv2.VideoCapture("http://127.0.0.1:8090/stream.mjpg") + ffmpeg: ffmpeg -i http://127.0.0.1:8090/stream.mjpg ... + browser: http://127.0.0.1:8090/ (preview page) + +This exists because the camera's default UVC output is a torn/garbage frame; the +bridge does the 512x484 radiometric parse and hands out a clean thermal image. +""" + +import argparse +import sys +import threading +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer + +import cv2 + +import thermalcam + +BOUNDARY = "frame" + +# Latest encoded JPEG, published by the capture thread and consumed by clients. +_latest = {"jpeg": None} +_new_frame = threading.Condition() +_stop = threading.Event() + + +def capture_loop(app, out_size, quality): + """Grab frames, colormap them, publish the newest JPEG. Skips desynced frames.""" + encode = [cv2.IMWRITE_JPEG_QUALITY, quality] + while not _stop.is_set(): + ok, frame = app.cap.read() + raw = app._as_raw(frame) if ok else None + if raw is None: + continue + bgr = app.colormap_frame(raw) + if bgr is None: # desynced frame; keep the previous one + continue + if out_size is not None: + bgr = cv2.resize(bgr, out_size, interpolation=cv2.INTER_AREA) + ok, buf = cv2.imencode(".jpg", bgr, encode) + if not ok: + continue + with _new_frame: + _latest["jpeg"] = buf.tobytes() + _new_frame.notify_all() + + +class Handler(BaseHTTPRequestHandler): + protocol_version = "HTTP/1.0" + + def log_message(self, *a): # quiet + pass + + def do_GET(self): + if self.path.startswith("/stream") or self.path.rstrip("/") == "": + if self.path.rstrip("/") == "": + return self._page() + return self._stream() + if self.path.startswith("/snapshot"): + return self._snapshot() + self.send_error(404) + + def _page(self): + html = (b"" + b"") + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.send_header("Content-Length", str(len(html))) + self.end_headers() + self.wfile.write(html) + + def _snapshot(self): + with _new_frame: + _new_frame.wait(timeout=2.0) + jpeg = _latest["jpeg"] + if jpeg is None: + return self.send_error(503) + self.send_response(200) + self.send_header("Content-Type", "image/jpeg") + self.send_header("Content-Length", str(len(jpeg))) + self.end_headers() + self.wfile.write(jpeg) + + def _stream(self): + self.send_response(200) + self.send_header("Content-Type", f"multipart/x-mixed-replace; boundary={BOUNDARY}") + self.send_header("Cache-Control", "no-cache") + self.end_headers() + try: + while not _stop.is_set(): + with _new_frame: + _new_frame.wait(timeout=2.0) + jpeg = _latest["jpeg"] + if jpeg is None: + continue + self.wfile.write(b"--" + BOUNDARY.encode() + b"\r\n") + self.wfile.write(b"Content-Type: image/jpeg\r\n") + self.wfile.write(f"Content-Length: {len(jpeg)}\r\n\r\n".encode()) + self.wfile.write(jpeg) + self.wfile.write(b"\r\n") + except (BrokenPipeError, ConnectionResetError): + pass + + +def build_app(args): + """Open the camera through thermalcam with the requested settings.""" + names = [n.lower() for _c, n in thermalcam.COLORMAPS] + if args.colormap.lower() not in names: + sys.exit(f"--colormap must be one of: {', '.join(names)}") + + tc_argv = ["--scale", "1", "--rotate", str(args.rotate), "--flip", args.flip, + "--temp-scale", args.temp_scale, "--temp-order", args.temp_order] + if args.device: + tc_argv += ["--device", args.device] + if args.resolution: + tc_argv += ["--resolution", args.resolution] + if args.no_destripe: + tc_argv += ["--no-destripe"] + tc_argv += ["--smooth", str(args.smooth)] + + app = thermalcam.ThermalApp(thermalcam.parse_args(tc_argv)) + app.colormap = names.index(args.colormap.lower()) + app.open_camera() + return app + + +def main(argv=None): + p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + p.add_argument("--host", default="127.0.0.1", help="Bind address (0.0.0.0 to expose on the LAN).") + p.add_argument("--port", type=int, default=8090) + p.add_argument("--colormap", default="inferno", help="Palette (jet, inferno, hot, ...). Default inferno.") + p.add_argument("--width", type=int, help="Output width (default: camera native).") + p.add_argument("--height", type=int, help="Output height (default: camera native).") + p.add_argument("--quality", type=int, default=85, help="JPEG quality 1-100.") + p.add_argument("--device", help="Video device. Default: auto-detect.") + p.add_argument("--resolution", help="Force capture WxH.") + p.add_argument("--rotate", type=int, default=0, choices=[0, 90, 180, 270]) + p.add_argument("--flip", default="none", choices=["none", "h", "v"]) + p.add_argument("--temp-scale", default="auto") + p.add_argument("--temp-order", default="le", choices=["le", "be"]) + p.add_argument("--no-destripe", action="store_true") + p.add_argument("--smooth", type=float, default=0.5) + args = p.parse_args(argv) + + app = build_app(args) + out_size = (args.width, args.height) if args.width and args.height else None + + worker = threading.Thread(target=capture_loop, args=(app, out_size, args.quality), daemon=True) + worker.start() + + server = ThreadingHTTPServer((args.host, args.port), Handler) + url = f"http://{args.host}:{args.port}" + print(f"Thermal MJPEG stream at {url}/stream.mjpg (preview: {url}/ snapshot: {url}/snapshot.jpg)") + print("Open it from any app, e.g.:") + print(f" python3 -c \"import cv2; c=cv2.VideoCapture('{url}/stream.mjpg'); print(c.read()[0])\"") + try: + server.serve_forever() + except KeyboardInterrupt: + pass + finally: + _stop.set() + server.shutdown() + app.cap.release() + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/thermalcam.py b/src/thermalcam.py index 2dc43a7..ba43f62 100644 --- a/src/thermalcam.py +++ b/src/thermalcam.py @@ -288,6 +288,8 @@ def __init__(self, args): self.swap = args.swap_halves self.smooth = args.smooth self._ema = None + self.rotate = args.rotate # 0/90/180/270 + self.flip = args.flip # none/h/v self.temp_scale = None # raw-units-per-Kelvin, or None for relative self.temp_order = args.temp_order @@ -440,19 +442,36 @@ def _display_luma(self, image): luma = np.clip((luma - lo) * (255.0 / span), 0, 255) return luma.astype(np.uint8) + def _orient(self, arr): + """Apply the current rotation/flip to an image or temperature array.""" + if arr is None: + return None + if self.rotate == 90: + arr = cv2.rotate(arr, cv2.ROTATE_90_CLOCKWISE) + elif self.rotate == 180: + arr = cv2.rotate(arr, cv2.ROTATE_180) + elif self.rotate == 270: + arr = cv2.rotate(arr, cv2.ROTATE_90_COUNTERCLOCKWISE) + if self.flip == "h": + arr = cv2.flip(arr, 1) + elif self.flip == "v": + arr = cv2.flip(arr, 0) + return arr + def render(self, raw): frame = parse_frame(raw, self.temp_scale, self.temp_order, self.swap) if not frame.valid: return None # desynced/garbage frame - caller reuses the last one is_real = frame.is_real + image = self._orient(frame.image) if is_real: - temp_map = frame.temp + self.temp_offset + temp_map = self._orient(frame.temp) + self.temp_offset unit = "C" else: - temp_map = self.args.rel_gain * frame.image + self.args.rel_offset + temp_map = self.args.rel_gain * image + self.args.rel_offset unit = "lvl" - h, w = frame.h, frame.w + h, w = image.shape center = temp_map[h // 2, w // 2] inner = temp_map[2:-2, 2:-2] max_pos = tuple(p + 2 for p in np.unravel_index(np.argmax(inner), inner.shape)) @@ -460,7 +479,7 @@ def render(self, raw): tmax, tmin = float(temp_map[max_pos]), float(temp_map[min_pos]) tavg = float(temp_map.mean()) - luma = self._display_luma(frame.image) + luma = self._display_luma(image) luma = cv2.convertScaleAbs(luma, alpha=self.alpha) new_w, new_h = w * self.scale, h * self.scale luma = cv2.resize(luma, (new_w, new_h), interpolation=cv2.INTER_CUBIC) @@ -476,6 +495,22 @@ def render(self, raw): self._draw_hud(heatmap, cmap_name, tavg, unit, is_real) return heatmap + def colormap_frame(self, raw): + """A clean colormapped BGR frame (oriented, no crosshair/HUD/markers), + for feeding other apps. Returns None for a desynced frame.""" + frame = parse_frame(raw, self.temp_scale, self.temp_order, self.swap) + if not frame.valid: + return None + image = self._orient(frame.image) + luma = self._display_luma(image) + luma = cv2.convertScaleAbs(luma, alpha=self.alpha) + nw, nh = image.shape[1] * self.scale, image.shape[0] * self.scale + luma = cv2.resize(luma, (nw, nh), interpolation=cv2.INTER_CUBIC) + if self.blur > 0: + luma = cv2.blur(luma, (self.blur, self.blur)) + cmap, _ = COLORMAPS[self.colormap] + return cv2.applyColorMap(luma, cmap) + def _draw_crosshair(self, img, w, h, center_temp, unit): cx, cy = w // 2, h // 2 for color, thick in (((255, 255, 255), 2), ((0, 0, 0), 1)): @@ -590,6 +625,8 @@ def handle_key(self, key, heatmap): self.swap = not self.swap elif key == ord("g"): self.smooth = 0.0 if self.smooth > 0 else (self.args.smooth or 0.5) + elif key == ord("o"): # cycle rotation 0->90->180->270 + self.rotate = (self.rotate + 90) % 360 elif key == ord("w"): self._set_fullscreen(False) elif key == ord("e"): @@ -637,7 +674,7 @@ def _print_keys(self): " d/c scale +/- f/v contrast +/-\n" " m cycle colormap h toggle HUD\n" " n swap bands g toggle temporal smoothing\n" - " e/w fullscreen on/off\n" + " o rotate 90 deg e/w fullscreen on/off\n" " r/t record / stop p snapshot\n" " [/] temperature offset (or relative gain) calibration\n" " q/ESC quit\n" @@ -691,6 +728,9 @@ def parse_args(argv=None): p.add_argument("--no-destripe", action="store_true", help="Disable fixed-pattern-noise removal.") p.add_argument("--smooth", type=float, default=0.5, help="Temporal smoothing 0.0-0.9 (0 disables). Live: 'g'.") p.add_argument("--swap-halves", action="store_true", help="Swap which band is image vs data. Live: 'n'.") + p.add_argument("--rotate", type=int, default=0, choices=[0, 90, 180, 270], + help="Rotate the image clockwise. Live: cycle with 'o'.") + p.add_argument("--flip", default="none", choices=["none", "h", "v"], help="Mirror the image horizontally/vertically.") p.add_argument("--selftest", type=int, metavar="N", help="Headless: grab N frames, save a snapshot, exit.") return p.parse_args(argv) From 2f49a730402198ae9307cfba6ecfc5abdfef3294 Mon Sep 17 00:00:00 2001 From: Aron Novak Date: Thu, 11 Jun 2026 11:12:08 +0200 Subject: [PATCH 3/3] Reject desynced/tiled frames; auto-recover the stream These cameras occasionally emit a horizontally-tiled/desynced frame. The frame validity check only looked at the temperature band, so such frames slipped through as garbage. Add an image-band alignment guard (mean-abs-difference vs a shifted copy; gradients and flat scenes are exempt) so the viewer and the MJPEG bridge never emit them. The bridge also reopens the camera after a run of unusable frames to recover from a stuck desync. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/thermal_bridge.py | 21 +++++++++++++-------- src/thermalcam.py | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 8 deletions(-) diff --git a/src/thermal_bridge.py b/src/thermal_bridge.py index 67c8ceb..a51f759 100644 --- a/src/thermal_bridge.py +++ b/src/thermal_bridge.py @@ -35,16 +35,21 @@ def capture_loop(app, out_size, quality): - """Grab frames, colormap them, publish the newest JPEG. Skips desynced frames.""" + """Grab frames, colormap them, publish the newest JPEG. Skips desynced/tiled + frames, and reopens the camera if the stream wedges into a stuck desync.""" encode = [cv2.IMWRITE_JPEG_QUALITY, quality] + skips = 0 while not _stop.is_set(): ok, frame = app.cap.read() raw = app._as_raw(frame) if ok else None - if raw is None: - continue - bgr = app.colormap_frame(raw) - if bgr is None: # desynced frame; keep the previous one + bgr = app.colormap_frame(raw) if raw is not None else None + if bgr is None: # bad frame; keep serving the previous one + skips += 1 + if skips % 60 == 0: # ~2-4 s of nothing usable -> recover + print(f"[bridge] {skips} unusable frames, reopening camera...", flush=True) + app.reopen() continue + skips = 0 if out_size is not None: bgr = cv2.resize(bgr, out_size, interpolation=cv2.INTER_AREA) ok, buf = cv2.imencode(".jpg", bgr, encode) @@ -160,9 +165,9 @@ def main(argv=None): server = ThreadingHTTPServer((args.host, args.port), Handler) url = f"http://{args.host}:{args.port}" - print(f"Thermal MJPEG stream at {url}/stream.mjpg (preview: {url}/ snapshot: {url}/snapshot.jpg)") - print("Open it from any app, e.g.:") - print(f" python3 -c \"import cv2; c=cv2.VideoCapture('{url}/stream.mjpg'); print(c.read()[0])\"") + print(f"Thermal MJPEG stream at {url}/stream.mjpg (preview: {url}/ snapshot: {url}/snapshot.jpg)", flush=True) + print("Open it from any app, e.g.:", flush=True) + print(f" python3 -c \"import cv2; c=cv2.VideoCapture('{url}/stream.mjpg'); print(c.read()[0])\"", flush=True) try: server.serve_forever() except KeyboardInterrupt: diff --git a/src/thermalcam.py b/src/thermalcam.py index ba43f62..06cb524 100644 --- a/src/thermalcam.py +++ b/src/thermalcam.py @@ -218,6 +218,27 @@ def _plausible(temp): return garbage < 0.02 +def _image_aligned(luma): + """False if the image band looks horizontally tiled - the signature of a + desynced frame whose line stride is wrong (content repeats at width/4 or /2). + + Tested by mean-absolute-difference against a shifted copy: tiled panels are + near-identical (diff ~0), whereas a normal scene - even a smooth left-to-right + temperature gradient - differs substantially when shifted. Flat/near-uniform + scenes are exempt.""" + s = float(luma.std()) + if s < 8.0: # too uniform to judge; treat as fine + return True + w = luma.shape[1] + for period in (w // 4, w // 2): + if period < 2: + continue + diff = float(np.abs(luma[:, period:] - luma[:, :-period]).mean()) + if diff < max(3.0, 0.12 * s): # panels ~identical at this period => tiled + return False + return True + + def _decode_temp(raw, temp_rows, sensor, scale, order): """Decode a temperature band into a sensor-resolution °C array, or None.""" t0, t1 = temp_rows @@ -242,6 +263,8 @@ def parse_frame(raw, scale=None, order="le", swap=False): temp_rows, image_rows = image_rows, temp_rows i0, i1 = image_rows image = raw[i0:i1, :, 0].astype(np.float32) + if not _image_aligned(image): + return Frame(image, None, False, valid=False) temp = _decode_temp(raw, temp_rows, sensor, scale or def_scale, order or def_order) if temp is None: # Known radiometric layout but bad data this frame -> skip it. @@ -388,6 +411,8 @@ def open_camera(self): sys.exit(f"{device}: no usable YUYV mode. Try --resolution 256x192.") self.cap, raw, self.temp_scale, self.temp_order = pick + self._device = device + self._wh = (raw.shape[1], raw.shape[0]) # for reopen() after a desync self.radiometric = self.temp_scale is not None if not self.radiometric: self.temp_scale = None @@ -404,6 +429,19 @@ def open_camera(self): else: print(" no 16-bit data in this stream - see docs/TC002C-DUO.md") + def reopen(self): + """Reopen the camera at the same resolution to recover from a stuck + desync (the stream can wedge into a misaligned state).""" + try: + self.cap.release() + except Exception: + pass + cap, _raw = self._open_at(self._device, *self._wh) + if cap is not None: + self.cap = cap + return True + return False + @staticmethod def _as_raw(frame): if frame is None: