diff --git a/androidctl/src/androidctl/commands/run_pipeline.py b/androidctl/src/androidctl/commands/run_pipeline.py index f4243a9..ca1b7fb 100644 --- a/androidctl/src/androidctl/commands/run_pipeline.py +++ b/androidctl/src/androidctl/commands/run_pipeline.py @@ -113,10 +113,15 @@ def run_command(cli_request: CliCommandRequest, ctx: AppContext) -> CommandOutco _raise_pre_dispatch_error(error, cli_request.command) try: - runtime_payload = daemon.get_runtime() + daemon, runtime_payload = _get_runtime_with_single_rediscovery_retry( + daemon=daemon, + ctx=ctx, + workspace_root=workspace_root, + ) except ( DaemonApiError, DaemonProtocolError, + click.ClickException, OSError, ValidationError, httpx.HTTPStatusError, @@ -247,6 +252,41 @@ def _resolve_command_daemon( ) from error +def _get_runtime_with_single_rediscovery_retry( + *, + daemon: RuntimeCommandClient, + ctx: AppContext, + workspace_root: Path, +) -> tuple[RuntimeCommandClient, RuntimePayload]: + try: + return daemon, daemon.get_runtime() + except ( + DaemonProtocolError, + OSError, + httpx.HTTPStatusError, + httpx.RequestError, + ): + if not _should_rediscover_daemon(ctx): + raise + except DaemonApiError as error: + if not _should_rediscover_daemon(ctx) or not _is_daemon_shutdown_busy(error): + raise + + refreshed_daemon = _resolve_command_daemon(ctx, workspace_root) + return refreshed_daemon, refreshed_daemon.get_runtime() + + +def _should_rediscover_daemon(ctx: AppContext) -> bool: + return ctx.daemon is None and ctx.daemon_discovery is not None + + +def _is_daemon_shutdown_busy(error: DaemonApiError) -> bool: + return ( + error.code == "RUNTIME_BUSY" + and error.details.get("reason") == "daemon_shutting_down" + ) + + def _build_command_run_request(command: CliCommandPayload) -> CommandRunRequest: if isinstance( command, diff --git a/androidctl/tests/unit/test_run_pipeline.py b/androidctl/tests/unit/test_run_pipeline.py index f6edaad..7fa9f4f 100644 --- a/androidctl/tests/unit/test_run_pipeline.py +++ b/androidctl/tests/unit/test_run_pipeline.py @@ -3,6 +3,7 @@ from pathlib import Path import click +import httpx import pytest from pydantic import ValidationError @@ -654,6 +655,98 @@ def discover_for_test(resolved_workspace_root: Path) -> ScriptedRecordingDaemon: assert outcome.payload["nextScreenId"] == "screen-next" +def test_run_command_rediscovers_daemon_when_runtime_get_loses_closing_daemon( + tmp_path: Path, +) -> None: + workspace_root = tmp_path / "workspace" + runtime_request = httpx.Request("POST", "http://127.0.0.1:8765/runtime/get") + + class ClosingDaemon(ScriptedRecordingDaemon): + def get_runtime(self) -> RuntimePayload: + raise httpx.ConnectError("daemon stopped", request=runtime_request) + + stale_daemon = ClosingDaemon(root=workspace_root.resolve()) + fresh_daemon = ScriptedRecordingDaemon( + root=workspace_root.resolve(), + command_handlers={"observe": _run_pipeline_result}, + ) + daemon_iter = iter([stale_daemon, fresh_daemon]) + discovery_calls: list[Path] = [] + + def discover_for_test(resolved_workspace_root: Path) -> ScriptedRecordingDaemon: + discovery_calls.append(resolved_workspace_root) + return next(daemon_iter) + + ctx = AppContext( + daemon=None, + cwd=workspace_root, + env={}, + daemon_discovery=discover_for_test, + ) + + outcome = run_command( + CliCommandRequest( + public_command="observe", + command=ObserveCommandPayload(kind="observe"), + ), + ctx, + ) + + assert discovery_calls == [workspace_root.resolve(), workspace_root.resolve()] + assert stale_daemon.runtime_requests == 0 + assert stale_daemon.run_requests == [] + assert fresh_daemon.runtime_requests == 1 + assert fresh_daemon.runs == [{"command": {"kind": "observe"}}] + assert outcome.payload["command"] == "observe" + + +def test_run_command_rediscovers_daemon_when_runtime_get_reports_shutdown( + tmp_path: Path, +) -> None: + workspace_root = tmp_path / "workspace" + + class ShuttingDownDaemon(ScriptedRecordingDaemon): + def get_runtime(self) -> RuntimePayload: + raise DaemonApiError( + code="RUNTIME_BUSY", + message="daemon is shutting down", + details={"reason": "daemon_shutting_down"}, + ) + + stale_daemon = ShuttingDownDaemon(root=workspace_root.resolve()) + fresh_daemon = ScriptedRecordingDaemon( + root=workspace_root.resolve(), + command_handlers={"observe": _run_pipeline_result}, + ) + daemon_iter = iter([stale_daemon, fresh_daemon]) + discovery_calls: list[Path] = [] + + def discover_for_test(resolved_workspace_root: Path) -> ScriptedRecordingDaemon: + discovery_calls.append(resolved_workspace_root) + return next(daemon_iter) + + ctx = AppContext( + daemon=None, + cwd=workspace_root, + env={}, + daemon_discovery=discover_for_test, + ) + + outcome = run_command( + CliCommandRequest( + public_command="observe", + command=ObserveCommandPayload(kind="observe"), + ), + ctx, + ) + + assert discovery_calls == [workspace_root.resolve(), workspace_root.resolve()] + assert stale_daemon.run_requests == [] + assert fresh_daemon.runtime_requests == 1 + assert fresh_daemon.runs == [{"command": {"kind": "observe"}}] + assert outcome.payload["command"] == "observe" + + def test_run_command_returns_canonical_payload_for_explicit_null_result( tmp_path: Path, ) -> None: