Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion androidctl/src/androidctl/commands/run_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,15 @@ def run_command(cli_request: CliCommandRequest, ctx: AppContext) -> CommandOutco
_raise_pre_dispatch_error(error, cli_request.command)

try:
runtime_payload = daemon.get_runtime()
daemon, runtime_payload = _get_runtime_with_single_rediscovery_retry(
daemon=daemon,
ctx=ctx,
workspace_root=workspace_root,
)
except (
DaemonApiError,
DaemonProtocolError,
click.ClickException,
OSError,
ValidationError,
httpx.HTTPStatusError,
Expand Down Expand Up @@ -247,6 +252,41 @@ def _resolve_command_daemon(
) from error


def _get_runtime_with_single_rediscovery_retry(
*,
daemon: RuntimeCommandClient,
ctx: AppContext,
workspace_root: Path,
) -> tuple[RuntimeCommandClient, RuntimePayload]:
try:
return daemon, daemon.get_runtime()
except (
DaemonProtocolError,
OSError,
httpx.HTTPStatusError,
httpx.RequestError,
):
if not _should_rediscover_daemon(ctx):
raise
except DaemonApiError as error:
if not _should_rediscover_daemon(ctx) or not _is_daemon_shutdown_busy(error):
raise

refreshed_daemon = _resolve_command_daemon(ctx, workspace_root)
return refreshed_daemon, refreshed_daemon.get_runtime()


def _should_rediscover_daemon(ctx: AppContext) -> bool:
return ctx.daemon is None and ctx.daemon_discovery is not None


def _is_daemon_shutdown_busy(error: DaemonApiError) -> bool:
return (
error.code == "RUNTIME_BUSY"
and error.details.get("reason") == "daemon_shutting_down"
)


def _build_command_run_request(command: CliCommandPayload) -> CommandRunRequest:
if isinstance(
command,
Expand Down
93 changes: 93 additions & 0 deletions androidctl/tests/unit/test_run_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pathlib import Path

import click
import httpx
import pytest
from pydantic import ValidationError

Expand Down Expand Up @@ -654,6 +655,98 @@ def discover_for_test(resolved_workspace_root: Path) -> ScriptedRecordingDaemon:
assert outcome.payload["nextScreenId"] == "screen-next"


def test_run_command_rediscovers_daemon_when_runtime_get_loses_closing_daemon(
tmp_path: Path,
) -> None:
workspace_root = tmp_path / "workspace"
runtime_request = httpx.Request("POST", "http://127.0.0.1:8765/runtime/get")

class ClosingDaemon(ScriptedRecordingDaemon):
def get_runtime(self) -> RuntimePayload:
raise httpx.ConnectError("daemon stopped", request=runtime_request)

stale_daemon = ClosingDaemon(root=workspace_root.resolve())
fresh_daemon = ScriptedRecordingDaemon(
root=workspace_root.resolve(),
command_handlers={"observe": _run_pipeline_result},
)
daemon_iter = iter([stale_daemon, fresh_daemon])
discovery_calls: list[Path] = []

def discover_for_test(resolved_workspace_root: Path) -> ScriptedRecordingDaemon:
discovery_calls.append(resolved_workspace_root)
return next(daemon_iter)

ctx = AppContext(
daemon=None,
cwd=workspace_root,
env={},
daemon_discovery=discover_for_test,
)

outcome = run_command(
CliCommandRequest(
public_command="observe",
command=ObserveCommandPayload(kind="observe"),
),
ctx,
)

assert discovery_calls == [workspace_root.resolve(), workspace_root.resolve()]
assert stale_daemon.runtime_requests == 0
assert stale_daemon.run_requests == []
assert fresh_daemon.runtime_requests == 1
assert fresh_daemon.runs == [{"command": {"kind": "observe"}}]
assert outcome.payload["command"] == "observe"


def test_run_command_rediscovers_daemon_when_runtime_get_reports_shutdown(
tmp_path: Path,
) -> None:
workspace_root = tmp_path / "workspace"

class ShuttingDownDaemon(ScriptedRecordingDaemon):
def get_runtime(self) -> RuntimePayload:
raise DaemonApiError(
code="RUNTIME_BUSY",
message="daemon is shutting down",
details={"reason": "daemon_shutting_down"},
)

stale_daemon = ShuttingDownDaemon(root=workspace_root.resolve())
fresh_daemon = ScriptedRecordingDaemon(
root=workspace_root.resolve(),
command_handlers={"observe": _run_pipeline_result},
)
daemon_iter = iter([stale_daemon, fresh_daemon])
discovery_calls: list[Path] = []

def discover_for_test(resolved_workspace_root: Path) -> ScriptedRecordingDaemon:
discovery_calls.append(resolved_workspace_root)
return next(daemon_iter)

ctx = AppContext(
daemon=None,
cwd=workspace_root,
env={},
daemon_discovery=discover_for_test,
)

outcome = run_command(
CliCommandRequest(
public_command="observe",
command=ObserveCommandPayload(kind="observe"),
),
ctx,
)

assert discovery_calls == [workspace_root.resolve(), workspace_root.resolve()]
assert stale_daemon.run_requests == []
assert fresh_daemon.runtime_requests == 1
assert fresh_daemon.runs == [{"command": {"kind": "observe"}}]
assert outcome.payload["command"] == "observe"


def test_run_command_returns_canonical_payload_for_explicit_null_result(
tmp_path: Path,
) -> None:
Expand Down