Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ jobs:
build:
name: Build and test
runs-on: ubuntu-latest
permissions:
contents: write

steps:
- name: Check out repository
Expand Down Expand Up @@ -38,6 +40,11 @@ jobs:
name: distributions
path: dist/

- name: Attach Demo.qtm to GitHub Release
env:
GH_TOKEN: ${{ github.token }}
run: gh release upload "${{ github.event.release.tag_name }}" examples/Demo.qtm --repo "${{ github.repository }}" --clobber

publish:
name: Publish to PyPI
runs-on: ubuntu-latest
Expand Down
58 changes: 58 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Project overview

`qtm_rt` is the Qualisys SDK for Python — a client library that implements Qualisys' RealTime (RT) protocol for talking to QTM (Qualisys Track Manager). Published to PyPI as `qtm-rt`. Targets Python 3.10+ and RT protocol version 1.8+. Little-endian only; default port 22223.

## Common commands

```bash
# Setup (from README.md)
python -m venv .venv
source ./.venv/Scripts/activate # Windows bash; on PowerShell: .\.venv\Scripts\Activate.ps1
pip install -r requirements-dev.txt

# Tests
pytest test/
pytest test/qrtconnection_test.py::test_connect_no_loop # single test

# Build sdist + wheel into dist/
python -m build

# Build Sphinx docs into docs/_build/html/
make -C docs html

# Enable debug logging at runtime
QTM_LOGGING=debug python your_script.py
```

Release flow — bump `version` in `pyproject.toml` (and `docs/conf.py`), open a PR, merge to master, then create a GitHub Release on the tag `vX.Y.Z`. Publishing the release fires `.github/workflows/release.yml`, which runs tests, builds the sdist+wheel, twine-checks, uploads to PyPI via Trusted Publishing, and attaches `examples/Demo.qtm` to the release. Docs are still a manual step: build with `make -C docs html`, copy `docs/_build/html/*` into the sibling `../qualisys_python_sdk_gh_pages` checkout **preserving the legacy `v102/`, `v103/`, `v212/` directories**, commit/push that branch.

## Architecture

Everything is `asyncio`-based and built around `qtm_rt.connect()` → returns a `QRTConnection`. The data flow:

- **`qrt.py`** — public API surface. `connect()` opens a TCP connection wrapped in `QTMProtocol`, negotiates the RT protocol version (default `"1.25"`), and returns a `QRTConnection` exposing async methods for every RT command (`stream_frames`, `get_current_frame`, `get_parameters`, `take_control`, `start`, `stop`, `load`, `save`, `calibrate`, etc.). The `@validate_response([...])` decorator asserts the server's reply starts with an expected prefix and raises `QRTCommandException` otherwise.
- **`protocol.py`** — `QTMProtocol` is an `asyncio.Protocol` subclass. Outgoing commands are framed with `RTheader` (`<II`: size + packet type) and one of the `QRTPacketType` variants. Responses are dispatched through a `_handlers` dict keyed on packet type. Single-shot command replies are delivered via a `request_queue` of futures (FIFO via `popleft`, with done-future skipping so a late response to a cancelled caller doesn't crash the transport); streaming data goes to the user-supplied `on_packet` callback instead. The first `streamframes` response is synthesized as `b"Ok"` so callers can await the command even though real data packets arrive on the streaming path.
- **`receiver.py`** — buffers raw bytes, slices them into complete packets using the header size field, converts the type byte to `QRTPacketType`, wraps data/event payloads into `QRTPacket`/`QRTEvent`, and routes to the handler.
- **`packet.py`** — all binary layouts. Uses `struct.Struct` and `namedtuple` for every RT component type (2D/3D/6D/Analog/Force/GazeVector/EyeTracker/Image/Skeleton/Timecode/etc.). `QRTPacket` exposes `get_*` accessors that lazily parse components on demand. `QRTPacketType` and `QRTEvent` are enums matching the protocol byte values.
- **`discovery.py`** — UDP broadcast discovery of QTM instances on the LAN (`Discover`).
- **`control.py`** — `TakeControl`, an async context manager wrapping `take_control`/`release_control`.
- **`reboot.py`** — utility to reboot cameras.

Key invariants when modifying:

- Component and parameter names accepted by `stream_frames` / `get_current_frame` / `get_parameters` are validated against hardcoded allow-lists in `qrt.py` (`_validate_components` and the inline list in `get_parameters`). Adding a new RT component requires updating **both** that list and `packet.py`.
- `request_queue` is FIFO (`popleft`). When a caller's `asyncio.wait_for` cancels the queued future, the next QTM response is dropped rather than delivered to it — keep this property when adding code paths that enqueue futures.
- `on_packet` callbacks are sync; they run inside the asyncio event loop, so blocking work there will stall the protocol.
- `QRTConnection` is an async context manager; prefer `async with await qtm_rt.connect(...) as c:` in new examples and tests.

## Testing notes

Tests use `pytest`, `pytest-asyncio` (markers required: `@pytest.mark.asyncio`), and `pytest-mock`. They exercise `connect()` and `QRTConnection` by mocking `loop.create_connection` — there is no live QTM in CI. New protocol commands should get analogous mocked tests in `test/qrtconnection_test.py` or `test/qtmprotocol_test.py`.

## Known gaps (from README)

`GetCaptureC3D`, `GetCaptureQTM`, and per-channel analog selection are intentionally not implemented.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ https://qualisys.github.io/qualisys_python_sdk/index.html
Examples
--------

See the examples folder.
See the examples folder. Some examples load a `Demo.qtm` recording — it ships
alongside the example scripts in this repo, and is also attached to each
[GitHub Release](https://github.com/qualisys/qualisys_python_sdk/releases)
for users who installed via pip.

Logging
-------
Expand Down
2 changes: 1 addition & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ This document describes the Qualisys SDK for Python version 3.0.2
Installation:
-------------

This package is a pure python package and requires at least Python 3.5.3, the easiest way to install it is:
This package is a pure python package and requires at least Python 3.7, the easiest way to install it is:

.. code-block:: console

Expand Down
File renamed without changes.
19 changes: 10 additions & 9 deletions examples/advanced_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def package_receiver(queue):
LOG.info("Exiting package_receiver")


async def shutdown(delay, connection, receiver_future, queue):
async def shutdown(delay, connection, receiver_future, queue, done):

# wait desired time before exiting
await asyncio.sleep(delay)
Expand All @@ -41,19 +41,19 @@ async def shutdown(delay, connection, receiver_future, queue):
# tell qtm to stop streaming
await connection.stream_frames_stop()

# stop the event loop, thus exiting the run_forever call
loop.stop()
# signal main() to return so asyncio.run() can exit
done.set()


async def setup():
async def main():
""" main function """

connection = await qtm_rt.connect("127.0.0.1")

if connection is None:
return -1

async with qtm_rt.TakeControl(connection, "password"):
async with connection, qtm_rt.TakeControl(connection, "password"):

state = await connection.get_state()
if state != qtm_rt.QRTEvent.EventConnected:
Expand All @@ -65,15 +65,16 @@ async def setup():
return -1

queue = asyncio.Queue()
done = asyncio.Event()

receiver_future = asyncio.ensure_future(package_receiver(queue))

await connection.stream_frames(components=["2d"], on_packet=queue.put_nowait)

asyncio.ensure_future(shutdown(30, connection, receiver_future, queue))
asyncio.ensure_future(shutdown(30, connection, receiver_future, queue, done))

await done.wait()


if __name__ == "__main__":
loop = asyncio.get_event_loop()
asyncio.ensure_future(setup())
loop.run_forever()
asyncio.run(main())
95 changes: 47 additions & 48 deletions examples/asyncio_everything.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import asyncio
import argparse
import pkg_resources
import os

import qtm_rt

Expand All @@ -12,7 +12,7 @@
LOG = logging.getLogger("example")


QTM_FILE = pkg_resources.resource_filename("qtm_rt", "data/Demo.qtm")
QTM_FILE = os.path.join(os.path.dirname(__file__), "Demo.qtm")


class AsyncEnumerate:
Expand Down Expand Up @@ -80,74 +80,73 @@ async def main(interface=None):
if connection is None:
return

await connection.get_state()
await connection.byte_order()
async with connection:
await connection.get_state()
await connection.byte_order()

async with qtm_rt.TakeControl(connection, "password"):

async with qtm_rt.TakeControl(connection, "password"):
result = await connection.close()
if result == b"Closing connection":
await connection.await_event(qtm_rt.QRTEvent.EventConnectionClosed)

result = await connection.close()
if result == b"Closing connection":
await connection.await_event(qtm_rt.QRTEvent.EventConnectionClosed)
await connection.load(QTM_FILE)

await connection.load(QTM_FILE)
await connection.start(rtfromfile=True)

await connection.start(rtfromfile=True)
(await connection.get_current_frame(components=["3d"])).get_3d_markers()

(await connection.get_current_frame(components=["3d"])).get_3d_markers()
queue = asyncio.Queue()

queue = asyncio.Queue()
asyncio.ensure_future(packet_receiver(queue))

asyncio.ensure_future(packet_receiver(queue))
try:
await connection.stream_frames(
components=["incorrect"], on_packet=queue.put_nowait
)
except qtm_rt.QRTCommandException as exception:
LOG.info("exception %s", exception)

try:
await connection.stream_frames(
components=["incorrect"], on_packet=queue.put_nowait
components=["3d"], on_packet=queue.put_nowait
)
except qtm_rt.QRTCommandException as exception:
LOG.info("exception %s", exception)

await connection.stream_frames(
components=["3d"], on_packet=queue.put_nowait
)

await asyncio.sleep(0.5)
await connection.byte_order()
await asyncio.sleep(0.5)
await connection.stream_frames_stop()
queue.put_nowait(None)

await connection.get_parameters(parameters=["3d"])
await connection.stop()
await asyncio.sleep(0.5)
await connection.byte_order()
await asyncio.sleep(0.5)
await connection.stream_frames_stop()
queue.put_nowait(None)

await connection.await_event()
await connection.get_parameters(parameters=["3d"])
await connection.stop()

await connection.new()
await connection.await_event(qtm_rt.QRTEvent.EventConnected)
await connection.await_event()

await connection.start()
await connection.await_event(qtm_rt.QRTEvent.EventWaitingForTrigger)
await connection.new()
await connection.await_event(qtm_rt.QRTEvent.EventConnected)

await connection.trig()
await connection.await_event(qtm_rt.QRTEvent.EventCaptureStarted)
await connection.start()
await connection.await_event(qtm_rt.QRTEvent.EventWaitingForTrigger)

await asyncio.sleep(0.5)
await connection.trig()
await connection.await_event(qtm_rt.QRTEvent.EventCaptureStarted)

await connection.set_qtm_event()
await asyncio.sleep(0.001)
await connection.set_qtm_event("with_label")
await asyncio.sleep(0.5)

await asyncio.sleep(0.5)
await connection.set_qtm_event()
await asyncio.sleep(0.001)
await connection.set_qtm_event("with_label")

await connection.stop()
await connection.await_event(qtm_rt.QRTEvent.EventCaptureStopped)
await asyncio.sleep(0.5)

await connection.save(r"measurement.qtm")
await connection.stop()
await connection.await_event(qtm_rt.QRTEvent.EventCaptureStopped)

await asyncio.sleep(3)
await connection.save(r"measurement.qtm")

await connection.close()
await asyncio.sleep(3)

connection.disconnect()
await connection.close()


def parse_args():
Expand All @@ -165,4 +164,4 @@ def parse_args():

if __name__ == "__main__":
args = parse_args()
asyncio.get_event_loop().run_until_complete(main(interface=args.ip))
asyncio.run(main(interface=args.ip))
12 changes: 8 additions & 4 deletions examples/basic_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ def on_packet(packet):
print("\t", marker)


async def setup():
async def main():
""" Main function """
connection = await qtm_rt.connect("127.0.0.1")
if connection is None:
return

await connection.stream_frames(components=["3d"], on_packet=on_packet)
async with connection:
await connection.stream_frames(components=["3d"], on_packet=on_packet)

# stream_frames returns immediately after registering the callback; keep the
# loop alive so packets keep arriving. Ctrl-C to stop.
await asyncio.Event().wait()


if __name__ == "__main__":
asyncio.ensure_future(setup())
asyncio.get_event_loop().run_forever()
asyncio.run(main())
6 changes: 3 additions & 3 deletions examples/calibration_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async def setup():
if connection is None:
return -1

async with qtm_rt.TakeControl(connection, "password"):
async with connection, qtm_rt.TakeControl(connection, "password"):

state = await connection.get_state()
if state != qtm_rt.QRTEvent.EventConnected:
Expand All @@ -37,8 +37,8 @@ async def setup():
root = ET.fromstring(cal_response)
print(ET.tostring(root, pretty_print=True).decode())

# tell qtm to stop streaming
await connection.stream_frames_stop()
# tell qtm to stop streaming
await connection.stream_frames_stop()

if __name__ == "__main__":
asyncio.run(setup())
7 changes: 2 additions & 5 deletions examples/control_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async def setup():
if connection is None:
return -1

async with qtm_rt.TakeControl(connection, "password"):
async with connection, qtm_rt.TakeControl(connection, "password"):
state = await connection.get_state()
if state != qtm_rt.QRTEvent.EventConnected:
await connection.new()
Expand All @@ -49,8 +49,5 @@ async def setup():

LOG.info("Measurement saved to Demo.qtm")

connection.disconnect()

if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(setup())
asyncio.run(setup())
29 changes: 14 additions & 15 deletions examples/image_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,20 @@ async def main(password, target_camera_id, output_path):
if connection is None:
raise RuntimeError("Failed to connect")

settings = await connection.get_parameters(parameters=["image"])
updated_settings = enable_disable_cameras(output_path, target_camera_id, settings)

async with qtm_rt.TakeControl(connection, password):
logging.debug("%s", await connection.send_xml(updated_settings))

frame = await connection.get_current_frame(components=["image"])
info, images = frame.get_image()
logging.info("%s", info)
logging.info("%s", images[0][0])
with open(output_path, "wb") as f:
logging.info("Writing %s", output_path)
f.write(images[0][1])

connection.disconnect()
async with connection:
settings = await connection.get_parameters(parameters=["image"])
updated_settings = enable_disable_cameras(output_path, target_camera_id, settings)

async with qtm_rt.TakeControl(connection, password):
logging.debug("%s", await connection.send_xml(updated_settings))

frame = await connection.get_current_frame(components=["image"])
info, images = frame.get_image()
logging.info("%s", info)
logging.info("%s", images[0][0])
with open(output_path, "wb") as f:
logging.info("Writing %s", output_path)
f.write(images[0][1])


if __name__ == "__main__":
Expand Down
Loading