Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions src/forge/workflow/nodes/rca_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,20 +235,21 @@ async def reflect_rca(state: BugState) -> BugState:
with tempfile.TemporaryDirectory() as tmpdir:
workspace_path = Path(tmpdir)
runner = ContainerRunner(settings)
task_key = f"{ticket_key}-reflect"
result = await runner.run(
workspace_path=workspace_path,
task_summary=f"RCA reflection for {ticket_key}",
task_description=task_description,
ticket_key=ticket_key,
task_key=f"{ticket_key}-reflect",
task_key=task_key,
)

if not result.success:
raise RuntimeError(
f"Reflection container failed with exit_code={result.exit_code}: {result.stderr}"
)

verdict = result.stdout.strip()
verdict = _extract_reflection_verdict(workspace_path, task_key, result.stdout)

if verdict.upper().strip() == "VALID":
return update_state_timestamp(
Expand Down Expand Up @@ -300,3 +301,45 @@ async def reflect_rca(state: BugState) -> BugState:

finally:
await jira.close()


def _extract_reflection_verdict(workspace_path: Path, task_key: str, stdout: str) -> str:
"""Read the reflector's final assistant message, falling back to stdout.

Container stdout contains entrypoint logs. The entrypoint saves the actual
Deep Agents conversation to ``.forge/history/{task_key}.json``; for RCA
reflection that final assistant message is the verdict we care about.
"""
history_file = workspace_path / ".forge" / "history" / f"{task_key}.json"
if history_file.exists():
try:
history = json.loads(history_file.read_text())
messages = history.get("messages", [])
for message in reversed(messages):
role = str(message.get("role", "")).lower()
if role not in {"ai", "assistant"}:
continue
content = _stringify_message_content(message.get("content", ""))
if content.strip():
return content.strip()
except Exception as e:
logger.warning(f"Could not read reflection history {history_file}: {e}")

return (stdout or "").strip()


def _stringify_message_content(content: object) -> str:
"""Convert LangChain message content variants into plain text."""
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for item in content:
if isinstance(item, str):
parts.append(item)
elif isinstance(item, dict):
text = item.get("text") or item.get("content")
if text:
parts.append(str(text))
return "\n".join(parts)
return str(content)
37 changes: 36 additions & 1 deletion tests/unit/workflow/nodes/test_rca_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,41 @@ async def test_valid_output_routes_to_rca_option_gate(self, rca_state):

assert result["current_node"] == "rca_option_gate"

@pytest.mark.asyncio
async def test_reflection_uses_history_final_ai_message_over_stdout_logs(self, rca_state):
"""Container logs on stdout should not hide a VALID final agent response."""
mock_jira = _make_mock_jira()

class _HistoryRunner:
async def run(self, workspace_path, **_kwargs):
history_dir = workspace_path / ".forge" / "history"
history_dir.mkdir(parents=True, exist_ok=True)
(history_dir / "BUG-123-reflect.json").write_text(json.dumps({
"messages": [
{"role": "human", "content": "Review this RCA"},
{"role": "ai", "content": "VALID"},
],
}))
result = MagicMock()
result.success = True
result.exit_code = 0
result.stdout = (
"2026-06-30 [INFO] Implementing task: RCA reflection for BUG-123\n"
"2026-06-30 [INFO] Agent completed task execution\n"
"2026-06-30 [INFO] Task completed successfully\n"
)
result.stderr = ""
return result

with (
patch("forge.workflow.nodes.rca_analysis.JiraClient", return_value=mock_jira),
patch("forge.workflow.nodes.rca_analysis.ContainerRunner", return_value=_HistoryRunner()),
):
result = await reflect_rca(rca_state)

assert result["current_node"] == "rca_option_gate"
assert result["reflection_count"] == 0

@pytest.mark.asyncio
async def test_valid_output_does_not_change_reflection_count(self, rca_state):
"""reflection_count is not incremented on VALID output."""
Expand Down Expand Up @@ -497,7 +532,7 @@ async def test_container_failure_uses_own_retry_counter_not_shared_retry_count(s
mock_jira = _make_mock_jira()

class _FailingRunner:
async def run(self, workspace_path, **_kwargs):
async def run(self, _workspace_path, **_kwargs):
result = MagicMock()
result.success = False
result.exit_code = 1
Expand Down
Loading