Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions examples/flows/token_dashboard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Example: display used and remaining session tokens after flow runs."""

import asyncio

from quantmind.configs import PaperFlowCfg
from quantmind.configs.paper import RawText
from quantmind.flows import SessionTokenDashboard, paper_flow

EXAMPLE_TEXT = """
Momentum and value factors remain the most persistent cross-sectional
signals in global equity markets, but turnover-aware portfolio
construction is required to retain net alpha after implementation costs.
"""


async def main() -> None:
"""Run two flow calls and print a session token usage dashboard."""
dashboard = SessionTokenDashboard(session_token_budget=50_000)
cfg = PaperFlowCfg(model="gpt-4o-mini")

await paper_flow(
RawText(text=EXAMPLE_TEXT),
cfg=cfg,
extra_run_hooks=[dashboard],
)

await paper_flow(
RawText(text=EXAMPLE_TEXT),
cfg=cfg,
extra_run_hooks=[dashboard],
extra_instructions="Summarize in fewer than 120 words.",
)

print(dashboard.render())


if __name__ == "__main__":
asyncio.run(main())
8 changes: 8 additions & 0 deletions quantmind/flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,23 @@
- ``batch_run`` runs any flow over a list of inputs with bounded
concurrency and aggregated results.
- ``BatchResult`` is the shape returned by ``batch_run``.
- ``SessionTokenDashboard`` tracks token usage and renders used-vs-
remaining dashboard text from run hooks.
- ``UnsupportedContentTypeError`` is raised when ``paper_flow`` cannot
route fetched bytes through the format layer.
"""

from quantmind.flows.batch import BatchResult, batch_run
from quantmind.flows.paper import UnsupportedContentTypeError, paper_flow
from quantmind.flows.token_dashboard import (
SessionTokenDashboard,
TokenDashboardSnapshot,
)

__all__ = [
"BatchResult",
"SessionTokenDashboard",
"TokenDashboardSnapshot",
"UnsupportedContentTypeError",
"batch_run",
"paper_flow",
Expand Down
141 changes: 141 additions & 0 deletions quantmind/flows/token_dashboard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
"""Session token dashboard utilities built on Agents SDK run hooks.

`SessionTokenDashboard` can be attached to any flow call through
``extra_run_hooks``. It accumulates token usage on every LLM response and
renders a compact text dashboard showing total usage and remaining tokens from
an optional session budget.
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Any

from agents import RunHooks
from agents.items import ModelResponse
from agents.usage import Usage


@dataclass(frozen=True, slots=True)
class TokenDashboardSnapshot:
"""Immutable snapshot of session token usage."""

requests: int
input_tokens: int
output_tokens: int
total_tokens: int
session_token_budget: int | None
remaining_tokens: int | None
used_percent: float | None


class SessionTokenDashboard(RunHooks[Any]):
"""Track session token usage and render a terminal-friendly dashboard."""

def __init__(self, session_token_budget: int | None = None) -> None:
self._session_token_budget = _validate_budget(session_token_budget)
self._usage = Usage()

@property
def session_token_budget(self) -> int | None:
"""Configured budget used to compute remaining tokens."""
return self._session_token_budget

def set_session_token_budget(
self, session_token_budget: int | None
) -> None:
"""Update the session token budget used for remaining-token math."""
self._session_token_budget = _validate_budget(session_token_budget)

async def on_llm_end(
self,
context: Any,
agent: Any,
response: ModelResponse,
) -> None:
"""Accumulate usage emitted by the SDK at the end of each LLM call."""
del context, agent
self._usage.add(response.usage)

def reset(self) -> None:
"""Reset all usage counters for a new session window."""
self._usage = Usage()

def snapshot(self) -> TokenDashboardSnapshot:
"""Return an immutable snapshot of current token usage."""
remaining_tokens: int | None = None
used_percent: float | None = None
if self._session_token_budget is not None:
used_percent = (
self._usage.total_tokens / self._session_token_budget
) * 100.0
remaining_tokens = max(
self._session_token_budget - self._usage.total_tokens,
0,
)
return TokenDashboardSnapshot(
requests=self._usage.requests,
input_tokens=self._usage.input_tokens,
output_tokens=self._usage.output_tokens,
total_tokens=self._usage.total_tokens,
session_token_budget=self._session_token_budget,
remaining_tokens=remaining_tokens,
used_percent=used_percent,
)

def as_dict(self) -> dict[str, int | float | None]:
"""Return the dashboard snapshot as a serializable dictionary."""
snapshot = self.snapshot()
return {
"requests": snapshot.requests,
"input_tokens": snapshot.input_tokens,
"output_tokens": snapshot.output_tokens,
"total_tokens": snapshot.total_tokens,
"session_token_budget": snapshot.session_token_budget,
"remaining_tokens": snapshot.remaining_tokens,
"used_percent": snapshot.used_percent,
}

def render(self, *, bar_width: int = 24) -> str:
"""Render a compact text dashboard for terminal output."""
if bar_width < 1:
raise ValueError(f"bar_width must be >= 1, got {bar_width}")
snapshot = self.snapshot()
lines = [
"Session Token Dashboard",
f"Requests: {snapshot.requests}",
f"Input tokens: {snapshot.input_tokens}",
f"Output tokens: {snapshot.output_tokens}",
f"Total used: {snapshot.total_tokens}",
]
if snapshot.session_token_budget is None:
lines.append("Remaining: n/a (set session_token_budget)")
return "\n".join(lines)

used_percent = snapshot.used_percent or 0.0
filled = min(
bar_width,
int(round((min(used_percent, 100.0) / 100.0) * bar_width)),
)
bar = "█" * filled + "░" * (bar_width - filled)
lines.extend(
[
f"Budget: {snapshot.session_token_budget}",
f"Remaining: {snapshot.remaining_tokens}",
(
f"Used/Budget: {snapshot.total_tokens}"
f"/{snapshot.session_token_budget}"
),
f"[{bar}] {used_percent:.1f}% used",
]
)
return "\n".join(lines)


def _validate_budget(session_token_budget: int | None) -> int | None:
"""Validate and normalize the session budget."""
if session_token_budget is None:
return None
if session_token_budget <= 0:
raise ValueError("session_token_budget must be > 0 when provided")
return session_token_budget
148 changes: 148 additions & 0 deletions tests/flows/test_token_dashboard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
"""Tests for ``quantmind.flows.token_dashboard``."""

import unittest

from agents.items import ModelResponse
from agents.usage import Usage

from quantmind.flows import SessionTokenDashboard


def _make_response(
*,
requests: int,
input_tokens: int,
output_tokens: int,
) -> ModelResponse:
"""Build a minimal model response with usage for dashboard tests."""
return ModelResponse(
output=[],
usage=Usage(
requests=requests,
input_tokens=input_tokens,
output_tokens=output_tokens,
total_tokens=input_tokens + output_tokens,
),
response_id=None,
request_id=None,
)


class SessionTokenDashboardTests(unittest.IsolatedAsyncioTestCase):
async def test_on_llm_end_accumulates_usage(self) -> None:
dashboard = SessionTokenDashboard(session_token_budget=500)
await dashboard.on_llm_end(
context=None,
agent=None,
response=_make_response(
requests=1,
input_tokens=100,
output_tokens=25,
),
)
await dashboard.on_llm_end(
context=None,
agent=None,
response=_make_response(
requests=1,
input_tokens=60,
output_tokens=15,
),
)
snapshot = dashboard.snapshot()
self.assertEqual(snapshot.requests, 2)
self.assertEqual(snapshot.input_tokens, 160)
self.assertEqual(snapshot.output_tokens, 40)
self.assertEqual(snapshot.total_tokens, 200)
self.assertEqual(snapshot.remaining_tokens, 300)
self.assertEqual(snapshot.used_percent, 40.0)

async def test_reset_clears_all_usage_counters(self) -> None:
dashboard = SessionTokenDashboard(session_token_budget=100)
await dashboard.on_llm_end(
context=None,
agent=None,
response=_make_response(
requests=1,
input_tokens=80,
output_tokens=10,
),
)
dashboard.reset()
snapshot = dashboard.snapshot()
self.assertEqual(snapshot.requests, 0)
self.assertEqual(snapshot.input_tokens, 0)
self.assertEqual(snapshot.output_tokens, 0)
self.assertEqual(snapshot.total_tokens, 0)
self.assertEqual(snapshot.remaining_tokens, 100)
self.assertEqual(snapshot.used_percent, 0.0)

def test_snapshot_without_budget_has_no_remaining(self) -> None:
dashboard = SessionTokenDashboard()
snapshot = dashboard.snapshot()
self.assertIsNone(snapshot.session_token_budget)
self.assertIsNone(snapshot.remaining_tokens)
self.assertIsNone(snapshot.used_percent)

def test_render_with_budget_includes_progress_and_remaining(self) -> None:
dashboard = SessionTokenDashboard(session_token_budget=200)
dashboard._usage.add( # pyright: ignore[reportPrivateUsage]
Usage(
requests=2,
input_tokens=100,
output_tokens=50,
total_tokens=150,
)
)
rendered = dashboard.render(bar_width=10)
self.assertIn("Session Token Dashboard", rendered)
self.assertIn("Remaining: 50", rendered)
self.assertIn("Used/Budget: 150/200", rendered)
self.assertIn("75.0% used", rendered)

def test_render_without_budget_mentions_missing_budget(self) -> None:
dashboard = SessionTokenDashboard()
rendered = dashboard.render()
self.assertIn("Remaining: n/a (set session_token_budget)", rendered)

def test_as_dict_returns_serializable_snapshot(self) -> None:
dashboard = SessionTokenDashboard(session_token_budget=300)
dashboard._usage.add( # pyright: ignore[reportPrivateUsage]
Usage(
requests=1,
input_tokens=90,
output_tokens=30,
total_tokens=120,
)
)
payload = dashboard.as_dict()
self.assertEqual(
payload,
{
"requests": 1,
"input_tokens": 90,
"output_tokens": 30,
"total_tokens": 120,
"session_token_budget": 300,
"remaining_tokens": 180,
"used_percent": 40.0,
},
)


class SessionTokenDashboardValidationTests(unittest.TestCase):
def test_invalid_budget_raises_value_error(self) -> None:
with self.assertRaises(ValueError):
SessionTokenDashboard(session_token_budget=0)

def test_invalid_render_width_raises_value_error(self) -> None:
dashboard = SessionTokenDashboard(session_token_budget=100)
with self.assertRaises(ValueError):
dashboard.render(bar_width=0)

def test_set_session_token_budget_validates_values(self) -> None:
dashboard = SessionTokenDashboard(session_token_budget=100)
dashboard.set_session_token_budget(250)
self.assertEqual(dashboard.session_token_budget, 250)
with self.assertRaises(ValueError):
dashboard.set_session_token_budget(-10)