From cb6fddae470b6e4bb141deedd9b3c7504e6f26ee Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Thu, 16 Apr 2026 03:07:08 +0000 Subject: [PATCH 1/5] Add manim text lint and compose retry-manim path Co-authored-by: John Menke --- README.md | 8 ++- src/docgen/cli.py | 7 +++ src/docgen/compose.py | 4 +- src/docgen/pipeline.py | 35 ++++++++++- src/docgen/validate.py | 101 +++++++++++++++++++++++++++++++ tests/test_pipeline.py | 131 +++++++++++++++++++++++++++++++++++++++++ tests/test_validate.py | 51 ++++++++++++++++ 7 files changed, 333 insertions(+), 4 deletions(-) create mode 100644 tests/test_pipeline.py diff --git a/README.md b/README.md index 0935536..a77c7a3 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,7 @@ docgen validate --pre-push # validate all outputs before committing | `docgen validate [--max-drift 2.75] [--pre-push]` | Run all validation checks | | `docgen concat [--config full-demo]` | Concatenate full demo files | | `docgen pages [--force]` | Generate index.html, pages.yml, .gitattributes, .gitignore | -| `docgen generate-all [--skip-tts] [--skip-manim] [--skip-vhs]` | Run full pipeline | +| `docgen generate-all [--skip-tts] [--skip-manim] [--skip-vhs] [--retry-manim]` | Run full pipeline (optionally auto-retry Manim after FREEZE GUARD) | | `docgen rebuild-after-audio` | Recompose + validate + concat | ## Configuration @@ -100,6 +100,12 @@ docgen sync-vhs docgen vhs docgen compose ``` + +If `compose` fails with `FREEZE GUARD` after fresh timestamps, retry Manim once automatically: + +```bash +docgen generate-all --retry-manim +``` ## System dependencies - **ffmpeg** — composition and probing diff --git a/src/docgen/cli.py b/src/docgen/cli.py index c889e7c..9519928 100644 --- a/src/docgen/cli.py +++ b/src/docgen/cli.py @@ -238,6 +238,11 @@ def pages(ctx: click.Context, force: bool) -> None: @click.option("--skip-manim", is_flag=True) @click.option("--skip-vhs", is_flag=True) @click.option("--skip-tape-sync", is_flag=True, help="Skip optional sync-vhs stage after timestamps.") +@click.option( + "--retry-manim", + is_flag=True, + help="If compose hits FREEZE GUARD, clear Manim cache and retry Manim + compose once.", +) @click.pass_context def generate_all( ctx: click.Context, @@ -245,6 +250,7 @@ def generate_all( skip_manim: bool, skip_vhs: bool, skip_tape_sync: bool, + retry_manim: bool, ) -> None: """Run full pipeline: TTS -> Manim -> VHS -> compose -> validate -> concat -> pages.""" from docgen.pipeline import Pipeline @@ -256,6 +262,7 @@ def generate_all( skip_manim=skip_manim, skip_vhs=skip_vhs, skip_tape_sync=skip_tape_sync, + retry_manim_on_freeze=retry_manim, ) diff --git a/src/docgen/compose.py b/src/docgen/compose.py index 80e41fb..ba553dc 100644 --- a/src/docgen/compose.py +++ b/src/docgen/compose.py @@ -85,7 +85,9 @@ def _compose_simple(self, seg_id: str, video_path: Path, *, strict: bool = True) msg = ( f" FREEZE GUARD: {seg_id} visual is {video_dur:.1f}s but audio " f"is {audio_dur:.1f}s → {freeze:.0%} frozen " - f"(max {max_ratio:.0%}). Re-render the visual source to be longer." + f"(max {max_ratio:.0%}). Re-render the visual source to be longer. " + "If this segment uses timing-driven Manim waits, run `docgen manim` again " + "after `docgen timestamps`, or use `docgen generate-all --retry-manim`." ) if strict: raise ComposeError(msg) diff --git a/src/docgen/pipeline.py b/src/docgen/pipeline.py index ecfa8c8..8dd26b0 100644 --- a/src/docgen/pipeline.py +++ b/src/docgen/pipeline.py @@ -2,6 +2,7 @@ from __future__ import annotations +import shutil from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -18,6 +19,7 @@ def run( skip_manim: bool = False, skip_vhs: bool = False, skip_tape_sync: bool = False, + retry_manim_on_freeze: bool = False, ) -> None: if not skip_tts: print("\n=== Stage: TTS ===") @@ -47,8 +49,21 @@ def run( print(f" WARNING: {r.tape} had errors: {r.errors}") print("\n=== Stage: Compose ===") - from docgen.compose import Composer - Composer(self.config).compose_segments(self.config.segments_all) + from docgen.compose import ComposeError, Composer + composer = Composer(self.config) + try: + composer.compose_segments(self.config.segments_all) + except ComposeError as exc: + if self._should_retry_manim(exc, skip_manim, retry_manim_on_freeze): + print("\n=== Compose FREEZE GUARD detected; retrying Manim + compose once ===") + self._clear_manim_media_cache() + print("\n=== Stage: Manim (retry) ===") + from docgen.manim_runner import ManimRunner + ManimRunner(self.config).render() + print("\n=== Stage: Compose (retry) ===") + composer.compose_segments(self.config.segments_all) + else: + raise print("\n=== Stage: Validate ===") from docgen.validate import Validator @@ -65,3 +80,19 @@ def run( PagesGenerator(self.config).generate_all(force=True) print("\n=== Pipeline complete ===") + + @staticmethod + def _should_retry_manim( + exc: Exception, skip_manim: bool, retry_manim_on_freeze: bool + ) -> bool: + if skip_manim or not retry_manim_on_freeze: + return False + return "FREEZE GUARD" in str(exc).upper() + + def _clear_manim_media_cache(self) -> None: + media_dir = self.config.animations_dir / "media" + if not media_dir.exists(): + print("[pipeline] Manim cache already empty") + return + shutil.rmtree(media_dir) + print(f"[pipeline] Cleared Manim cache: {media_dir}") diff --git a/src/docgen/validate.py b/src/docgen/validate.py index 21a8c9a..8521346 100644 --- a/src/docgen/validate.py +++ b/src/docgen/validate.py @@ -7,6 +7,7 @@ from __future__ import annotations +import ast import json import subprocess from dataclasses import dataclass, field @@ -88,9 +89,88 @@ def _is_lfs_pointer(path: Path) -> bool: return False +def _is_text_call(node: ast.Call) -> bool: + func = node.func + if isinstance(func, ast.Name): + return func.id == "Text" + if isinstance(func, ast.Attribute): + return func.attr == "Text" + return False + + +def _looks_numeric(node: ast.AST) -> bool: + if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)): + return True + if isinstance(node, ast.UnaryOp) and isinstance(node.op, (ast.UAdd, ast.USub)): + return _looks_numeric(node.operand) + return False + + +def _looks_like_color_positional(node: ast.AST) -> bool: + if _looks_numeric(node): + return False + if isinstance(node, ast.Constant) and isinstance(node.value, str): + value = node.value.strip() + if value.startswith("#"): + return True + # Positional named colors are almost always accidental in Text(). + return bool(value) and value.replace("_", "").isalpha() + if isinstance(node, ast.Name): + ident = node.id.upper() + return node.id.isupper() or ident.startswith("C_") or "COLOR" in ident + if isinstance(node, ast.Attribute): + ident = node.attr.upper() + return node.attr.isupper() or ident.startswith("C_") or "COLOR" in ident + return False + + +def _is_bold_weight(node: ast.AST) -> bool: + if isinstance(node, ast.Name): + return node.id == "BOLD" + if isinstance(node, ast.Attribute): + return node.attr == "BOLD" + if isinstance(node, ast.Constant) and isinstance(node.value, str): + return node.value.strip().lower() == "bold" + return False + + +def _lint_manim_text_usage(path: Path) -> list[str]: + try: + source = path.read_text(encoding="utf-8") + except OSError as exc: + return [f"{path}: could not read scene source ({exc})"] + + try: + tree = ast.parse(source, filename=str(path)) + except SyntaxError as exc: + line = exc.lineno if exc.lineno is not None else "?" + return [f"{path}:{line} could not parse scenes.py ({exc.msg})"] + + issues: list[str] = [] + for node in ast.walk(tree): + if not isinstance(node, ast.Call) or not _is_text_call(node): + continue + + if len(node.args) >= 2 and _looks_like_color_positional(node.args[1]): + issues.append( + f"{path}:{node.lineno} Text() second positional argument looks like a color; " + "use keyword form `Text(..., color=...)`." + ) + + for kw in node.keywords: + if kw.arg == "weight" and kw.value is not None and _is_bold_weight(kw.value): + issues.append( + f"{path}:{node.lineno} Text(..., weight=BOLD) can substitute a different font; " + "prefer emphasis with color/size." + ) + + return issues + + class Validator: def __init__(self, config: Config) -> None: self.config = config + self._manim_lint_cache: CheckResult | None = None def run_all(self, max_drift_override: float | None = None) -> list[ValidationReport]: reports: list[ValidationReport] = [] @@ -121,6 +201,8 @@ def validate_segment( report.checks.append(CheckResult("recording_exists", False, [f"No recording for {seg_id}"])) report.checks.append(self._check_narration_lint(seg_id)) + if self.config.visual_map.get(seg_id, {}).get("type") == "manim": + report.checks.append(self._check_manim_scene_lint()) return report.to_dict() @@ -299,6 +381,25 @@ def _check_narration_lint(self, seg_id: str) -> CheckResult: result.issues[:10] if result.issues else [], ) + def _check_manim_scene_lint(self) -> CheckResult: + if self._manim_lint_cache is not None: + return self._manim_lint_cache + + scenes = self.config.animations_dir / "scenes.py" + if not scenes.exists(): + result = CheckResult("manim_scene_lint", True, ["No animations/scenes.py (skipped)"]) + self._manim_lint_cache = result + return result + + issues = _lint_manim_text_usage(scenes) + result = CheckResult( + "manim_scene_lint", + not issues, + issues[:15] if issues else ["No risky Text() usage detected"], + ) + self._manim_lint_cache = result + return result + # ── ffprobe-based checks ────────────────────────────────────────── def _check_streams(self, path: Path) -> CheckResult: diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py new file mode 100644 index 0000000..5b4e4f2 --- /dev/null +++ b/tests/test_pipeline.py @@ -0,0 +1,131 @@ +"""Tests for pipeline retry behavior around compose FREEZE GUARD.""" + +from __future__ import annotations + +from types import SimpleNamespace + +import pytest + +from docgen.compose import ComposeError +from docgen.pipeline import Pipeline + + +def _patch_pipeline_stages(monkeypatch, composer_cls, calls: list[str]) -> None: + class FakeTimestampExtractor: + def __init__(self, _config) -> None: + pass + + def extract_all(self) -> None: + calls.append("timestamps") + + class FakeManimRunner: + def __init__(self, _config) -> None: + pass + + def render(self, scene=None) -> None: + calls.append("manim") + + class FakeValidator: + def __init__(self, _config) -> None: + pass + + def run_all(self): + calls.append("validate") + return [] + + def print_report(self, _reports) -> None: + calls.append("print-report") + + class FakeConcatBuilder: + def __init__(self, _config) -> None: + pass + + def build(self) -> None: + calls.append("concat") + + class FakePagesGenerator: + def __init__(self, _config) -> None: + pass + + def generate_all(self, force=False) -> None: + calls.append(f"pages:{force}") + + import docgen.concat as concat_module + import docgen.manim_runner as manim_module + import docgen.pages as pages_module + import docgen.timestamps as timestamps_module + import docgen.validate as validate_module + import docgen.compose as compose_module + + monkeypatch.setattr(timestamps_module, "TimestampExtractor", FakeTimestampExtractor) + monkeypatch.setattr(manim_module, "ManimRunner", FakeManimRunner) + monkeypatch.setattr(validate_module, "Validator", FakeValidator) + monkeypatch.setattr(concat_module, "ConcatBuilder", FakeConcatBuilder) + monkeypatch.setattr(pages_module, "PagesGenerator", FakePagesGenerator) + monkeypatch.setattr(compose_module, "Composer", composer_cls) + + +def test_retry_manim_after_freeze_guard(tmp_path, monkeypatch) -> None: + calls: list[str] = [] + + class FlakyComposer: + attempts = 0 + + def __init__(self, _config) -> None: + pass + + def compose_segments(self, _segments) -> int: + FlakyComposer.attempts += 1 + calls.append(f"compose:{FlakyComposer.attempts}") + if FlakyComposer.attempts == 1: + raise ComposeError("FREEZE GUARD: short visual") + return 1 + + _patch_pipeline_stages(monkeypatch, FlakyComposer, calls) + + animations_dir = tmp_path / "animations" + media_dir = animations_dir / "media" + media_dir.mkdir(parents=True) + (media_dir / "cache.bin").write_text("cache", encoding="utf-8") + + cfg = SimpleNamespace( + animations_dir=animations_dir, + segments_all=["01"], + sync_vhs_after_timestamps=False, + ) + + Pipeline(cfg).run(skip_tts=True, skip_vhs=True, retry_manim_on_freeze=True) + + assert FlakyComposer.attempts == 2 + assert calls.count("manim") == 2, "Manim should run once initially and once on retry" + assert not media_dir.exists(), "Retry path should clear Manim cache directory" + + +def test_no_retry_when_flag_disabled(tmp_path, monkeypatch) -> None: + calls: list[str] = [] + + class AlwaysFailComposer: + def __init__(self, _config) -> None: + pass + + def compose_segments(self, _segments) -> int: + calls.append("compose") + raise ComposeError("FREEZE GUARD: short visual") + + _patch_pipeline_stages(monkeypatch, AlwaysFailComposer, calls) + + animations_dir = tmp_path / "animations" + media_dir = animations_dir / "media" + media_dir.mkdir(parents=True) + + cfg = SimpleNamespace( + animations_dir=animations_dir, + segments_all=["01"], + sync_vhs_after_timestamps=False, + ) + + with pytest.raises(ComposeError, match="FREEZE GUARD"): + Pipeline(cfg).run(skip_tts=True, skip_vhs=True, retry_manim_on_freeze=False) + + assert calls.count("manim") == 1 + assert media_dir.exists(), "Without retry flag, Manim cache should be untouched" diff --git a/tests/test_validate.py b/tests/test_validate.py index 3174776..38acf58 100644 --- a/tests/test_validate.py +++ b/tests/test_validate.py @@ -354,6 +354,57 @@ def test_static_video_does_not_fail_pre_push(self, config, cfg_dir): v.run_pre_push() # should NOT raise +# ── Manim scene lint ─────────────────────────────────────────────────── + +class TestManimSceneLint: + def _configure_manim(self, cfg_dir: Path, scenes_source: str) -> Config: + cfg_raw = yaml.safe_load((cfg_dir / "docgen.yaml").read_text(encoding="utf-8")) + cfg_raw["visual_map"]["01"] = {"type": "manim", "source": "Scene01.mp4"} + cfg_raw.setdefault("manim", {})["scenes"] = ["Scene01"] + (cfg_dir / "docgen.yaml").write_text(yaml.dump(cfg_raw), encoding="utf-8") + (cfg_dir / "animations" / "scenes.py").write_text(scenes_source, encoding="utf-8") + return Config.from_yaml(cfg_dir / "docgen.yaml") + + def test_flags_positional_text_color_and_bold_weight(self, cfg_dir): + config = self._configure_manim( + cfg_dir, + """ +from manim import * +C_BLUE = "#2979ff" + +class Demo(Scene): + def construct(self): + Text("Some label", C_BLUE, font_size=14) + Text("Heading", font_size=36, weight=BOLD) +""".strip(), + ) + v = Validator(config) + report = v.validate_segment("01") + check = next(c for c in report["checks"] if c["name"] == "manim_scene_lint") + assert not check["passed"] + details = " ".join(check["details"]) + assert "second positional argument" in details + assert "weight=BOLD" in details + + def test_clean_text_usage_passes(self, cfg_dir): + config = self._configure_manim( + cfg_dir, + """ +from manim import * +C_BLUE = "#2979ff" + +class Demo(Scene): + def construct(self): + Text("Some label", font_size=14, color=C_BLUE) + Text("Heading", font_size=36, color=WHITE) +""".strip(), + ) + v = Validator(config) + report = v.validate_segment("01") + check = next(c for c in report["checks"] if c["name"] == "manim_scene_lint") + assert check["passed"], check["details"] + + # ── Helper to create silent audio ───────────────────────────────────── def _make_silent_audio(path: Path, duration_sec: float = 10.0) -> Path: From bca9507854bf6b2a1ca41edf6b7c4853a1ebdf14 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Thu, 16 Apr 2026 03:17:56 +0000 Subject: [PATCH 2/5] Add VHS tape lint and configurable render timeouts Co-authored-by: John Menke --- README.md | 24 ++++++++ src/docgen/cli.py | 44 ++++++++++++++- src/docgen/config.py | 5 ++ src/docgen/init.py | 37 +++++++++++++ src/docgen/vhs.py | 128 ++++++++++++++++++++++++++++++++++++++++--- tests/test_config.py | 9 ++- tests/test_init.py | 2 + tests/test_vhs.py | 43 ++++++++++++++- 8 files changed, 281 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index a77c7a3..94a356e 100644 --- a/README.md +++ b/README.md @@ -55,6 +55,7 @@ docgen validate --pre-push # validate all outputs before committing | `docgen tts [--segment 01] [--dry-run]` | Generate TTS audio | | `docgen manim [--scene StackDAGScene]` | Render Manim animations | | `docgen vhs [--tape 02-quickstart.tape] [--strict]` | Render VHS terminal recordings | +| `docgen tape-lint [--tape 02-quickstart.tape]` | Lint tapes for commands likely to hang in VHS | | `docgen sync-vhs [--segment 01] [--dry-run]` | Rewrite VHS `Sleep` values from `animations/timing.json` | | `docgen compose [01 02 03] [--ffmpeg-timeout 900]` | Compose segments (audio + video) | | `docgen validate [--max-drift 2.75] [--pre-push]` | Run all validation checks | @@ -80,6 +81,7 @@ vhs: typing_ms_per_char: 55 # typing estimate used by sync-vhs max_typing_sec: 3.0 # per block cap for typing estimate min_sleep_sec: 0.05 # floor for rewritten Sleep values + render_timeout_sec: 120 # per-tape timeout for `docgen vhs` pipeline: sync_vhs_after_timestamps: false # opt-in: run sync-vhs automatically in generate-all/rebuild-after-audio @@ -91,6 +93,28 @@ compose: If you edit a `.tape` file, run `docgen vhs` before `docgen compose` so compose does not use stale rendered terminal video. +### VHS safety: avoid real long-running commands in tapes + +VHS executes commands in a real shell session. For demos, prefer simulated output with `echo` +instead of invoking real services or model inference in the tape itself. + +Example: + +```tape +Type "echo '$ python -m myapp run --image sample.png'" +Enter +Sleep 1s +Type "echo '[myapp] Loading model... done (2.1s)'" +Enter +``` + +Helpful checks: + +```bash +docgen tape-lint # flag risky commands in all tapes +docgen vhs --strict # fail if VHS output includes shell/runtime errors +``` + To auto-align tape pacing with generated narration: ```bash diff --git a/src/docgen/cli.py b/src/docgen/cli.py index 9519928..9eb4794 100644 --- a/src/docgen/cli.py +++ b/src/docgen/cli.py @@ -107,14 +107,26 @@ def manim(ctx: click.Context, scene: str | None) -> None: @main.command() @click.option("--tape", default=None, help="Render a single VHS tape.") @click.option("--strict", is_flag=True, help="Fail on any unexpected stderr output.") +@click.option( + "--timeout", + "render_timeout_sec", + default=None, + type=int, + help="Override VHS per-tape timeout seconds (default from docgen.yaml vhs.render_timeout_sec).", +) @click.pass_context -def vhs(ctx: click.Context, tape: str | None, strict: bool) -> None: +def vhs( + ctx: click.Context, + tape: str | None, + strict: bool, + render_timeout_sec: int | None, +) -> None: """Render VHS terminal recordings.""" from docgen.vhs import VHSRunner cfg = ctx.obj["config"] runner = VHSRunner(cfg) - results = runner.render(tape=tape, strict=strict) + results = runner.render(tape=tape, strict=strict, timeout_sec=render_timeout_sec) for r in results: status = "ok" if r.success else "FAIL" click.echo(f" [{status}] {r.tape}") @@ -122,6 +134,34 @@ def vhs(ctx: click.Context, tape: str | None, strict: bool) -> None: click.echo(f" {e}") +@main.command("tape-lint") +@click.option("--tape", default=None, help="Lint a single tape name or pattern.") +@click.pass_context +def tape_lint(ctx: click.Context, tape: str | None) -> None: + """Lint VHS tapes for potentially real/hanging commands.""" + from docgen.vhs import VHSRunner + + cfg = ctx.obj["config"] + runner = VHSRunner(cfg) + reports = runner.lint_tapes(tape=tape) + if not reports: + click.echo("No tape files found.") + return + + total_issues = 0 + for report in reports: + if report.issues: + click.echo(f"[WARN] {report.tape}") + for issue in report.issues: + click.echo(f" - {issue}") + total_issues += 1 + else: + click.echo(f"[ok] {report.tape}") + + if total_issues: + raise SystemExit(1) + + @main.command("sync-vhs") @click.option("--segment", default=None, help="Sync tape(s) for one segment ID/name.") @click.option("--dry-run", is_flag=True, help="Preview updates without writing files.") diff --git a/src/docgen/config.py b/src/docgen/config.py index 243fcf1..e36c614 100644 --- a/src/docgen/config.py +++ b/src/docgen/config.py @@ -102,6 +102,7 @@ def vhs_config(self) -> dict[str, Any]: "typing_ms_per_char": 35, "max_typing_sec": 3.0, "min_sleep_sec": 0.2, + "render_timeout_sec": 120, } defaults.update(self.raw.get("vhs", {})) return defaults @@ -128,6 +129,10 @@ def max_typing_sec(self) -> float: def min_sleep_sec(self) -> float: return float(self.vhs_config.get("min_sleep_sec", 0.2)) + @property + def vhs_render_timeout_sec(self) -> int: + return int(self.vhs_config.get("render_timeout_sec", 120)) + @property def sync_vhs_after_timestamps(self) -> bool: pipeline_cfg = self.raw.get("pipeline", {}) diff --git a/src/docgen/init.py b/src/docgen/init.py index 7a518e1..65f0fb6 100644 --- a/src/docgen/init.py +++ b/src/docgen/init.py @@ -206,6 +206,11 @@ def generate_files(plan: InitPlan) -> list[str]: if not narr_readme.exists(): created.append(_write_narration_readme(plan)) + # terminal/README.md with safe tape authoring guidance + terminal_readme = plan.demo_dir / "terminal" / "README.md" + if not terminal_readme.exists(): + created.append(_write_terminal_readme(plan)) + # Starter narration files (only for segments without existing files) for seg in plan.segments: narr_file = plan.demo_dir / "narration" / f"{seg['name']}.md" @@ -261,6 +266,7 @@ def _write_config(plan: InitPlan) -> str: "typing_ms_per_char": 55, "max_typing_sec": 3.0, "min_sleep_sec": 0.2, + "render_timeout_sec": 120, }, "compose": { "ffmpeg_timeout_sec": 300, @@ -424,6 +430,37 @@ def _write_narration_readme(plan: InitPlan) -> str: return str(path) +def _write_terminal_readme(plan: InitPlan) -> str: + content = textwrap.dedent("""\ + # Terminal tape authoring (VHS) + + `.tape` files run in a real shell. Avoid real long-running commands in demos. + + ## Safe pattern: simulate output with `echo` + + Prefer: + + ```tape + Type "echo '$ python app.py --serve'" + Enter + Type "echo 'Starting server on :8080'" + Enter + ``` + + Avoid in tapes unless you really want to execute them: + - `python ...` + - `curl localhost ...` + - `npm start`, `docker ...`, `kubectl ...` + + Useful checks: + - `docgen tape-lint` (warn on risky command patterns) + - `docgen vhs --strict` (fails on common shell error output) + """) + path = plan.demo_dir / "terminal" / "README.md" + path.write_text(content, encoding="utf-8") + return str(path) + + def _install_pre_push_hook(plan: InitPlan) -> str | None: git_root = detect_git_root(plan.demo_dir) if not git_root: diff --git a/src/docgen/vhs.py b/src/docgen/vhs.py index 40618e9..d51943b 100644 --- a/src/docgen/vhs.py +++ b/src/docgen/vhs.py @@ -1,4 +1,4 @@ -"""VHS terminal recorder wrapper with error scanning.""" +"""VHS terminal recorder wrapper with error scanning and tape linting.""" from __future__ import annotations @@ -6,6 +6,7 @@ import re import subprocess import tempfile +import time from dataclasses import dataclass, field from pathlib import Path from typing import TYPE_CHECKING @@ -29,6 +30,39 @@ export PS1='$ ' """ +RISKY_TAPE_COMMANDS = ( + re.compile(r"^\s*Type\s+\".*\bpython(?:3)?\b.*\"", re.IGNORECASE), + re.compile(r"^\s*Type\s+\".*\bcurl\b.*\"", re.IGNORECASE), + re.compile(r"^\s*Type\s+\".*\bwget\b.*\"", re.IGNORECASE), + re.compile(r"^\s*Type\s+\".*\bnpm\s+(start|run)\b.*\"", re.IGNORECASE), + re.compile(r"^\s*Type\s+\".*\byarn\s+(start|dev)\b.*\"", re.IGNORECASE), + re.compile(r"^\s*Type\s+\".*\bnode\b.*\"", re.IGNORECASE), + re.compile(r"^\s*Type\s+\".*\bdocker\b.*\"", re.IGNORECASE), + re.compile(r"^\s*Type\s+\".*\bdocker-compose\b.*\"", re.IGNORECASE), + re.compile(r"^\s*Type\s+\".*\bkubectl\b.*\"", re.IGNORECASE), +) + + +@dataclass +class TapeLintIssue: + tape: str + line: int + text: str + message: str + + def __str__(self) -> str: + return f"L{self.line}: {self.message} :: {self.text}" + + +@dataclass +class TapeLintResult: + tape: str + issues: list[TapeLintIssue] = field(default_factory=list) + + @property + def passed(self) -> bool: + return not self.issues + @dataclass class VHSResult: @@ -39,10 +73,16 @@ class VHSResult: class VHSRunner: - def __init__(self, config: Config) -> None: + def __init__(self, config: Config, render_timeout_sec: int | None = None) -> None: self.config = config + self.render_timeout_sec = render_timeout_sec - def render(self, tape: str | None = None, strict: bool = False) -> list[VHSResult]: + def render( + self, + tape: str | None = None, + strict: bool = False, + timeout_sec: int | None = None, + ) -> list[VHSResult]: terminal_dir = self.config.terminal_dir if not terminal_dir.exists(): print("[vhs] No terminal directory found") @@ -56,10 +96,32 @@ def render(self, tape: str | None = None, strict: bool = False) -> list[VHSResul tapes = sorted(terminal_dir.glob("*.tape")) results: list[VHSResult] = [] + effective_timeout = timeout_sec + if effective_timeout is None: + effective_timeout = ( + self.render_timeout_sec + if self.render_timeout_sec is not None + else self.config.vhs_render_timeout_sec + ) for t in tapes: - results.append(self._render_one(t, strict)) + results.append(self._render_one(t, strict, effective_timeout)) return results + def lint_tapes(self, tape: str | None = None) -> list[TapeLintResult]: + terminal_dir = self.config.terminal_dir + if not terminal_dir.exists(): + print("[vhs] No terminal directory found") + return [] + + if tape: + tapes = [terminal_dir / tape] if (terminal_dir / tape).exists() else list( + terminal_dir.glob(f"*{tape}*") + ) + else: + tapes = sorted(terminal_dir.glob("*.tape")) + + return [self._lint_one(path) for path in tapes] + @staticmethod def _clean_env() -> dict[str, str]: """Build a minimal environment that produces a clean VHS recording. @@ -104,7 +166,7 @@ def _clean_env() -> dict[str, str]: } return env - def _render_one(self, tape_path: Path, strict: bool) -> VHSResult: + def _render_one(self, tape_path: Path, strict: bool, timeout_sec: int) -> VHSResult: print(f"[vhs] Rendering {tape_path.name}") env = self._clean_env() vhs_bin = self._resolve_vhs_binary() @@ -116,19 +178,27 @@ def _render_one(self, tape_path: Path, strict: bool) -> VHSResult: "vhs not found. Install VHS or set vhs.vhs_path in docgen.yaml.", ], ) + start = time.monotonic() try: proc = subprocess.run( [vhs_bin, str(tape_path)], capture_output=True, text=True, - timeout=300, + timeout=max(1, int(timeout_sec)), cwd=str(tape_path.parent), env=env, ) except FileNotFoundError: return VHSResult(tape=tape_path.name, success=False, errors=["vhs not found in PATH"]) except subprocess.TimeoutExpired: - return VHSResult(tape=tape_path.name, success=False, errors=["VHS render timed out"]) + return VHSResult( + tape=tape_path.name, + success=False, + errors=[ + f"VHS render timed out after {timeout_sec}s.", + "Tip: use `docgen tape-lint` and prefer `Type \"echo ...\"` simulated output.", + ], + ) finally: fake_home = env.get("HOME", "") if fake_home and "vhs_home_" in fake_home: @@ -145,6 +215,8 @@ def _render_one(self, tape_path: Path, strict: bool) -> VHSResult: success = proc.returncode == 0 and not errors if not success and proc.returncode != 0: errors.append(f"Exit code {proc.returncode}") + elapsed = time.monotonic() - start + print(f"[vhs] Finished {tape_path.name} in {elapsed:.1f}s") return VHSResult( tape=tape_path.name, @@ -153,6 +225,48 @@ def _render_one(self, tape_path: Path, strict: bool) -> VHSResult: output_path=str(self.config.terminal_dir / "rendered"), ) + def _lint_one(self, tape_path: Path) -> TapeLintResult: + result = TapeLintResult(tape=tape_path.name) + result.issues.extend(self.scan_tape_for_risky_commands(tape_path)) + return result + + @staticmethod + def scan_tape_for_risky_commands(tape_path: Path) -> list[TapeLintIssue]: + """Return lint issues for risky Type commands in a tape file.""" + try: + lines = tape_path.read_text(encoding="utf-8").splitlines() + except OSError as exc: + return [ + TapeLintIssue( + tape=tape_path.name, + line=0, + text="", + message=f"Could not read tape: {exc}", + ) + ] + + issues: list[TapeLintIssue] = [] + for idx, line in enumerate(lines, start=1): + stripped = line.strip() + lowered = stripped.lower() + if lowered.startswith('type "echo ') or lowered.startswith("type 'echo "): + continue + for pattern in RISKY_TAPE_COMMANDS: + if pattern.search(stripped): + issues.append( + TapeLintIssue( + tape=tape_path.name, + line=idx, + text=stripped[:160], + message=( + "Potentially real/external command in tape. " + "Prefer simulated output with `Type \"echo ...\"`." + ), + ) + ) + break + return issues + @staticmethod def _scan_output(text: str) -> list[str]: found: list[str] = [] diff --git a/tests/test_config.py b/tests/test_config.py index b616d94..bc274fc 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -60,6 +60,7 @@ def test_defaults(): assert c.warn_stale_vhs is True assert c.manim_path is None assert c.vhs_path is None + assert c.vhs_render_timeout_sec == 120 finally: cfg_path.unlink() @@ -78,7 +79,12 @@ def test_resolved_dirs(tmp_config): def test_binary_paths_and_compose_config(tmp_path): cfg = { "manim": {"manim_path": "/opt/bin/manim"}, - "vhs": {"vhs_path": "/opt/bin/vhs", "sync_from_timing": True, "typing_ms_per_char": 40}, + "vhs": { + "vhs_path": "/opt/bin/vhs", + "sync_from_timing": True, + "typing_ms_per_char": 40, + "render_timeout_sec": 240, + }, "compose": {"ffmpeg_timeout_sec": 900, "warn_stale_vhs": False}, "pipeline": {"sync_vhs_after_timestamps": True}, } @@ -92,3 +98,4 @@ def test_binary_paths_and_compose_config(tmp_path): assert c.sync_from_timing is True assert c.sync_vhs_after_timestamps is True assert c.typing_ms_per_char == 40 + assert c.vhs_render_timeout_sec == 240 diff --git a/tests/test_init.py b/tests/test_init.py index 0b542af..65c8959 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -87,6 +87,7 @@ def test_generate_files_minimal(tmp_path: Path) -> None: assert (tmp_path / "demos" / "rebuild-after-audio.sh").exists() assert (tmp_path / "demos" / "validate.sh").exists() assert (tmp_path / "demos" / "narration" / "README.md").exists() + assert (tmp_path / "demos" / "terminal" / "README.md").exists() assert (tmp_path / "demos" / "narration" / "01-intro.md").exists() assert (tmp_path / "demos" / "narration" / "02-setup.md").exists() @@ -94,6 +95,7 @@ def test_generate_files_minimal(tmp_path: Path) -> None: cfg = yaml.safe_load(cfg_text.split("\n\n", 1)[-1]) assert cfg["segments"]["all"] == ["01", "02"] assert cfg["segment_names"]["01"] == "01-intro" + assert cfg["vhs"]["render_timeout_sec"] == 120 assert "test-project" in cfg["tts"]["instructions"] assert len(created) >= 7 diff --git a/tests/test_vhs.py b/tests/test_vhs.py index 0da6604..523a382 100644 --- a/tests/test_vhs.py +++ b/tests/test_vhs.py @@ -1,4 +1,8 @@ -"""Tests for docgen.vhs error pattern scanning.""" +"""Tests for docgen.vhs error pattern scanning and tape linting.""" + +from __future__ import annotations + +from pathlib import Path from docgen.vhs import VHSRunner @@ -28,3 +32,40 @@ def test_scan_output_multiple(): text = "line1\nbash: foo: command not found\nline3\nerror: something broke\n" errors = VHSRunner._scan_output(text) assert len(errors) == 2 + + +def test_scan_tape_for_risky_commands_detects_python_and_curl(tmp_path: Path) -> None: + tape = tmp_path / "demo.tape" + tape.write_text( + '\n'.join( + [ + 'Set Shell "bash --norc --noprofile"', + 'Type "python app.py"', + "Enter", + 'Type "curl http://localhost:8080/health"', + "Enter", + ] + ), + encoding="utf-8", + ) + issues = VHSRunner.scan_tape_for_risky_commands(tape) + assert len(issues) == 2 + assert "python" in issues[0].text.lower() + assert "curl" in issues[1].text.lower() + + +def test_scan_tape_for_risky_commands_ignores_echo_simulation(tmp_path: Path) -> None: + tape = tmp_path / "demo.tape" + tape.write_text( + '\n'.join( + [ + 'Type "echo \'$ python app.py\'"', + "Enter", + 'Type "echo \'[ok] done\'"', + "Enter", + ] + ), + encoding="utf-8", + ) + issues = VHSRunner.scan_tape_for_risky_commands(tape) + assert issues == [] From 8f86de4a36a37c3ee8c5ad7f7e34c2345de052d9 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Thu, 16 Apr 2026 03:26:00 +0000 Subject: [PATCH 3/5] Add Playwright visual source runner and compose support Co-authored-by: John Menke --- README.md | 41 ++++++++- src/docgen/cli.py | 36 ++++++++ src/docgen/compose.py | 15 ++++ src/docgen/config.py | 34 +++++++ src/docgen/playwright_runner.py | 152 ++++++++++++++++++++++++++++++++ src/docgen/wizard.py | 19 ++++ tests/test_compose.py | 103 ++++++++++++++++++++++ tests/test_config.py | 14 +++ tests/test_playwright_runner.py | 95 ++++++++++++++++++++ 9 files changed, 508 insertions(+), 1 deletion(-) create mode 100644 src/docgen/playwright_runner.py create mode 100644 tests/test_playwright_runner.py diff --git a/README.md b/README.md index 94a356e..0e1bc04 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,8 @@ docgen validate --pre-push # validate all outputs before committing | `docgen wizard [--port 8501]` | Launch narration setup wizard (local web GUI) | | `docgen tts [--segment 01] [--dry-run]` | Generate TTS audio | | `docgen manim [--scene StackDAGScene]` | Render Manim animations | -| `docgen vhs [--tape 02-quickstart.tape] [--strict]` | Render VHS terminal recordings | +| `docgen vhs [--tape 02-quickstart.tape] [--strict] [--timeout 120]` | Render VHS terminal recordings | +| `docgen playwright --script scripts/capture.py --url http://localhost:3000 --output terminal/rendered/demo.mp4` | Capture browser demo video with Playwright script | | `docgen tape-lint [--tape 02-quickstart.tape]` | Lint tapes for commands likely to hang in VHS | | `docgen sync-vhs [--segment 01] [--dry-run]` | Rewrite VHS `Sleep` values from `animations/timing.json` | | `docgen compose [01 02 03] [--ffmpeg-timeout 900]` | Compose segments (audio + video) | @@ -83,6 +84,13 @@ vhs: min_sleep_sec: 0.05 # floor for rewritten Sleep values render_timeout_sec: 120 # per-tape timeout for `docgen vhs` +playwright: + python_path: "" # optional python executable for capture scripts + default_url: "" # fallback URL when visual_map entry omits url + default_viewport: # fallback viewport when visual_map entry omits viewport + width: 1920 + height: 1080 + pipeline: sync_vhs_after_timestamps: false # opt-in: run sync-vhs automatically in generate-all/rebuild-after-audio @@ -93,6 +101,37 @@ compose: If you edit a `.tape` file, run `docgen vhs` before `docgen compose` so compose does not use stale rendered terminal video. +### Playwright visual source (`type: playwright`) + +`visual_map` entries can now use a Playwright capture script: + +```yaml +visual_map: + "04": + type: playwright + source: 04-browser-flow.mp4 + script: scripts/demo_capture.py + url: http://localhost:3300 + viewport: + width: 1920 + height: 1080 +``` + +During `docgen compose`, docgen runs the capture script first (if `source` does not exist yet), +then muxes the generated MP4 with narration audio. + +Manual capture (useful while iterating on scripts): + +```bash +docgen playwright --script scripts/demo_capture.py --url http://localhost:3300 \ + --output terminal/rendered/04-browser-flow.mp4 +``` + +Script contract: +- receives CLI args: `--output`, optional `--url`, optional `--width`, optional `--height` +- must write an MP4 to the requested output path +- should use headless Playwright for CI compatibility + ### VHS safety: avoid real long-running commands in tapes VHS executes commands in a real shell session. For demos, prefer simulated output with `echo` diff --git a/src/docgen/cli.py b/src/docgen/cli.py index 9eb4794..835e2c2 100644 --- a/src/docgen/cli.py +++ b/src/docgen/cli.py @@ -134,6 +134,42 @@ def vhs( click.echo(f" {e}") +@main.command() +@click.option( + "--script", + "script_path", + default=None, + help="Python script to execute for browser actions (required for standalone mode).", +) +@click.option("--url", default=None, help="Target URL for browser capture.") +@click.option("--source", default="playwright-capture.mp4", help="Output filename under terminal/rendered/.") +@click.option("--width", default=1920, type=int, help="Browser viewport width.") +@click.option("--height", default=1080, type=int, help="Browser viewport height.") +@click.option("--timeout", "timeout_sec", default=120, type=int, help="Capture timeout in seconds.") +@click.pass_context +def playwright( + ctx: click.Context, + script_path: str | None, + url: str | None, + source: str, + width: int, + height: int, + timeout_sec: int, +) -> None: + """Capture a browser demo video using Playwright.""" + from docgen.playwright_runner import PlaywrightRunner + + cfg = ctx.obj["config"] + runner = PlaywrightRunner(cfg) + video = runner.capture( + script=script_path or "", + output=source, + url=url, + viewport={"width": width, "height": height}, + ) + click.echo(f"[playwright] captured: {video}") + + @main.command("tape-lint") @click.option("--tape", default=None, help="Lint a single tape name or pattern.") @click.pass_context diff --git a/src/docgen/compose.py b/src/docgen/compose.py index ba553dc..1c022f7 100644 --- a/src/docgen/compose.py +++ b/src/docgen/compose.py @@ -38,6 +38,15 @@ def compose_segments(self, segment_ids: list[str], *, strict: bool = True) -> in video_path = self._vhs_path(vmap) self._warn_if_stale_vhs(vmap, video_path) ok = self._compose_simple(seg_id, video_path, strict=strict) + elif vtype == "playwright": + from docgen.playwright_runner import PlaywrightError, PlaywrightRunner + + try: + video_path = PlaywrightRunner(self.config).capture_segment(seg_id, vmap) + except PlaywrightError as exc: + print(f" SKIP: playwright capture failed ({exc})") + video_path = Path("") + ok = video_path.exists() and self._compose_simple(seg_id, video_path, strict=strict) elif vtype == "mixed": sources = [self._resolve_source(s) for s in vmap.get("sources", [])] ok = self._compose_mixed(seg_id, sources) @@ -245,6 +254,12 @@ def _vhs_path(self, vmap: dict[str, Any]) -> Path: src = vmap.get("source", "") return self.config.terminal_dir / "rendered" / src + def _playwright_path(self, vmap: dict[str, Any]) -> Path: + src = str(vmap.get("source", "")).strip() + if not src: + return self.config.terminal_dir / "rendered" / "playwright.mp4" + return self.config.terminal_dir / "rendered" / src + def _resolve_source(self, source: str) -> Path: for base in self._manim_video_dirs(): manim_path = base / source diff --git a/src/docgen/config.py b/src/docgen/config.py index e36c614..dc94fcd 100644 --- a/src/docgen/config.py +++ b/src/docgen/config.py @@ -139,6 +139,40 @@ def sync_vhs_after_timestamps(self) -> bool: if "sync_vhs_after_timestamps" in pipeline_cfg: return bool(pipeline_cfg.get("sync_vhs_after_timestamps")) return self.sync_from_timing + + # -- Playwright ------------------------------------------------------------ + + @property + def playwright_config(self) -> dict[str, Any]: + defaults: dict[str, Any] = { + "python_path": "", + "timeout_sec": 120, + "default_url": "", + "default_viewport": {"width": 1920, "height": 1080}, + } + defaults.update(self.raw.get("playwright", {})) + return defaults + + @property + def playwright_python_path(self) -> str | None: + value = self.playwright_config.get("python_path") + return str(value) if value else None + + @property + def playwright_timeout_sec(self) -> int: + return int(self.playwright_config.get("timeout_sec", 120)) + + @property + def playwright_default_url(self) -> str | None: + value = str(self.playwright_config.get("default_url", "")).strip() + return value or None + + @property + def playwright_default_viewport(self) -> tuple[int, int]: + raw = self.playwright_config.get("default_viewport", {}) or {} + width = int(raw.get("width", 1920)) + height = int(raw.get("height", 1080)) + return width, height # -- Compose ---------------------------------------------------------------- @property diff --git a/src/docgen/playwright_runner.py b/src/docgen/playwright_runner.py new file mode 100644 index 0000000..7cebedf --- /dev/null +++ b/src/docgen/playwright_runner.py @@ -0,0 +1,152 @@ +"""Playwright visual source runner via external capture scripts.""" + +from __future__ import annotations + +import os +import subprocess +import sys +from pathlib import Path +from typing import Any, TYPE_CHECKING + +if TYPE_CHECKING: + from docgen.config import Config + + +class PlaywrightError(RuntimeError): + """Raised when Playwright capture fails.""" + + +class PlaywrightRunner: + """Runs user-provided browser capture scripts for docgen segments.""" + + def __init__(self, config: Config, timeout_sec: int | None = None) -> None: + self.config = config + self.timeout_sec = ( + int(timeout_sec) + if timeout_sec is not None + else int(self.config.playwright_timeout_sec) + ) + + def capture_segment(self, seg_id: str, vmap: dict[str, Any]) -> Path: + """Capture (or resolve) segment video for `type: playwright` visual map.""" + source = str(vmap.get("source", "")).strip() + if not source: + raise PlaywrightError( + f"visual_map[{seg_id}] type=playwright requires a 'source' output path" + ) + output_path = self._resolve_output_path(source) + + script = str(vmap.get("script", "")).strip() + if not script: + if output_path.exists(): + return output_path + raise PlaywrightError( + f"type=playwright source missing and no script configured: {output_path}" + ) + + script_path = self._resolve_path(script) + if not script_path.exists(): + raise PlaywrightError(f"Playwright script not found: {script_path}") + + url = str(vmap.get("url", "")).strip() or None + viewport = vmap.get("viewport", {}) or {} + width = int(viewport.get("width", 1920)) + height = int(viewport.get("height", 1080)) + args = [str(a) for a in (vmap.get("args", []) or [])] + + return self.capture( + script=script_path, + output=output_path, + url=url, + viewport={"width": width, "height": height}, + args=args, + segment_id=seg_id, + ) + + def capture( + self, + *, + script: Path | str | None, + output: Path | str | None = None, + source: str | None = None, + url: str | None = None, + viewport: dict[str, int] | None = None, + args: list[str] | None = None, + segment_id: str | None = None, + timeout_sec: int | None = None, + ) -> Path: + """Run one external capture script and return the output video path.""" + if script is None and url is None: + raise PlaywrightError("capture requires --script or --url") + if script is None: + raise PlaywrightError("capture requires --script") + + script_path = self._resolve_path(script) + output_value = output if output is not None else source + if output_value is None: + output_value = "playwright-capture.mp4" + output_path = self._resolve_output_path(output_value) + output_path.parent.mkdir(parents=True, exist_ok=True) + + python_bin = self.config.playwright_python_path or sys.executable + env = os.environ.copy() + env["DOCGEN_PLAYWRIGHT_OUTPUT"] = str(output_path) + if url: + env["DOCGEN_PLAYWRIGHT_URL"] = url + if segment_id: + env["DOCGEN_PLAYWRIGHT_SEGMENT"] = segment_id + vp = viewport or {} + width = int(vp.get("width", 1920)) + height = int(vp.get("height", 1080)) + env["DOCGEN_PLAYWRIGHT_WIDTH"] = str(width) + env["DOCGEN_PLAYWRIGHT_HEIGHT"] = str(height) + env["DOCGEN_PLAYWRIGHT_VIEWPORT"] = f"{width}x{height}" + + effective_timeout = max(1, int(timeout_sec if timeout_sec is not None else self.timeout_sec)) + env["DOCGEN_PLAYWRIGHT_TIMEOUT_SEC"] = str(effective_timeout) + + cmd = [python_bin, str(script_path), *(args or [])] + try: + result = subprocess.run( + cmd, + cwd=str(self.config.base_dir), + env=env, + capture_output=True, + text=True, + timeout=effective_timeout, + check=True, + ) + except FileNotFoundError: + raise PlaywrightError(f"python executable not found: {python_bin}") + except subprocess.TimeoutExpired: + raise PlaywrightError( + f"Playwright capture timed out after {effective_timeout}s ({script_path.name})" + ) + except subprocess.CalledProcessError as exc: + detail = (exc.stderr or exc.stdout or "")[:400] + raise PlaywrightError( + f"Playwright script failed ({script_path.name}): {detail}" + ) + + if not output_path.exists(): + detail = (result.stderr or result.stdout or "").strip() + hint = f" ({detail[:200]})" if detail else "" + raise PlaywrightError( + f"Playwright script finished but output is missing: {output_path}{hint}" + ) + return output_path + + def _resolve_path(self, value: Path | str) -> Path: + path = Path(value) + if path.is_absolute(): + return path + return (self.config.base_dir / path).resolve() + + def _resolve_output_path(self, value: Path | str) -> Path: + path = Path(value) + if path.is_absolute(): + return path + # Source values are normally relative to terminal/rendered. + if path.parent == Path("."): + return (self.config.terminal_dir / "rendered" / path).resolve() + return (self.config.base_dir / path).resolve() diff --git a/src/docgen/wizard.py b/src/docgen/wizard.py index 83f6d98..7608a95 100644 --- a/src/docgen/wizard.py +++ b/src/docgen/wizard.py @@ -386,6 +386,25 @@ def api_run_step(step: str, segment_id: str): comp.compose_segments([segment_id]) return jsonify({"ok": True, "step": "compose", "segment": segment_id}) + elif step == "playwright": + from docgen.playwright_runner import PlaywrightRunner + + vmap = cfg.visual_map.get(segment_id, {}) + source = str(vmap.get("source", "")).strip() + if not source: + return jsonify({"error": "visual_map source is required for playwright"}), 400 + + runner = PlaywrightRunner(cfg) + video = runner.capture_segment(segment_id, vmap) + return jsonify( + { + "ok": True, + "step": "playwright", + "segment": segment_id, + "video": str(video.relative_to(cfg.base_dir)), + } + ) + elif step == "validate": from docgen.validate import Validator v = Validator(cfg) diff --git a/tests/test_compose.py b/tests/test_compose.py index 20230b7..5737949 100644 --- a/tests/test_compose.py +++ b/tests/test_compose.py @@ -10,6 +10,7 @@ from docgen.compose import Composer from docgen.config import Config +from docgen.playwright_runner import PlaywrightError def _write_cfg(tmp_path: Path, cfg: dict) -> Config: @@ -80,3 +81,105 @@ def test_stale_vhs_warning_can_be_disabled(tmp_path: Path, capsys) -> None: composer._warn_if_stale_vhs(c.visual_map["01"], video) out = capsys.readouterr().out assert out == "" + + +def test_playwright_source_resolves_to_rendered_path(tmp_path: Path) -> None: + cfg = { + "dirs": { + "terminal": "terminal", + "audio": "audio", + "recordings": "recordings", + "animations": "animations", + }, + "segments": {"default": ["01"], "all": ["01"]}, + "visual_map": {"01": {"type": "playwright", "source": "01-browser.mp4"}}, + } + c = _write_cfg(tmp_path, cfg) + rendered = tmp_path / "terminal" / "rendered" + rendered.mkdir(parents=True, exist_ok=True) + expected = rendered / "01-browser.mp4" + expected.write_text("video", encoding="utf-8") + + composer = Composer(c) + resolved = composer._playwright_path(c.visual_map["01"]) + assert resolved == expected + + +def test_compose_playwright_runs_capture_when_source_missing(tmp_path: Path, monkeypatch) -> None: + cfg = { + "dirs": { + "terminal": "terminal", + "audio": "audio", + "recordings": "recordings", + "animations": "animations", + }, + "segments": {"default": ["01"], "all": ["01"]}, + "segment_names": {"01": "01-demo"}, + "visual_map": { + "01": { + "type": "playwright", + "source": "01-browser.mp4", + "script": "scripts/capture.py", + } + }, + } + c = _write_cfg(tmp_path, cfg) + audio = tmp_path / "audio" / "01-demo.mp3" + audio.parent.mkdir(parents=True, exist_ok=True) + audio.write_bytes(b"mp3") + + rendered = tmp_path / "terminal" / "rendered" + rendered.mkdir(parents=True, exist_ok=True) + expected_video = rendered / "01-browser.mp4" + + calls: list[str] = [] + + class FakeRunner: + def __init__(self, _config) -> None: + pass + + def capture_segment(self, seg_id: str, vmap: dict) -> Path: + calls.append(seg_id) + expected_video.write_bytes(b"video") + return expected_video + + monkeypatch.setattr("docgen.playwright_runner.PlaywrightRunner", FakeRunner) + + composer = Composer(c) + monkeypatch.setattr(composer, "_probe_duration", lambda _p: 10.0) + monkeypatch.setattr(composer, "_run_ffmpeg", lambda _cmd: None) + ok = composer.compose_segments(["01"], strict=True) + assert ok == 1 + assert calls == ["01"] + + +def test_compose_playwright_skip_on_capture_error(tmp_path: Path, monkeypatch) -> None: + cfg = { + "dirs": { + "terminal": "terminal", + "audio": "audio", + "recordings": "recordings", + "animations": "animations", + }, + "segments": {"default": ["01"], "all": ["01"]}, + "visual_map": { + "01": { + "type": "playwright", + "source": "01-browser.mp4", + "script": "scripts/capture.py", + } + }, + } + c = _write_cfg(tmp_path, cfg) + + class FakeRunner: + def __init__(self, _config) -> None: + pass + + def capture_segment(self, seg_id: str, vmap: dict) -> Path: + raise PlaywrightError("boom") + + monkeypatch.setattr("docgen.playwright_runner.PlaywrightRunner", FakeRunner) + composer = Composer(c) + ok = composer.compose_segments(["01"], strict=True) + assert ok == 0 diff --git a/tests/test_config.py b/tests/test_config.py index bc274fc..2158195 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -61,6 +61,10 @@ def test_defaults(): assert c.manim_path is None assert c.vhs_path is None assert c.vhs_render_timeout_sec == 120 + assert c.playwright_python_path is None + assert c.playwright_timeout_sec == 120 + assert c.playwright_default_url is None + assert c.playwright_default_viewport == (1920, 1080) finally: cfg_path.unlink() @@ -87,6 +91,12 @@ def test_binary_paths_and_compose_config(tmp_path): }, "compose": {"ffmpeg_timeout_sec": 900, "warn_stale_vhs": False}, "pipeline": {"sync_vhs_after_timestamps": True}, + "playwright": { + "python_path": "/opt/bin/python3", + "timeout_sec": 240, + "default_url": "http://localhost:3300", + "default_viewport": {"width": 1366, "height": 768}, + }, } p = tmp_path / "docgen.yaml" p.write_text(yaml.dump(cfg), encoding="utf-8") @@ -99,3 +109,7 @@ def test_binary_paths_and_compose_config(tmp_path): assert c.sync_vhs_after_timestamps is True assert c.typing_ms_per_char == 40 assert c.vhs_render_timeout_sec == 240 + assert c.playwright_python_path == "/opt/bin/python3" + assert c.playwright_timeout_sec == 240 + assert c.playwright_default_url == "http://localhost:3300" + assert c.playwright_default_viewport == (1366, 768) diff --git a/tests/test_playwright_runner.py b/tests/test_playwright_runner.py new file mode 100644 index 0000000..7e80cb1 --- /dev/null +++ b/tests/test_playwright_runner.py @@ -0,0 +1,95 @@ +"""Tests for Playwright runner command and output path behavior.""" + +from __future__ import annotations + +import os +import sys +from pathlib import Path + +import pytest +import yaml + +from docgen.playwright_runner import PlaywrightError, PlaywrightRunner +from docgen.config import Config + + +def _write_cfg(tmp_path: Path) -> Config: + cfg = { + "dirs": {"terminal": "terminal"}, + "segments": {"default": ["01"], "all": ["01"]}, + } + path = tmp_path / "docgen.yaml" + path.write_text(yaml.dump(cfg), encoding="utf-8") + return Config.from_yaml(path) + + +def test_capture_requires_script_or_url(tmp_path: Path) -> None: + cfg = _write_cfg(tmp_path) + runner = PlaywrightRunner(cfg) + with pytest.raises(PlaywrightError, match="requires --script or --url"): + runner.capture(script=None, url=None) + + +def test_capture_runs_script_and_outputs_mp4(tmp_path: Path) -> None: + cfg = _write_cfg(tmp_path) + runner = PlaywrightRunner(cfg) + script = tmp_path / "capture.py" + output = cfg.terminal_dir / "rendered" / "demo.mp4" + script.write_text( + ( + "import os\n" + "from pathlib import Path\n" + "out = Path(os.environ['DOCGEN_PLAYWRIGHT_OUTPUT'])\n" + "out.parent.mkdir(parents=True, exist_ok=True)\n" + "out.write_bytes(b'fake-mp4')\n" + ), + encoding="utf-8", + ) + + path = runner.capture(script=str(script), source="demo.mp4") + assert path == output + assert output.exists() + assert output.read_bytes() == b"fake-mp4" + + +def test_capture_builds_env_from_options(tmp_path: Path, monkeypatch) -> None: + cfg = _write_cfg(tmp_path) + runner = PlaywrightRunner(cfg) + script = tmp_path / "capture.py" + script.write_text("print('ok')\n", encoding="utf-8") + + observed: dict[str, str] = {} + + def _fake_run(cmd, *, cwd, env, capture_output, text, timeout, check): # noqa: ANN001 + observed["cmd0"] = cmd[0] + observed["script"] = cmd[1] + observed["cwd"] = cwd + observed["url"] = env.get("DOCGEN_PLAYWRIGHT_URL", "") + observed["viewport"] = env.get("DOCGEN_PLAYWRIGHT_VIEWPORT", "") + observed["timeout"] = env.get("DOCGEN_PLAYWRIGHT_TIMEOUT_SEC", "") + out = Path(env["DOCGEN_PLAYWRIGHT_OUTPUT"]) + out.parent.mkdir(parents=True, exist_ok=True) + out.write_text("x", encoding="utf-8") + + class _Proc: + returncode = 0 + stdout = "" + stderr = "" + + return _Proc() + + monkeypatch.setattr("subprocess.run", _fake_run) + out = runner.capture( + script=str(script), + url="http://localhost:3300", + source="custom.mp4", + viewport={"width": 1280, "height": 720}, + timeout_sec=45, + ) + assert out.name == "custom.mp4" + assert observed["cmd0"] == sys.executable + assert observed["script"] == str(script.resolve()) + assert observed["cwd"] == str(cfg.base_dir) + assert observed["url"] == "http://localhost:3300" + assert observed["viewport"] == "1280x720" + assert observed["timeout"] == "45" From be45f746e47feeb4859a898e9cf5370a9b3b8a27 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Thu, 16 Apr 2026 03:42:11 +0000 Subject: [PATCH 4/5] Fix lint failure in Playwright runner tests Co-authored-by: John Menke --- tests/test_playwright_runner.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_playwright_runner.py b/tests/test_playwright_runner.py index 7e80cb1..e1809ce 100644 --- a/tests/test_playwright_runner.py +++ b/tests/test_playwright_runner.py @@ -2,7 +2,6 @@ from __future__ import annotations -import os import sys from pathlib import Path From 936bdbfb663a7705329db1f7ba94fa620b4f018d Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Thu, 16 Apr 2026 03:54:42 +0000 Subject: [PATCH 5/5] Harden CI apt install with timeout and retries Co-authored-by: John Menke --- .github/workflows/ci.yml | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b567c78..8a41b3a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,7 +32,29 @@ jobs: with: python-version: ${{ matrix.python-version }} - name: Install system dependencies - run: sudo apt-get update && sudo apt-get install -y --no-install-recommends ffmpeg tesseract-ocr + run: | + set -euo pipefail + attempt=1 + max_attempts=4 + backoff=4 + + while [ "$attempt" -le "$max_attempts" ]; do + echo "apt attempt $attempt/$max_attempts" + if timeout 600s sudo apt-get -o Acquire::Retries=3 update \ + && timeout 600s sudo apt-get -o Acquire::Retries=3 install -y --no-install-recommends ffmpeg tesseract-ocr; then + exit 0 + fi + + if [ "$attempt" -eq "$max_attempts" ]; then + echo "apt failed after $max_attempts attempts" + exit 1 + fi + + echo "apt failed, retrying in ${backoff}s..." + sleep "$backoff" + backoff=$((backoff * 2)) + attempt=$((attempt + 1)) + done - run: pip install ".[dev]" - run: pytest tests/ --ignore=tests/e2e -v --tb=short