Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 208 additions & 30 deletions src/winml/modelkit/commands/perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, cast

import click
import numpy as np
Expand All @@ -33,7 +33,10 @@


if TYPE_CHECKING:
from collections.abc import Callable

from ..models.winml.base import WinMLPreTrainedModel
from ..models.winml.composite_model import WinMLCompositeModel
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
from ..session.stats import PerfStats

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -280,34 +283,121 @@ class PerfBenchmark:
def __init__(self, config: BenchmarkConfig) -> None:
"""Initialize benchmark with configuration."""
self.config = config
self._model: WinMLPreTrainedModel | None = None
self._model: WinMLPreTrainedModel | WinMLCompositeModel | None = None
self._inputs: dict[str, np.ndarray] | None = None
self._io_config: dict[str, Any] | None = None

@property
def _is_composite(self) -> bool:
"""Composite models orchestrate multiple sub-sessions (e.g. CLIP/SigLIP).

Duck-typed on ``sub_models`` rather than ``isinstance(..., WinMLCompositeModel)``
on purpose: an ``isinstance`` check needs a runtime import of
``composite_model``, which imports torch and would blow the
``winml perf --help`` import budget (see tests/cli/test_import_time.py) by
pulling torch in at module load. Keeping ``WinMLCompositeModel`` a
TYPE_CHECKING-only import also lets the unit tests use lightweight
duck-typed fakes instead of constructing real torch-backed composites.
``sub_models`` is the defining member of the composite base, so it is a
reliable marker.
"""
return hasattr(self._model, "sub_models")

def _sub_models(self) -> dict[str, WinMLPreTrainedModel]:
"""Sub-models of a composite model (only valid when ``_is_composite``)."""
return cast("WinMLCompositeModel", self._model).sub_models

@property
def _single(self) -> WinMLPreTrainedModel:
"""The model under benchmark, narrowed to a single-session model.

Only valid for non-composite models: composites dispatch to
``_run_sub_models``, which benchmarks each sub-model through a child
``PerfBenchmark`` whose ``_model`` is itself single-session.
"""
assert self._model is not None
return cast("WinMLPreTrainedModel", self._model)

def _resolved_io_config(self) -> dict[str, Any]:
"""I/O config of the (single-session) model being benchmarked."""
if self._io_config is None:
self._io_config = self._single.io_config
return self._io_config

def run(self) -> BenchmarkResult:
def _compile_model(self) -> None:
"""Compile the underlying ORT session so device/EP are resolved."""
self._single._session.compile()

def _resolved_device(self) -> str:
"""Actual device bound after compile."""
return self._single.device

def _resolved_ep(self) -> EPName | None:
"""Primary EP bound after compile."""
return self._single.ep_name

def _resolved_task(self) -> str | None:
"""Resolved task; falls back to the requested task."""
return self._single.task or self.config.task

def run(self) -> BenchmarkResult | dict[str, BenchmarkResult]:
"""Execute full benchmark pipeline.

Returns:
BenchmarkResult with timing statistics
A single ``BenchmarkResult`` for single-session models, or a
``{sub_model_name: BenchmarkResult}`` mapping for composite models
(e.g. CLIP/SigLIP dual-encoders). Composite models have no single
ORT session, so each sub-model is benchmarked individually rather
than timing the aggregate ``forward()`` pass.
"""
# [1] Load model
logger.info("Loading model: %s", self.config.model_id)
self._load_model()
assert self._model is not None

if self._is_composite:
return self._run_sub_models()
return self._run_single()

def _run_sub_models(self) -> dict[str, BenchmarkResult]:
"""Benchmark each sub-model of a composite individually.

Each sub-model is itself a single-session ``WinMLAutoModel``, so it is
benchmarked through the standard single-model pipeline by spawning a
child ``PerfBenchmark`` with the already-loaded sub-model. Results are
keyed by sub-model name for per-component reporting.
"""
results: dict[str, BenchmarkResult] = {}
for name, sub in self._sub_models().items():
logger.info("Benchmarking sub-model '%s'", name)
Console(stderr=True).print(f"\n[bold]Sub-model:[/bold] {name}")
child = PerfBenchmark(self.config)
child._model = sub
results[name] = child._run_single()
return results

def _run_single(self) -> BenchmarkResult:
"""Benchmark the loaded single-session model.

Returns:
BenchmarkResult with timing statistics
"""
assert self._model is not None

# [2] Generate inputs
logger.info("Generating benchmark inputs")
self._generate_inputs()

# Compile session early so model.device is resolved for display
self._model._session.compile()
self._compile_model()

# Print model info before benchmark starts
_print_model_info(
self._model.io_config,
task=self._model.task or self.config.task,
self._resolved_io_config(),
task=self._resolved_task(),
req_device=self.config.device,
act_device=self._model.device,
ep_name=self._model.ep_name,
act_device=self._resolved_device(),
ep_name=self._resolved_ep(),
)

# [3] Run benchmark
Expand Down Expand Up @@ -380,7 +470,7 @@ def _load_model(self) -> None:
def _generate_inputs(self) -> None:
"""Generate random inputs based on model io_config."""
assert self._model is not None
io_config = self._model.io_config
io_config = self._resolved_io_config()
self._inputs = generate_random_inputs(
io_config=io_config,
batch_size=self.config.batch_size,
Expand All @@ -394,11 +484,10 @@ def _run_benchmark(self) -> PerfStats:

def _run_benchmark_simple(self) -> PerfStats:
"""Execute benchmark without live monitoring."""
assert self._model is not None
assert self._inputs is not None
session = self._model._session
total_iterations = self.config.warmup + self.config.iterations

session = self._single._session
with session.perf(warmup=self.config.warmup) as stats:
_run_simple_loop(session, self._inputs, total_iterations)

Expand All @@ -416,9 +505,7 @@ def _run_benchmark_monitored(self) -> PerfStats:
from ..session.monitor.hw_monitor import HWMonitor
from ..session.monitor.vitisai_monitor import VitisAIMonitor

assert self._model is not None
assert self._inputs is not None
session = self._model._session
total_iterations = self.config.warmup + self.config.iterations

if not HWMonitor.is_available():
Expand All @@ -432,31 +519,37 @@ def _run_benchmark_monitored(self) -> PerfStats:
# GPU when --device gpu is specified, NPU when --device npu, etc.
# ep_name lets the monitor resolve the exact LUID via ORT's autoEP
# metadata so we follow the adapter the session actually binds to.
monitor_device = self._model.device or self.config.device or "auto"
ep_name = self._resolved_ep()
monitor_device = self._resolved_device() or self.config.device or "auto"
hw_monitor = HWMonitor(
poll_interval_ms=_HW_POLL_INTERVAL_MS,
device=monitor_device,
ep_name=session.ep_name,
ep_name=ep_name,
)

# EP-specific proof-of-execution monitor.
# When QNN/OpenVINO monitors become real, add entries here.
_ep_monitors: dict[EPName, Any] = {"VitisAIExecutionProvider": VitisAIMonitor}
monitor_cls = _ep_monitors.get(session.ep_name) if session.ep_name else None
monitor_cls = _ep_monitors.get(ep_name) if ep_name else None
ep_monitor: Any
if monitor_cls and monitor_cls.is_available():
ep_monitor = monitor_cls()
else:
ep_monitor = NullEPMonitor()

session = self._single._session
with (
session.perf(warmup=self.config.warmup) as stats,
hw_monitor as hw,
ep_monitor as ep_mon,
):
inputs = self._inputs

def run_iteration() -> None:
session.run(inputs)

_run_monitored_loop(
session,
self._inputs,
run_iteration,
stats,
hw,
total_iterations=total_iterations,
Expand All @@ -476,7 +569,7 @@ def _run_benchmark_monitored(self) -> PerfStats:
def _collect_results(self, stats: PerfStats) -> BenchmarkResult:
"""Collect benchmark results from PerfStats."""
assert self._model is not None
io_config = self._model.io_config
io_config = self._resolved_io_config()

# Calculate throughput
mean_latency_sec = stats.mean_ms / 1000.0
Expand Down Expand Up @@ -514,9 +607,9 @@ def _collect_results(self, stats: PerfStats) -> BenchmarkResult:
samples_per_sec=samples_per_sec,
batches_per_sec=batches_per_sec,
# Actual values (resolved after build + compile)
actual_device=self._model.device,
actual_task=self._model.task or self.config.task or "auto-detected",
actual_ep=self._model.ep_name,
actual_device=self._resolved_device(),
actual_task=self._resolved_task() or "auto-detected",
actual_ep=self._resolved_ep(),
# Hardware monitor metrics (only present when --monitor is used)
hw_monitor=getattr(self, "_hw_metrics", None),
)
Expand Down Expand Up @@ -888,6 +981,74 @@ def write_json_report(result: BenchmarkResult, output_path: Path) -> None:
json.dump(result.to_dict(), f, indent=2)


def _composite_report_dict(
results: dict[str, BenchmarkResult],
*,
model_id: str,
task: str | None,
) -> dict[str, Any]:
"""Build the combined JSON report for a composite model's sub-models."""
return {
"model_id": model_id,
"task": task,
"component_count": len(results),
"components": {name: result.to_dict() for name, result in results.items()},
}


def report_composite_results(
results: dict[str, BenchmarkResult],
*,
console: Console,
json_mode: bool,
output_path: Path,
model_id: str,
task: str | None,
) -> None:
"""Display and persist per-sub-model results for a composite model.

Composite models (e.g. CLIP/SigLIP dual-encoders) have no single ORT
session; each sub-model is benchmarked individually (like ``--module``)
and reported as its own summary row rather than timing the aggregate
``forward()`` pass. The combined JSON nests each sub-model's full
``BenchmarkResult.to_dict()`` under ``components``.
"""
combined = _composite_report_dict(results, model_id=model_id, task=task)

if json_mode:
click.echo(json.dumps(combined, indent=2))
else:
table = Table(title="Per-Sub-Model Perf", show_header=True)
table.add_column("Sub-Model", style="cyan")
table.add_column("Task")
table.add_column("Device")
table.add_column("Mean (ms)", justify="right")
table.add_column("P90 (ms)", justify="right")
table.add_column("Min (ms)", justify="right")
table.add_column("Max (ms)", justify="right")
for name, result in results.items():
device_str = _device_string(
result.config.device, result.actual_device, result.actual_ep
)
table.add_row(
name,
result.actual_task,
device_str,
f"{result.mean_ms:.2f}",
f"{result.p90_ms:.2f}",
f"{result.min_ms:.2f}",
f"{result.max_ms:.2f}",
)
console.print()
console.print(table)
console.print()

output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", encoding="utf-8") as f:
json.dump(combined, f, indent=2)


def generate_output_path(model_id: str, *, module_class: str | None = None) -> Path:
r"""Generate default output path under the user's cache directory.

Expand Down Expand Up @@ -963,8 +1124,7 @@ def _print_model_info(


def _run_monitored_loop(
session: Any,
inputs: dict[str, Any],
run_iteration: Callable[[], Any],
stats: PerfStats,
hw: Any,
*,
Expand All @@ -973,7 +1133,12 @@ def _run_monitored_loop(
model_id: str,
device: str,
) -> None:
"""Run the benchmark iteration loop with live hardware monitoring."""
"""Run the benchmark iteration loop with live hardware monitoring.

``run_iteration`` runs (and times into ``stats``) a single inference. For
single-session models it invokes ``session.run`` inside the session's
perf() context; for composite models it records a full ``forward()`` pass.
"""
display = LiveMonitorDisplay(
total_iterations=total_iterations,
warmup=warmup,
Expand All @@ -983,7 +1148,7 @@ def _run_monitored_loop(
)
with display:
for i in range(total_iterations):
session.run(inputs)
run_iteration()

latest_latency = stats.all_samples_ms[-1] if stats.all_samples_ms else 0
display.update(
Expand Down Expand Up @@ -1310,6 +1475,21 @@ def perf(
benchmark = PerfBenchmark(config)
result = benchmark.run()

# Composite models (e.g. CLIP/SigLIP dual-encoders) have no single ORT
# session; each sub-model is benchmarked individually and reported as
# its own row (like --module), not as one aggregate forward() timing.
if isinstance(result, dict):
report_composite_results(
result,
console=console,
json_mode=json_mode,
output_path=output,
model_id=hf_model,
task=task,
)
console.print(f"[green]Results saved to:[/green] {output}")
return

# Display results
if json_mode:
click.echo(json.dumps(result.to_dict(), indent=2))
Expand Down Expand Up @@ -1341,9 +1521,7 @@ def perf(
# For HF models the ONNX is built internally by PerfBenchmark.
try:
onnx_for_trace = (
model_path
if is_onnx
else (benchmark._model._onnx_path if benchmark._model else None)
model_path if is_onnx else getattr(benchmark._model, "_onnx_path", None)
)
if onnx_for_trace is None:
raise AttributeError("benchmark._model not initialized")
Expand Down
Loading
Loading