diff --git a/src/microbots/auto_memory/callbacks.py b/src/microbots/auto_memory/callbacks.py index ba46431..5cfb070 100644 --- a/src/microbots/auto_memory/callbacks.py +++ b/src/microbots/auto_memory/callbacks.py @@ -145,6 +145,11 @@ def _run_one( stdout_path.open("w", encoding="utf-8") as out_fh, stderr_path.open("w", encoding="utf-8") as err_fh, ): + # Security note: spec.command is intentionally run with + # shell=True for developer convenience (supports pipes, + # redirects, etc.). This runner assumes configs are loaded + # from trusted local files only. Do NOT use with configs + # sourced from untrusted input. proc = subprocess.run( spec.command, shell=True, diff --git a/src/microbots/auto_memory/config.py b/src/microbots/auto_memory/config.py index 231c0bf..ed1cce3 100644 --- a/src/microbots/auto_memory/config.py +++ b/src/microbots/auto_memory/config.py @@ -140,6 +140,11 @@ def validate(self) -> None: f"'output_path' must be a relative path, got '{self.output_path}'" ) + if ".." in Path(self.output_path).parts: + raise ConfigError( + f"'output_path' must not contain '..', got '{self.output_path}'" + ) + if not self.callbacks: raise ConfigError("'callbacks' must contain at least one entry") diff --git a/src/microbots/auto_memory/context.py b/src/microbots/auto_memory/context.py index 7f0ed9b..363825f 100644 --- a/src/microbots/auto_memory/context.py +++ b/src/microbots/auto_memory/context.py @@ -7,6 +7,8 @@ from microbots.auto_memory.config import TaskConfig from microbots.auto_memory.data_models import Feedback +_JINJA_ENV = JinjaEnvironment(undefined=StrictUndefined, keep_trailing_newline=True) + def build_iteration_context( config: TaskConfig, @@ -40,7 +42,7 @@ def build_iteration_context( str The rendered prompt text, ready to be sent to the agent. """ - env = JinjaEnvironment(undefined=StrictUndefined, keep_trailing_newline=True) + env = _JINJA_ENV template = env.from_string(config.prompt_template) return template.render( task=config.task_definition, diff --git a/src/microbots/auto_memory/errors.py b/src/microbots/auto_memory/errors.py index 1310576..4b8720a 100644 --- a/src/microbots/auto_memory/errors.py +++ b/src/microbots/auto_memory/errors.py @@ -17,7 +17,7 @@ class CallbackError(AutoMemoryError): """Raised when a callback cannot be spawned or set up (not a failing assertion).""" -class TimeoutError(AutoMemoryError): # noqa: A001 — intentional shadow of builtin +class AutoMemoryTimeoutError(AutoMemoryError): """Raised when the per-iteration or total run timeout is exceeded.""" diff --git a/src/microbots/auto_memory/memory.py b/src/microbots/auto_memory/memory.py index cabc4ef..baba722 100644 --- a/src/microbots/auto_memory/memory.py +++ b/src/microbots/auto_memory/memory.py @@ -105,16 +105,20 @@ def read_all(self) -> list[Feedback]: """Return all persisted :class:`~microbots.auto_memory.data_models.Feedback` entries. Returns an empty list if the feedback file does not exist yet. + Lines that contain invalid JSON, non-object JSON, or fields that cannot + be mapped to :class:`~microbots.auto_memory.data_models.Feedback` are + skipped with a ``WARNING`` log message rather than raising an exception. Returns ------- list[Feedback] - All feedback entries in the order they were appended. + Successfully parsed entries in the order they were appended. + Corrupt or malformed lines are omitted. Raises ------ MemoryStoreError - If not mounted or on I/O / parse error. + If not mounted, or if the file cannot be opened (I/O error). """ self._require_mounted() if not self._feedback_path.exists(): # type: ignore[union-attr] @@ -130,23 +134,32 @@ def read_all(self) -> list[Feedback]: try: data = json.loads(line) except json.JSONDecodeError as exc: - raise MemoryStoreError( - f"Invalid JSON on line {lineno} of " - f"{self._feedback_path}: {exc}" - ) from exc + logger.warning( + "Skipping corrupt JSON on line %d of %s: %s", + lineno, + self._feedback_path, + exc, + ) + continue if not isinstance(data, dict): - raise MemoryStoreError( - f"Expected a JSON object on line {lineno} of " - f"{self._feedback_path}, got {type(data).__name__}" + logger.warning( + "Skipping non-object entry on line %d of %s (got %s)", + lineno, + self._feedback_path, + type(data).__name__, ) + continue known = {f.name for f in dataclasses.fields(Feedback)} try: entries.append(Feedback(**{k: v for k, v in data.items() if k in known})) except TypeError as exc: - raise MemoryStoreError( - f"Cannot construct Feedback from line {lineno} of " - f"{self._feedback_path}: {exc}" - ) from exc + logger.warning( + "Skipping malformed Feedback on line %d of %s: %s", + lineno, + self._feedback_path, + exc, + ) + continue except OSError as exc: raise MemoryStoreError(f"Failed to read feedback: {exc}") from exc @@ -190,3 +203,23 @@ def _require_mounted(self) -> None: raise MemoryStoreError( "MemoryStore has not been mounted; call mount() first" ) + + @property + def memory_dir(self) -> Path: + """Absolute path to the memory directory. + + Returns + ------- + Path + The directory that holds the feedback file. + + Raises + ------ + MemoryStoreError + If the store has not been mounted yet. + """ + if self._memory_dir is None: + raise MemoryStoreError( + "MemoryStore has not been mounted; call mount() first" + ) + return self._memory_dir diff --git a/src/microbots/auto_memory/orchestrator.py b/src/microbots/auto_memory/orchestrator.py new file mode 100644 index 0000000..ed47c00 --- /dev/null +++ b/src/microbots/auto_memory/orchestrator.py @@ -0,0 +1,383 @@ +"""TrainingLoopOrchestrator — wires all auto_memory components into a full iteration loop.""" + +from __future__ import annotations + +import time +from dataclasses import dataclass, field +from logging import getLogger +from pathlib import Path + +from microbots.auto_memory.analyzer import analyze_failure +from microbots.auto_memory.callbacks import CallbackResult, CallbackRunner +from microbots.auto_memory.config import TaskConfig +from microbots.auto_memory.context import build_iteration_context +from microbots.auto_memory.data_models import Feedback, FinalStatus, IterationStatus +from microbots.auto_memory.errors import AgentError +from microbots.auto_memory.runners.base import AgentResult, AgentRunner, IterationContext +from microbots.auto_memory.workspace import WorkspaceManager + +logger = getLogger(__name__) + + +# --------------------------------------------------------------------------- +# RunSummary +# --------------------------------------------------------------------------- + + +@dataclass +class IterationRecord: + """Record of a single completed iteration.""" + + idx: int + status: IterationStatus + feedback: Feedback | None = None + error: str | None = None + + +@dataclass +class RunSummary: + """Summary of a completed auto_memory run. + + Attributes + ---------- + final_status : FinalStatus + Overall outcome of the run. + iterations_run : int + Total number of iterations executed. + iteration_records : list[IterationRecord] + Per-iteration outcomes and feedback. + elapsed_s : float + Total wall-clock time for the run in seconds. + error_message : str | None + Set when ``final_status`` is ``ERROR``; otherwise ``None``. + """ + + final_status: FinalStatus + iterations_run: int + iteration_records: list[IterationRecord] = field(default_factory=list) + elapsed_s: float = 0.0 + error_message: str | None = None + + +# --------------------------------------------------------------------------- +# TrainingLoopOrchestrator +# --------------------------------------------------------------------------- + + +class TrainingLoopOrchestrator: + """Wires all auto_memory components and drives the full iteration loop. + + Example usage:: + + orchestrator = TrainingLoopOrchestrator( + config=cfg, + agent_runner=WritingBotRunner(model="azure/gpt-4o"), + callback_runner=ShellCallbackRunner(), + workspace=WorkspaceManager(run_dir=Path("runs/my_task")), + ) + summary = orchestrator.run() + + Parameters + ---------- + config : TaskConfig + Task configuration (max_iterations, timeout_min, etc.). + agent_runner : AgentRunner + Structural-protocol object that executes one agent iteration. + callback_runner : CallbackRunner + Runs validation callbacks against the agent's output. + workspace : WorkspaceManager + Manages the on-disk layout for the run. + max_agent_retries : int, optional + Maximum number of consecutive transient agent ``ERROR`` results to + tolerate before aborting the run. Each retry consumes one iteration + slot. Defaults to ``2``. + """ + + def __init__( + self, + config: TaskConfig, + agent_runner: AgentRunner, + callback_runner: CallbackRunner, + workspace: WorkspaceManager, + max_agent_retries: int = 2, + ) -> None: + """Store injected collaborators used throughout the iteration loop. + + Parameters + ---------- + config : TaskConfig + Task configuration (max_iterations, timeout_min, etc.). + agent_runner : AgentRunner + Structural-protocol object that executes one agent iteration. + callback_runner : CallbackRunner + Runs validation callbacks against the agent's output. + workspace : WorkspaceManager + Manages the on-disk layout for the run. + max_agent_retries : int, optional + Maximum consecutive transient agent errors before aborting. + Defaults to ``2``. + """ + self._config = config + self._agent_runner = agent_runner + self._callback_runner = callback_runner + self._workspace = workspace + self._max_agent_retries = max_agent_retries + + # ------------------------------------------------------------------ + # Public API + + def run(self) -> RunSummary: + """Execute the full iteration loop and return a :class:`RunSummary`. + + The loop terminates on one of four conditions: all callbacks pass + (``PASSED``), ``max_iterations`` is exhausted without a pass + (``LIMIT_REACHED``), total wall-clock time exceeds + ``timeout_min * 60`` seconds (``TIMEOUT``), or the agent runner + raises :class:`~microbots.auto_memory.errors.AgentError` or returns + ``IterationStatus.ERROR`` more than ``max_agent_retries`` consecutive + times (``ERROR``). + + Returns + ------- + RunSummary + Complete summary of the run including the final status and all + per-iteration records. + """ + self._workspace.prepare() + + start_time = time.monotonic() + timeout_s = self._config.timeout_min * 60 + records: list[IterationRecord] = [] + last_feedback: Feedback | None = None + consecutive_errors = 0 + + for iteration_idx in range(self._config.max_iterations): + elapsed = time.monotonic() - start_time + + # --- total timeout check --- + if elapsed >= timeout_s: + logger.info( + "Orchestrator: total timeout reached after %.1fs (limit %ds)", + elapsed, + timeout_s, + ) + return RunSummary( + final_status=FinalStatus.TIMEOUT, + iterations_run=iteration_idx, + iteration_records=records, + elapsed_s=elapsed, + ) + + # --- run one iteration --- + try: + record = self.run_iteration( + iteration_idx=iteration_idx, + feedback=last_feedback, + ) + except AgentError as exc: + elapsed = time.monotonic() - start_time + logger.error( + "Orchestrator: AgentError on iteration %d: %s", iteration_idx, exc + ) + return RunSummary( + final_status=FinalStatus.ERROR, + iterations_run=iteration_idx + 1, + iteration_records=records, + elapsed_s=elapsed, + error_message=str(exc), + ) + + records.append(record) + + if record.status == IterationStatus.ERROR: + consecutive_errors += 1 + if consecutive_errors <= self._max_agent_retries: + logger.warning( + "Orchestrator: transient agent error on iteration %d " + "(retry %d/%d), continuing", + iteration_idx, + consecutive_errors, + self._max_agent_retries, + ) + continue + elapsed = time.monotonic() - start_time + error_msg = ( + record.error + or f"Agent returned ERROR on iteration {iteration_idx}" + ) + logger.error( + "Orchestrator: iteration %d returned ERROR (%d consecutive)", + iteration_idx, + consecutive_errors, + ) + return RunSummary( + final_status=FinalStatus.ERROR, + iterations_run=iteration_idx + 1, + iteration_records=records, + elapsed_s=elapsed, + error_message=error_msg, + ) + + consecutive_errors = 0 + + if record.status == IterationStatus.TIMEOUT: + elapsed = time.monotonic() - start_time + logger.info( + "Orchestrator: per-iteration timeout on iteration %d", iteration_idx + ) + return RunSummary( + final_status=FinalStatus.TIMEOUT, + iterations_run=iteration_idx + 1, + iteration_records=records, + elapsed_s=elapsed, + ) + + if record.status == IterationStatus.PASSED: + elapsed = time.monotonic() - start_time + logger.info( + "Orchestrator: PASSED on iteration %d (%.1fs)", iteration_idx, elapsed + ) + return RunSummary( + final_status=FinalStatus.PASSED, + iterations_run=iteration_idx + 1, + iteration_records=records, + elapsed_s=elapsed, + ) + + # FAILED — persist feedback and continue + last_feedback = record.feedback + + # All iterations exhausted without a pass + elapsed = time.monotonic() - start_time + logger.info( + "Orchestrator: limit reached after %d iteration(s)", self._config.max_iterations + ) + return RunSummary( + final_status=FinalStatus.LIMIT_REACHED, + iterations_run=self._config.max_iterations, + iteration_records=records, + elapsed_s=elapsed, + ) + + def run_iteration( + self, + iteration_idx: int, + feedback: Feedback | None = None, + ) -> IterationRecord: + """Execute a single agent iteration and return an :class:`IterationRecord`. + + Prepares the workspace directory, builds the prompt, runs the agent, + and — when the agent succeeds — runs callbacks. Returns ``PASSED`` if + all callbacks pass; otherwise calls :meth:`analyze_failure`, persists + the feedback, and returns ``FAILED``. Returns immediately on agent + ``TIMEOUT`` or ``ERROR`` without invoking callbacks. + + Parameters + ---------- + iteration_idx : int + Zero-based index of the iteration. + feedback : Feedback | None + Feedback from the previous iteration, or ``None`` on the first. + + Returns + ------- + IterationRecord + Status and (when failed) structured feedback for this iteration. + + Raises + ------ + AgentError + Re-raised from the agent runner when the error is unrecoverable. + """ + logger.debug("Orchestrator: starting iteration %d", iteration_idx) + + idir = self._workspace.prepare_iteration(iteration_idx) + candidate_path = idir / self._config.output_path + logs_dir = idir / "logs" + memory_dir = str(self._workspace.memory.memory_dir) + + task_prompt = build_iteration_context( + self._config, + iteration_idx, + feedback=feedback, + ) + + ctx = IterationContext(task=task_prompt, memory_dir=memory_dir) + + # Run agent + agent_result: AgentResult = self._agent_runner.run( + ctx, timeout_s=self._config.per_iteration_timeout + ) + + if agent_result.status == IterationStatus.TIMEOUT: + logger.debug("Orchestrator: agent timed out on iteration %d", iteration_idx) + return IterationRecord(idx=iteration_idx, status=IterationStatus.TIMEOUT) + + if agent_result.status == IterationStatus.ERROR: + logger.debug( + "Orchestrator: agent error on iteration %d: %s", + iteration_idx, + agent_result.error, + ) + return IterationRecord( + idx=iteration_idx, + status=IterationStatus.ERROR, + error=agent_result.error, + ) + + # Run callbacks + callback_results: list[CallbackResult] = self._callback_runner.run_all( + specs=self._config.callbacks, + logs_dir=logs_dir, + candidate_path=candidate_path, + ) + + all_passed = all(r.passed for r in callback_results) + if all_passed: + logger.debug("Orchestrator: all callbacks passed on iteration %d", iteration_idx) + return IterationRecord(idx=iteration_idx, status=IterationStatus.PASSED) + + # Analyse failure and persist feedback + new_feedback = self.analyze_failure( + callback_results=callback_results, + candidate_path=candidate_path, + iteration_idx=iteration_idx, + ) + self._workspace.memory.append_feedback(new_feedback) + + logger.debug( + "Orchestrator: iteration %d FAILED — %s", iteration_idx, new_feedback.summary + ) + return IterationRecord( + idx=iteration_idx, + status=IterationStatus.FAILED, + feedback=new_feedback, + ) + + def analyze_failure( + self, + callback_results: list[CallbackResult], + candidate_path: Path, + iteration_idx: int, + ) -> Feedback: + """Delegate to :func:`~microbots.auto_memory.analyzer.analyze_failure`. + + Parameters + ---------- + callback_results : list[CallbackResult] + Results from the callback runner. + candidate_path : Path + Path to the candidate output for this iteration. + iteration_idx : int + Zero-based iteration index. + + Returns + ------- + Feedback + Structured failure summary. + """ + return analyze_failure( + callback_results=callback_results, + candidate_path=candidate_path, + iteration_idx=iteration_idx, + ) diff --git a/test/auto_memory/test_config.py b/test/auto_memory/test_config.py index f159be0..f7a5100 100644 --- a/test/auto_memory/test_config.py +++ b/test/auto_memory/test_config.py @@ -177,6 +177,13 @@ def test_absolute_output_path(self): with pytest.raises(ConfigError, match="output_path"): cfg.validate() + def test_dotdot_output_path_rejected(self): + """output_path containing '..' is rejected to prevent path traversal.""" + cfg = self._base() + cfg.output_path = "../../etc/passwd" + with pytest.raises(ConfigError, match="output_path"): + cfg.validate() + def test_callback_empty_name(self): from microbots.auto_memory.data_models import CallbackSpec cfg = self._base() diff --git a/test/auto_memory/test_memory.py b/test/auto_memory/test_memory.py index f12f8a6..c9857c8 100644 --- a/test/auto_memory/test_memory.py +++ b/test/auto_memory/test_memory.py @@ -91,6 +91,11 @@ def test_unmounted_raises_on_persist(self): with pytest.raises(MemoryStoreError, match="not been mounted"): store.persist() + def test_unmounted_raises_on_memory_dir(self): + store = MemoryStore() + with pytest.raises(MemoryStoreError, match="not been mounted"): + _ = store.memory_dir + # --------------------------------------------------------------------------- # Read / write @@ -164,36 +169,60 @@ def test_read_all_no_file_returns_empty(self, tmp_path): feedback_file.unlink() # remove the empty file clear() created assert store.read_all() == [] - def test_read_all_raises_on_corrupt_jsonl(self, tmp_path): - """read_all() raises MemoryStoreError when a line is not valid JSON.""" + def test_read_all_skips_corrupt_jsonl(self, tmp_path, caplog): + """read_all() skips and warns on a line that is not valid JSON.""" + import logging run_dir = tmp_path / "run3" store = MemoryStore() store.mount(run_dir) feedback_file = run_dir / "memory" / "feedback.jsonl" feedback_file.write_text("{not valid json}\n", encoding="utf-8") - with pytest.raises(MemoryStoreError, match="Invalid JSON"): - store.read_all() - - def test_read_all_raises_on_non_object_json(self, tmp_path): - """read_all() raises MemoryStoreError when a line is valid JSON but not an object.""" + with caplog.at_level(logging.WARNING): + entries = store.read_all() + assert entries == [] + assert any("corrupt" in r.message.lower() for r in caplog.records) + + def test_read_all_skips_non_object_json(self, tmp_path, caplog): + """read_all() skips and warns on a valid-JSON line that is not an object.""" + import logging run_dir = tmp_path / "run4" store = MemoryStore() store.mount(run_dir) feedback_file = run_dir / "memory" / "feedback.jsonl" feedback_file.write_text("[1, 2, 3]\n", encoding="utf-8") - with pytest.raises(MemoryStoreError, match="Expected a JSON object"): - store.read_all() - - def test_read_all_raises_on_missing_required_fields(self, tmp_path): - """read_all() raises MemoryStoreError when required Feedback fields are absent.""" + with caplog.at_level(logging.WARNING): + entries = store.read_all() + assert entries == [] + assert any("non-object" in r.message.lower() for r in caplog.records) + + def test_read_all_skips_missing_required_fields(self, tmp_path, caplog): + """read_all() skips and warns when required Feedback fields are absent.""" + import logging run_dir = tmp_path / "run5" store = MemoryStore() store.mount(run_dir) feedback_file = run_dir / "memory" / "feedback.jsonl" # Missing both iteration_idx and summary (required, no default). feedback_file.write_text('{"root_causes": []}\n', encoding="utf-8") - with pytest.raises(MemoryStoreError, match="Cannot construct Feedback"): - store.read_all() + with caplog.at_level(logging.WARNING): + entries = store.read_all() + assert entries == [] + assert any("malformed" in r.message.lower() for r in caplog.records) + + def test_read_all_skips_blank_lines(self, tmp_path): + """Blank lines in the feedback file are silently skipped.""" + run_dir = tmp_path / "run6" + store = MemoryStore() + store.mount(run_dir) + feedback_file = run_dir / "memory" / "feedback.jsonl" + feedback_file.write_text( + '\n{"iteration_idx": 0, "summary": "ok", "root_causes": [], ' + '"validator_failures": [], "suggested_actions": []}\n\n', + encoding="utf-8", + ) + entries = store.read_all() + assert len(entries) == 1 + assert entries[0].summary == "ok" def test_read_all_ignores_unknown_fields(self, tmp_path): """read_all() silently drops fields not present in Feedback.""" diff --git a/test/auto_memory/test_orchestrator.py b/test/auto_memory/test_orchestrator.py new file mode 100644 index 0000000..737a9a0 --- /dev/null +++ b/test/auto_memory/test_orchestrator.py @@ -0,0 +1,575 @@ +"""Tests for TrainingLoopOrchestrator — all 5 termination paths.""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from microbots.auto_memory.callbacks import CallbackResult, CallbackRunner +from microbots.auto_memory.config import TaskConfig +from microbots.auto_memory.data_models import ( + CallbackSpec, + FinalStatus, + IterationStatus, +) +from microbots.auto_memory.errors import AgentError +from microbots.auto_memory.orchestrator import RunSummary, TrainingLoopOrchestrator +from microbots.auto_memory.runners.base import AgentResult, AgentRunner +from microbots.auto_memory.workspace import WorkspaceManager + + +# --------------------------------------------------------------------------- +# Helpers / factories +# --------------------------------------------------------------------------- + + +def _make_config( + *, + max_iterations: int = 5, + timeout_min: int = 60, + per_iteration_timeout: int = 600, +) -> TaskConfig: + """Minimal TaskConfig suitable for unit tests.""" + return TaskConfig( + task_definition="Write hello world", + prompt_template="{{ task }}", + callbacks=[CallbackSpec(name="check", command="echo ok")], + max_iterations=max_iterations, + timeout_min=timeout_min, + per_iteration_timeout=per_iteration_timeout, + ) + + +def _make_agent_result(status: IterationStatus, *, error: str | None = None) -> AgentResult: + return AgentResult( + status=status, + output="output" if status == IterationStatus.PASSED else None, + error=error, + ) + + +def _make_callback_result(tmp_path: Path, *, passed: bool = True) -> CallbackResult: + spec = CallbackSpec(name="check", command="echo ok") + tmp_path.mkdir(parents=True, exist_ok=True) + stdout = tmp_path / "check.stdout" + stderr = tmp_path / "check.stderr" + stdout.write_text("") + stderr.write_text("") + return CallbackResult( + spec=spec, + return_code=0 if passed else 1, + stdout_path=stdout, + stderr_path=stderr, + passed=passed, + ) + + +class MockAgentRunner: + """Controllable AgentRunner that returns results from a pre-configured queue.""" + + def __init__(self, results: list[AgentResult]) -> None: + self._results = list(results) + self._calls: list[tuple] = [] + + def run(self, ctx, timeout_s: int) -> AgentResult: + self._calls.append((ctx, timeout_s)) + if not self._results: + raise AgentError("MockAgentRunner: no more results configured") + return self._results.pop(0) + + @property + def call_count(self) -> int: + return len(self._calls) + + +class RaisingAgentRunner: + """AgentRunner that always raises AgentError.""" + + def __init__(self, message: str = "agent exploded") -> None: + self._message = message + + def run(self, ctx, timeout_s: int) -> AgentResult: + raise AgentError(self._message) + + +class MockCallbackRunner: + """Controllable CallbackRunner.""" + + def __init__(self, results_per_call: list[list[CallbackResult]]) -> None: + """Each inner list is returned on successive run_all() calls.""" + self._results = list(results_per_call) + self._calls: list[tuple] = [] + + def run_all(self, specs, logs_dir, candidate_path) -> list[CallbackResult]: + self._calls.append((specs, logs_dir, candidate_path)) + if not self._results: + raise RuntimeError("MockCallbackRunner: no more results configured") + return self._results.pop(0) + + @property + def call_count(self) -> int: + return len(self._calls) + + +def _build_orchestrator( + config: TaskConfig, + agent_runner: AgentRunner, + callback_runner: CallbackRunner, + tmp_path: Path, +) -> TrainingLoopOrchestrator: + workspace = WorkspaceManager(run_dir=tmp_path / "run") + return TrainingLoopOrchestrator( + config=config, + agent_runner=agent_runner, + callback_runner=callback_runner, + workspace=workspace, + ) + + +# --------------------------------------------------------------------------- +# PATH 1: PASSED — agent succeeds and all callbacks pass +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestPassedPath: + def test_final_status_is_passed(self, tmp_path): + config = _make_config() + agent = MockAgentRunner([_make_agent_result(IterationStatus.PASSED)]) + callbacks = MockCallbackRunner([[_make_callback_result(tmp_path, passed=True)]]) + orch = _build_orchestrator(config, agent, callbacks, tmp_path) + + summary = orch.run() + + assert summary.final_status == FinalStatus.PASSED + + def test_iterations_run_is_one(self, tmp_path): + config = _make_config() + agent = MockAgentRunner([_make_agent_result(IterationStatus.PASSED)]) + callbacks = MockCallbackRunner([[_make_callback_result(tmp_path, passed=True)]]) + orch = _build_orchestrator(config, agent, callbacks, tmp_path) + + summary = orch.run() + + assert summary.iterations_run == 1 + + def test_agent_called_once(self, tmp_path): + config = _make_config() + agent = MockAgentRunner([_make_agent_result(IterationStatus.PASSED)]) + callbacks = MockCallbackRunner([[_make_callback_result(tmp_path, passed=True)]]) + orch = _build_orchestrator(config, agent, callbacks, tmp_path) + + orch.run() + + assert agent.call_count == 1 + + def test_passed_on_second_attempt(self, tmp_path): + """Callbacks fail on iteration 0 then pass on iteration 1 — status must be PASSED after 2 iterations.""" + config = _make_config() + agent = MockAgentRunner([ + _make_agent_result(IterationStatus.PASSED), + _make_agent_result(IterationStatus.PASSED), + ]) + callbacks = MockCallbackRunner([ + [_make_callback_result(tmp_path / "a", passed=False)], + [_make_callback_result(tmp_path / "b", passed=True)], + ]) + orch = _build_orchestrator(config, agent, callbacks, tmp_path) + + summary = orch.run() + + assert summary.final_status == FinalStatus.PASSED + assert summary.iterations_run == 2 + + +# --------------------------------------------------------------------------- +# PATH 2: FAILED → retry (callback fails but retries remain) +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestFailedRetryPath: + def test_callback_failure_triggers_retry(self, tmp_path): + config = _make_config(max_iterations=3) + agent = MockAgentRunner([ + _make_agent_result(IterationStatus.PASSED), + _make_agent_result(IterationStatus.PASSED), + _make_agent_result(IterationStatus.PASSED), + ]) + callbacks = MockCallbackRunner([ + [_make_callback_result(tmp_path / "a", passed=False)], + [_make_callback_result(tmp_path / "b", passed=False)], + [_make_callback_result(tmp_path / "c", passed=True)], + ]) + orch = _build_orchestrator(config, agent, callbacks, tmp_path) + + summary = orch.run() + + assert summary.final_status == FinalStatus.PASSED + assert summary.iterations_run == 3 + + def test_feedback_persisted_after_failure(self, tmp_path): + config = _make_config(max_iterations=2) + agent = MockAgentRunner([ + _make_agent_result(IterationStatus.PASSED), + _make_agent_result(IterationStatus.PASSED), + ]) + callbacks = MockCallbackRunner([ + [_make_callback_result(tmp_path / "a", passed=False)], + [_make_callback_result(tmp_path / "b", passed=True)], + ]) + workspace = WorkspaceManager(run_dir=tmp_path / "run") + orch = TrainingLoopOrchestrator( + config=config, + agent_runner=agent, + callback_runner=callbacks, + workspace=workspace, + ) + + orch.run() + + # One feedback entry must have been written to the memory store + feedback_entries = workspace.memory.read_all() + assert len(feedback_entries) == 1 + assert feedback_entries[0].iteration_idx == 0 + + def test_failed_record_has_feedback(self, tmp_path): + config = _make_config(max_iterations=2) + agent = MockAgentRunner([ + _make_agent_result(IterationStatus.PASSED), + _make_agent_result(IterationStatus.PASSED), + ]) + callbacks = MockCallbackRunner([ + [_make_callback_result(tmp_path / "a", passed=False)], + [_make_callback_result(tmp_path / "b", passed=True)], + ]) + orch = _build_orchestrator(config, agent, callbacks, tmp_path) + + summary = orch.run() + + failed_records = [r for r in summary.iteration_records if r.status == IterationStatus.FAILED] + assert len(failed_records) == 1 + assert failed_records[0].feedback is not None + + +# --------------------------------------------------------------------------- +# PATH 3: LIMIT_REACHED — all iterations exhausted without a pass +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestLimitReachedPath: + def test_final_status_is_limit_reached(self, tmp_path): + config = _make_config(max_iterations=3) + agent = MockAgentRunner([ + _make_agent_result(IterationStatus.PASSED), + _make_agent_result(IterationStatus.PASSED), + _make_agent_result(IterationStatus.PASSED), + ]) + callbacks = MockCallbackRunner([ + [_make_callback_result(tmp_path / "a", passed=False)], + [_make_callback_result(tmp_path / "b", passed=False)], + [_make_callback_result(tmp_path / "c", passed=False)], + ]) + orch = _build_orchestrator(config, agent, callbacks, tmp_path) + + summary = orch.run() + + assert summary.final_status == FinalStatus.LIMIT_REACHED + + def test_iterations_run_equals_max_iterations(self, tmp_path): + config = _make_config(max_iterations=3) + agent = MockAgentRunner([ + _make_agent_result(IterationStatus.PASSED), + _make_agent_result(IterationStatus.PASSED), + _make_agent_result(IterationStatus.PASSED), + ]) + callbacks = MockCallbackRunner([ + [_make_callback_result(tmp_path / "a", passed=False)], + [_make_callback_result(tmp_path / "b", passed=False)], + [_make_callback_result(tmp_path / "c", passed=False)], + ]) + orch = _build_orchestrator(config, agent, callbacks, tmp_path) + + summary = orch.run() + + assert summary.iterations_run == 3 + + def test_all_records_have_failed_status(self, tmp_path): + config = _make_config(max_iterations=2) + agent = MockAgentRunner([ + _make_agent_result(IterationStatus.PASSED), + _make_agent_result(IterationStatus.PASSED), + ]) + callbacks = MockCallbackRunner([ + [_make_callback_result(tmp_path / "a", passed=False)], + [_make_callback_result(tmp_path / "b", passed=False)], + ]) + orch = _build_orchestrator(config, agent, callbacks, tmp_path) + + summary = orch.run() + + assert all(r.status == IterationStatus.FAILED for r in summary.iteration_records) + + +# --------------------------------------------------------------------------- +# PATH 4: TIMEOUT — total wall-clock limit exceeded between iterations +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestTimeoutPath: + def test_final_status_is_timeout_when_total_elapsed(self, tmp_path): + """Simulate total timeout expiring before the second iteration begins.""" + config = _make_config(max_iterations=5, timeout_min=1) + agent = MockAgentRunner([_make_agent_result(IterationStatus.PASSED)]) + callbacks = MockCallbackRunner([ + [_make_callback_result(tmp_path / "a", passed=False)], + ]) + orch = _build_orchestrator(config, agent, callbacks, tmp_path) + + # Patch time.monotonic so it jumps past timeout after iteration 0 + call_count = 0 + + def fake_monotonic(): + nonlocal call_count + call_count += 1 + # First call (start_time) returns 0; next call returns timeout + 1 + return 0.0 if call_count == 1 else config.timeout_min * 60 + 1.0 + + with patch("microbots.auto_memory.orchestrator.time.monotonic", side_effect=fake_monotonic): + summary = orch.run() + + assert summary.final_status == FinalStatus.TIMEOUT + + def test_per_iteration_timeout_yields_timeout_status(self, tmp_path): + """Agent itself times out → final status TIMEOUT.""" + config = _make_config() + agent = MockAgentRunner([_make_agent_result(IterationStatus.TIMEOUT)]) + callbacks = MockCallbackRunner([]) + orch = _build_orchestrator(config, agent, callbacks, tmp_path) + + summary = orch.run() + + assert summary.final_status == FinalStatus.TIMEOUT + + def test_per_iteration_timeout_callbacks_not_called(self, tmp_path): + """Callbacks must not be invoked when the agent itself times out.""" + config = _make_config() + agent = MockAgentRunner([_make_agent_result(IterationStatus.TIMEOUT)]) + callbacks = MockCallbackRunner([]) + orch = _build_orchestrator(config, agent, callbacks, tmp_path) + + orch.run() + + assert callbacks.call_count == 0 + + def test_timeout_iterations_run_reflects_completed(self, tmp_path): + config = _make_config() + agent = MockAgentRunner([_make_agent_result(IterationStatus.TIMEOUT)]) + callbacks = MockCallbackRunner([]) + orch = _build_orchestrator(config, agent, callbacks, tmp_path) + + summary = orch.run() + + # One iteration was started and returned TIMEOUT + assert summary.iterations_run == 1 + + +# --------------------------------------------------------------------------- +# PATH 5: AGENT_ERROR — runner raises AgentError or returns ERROR status +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestAgentErrorPath: + def test_raised_agent_error_yields_error_status(self, tmp_path): + config = _make_config() + agent = RaisingAgentRunner("disk full") + callbacks = MockCallbackRunner([]) + orch = _build_orchestrator(config, agent, callbacks, tmp_path) + + summary = orch.run() + + assert summary.final_status == FinalStatus.ERROR + + def test_raised_agent_error_iterations_run_is_one(self, tmp_path): + """AgentError on iteration 0 → iterations_run == 1 (attempt counts).""" + config = _make_config() + agent = RaisingAgentRunner("disk full") + callbacks = MockCallbackRunner([]) + orch = _build_orchestrator(config, agent, callbacks, tmp_path) + + summary = orch.run() + + assert summary.iterations_run == 1 + + def test_raised_agent_error_message_captured(self, tmp_path): + config = _make_config() + agent = RaisingAgentRunner("disk full") + callbacks = MockCallbackRunner([]) + orch = _build_orchestrator(config, agent, callbacks, tmp_path) + + summary = orch.run() + + assert "disk full" in (summary.error_message or "") + + def test_returned_error_status_yields_error(self, tmp_path): + """Agent returns IterationStatus.ERROR → RunSummary.final_status == ERROR.""" + config = _make_config() + agent = MockAgentRunner([_make_agent_result(IterationStatus.ERROR, error="OOM")]) + callbacks = MockCallbackRunner([]) + workspace = WorkspaceManager(run_dir=tmp_path / "run") + orch = TrainingLoopOrchestrator( + config=config, + agent_runner=agent, + callback_runner=callbacks, + workspace=workspace, + max_agent_retries=0, + ) + + summary = orch.run() + + assert summary.final_status == FinalStatus.ERROR + + def test_returned_error_callbacks_not_called(self, tmp_path): + config = _make_config() + agent = MockAgentRunner([_make_agent_result(IterationStatus.ERROR, error="OOM")]) + callbacks = MockCallbackRunner([]) + workspace = WorkspaceManager(run_dir=tmp_path / "run") + orch = TrainingLoopOrchestrator( + config=config, + agent_runner=agent, + callback_runner=callbacks, + workspace=workspace, + max_agent_retries=0, + ) + + orch.run() + + assert callbacks.call_count == 0 + + def test_error_iterations_run_is_one(self, tmp_path): + config = _make_config() + agent = MockAgentRunner([_make_agent_result(IterationStatus.ERROR, error="OOM")]) + callbacks = MockCallbackRunner([]) + workspace = WorkspaceManager(run_dir=tmp_path / "run") + orch = TrainingLoopOrchestrator( + config=config, + agent_runner=agent, + callback_runner=callbacks, + workspace=workspace, + max_agent_retries=0, + ) + + summary = orch.run() + + assert summary.iterations_run == 1 + + def test_retries_transient_errors_then_fails(self, tmp_path): + """After max_agent_retries consecutive ERROR results the run aborts.""" + config = _make_config(max_iterations=10) + # With max_agent_retries=2 (default): 3 consecutive ERRORs exhaust retries. + agent = MockAgentRunner([ + _make_agent_result(IterationStatus.ERROR, error="transient"), + _make_agent_result(IterationStatus.ERROR, error="transient"), + _make_agent_result(IterationStatus.ERROR, error="transient"), + ]) + callbacks = MockCallbackRunner([]) + workspace = WorkspaceManager(run_dir=tmp_path / "run") + orch = TrainingLoopOrchestrator( + config=config, + agent_runner=agent, + callback_runner=callbacks, + workspace=workspace, + max_agent_retries=2, + ) + + summary = orch.run() + + assert summary.final_status == FinalStatus.ERROR + assert agent.call_count == 3 + assert summary.error_message == "transient" + + def test_retries_reset_after_success(self, tmp_path): + """Consecutive-error counter resets when an iteration succeeds.""" + config = _make_config(max_iterations=10) + # Two errors, then a pass — should NOT abort. + agent = MockAgentRunner([ + _make_agent_result(IterationStatus.ERROR, error="t1"), + _make_agent_result(IterationStatus.ERROR, error="t2"), + _make_agent_result(IterationStatus.PASSED), + ]) + callbacks = MockCallbackRunner([[_make_callback_result(tmp_path / "cb", passed=True)]]) + workspace = WorkspaceManager(run_dir=tmp_path / "run") + orch = TrainingLoopOrchestrator( + config=config, + agent_runner=agent, + callback_runner=callbacks, + workspace=workspace, + max_agent_retries=2, + ) + + summary = orch.run() + + assert summary.final_status == FinalStatus.PASSED + + +# --------------------------------------------------------------------------- +# RunSummary data model +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestRunSummary: + def test_defaults(self): + s = RunSummary(final_status=FinalStatus.PASSED, iterations_run=1) + assert s.iteration_records == [] + assert s.elapsed_s == 0.0 + assert s.error_message is None + + def test_is_dataclass(self): + s = RunSummary(final_status=FinalStatus.ERROR, iterations_run=0, error_message="boom") + assert s.final_status == FinalStatus.ERROR + assert s.error_message == "boom" + + +# --------------------------------------------------------------------------- +# analyze_failure delegation +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestAnalyzeFailureDelegation: + def test_delegates_to_module_function(self, tmp_path): + config = _make_config() + workspace = WorkspaceManager(run_dir=tmp_path / "run") + workspace.prepare() + workspace.prepare_iteration(0) + + orch = TrainingLoopOrchestrator( + config=config, + agent_runner=MockAgentRunner([]), + callback_runner=MockCallbackRunner([]), + workspace=workspace, + ) + + cb_result = _make_callback_result(tmp_path, passed=False) + + with patch( + "microbots.auto_memory.orchestrator.analyze_failure", + return_value=MagicMock(), + ) as mock_analyze: + orch.analyze_failure( + callback_results=[cb_result], + candidate_path=tmp_path / "cand", + iteration_idx=0, + ) + + mock_analyze.assert_called_once_with( + callback_results=[cb_result], + candidate_path=tmp_path / "cand", + iteration_idx=0, + ) diff --git a/test/auto_memory/test_workspace.py b/test/auto_memory/test_workspace.py index cb1b019..1a65370 100644 --- a/test/auto_memory/test_workspace.py +++ b/test/auto_memory/test_workspace.py @@ -330,6 +330,14 @@ def test_ignores_dirs_with_non_digit_suffix(self, tmp_path): wm2.prepare(resume=True) assert wm2.iteration_count == 3 # only iter_02 counts → max_idx=2, +1=3 + def test_returns_zero_when_iterations_dir_missing(self, tmp_path): + """_detect_iteration_count() returns 0 when iterations/ does not exist.""" + run_dir = tmp_path / "run" + run_dir.mkdir() + wm = WorkspaceManager(run_dir=run_dir) + # _iterations_dir has not been created — simulate a partial workspace. + assert wm._detect_iteration_count() == 0 + # --------------------------------------------------------------------------- # _write_meta() error path