diff --git a/src/winml/modelkit/commands/perf.py b/src/winml/modelkit/commands/perf.py index 5f8b0114..622d30f1 100644 --- a/src/winml/modelkit/commands/perf.py +++ b/src/winml/modelkit/commands/perf.py @@ -19,7 +19,7 @@ from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, cast import click import numpy as np @@ -34,6 +34,7 @@ if TYPE_CHECKING: from ..models.winml.base import WinMLPreTrainedModel + from ..models.winml.composite_model import WinMLCompositeModel from ..session.stats import PerfStats logger = logging.getLogger(__name__) @@ -290,34 +291,101 @@ class PerfBenchmark: def __init__(self, config: BenchmarkConfig) -> None: """Initialize benchmark with configuration.""" self.config = config - self._model: WinMLPreTrainedModel | None = None + self._model: WinMLPreTrainedModel | WinMLCompositeModel | None = None self._inputs: dict[str, np.ndarray] | None = None - def run(self) -> BenchmarkResult: + @property + def _is_composite(self) -> bool: + """Composite models orchestrate multiple sub-sessions (e.g. CLIP/SigLIP). + + Duck-typed on ``sub_models`` rather than ``isinstance(..., WinMLCompositeModel)`` + on purpose: an ``isinstance`` check needs a runtime import of + ``composite_model``, which imports torch and would blow the + ``winml perf --help`` import budget (see tests/cli/test_import_time.py) by + pulling torch in at module load. Keeping ``WinMLCompositeModel`` a + TYPE_CHECKING-only import also lets the unit tests use lightweight + duck-typed fakes instead of constructing real torch-backed composites. + ``sub_models`` is the defining member of the composite base, so it is a + reliable marker. + """ + return hasattr(self._model, "sub_models") + + def _sub_models(self) -> dict[str, WinMLPreTrainedModel]: + """Sub-models of a composite model (only valid when ``_is_composite``).""" + return cast("WinMLCompositeModel", self._model).sub_models + + @property + def _single(self) -> WinMLPreTrainedModel: + """The model under benchmark, narrowed to a single-session model. + + Only valid for non-composite models: composites dispatch to + ``_run_sub_models``, which benchmarks each sub-model through a child + ``PerfBenchmark`` whose ``_model`` is itself single-session. Exposes + ``io_config`` / ``device`` / ``ep_name`` / ``task`` directly (the + session caches ``io_config``), so callers read ``self._single.*`` + rather than going through per-attribute wrappers. + """ + assert self._model is not None + return cast("WinMLPreTrainedModel", self._model) + + def run(self) -> BenchmarkResult | dict[str, BenchmarkResult]: """Execute full benchmark pipeline. Returns: - BenchmarkResult with timing statistics + A single ``BenchmarkResult`` for single-session models, or a + ``{sub_model_name: BenchmarkResult}`` mapping for composite models + (e.g. CLIP/SigLIP dual-encoders). Composite models have no single + ORT session, so each sub-model is benchmarked individually rather + than timing the aggregate ``forward()`` pass. """ # [1] Load model logger.info("Loading model: %s", self.config.model_id) self._load_model() assert self._model is not None + if self._is_composite: + return self._run_sub_models() + return self._run_single() + + def _run_sub_models(self) -> dict[str, BenchmarkResult]: + """Benchmark each sub-model of a composite individually. + + Each sub-model is itself a single-session ``WinMLAutoModel``, so it is + benchmarked through the standard single-model pipeline by spawning a + child ``PerfBenchmark`` with the already-loaded sub-model. Results are + keyed by sub-model name for per-component reporting. + """ + results: dict[str, BenchmarkResult] = {} + for name, sub in self._sub_models().items(): + logger.info("Benchmarking sub-model '%s'", name) + Console(stderr=True).print(f"\n[bold]Sub-model:[/bold] {name}") + child = PerfBenchmark(self.config) + child._model = sub + results[name] = child._run_single() + return results + + def _run_single(self) -> BenchmarkResult: + """Benchmark the loaded single-session model. + + Returns: + BenchmarkResult with timing statistics + """ + assert self._model is not None + # [2] Generate inputs logger.info("Generating benchmark inputs") self._generate_inputs() # Compile session early so model.device is resolved for display - self._model._session.compile() + self._single._session.compile() # Print model info before benchmark starts _print_model_info( - self._model.io_config, - task=self._model.task or self.config.task, + self._single.io_config, + task=self._single.task or self.config.task, req_device=self.config.device, - act_device=self._model.device, - ep_name=self._model.ep_name, + act_device=self._single.device, + ep_name=self._single.ep_name, ) # [3] Run benchmark @@ -389,10 +457,8 @@ def _load_model(self) -> None: def _generate_inputs(self) -> None: """Generate random inputs based on model io_config.""" - assert self._model is not None - io_config = self._model.io_config self._inputs = generate_random_inputs( - io_config=io_config, + io_config=self._single.io_config, batch_size=self.config.batch_size, ) @@ -404,11 +470,10 @@ def _run_benchmark(self) -> PerfStats: def _run_benchmark_simple(self) -> PerfStats: """Execute benchmark without live monitoring.""" - assert self._model is not None assert self._inputs is not None - session = self._model._session total_iterations = self.config.warmup + self.config.iterations + session = self._single._session with session.perf(warmup=self.config.warmup) as stats: _run_simple_loop(session, self._inputs, total_iterations) @@ -426,9 +491,7 @@ def _run_benchmark_monitored(self) -> PerfStats: from ..session.monitor.hw_monitor import HWMonitor from ..session.monitor.vitisai_monitor import VitisAIMonitor - assert self._model is not None assert self._inputs is not None - session = self._model._session total_iterations = self.config.warmup + self.config.iterations if not HWMonitor.is_available(): @@ -442,23 +505,25 @@ def _run_benchmark_monitored(self) -> PerfStats: # GPU when --device gpu is specified, NPU when --device npu, etc. # ep_name lets the monitor resolve the exact LUID via ORT's autoEP # metadata so we follow the adapter the session actually binds to. - monitor_device = self._model.device or self.config.device or "auto" + ep_name = self._single.ep_name + monitor_device = self._single.device or self.config.device or "auto" hw_monitor = HWMonitor( poll_interval_ms=_HW_POLL_INTERVAL_MS, device=monitor_device, - ep_name=session.ep_name, + ep_name=ep_name, ) # EP-specific proof-of-execution monitor. # When QNN/OpenVINO monitors become real, add entries here. _ep_monitors: dict[EPName, Any] = {"VitisAIExecutionProvider": VitisAIMonitor} - monitor_cls = _ep_monitors.get(session.ep_name) if session.ep_name else None + monitor_cls = _ep_monitors.get(ep_name) if ep_name else None ep_monitor: Any if monitor_cls and monitor_cls.is_available(): ep_monitor = monitor_cls() else: ep_monitor = NullEPMonitor() + session = self._single._session with ( session.perf(warmup=self.config.warmup) as stats, hw_monitor as hw, @@ -485,8 +550,7 @@ def _run_benchmark_monitored(self) -> PerfStats: def _collect_results(self, stats: PerfStats) -> BenchmarkResult: """Collect benchmark results from PerfStats.""" - assert self._model is not None - io_config = self._model.io_config + io_config = self._single.io_config # Calculate throughput mean_latency_sec = stats.mean_ms / 1000.0 @@ -525,10 +589,10 @@ def _collect_results(self, stats: PerfStats) -> BenchmarkResult: samples_per_sec=samples_per_sec, batches_per_sec=batches_per_sec, # Actual values (resolved after build + compile) - actual_device=self._model.device, - actual_task=self._model.task or self.config.task or "auto-detected", - actual_ep=self._model.ep_name, - running_model_path=str(self._model.running_model_path), + actual_device=self._single.device, + actual_task=self._single.task or self.config.task or "auto-detected", + actual_ep=self._single.ep_name, + running_model_path=str(self._single.running_model_path), # Hardware monitor metrics (only present when --monitor is used) hw_monitor=getattr(self, "_hw_metrics", None), ) @@ -548,6 +612,7 @@ def _perf_modules( warmup: int, batch_size: int, no_quantize: bool, + no_compile: bool, output: Path | None, verbose: bool, console: Console, @@ -570,7 +635,8 @@ def _perf_modules( iterations: Number of benchmark iterations. warmup: Number of warmup iterations. batch_size: Batch size for input generation. - no_quantize: If True, skip quantization and compilation. + no_quantize: If True, skip quantization during the per-module build. + no_compile: If True, skip the build's compile stage for each module. output: Output JSON path, or None for auto-generated path. verbose: If True, log exceptions at DEBUG level. console: Rich console for output. @@ -664,9 +730,11 @@ def _perf_modules( submodule = parent_model.get_submodule(module_path) - # Skip quant/compile for faster iteration when requested + # Skip quant/compile for faster iteration when requested. Quantization + # and compilation are independent toggles (mirrors the single-model path). if no_quantize: cfg.quant = None + if no_compile: cfg.compile = None with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: @@ -901,6 +969,74 @@ def write_json_report(result: BenchmarkResult, output_path: Path) -> None: json.dump(result.to_dict(), f, indent=2) +def _composite_report_dict( + results: dict[str, BenchmarkResult], + *, + model_id: str, + task: str | None, +) -> dict[str, Any]: + """Build the combined JSON report for a composite model's sub-models.""" + return { + "model_id": model_id, + "task": task, + "component_count": len(results), + "components": {name: result.to_dict() for name, result in results.items()}, + } + + +def report_composite_results( + results: dict[str, BenchmarkResult], + *, + console: Console, + json_mode: bool, + output_path: Path, + model_id: str, + task: str | None, +) -> None: + """Display and persist per-sub-model results for a composite model. + + Composite models (e.g. CLIP/SigLIP dual-encoders) have no single ORT + session; each sub-model is benchmarked individually (like ``--module``) + and reported as its own summary row rather than timing the aggregate + ``forward()`` pass. The combined JSON nests each sub-model's full + ``BenchmarkResult.to_dict()`` under ``components``. + """ + combined = _composite_report_dict(results, model_id=model_id, task=task) + + if json_mode: + click.echo(json.dumps(combined, indent=2)) + else: + table = Table(title="Per-Sub-Model Perf", show_header=True) + table.add_column("Sub-Model", style="cyan") + table.add_column("Task") + table.add_column("Device") + table.add_column("Mean (ms)", justify="right") + table.add_column("P90 (ms)", justify="right") + table.add_column("Min (ms)", justify="right") + table.add_column("Max (ms)", justify="right") + for name, result in results.items(): + device_str = _device_string( + result.config.device, result.actual_device, result.actual_ep + ) + table.add_row( + name, + result.actual_task, + device_str, + f"{result.mean_ms:.2f}", + f"{result.p90_ms:.2f}", + f"{result.min_ms:.2f}", + f"{result.max_ms:.2f}", + ) + console.print() + console.print(table) + console.print() + + output_path = Path(output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + with output_path.open("w", encoding="utf-8") as f: + json.dump(combined, f, indent=2) + + def generate_output_path(model_id: str, *, module_class: str | None = None) -> Path: r"""Generate default output path under the user's cache directory. @@ -1242,6 +1378,7 @@ def perf( warmup=warmup, batch_size=batch_size, no_quantize=not quantize, + no_compile=no_compile, output=output, verbose=bool(verbose), console=console, @@ -1323,6 +1460,21 @@ def perf( benchmark = PerfBenchmark(config) result = benchmark.run() + # Composite models (e.g. CLIP/SigLIP dual-encoders) have no single ORT + # session; each sub-model is benchmarked individually and reported as + # its own row (like --module), not as one aggregate forward() timing. + if isinstance(result, dict): + report_composite_results( + result, + console=console, + json_mode=json_mode, + output_path=output, + model_id=hf_model, + task=task, + ) + console.print(f"[green]Results saved to:[/green] {output}") + return + # Display results if json_mode: click.echo(json.dumps(result.to_dict(), indent=2)) @@ -1354,9 +1506,7 @@ def perf( # For HF models the ONNX is built internally by PerfBenchmark. try: onnx_for_trace = ( - model_path - if is_onnx - else (benchmark._model._onnx_path if benchmark._model else None) + model_path if is_onnx else getattr(benchmark._model, "_onnx_path", None) ) if onnx_for_trace is None: raise AttributeError("benchmark._model not initialized") diff --git a/src/winml/modelkit/models/auto.py b/src/winml/modelkit/models/auto.py index a8e4036c..4fe69793 100644 --- a/src/winml/modelkit/models/auto.py +++ b/src/winml/modelkit/models/auto.py @@ -160,6 +160,7 @@ def from_onnx( use_cache=use_cache, force_rebuild=force_rebuild, skip_build=skip_build, + no_compile=no_compile, session_options=session_options, **kwargs, ) @@ -365,6 +366,7 @@ def from_pretrained( config=config, cache_dir=cache_dir, allow_unsupported_nodes=allow_unsupported_nodes, + no_compile=no_compile, **kwargs, ) diff --git a/tests/unit/commands/test_perf_composite.py b/tests/unit/commands/test_perf_composite.py new file mode 100644 index 00000000..f19e9484 --- /dev/null +++ b/tests/unit/commands/test_perf_composite.py @@ -0,0 +1,264 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +# -------------------------------------------------------------------------- +"""Tests for winml perf support of composite (multi-session) models. + +Composite models (e.g. CLIP/SigLIP dual-encoders) have no single ONNX +session; they orchestrate several sub-models. ``winml perf`` benchmarks +each sub-model individually (like ``--module``) and reports one row per +sub-model rather than timing the aggregate ``forward()`` pass. + +Regression guard: previously ``PerfBenchmark`` assumed every model exposed +``io_config`` / ``_session`` and raised ``AttributeError`` on composites. +""" + +from __future__ import annotations + +import json +from contextlib import contextmanager +from typing import TYPE_CHECKING, Any + +from rich.console import Console + +from winml.modelkit.commands.perf import ( + BenchmarkConfig, + BenchmarkResult, + PerfBenchmark, + report_composite_results, +) +from winml.modelkit.session.stats import PerfStats + + +if TYPE_CHECKING: + from collections.abc import Generator + from pathlib import Path + + +class _FakeSession: + """Stand-in for a WinMLSession that times runs via a real PerfStats.""" + + def __init__(self, io_config: dict[str, Any], device: str, ep_name: str) -> None: + self.io_config = io_config + self.device = device + self.ep_name = ep_name + self.running_model_path = "model.onnx" + self.compiled = False + self.run_log: list[dict[str, Any]] = [] + self._perf_stats: PerfStats | None = None + + def compile(self) -> None: + self.compiled = True + + @contextmanager + def perf(self, warmup: int = 0) -> Generator[PerfStats, None, None]: + self._perf_stats = PerfStats(warmup=warmup) + try: + yield self._perf_stats + finally: + self._perf_stats = None + + def run(self, inputs: dict[str, Any]) -> dict[str, Any]: + self.run_log.append(inputs) + if self._perf_stats is not None: + self._perf_stats.record(lambda: None) + return {} + + +class _FakeSubModel: + """Stand-in for a single-session WinMLAutoModel sub-component.""" + + def __init__( + self, + io_config: dict[str, Any], + task: str, + *, + device: str = "GPU", + ep_name: str = "OpenVINOExecutionProvider", + ) -> None: + self._session = _FakeSession(io_config, device, ep_name) + self.task = task + + @property + def io_config(self) -> dict[str, Any]: + return self._session.io_config + + @property + def device(self) -> str: + return self._session.device + + @property + def ep_name(self) -> str: + return self._session.ep_name + + @property + def running_model_path(self) -> str: + return self._session.running_model_path + + +class _FakeComposite: + """Stand-in for a WinMLCompositeModel (duck-typed via ``sub_models``).""" + + def __init__(self, sub_models: dict[str, Any]) -> None: + self.sub_models = sub_models + + +def _io_config( + input_names: list[str], + input_shapes: list[list[int]], + input_types: list[str], + output_names: list[str], + output_shapes: list[list[int]], + *, + precision: str | None = "fp16", +) -> dict[str, Any]: + return { + "input_names": input_names, + "input_shapes": input_shapes, + "input_types": input_types, + "output_names": output_names, + "output_shapes": output_shapes, + "output_types": ["float32"] * len(output_names), + "precision": precision, + } + + +def _siglip_like() -> _FakeComposite: + image_encoder = _FakeSubModel( + _io_config( + ["pixel_values"], + [[1, 3, 224, 224]], + ["float32"], + ["image_embeds"], + [[1, 768]], + ), + task="image-feature-extraction", + ) + text_encoder = _FakeSubModel( + _io_config( + ["input_ids", "attention_mask"], + [[1, 64], [1, 64]], + ["int64", "int64"], + ["text_embeds"], + [[1, 768]], + ), + task="feature-extraction", + ) + return _FakeComposite({"image-encoder": image_encoder, "text-encoder": text_encoder}) + + +def _composite_benchmark() -> tuple[PerfBenchmark, _FakeComposite]: + config = BenchmarkConfig( + model_id="google/siglip-base-patch16-224", + task="zero-shot-image-classification", + device="gpu", + iterations=3, + warmup=1, + ) + bench = PerfBenchmark(config) + model = _siglip_like() + bench._model = model # bypass _load_model (no HF download in unit tests) + return bench, model + + +class TestPerfBenchmarkComposite: + """PerfBenchmark benchmarks each sub-model of a composite individually.""" + + def test_detects_composite(self) -> None: + bench, _ = _composite_benchmark() + assert bench._is_composite is True + + def test_run_returns_result_per_sub_model(self) -> None: + bench, _ = _composite_benchmark() + results = bench._run_sub_models() + + assert set(results) == {"image-encoder", "text-encoder"} + assert all(isinstance(r, BenchmarkResult) for r in results.values()) + + def test_each_sub_model_reports_its_own_io(self) -> None: + # No aggregation: each result carries only its sub-model's inputs. + bench, _ = _composite_benchmark() + results = bench._run_sub_models() + + assert results["image-encoder"].input_names == ["pixel_values"] + assert results["text-encoder"].input_names == ["input_ids", "attention_mask"] + assert results["image-encoder"].output_names == ["image_embeds"] + assert results["text-encoder"].output_names == ["text_embeds"] + + def test_each_sub_model_reports_its_own_task(self) -> None: + bench, _ = _composite_benchmark() + results = bench._run_sub_models() + + assert results["image-encoder"].actual_task == "image-feature-extraction" + assert results["text-encoder"].actual_task == "feature-extraction" + + def test_resolved_device_and_ep_per_sub_model(self) -> None: + bench, _ = _composite_benchmark() + results = bench._run_sub_models() + + for result in results.values(): + assert result.actual_device == "GPU" + assert result.actual_ep == "OpenVINOExecutionProvider" + + def test_compiles_and_runs_every_sub_session(self) -> None: + bench, model = _composite_benchmark() + bench._run_sub_models() + + for sub in model.sub_models.values(): + assert sub._session.compiled is True + # warmup(1) + iterations(3) == 4 run() calls per sub-session. + assert len(sub._session.run_log) == 4 + + def test_each_sub_model_stats_exclude_warmup(self) -> None: + bench, _ = _composite_benchmark() + results = bench._run_sub_models() + + for result in results.values(): + assert len(result.raw_samples_ms) == 3 + + +class TestReportCompositeResults: + """report_composite_results writes a combined per-component JSON report.""" + + def test_combined_json_nests_each_component(self, tmp_path: Path) -> None: + bench, _ = _composite_benchmark() + results = bench._run_sub_models() + output = tmp_path / "perf.json" + + report_composite_results( + results, + console=Console(), + json_mode=False, + output_path=output, + model_id="google/siglip-base-patch16-224", + task="zero-shot-image-classification", + ) + + data = json.loads(output.read_text()) + assert data["model_id"] == "google/siglip-base-patch16-224" + assert data["task"] == "zero-shot-image-classification" + assert data["component_count"] == 2 + assert set(data["components"]) == {"image-encoder", "text-encoder"} + # Each component holds a full BenchmarkResult.to_dict() payload. + img = data["components"]["image-encoder"] + assert img["model_info"]["input_names"] == ["pixel_values"] + assert "latency_ms" in img + + def test_json_mode_emits_combined_payload_to_stdout(self, tmp_path: Path, capsys: Any) -> None: + bench, _ = _composite_benchmark() + results = bench._run_sub_models() + output = tmp_path / "perf.json" + + report_composite_results( + results, + console=Console(stderr=True), + json_mode=True, + output_path=output, + model_id="google/siglip-base-patch16-224", + task="zero-shot-image-classification", + ) + + payload = json.loads(capsys.readouterr().out) + assert set(payload["components"]) == {"image-encoder", "text-encoder"} + # File is written regardless of json_mode. + assert output.exists() diff --git a/tests/unit/commands/test_perf_module.py b/tests/unit/commands/test_perf_module.py index f08f5d3d..80298c02 100644 --- a/tests/unit/commands/test_perf_module.py +++ b/tests/unit/commands/test_perf_module.py @@ -314,3 +314,93 @@ def test_running_model_path_in_module_result(self, tmp_path: Path) -> None: report = json.loads(out_path.read_text(encoding="utf-8")) instance = report["instances"][0] assert instance["running_model_path"] == str(running_model_path) + + +class TestPerfModuleQuantCompileToggles: + """--no-quantize and --compile/--no-compile clear cfg.quant / cfg.compile + independently in the per-module build (mirrors the single-model path).""" + + @staticmethod + def _run(tmp_path: Path, extra_args: list[str]) -> MagicMock: + """Invoke ``perf --module`` with mocked build and return the module cfg. + + The cfg is mutated (quant/compile cleared) before ``build_hf_model``, + so short-circuiting the benchmark via a failing ``session.perf()`` + still lets us inspect the mutation. + """ + fake_cfg = MagicMock() + fake_cfg.loader.model_type = "bert" + fake_cfg.loader.module_path = "encoder.layer.0" + + fake_build_result = MagicMock() + fake_build_result.final_onnx_path = tmp_path / "model.onnx" + + fake_session = MagicMock() + fake_session.perf.side_effect = RuntimeError("test-skip-benchmark") + + fake_loader_cfg = MagicMock() + fake_loader_cfg.task = "fill-mask" + + with ( + patch( + "winml.modelkit.sysinfo.resolve_device", + return_value=("cpu", ["cpu"]), + ), + patch( + "winml.modelkit.config.generate_hf_build_config", + return_value=[fake_cfg], + ), + patch( + "winml.modelkit.loader.resolve_loader_config", + return_value=(fake_loader_cfg, MagicMock(), MagicMock()), + ), + patch( + "winml.modelkit.commands.build._instantiate_parent_model", + return_value=MagicMock(), + ), + patch( + "winml.modelkit.build.build_hf_model", + return_value=fake_build_result, + ), + patch( + "winml.modelkit.session.WinMLSession", + return_value=fake_session, + ), + ): + runner = CliRunner() + result = runner.invoke( + main, + [ + "perf", + "-m", + "fake/model", + "--module", + "BertLayer", + "--iterations", + "1", + "--warmup", + "0", + "-o", + str(tmp_path / "out.json"), + *extra_args, + ], + ) + assert result.exit_code == 0, result.output + return fake_cfg + + def test_default_skips_compile_keeps_quant(self, tmp_path: Path) -> None: + # perf defaults to --no-compile and --quantize. + cfg = self._run(tmp_path, []) + assert cfg.compile is None + assert cfg.quant is not None + + def test_compile_flag_preserves_compile(self, tmp_path: Path) -> None: + cfg = self._run(tmp_path, ["--compile"]) + assert cfg.compile is not None + assert cfg.quant is not None + + def test_no_quantize_clears_only_quant(self, tmp_path: Path) -> None: + # --no-quantize must not also clear compile when --compile is set. + cfg = self._run(tmp_path, ["--no-quantize", "--compile"]) + assert cfg.quant is None + assert cfg.compile is not None