diff --git a/examples/flows/token_dashboard.py b/examples/flows/token_dashboard.py new file mode 100644 index 0000000..0916582 --- /dev/null +++ b/examples/flows/token_dashboard.py @@ -0,0 +1,38 @@ +"""Example: display used and remaining session tokens after flow runs.""" + +import asyncio + +from quantmind.configs import PaperFlowCfg +from quantmind.configs.paper import RawText +from quantmind.flows import SessionTokenDashboard, paper_flow + +EXAMPLE_TEXT = """ +Momentum and value factors remain the most persistent cross-sectional +signals in global equity markets, but turnover-aware portfolio +construction is required to retain net alpha after implementation costs. +""" + + +async def main() -> None: + """Run two flow calls and print a session token usage dashboard.""" + dashboard = SessionTokenDashboard(session_token_budget=50_000) + cfg = PaperFlowCfg(model="gpt-4o-mini") + + await paper_flow( + RawText(text=EXAMPLE_TEXT), + cfg=cfg, + extra_run_hooks=[dashboard], + ) + + await paper_flow( + RawText(text=EXAMPLE_TEXT), + cfg=cfg, + extra_run_hooks=[dashboard], + extra_instructions="Summarize in fewer than 120 words.", + ) + + print(dashboard.render()) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/quantmind/flows/__init__.py b/quantmind/flows/__init__.py index 78dd9bf..7a9a4b5 100644 --- a/quantmind/flows/__init__.py +++ b/quantmind/flows/__init__.py @@ -7,15 +7,23 @@ - ``batch_run`` runs any flow over a list of inputs with bounded concurrency and aggregated results. - ``BatchResult`` is the shape returned by ``batch_run``. +- ``SessionTokenDashboard`` tracks token usage and renders used-vs- + remaining dashboard text from run hooks. - ``UnsupportedContentTypeError`` is raised when ``paper_flow`` cannot route fetched bytes through the format layer. """ from quantmind.flows.batch import BatchResult, batch_run from quantmind.flows.paper import UnsupportedContentTypeError, paper_flow +from quantmind.flows.token_dashboard import ( + SessionTokenDashboard, + TokenDashboardSnapshot, +) __all__ = [ "BatchResult", + "SessionTokenDashboard", + "TokenDashboardSnapshot", "UnsupportedContentTypeError", "batch_run", "paper_flow", diff --git a/quantmind/flows/token_dashboard.py b/quantmind/flows/token_dashboard.py new file mode 100644 index 0000000..4bfb66b --- /dev/null +++ b/quantmind/flows/token_dashboard.py @@ -0,0 +1,141 @@ +"""Session token dashboard utilities built on Agents SDK run hooks. + +`SessionTokenDashboard` can be attached to any flow call through +``extra_run_hooks``. It accumulates token usage on every LLM response and +renders a compact text dashboard showing total usage and remaining tokens from +an optional session budget. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +from agents import RunHooks +from agents.items import ModelResponse +from agents.usage import Usage + + +@dataclass(frozen=True, slots=True) +class TokenDashboardSnapshot: + """Immutable snapshot of session token usage.""" + + requests: int + input_tokens: int + output_tokens: int + total_tokens: int + session_token_budget: int | None + remaining_tokens: int | None + used_percent: float | None + + +class SessionTokenDashboard(RunHooks[Any]): + """Track session token usage and render a terminal-friendly dashboard.""" + + def __init__(self, session_token_budget: int | None = None) -> None: + self._session_token_budget = _validate_budget(session_token_budget) + self._usage = Usage() + + @property + def session_token_budget(self) -> int | None: + """Configured budget used to compute remaining tokens.""" + return self._session_token_budget + + def set_session_token_budget( + self, session_token_budget: int | None + ) -> None: + """Update the session token budget used for remaining-token math.""" + self._session_token_budget = _validate_budget(session_token_budget) + + async def on_llm_end( + self, + context: Any, + agent: Any, + response: ModelResponse, + ) -> None: + """Accumulate usage emitted by the SDK at the end of each LLM call.""" + del context, agent + self._usage.add(response.usage) + + def reset(self) -> None: + """Reset all usage counters for a new session window.""" + self._usage = Usage() + + def snapshot(self) -> TokenDashboardSnapshot: + """Return an immutable snapshot of current token usage.""" + remaining_tokens: int | None = None + used_percent: float | None = None + if self._session_token_budget is not None: + used_percent = ( + self._usage.total_tokens / self._session_token_budget + ) * 100.0 + remaining_tokens = max( + self._session_token_budget - self._usage.total_tokens, + 0, + ) + return TokenDashboardSnapshot( + requests=self._usage.requests, + input_tokens=self._usage.input_tokens, + output_tokens=self._usage.output_tokens, + total_tokens=self._usage.total_tokens, + session_token_budget=self._session_token_budget, + remaining_tokens=remaining_tokens, + used_percent=used_percent, + ) + + def as_dict(self) -> dict[str, int | float | None]: + """Return the dashboard snapshot as a serializable dictionary.""" + snapshot = self.snapshot() + return { + "requests": snapshot.requests, + "input_tokens": snapshot.input_tokens, + "output_tokens": snapshot.output_tokens, + "total_tokens": snapshot.total_tokens, + "session_token_budget": snapshot.session_token_budget, + "remaining_tokens": snapshot.remaining_tokens, + "used_percent": snapshot.used_percent, + } + + def render(self, *, bar_width: int = 24) -> str: + """Render a compact text dashboard for terminal output.""" + if bar_width < 1: + raise ValueError(f"bar_width must be >= 1, got {bar_width}") + snapshot = self.snapshot() + lines = [ + "Session Token Dashboard", + f"Requests: {snapshot.requests}", + f"Input tokens: {snapshot.input_tokens}", + f"Output tokens: {snapshot.output_tokens}", + f"Total used: {snapshot.total_tokens}", + ] + if snapshot.session_token_budget is None: + lines.append("Remaining: n/a (set session_token_budget)") + return "\n".join(lines) + + used_percent = snapshot.used_percent or 0.0 + filled = min( + bar_width, + int(round((min(used_percent, 100.0) / 100.0) * bar_width)), + ) + bar = "█" * filled + "░" * (bar_width - filled) + lines.extend( + [ + f"Budget: {snapshot.session_token_budget}", + f"Remaining: {snapshot.remaining_tokens}", + ( + f"Used/Budget: {snapshot.total_tokens}" + f"/{snapshot.session_token_budget}" + ), + f"[{bar}] {used_percent:.1f}% used", + ] + ) + return "\n".join(lines) + + +def _validate_budget(session_token_budget: int | None) -> int | None: + """Validate and normalize the session budget.""" + if session_token_budget is None: + return None + if session_token_budget <= 0: + raise ValueError("session_token_budget must be > 0 when provided") + return session_token_budget diff --git a/tests/flows/test_token_dashboard.py b/tests/flows/test_token_dashboard.py new file mode 100644 index 0000000..78acef3 --- /dev/null +++ b/tests/flows/test_token_dashboard.py @@ -0,0 +1,148 @@ +"""Tests for ``quantmind.flows.token_dashboard``.""" + +import unittest + +from agents.items import ModelResponse +from agents.usage import Usage + +from quantmind.flows import SessionTokenDashboard + + +def _make_response( + *, + requests: int, + input_tokens: int, + output_tokens: int, +) -> ModelResponse: + """Build a minimal model response with usage for dashboard tests.""" + return ModelResponse( + output=[], + usage=Usage( + requests=requests, + input_tokens=input_tokens, + output_tokens=output_tokens, + total_tokens=input_tokens + output_tokens, + ), + response_id=None, + request_id=None, + ) + + +class SessionTokenDashboardTests(unittest.IsolatedAsyncioTestCase): + async def test_on_llm_end_accumulates_usage(self) -> None: + dashboard = SessionTokenDashboard(session_token_budget=500) + await dashboard.on_llm_end( + context=None, + agent=None, + response=_make_response( + requests=1, + input_tokens=100, + output_tokens=25, + ), + ) + await dashboard.on_llm_end( + context=None, + agent=None, + response=_make_response( + requests=1, + input_tokens=60, + output_tokens=15, + ), + ) + snapshot = dashboard.snapshot() + self.assertEqual(snapshot.requests, 2) + self.assertEqual(snapshot.input_tokens, 160) + self.assertEqual(snapshot.output_tokens, 40) + self.assertEqual(snapshot.total_tokens, 200) + self.assertEqual(snapshot.remaining_tokens, 300) + self.assertEqual(snapshot.used_percent, 40.0) + + async def test_reset_clears_all_usage_counters(self) -> None: + dashboard = SessionTokenDashboard(session_token_budget=100) + await dashboard.on_llm_end( + context=None, + agent=None, + response=_make_response( + requests=1, + input_tokens=80, + output_tokens=10, + ), + ) + dashboard.reset() + snapshot = dashboard.snapshot() + self.assertEqual(snapshot.requests, 0) + self.assertEqual(snapshot.input_tokens, 0) + self.assertEqual(snapshot.output_tokens, 0) + self.assertEqual(snapshot.total_tokens, 0) + self.assertEqual(snapshot.remaining_tokens, 100) + self.assertEqual(snapshot.used_percent, 0.0) + + def test_snapshot_without_budget_has_no_remaining(self) -> None: + dashboard = SessionTokenDashboard() + snapshot = dashboard.snapshot() + self.assertIsNone(snapshot.session_token_budget) + self.assertIsNone(snapshot.remaining_tokens) + self.assertIsNone(snapshot.used_percent) + + def test_render_with_budget_includes_progress_and_remaining(self) -> None: + dashboard = SessionTokenDashboard(session_token_budget=200) + dashboard._usage.add( # pyright: ignore[reportPrivateUsage] + Usage( + requests=2, + input_tokens=100, + output_tokens=50, + total_tokens=150, + ) + ) + rendered = dashboard.render(bar_width=10) + self.assertIn("Session Token Dashboard", rendered) + self.assertIn("Remaining: 50", rendered) + self.assertIn("Used/Budget: 150/200", rendered) + self.assertIn("75.0% used", rendered) + + def test_render_without_budget_mentions_missing_budget(self) -> None: + dashboard = SessionTokenDashboard() + rendered = dashboard.render() + self.assertIn("Remaining: n/a (set session_token_budget)", rendered) + + def test_as_dict_returns_serializable_snapshot(self) -> None: + dashboard = SessionTokenDashboard(session_token_budget=300) + dashboard._usage.add( # pyright: ignore[reportPrivateUsage] + Usage( + requests=1, + input_tokens=90, + output_tokens=30, + total_tokens=120, + ) + ) + payload = dashboard.as_dict() + self.assertEqual( + payload, + { + "requests": 1, + "input_tokens": 90, + "output_tokens": 30, + "total_tokens": 120, + "session_token_budget": 300, + "remaining_tokens": 180, + "used_percent": 40.0, + }, + ) + + +class SessionTokenDashboardValidationTests(unittest.TestCase): + def test_invalid_budget_raises_value_error(self) -> None: + with self.assertRaises(ValueError): + SessionTokenDashboard(session_token_budget=0) + + def test_invalid_render_width_raises_value_error(self) -> None: + dashboard = SessionTokenDashboard(session_token_budget=100) + with self.assertRaises(ValueError): + dashboard.render(bar_width=0) + + def test_set_session_token_budget_validates_values(self) -> None: + dashboard = SessionTokenDashboard(session_token_budget=100) + dashboard.set_session_token_budget(250) + self.assertEqual(dashboard.session_token_budget, 250) + with self.assertRaises(ValueError): + dashboard.set_session_token_budget(-10)