Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 61 additions & 4 deletions containers/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,18 @@ def configure_git() -> None:
def git_commit(workspace: Path, message: str) -> bool:
"""Stage all changes and create a commit."""
try:
# Keep Forge handoff/history files local even if ignore setup is missing
# or an earlier run accidentally staged them.
subprocess.run(
["git", "rm", "-r", "--cached", "--ignore-unmatch", ".forge"],
cwd=workspace,
capture_output=True,
text=True,
)

# Stage all changes
result = subprocess.run(
["git", "add", "-A"],
["git", "add", "-A", "--", ".", ":!.forge", ":!.forge/**"],
cwd=workspace,
capture_output=True,
text=True,
Expand Down Expand Up @@ -285,13 +294,43 @@ def build_system_prompt(
)


def resolve_container_trace_fields(trace_state: dict[str, Any]) -> tuple[list[str], dict[str, Any]]:
"""Resolve Langfuse tags/metadata from container env without full app settings."""
try:
from forge.integrations.langfuse.fields import (
parse_trace_fields,
resolve_field,
)
except Exception as e:
logger.debug(f"Trace field resolution unavailable: {e}")
return [], {}

tags: list[str] = []
for field in parse_trace_fields(os.environ.get("LANGFUSE_TRACE_TAGS", ""), allow_tags=True):
value = resolve_field(field, trace_state)
if value:
tags.append(value)

metadata: dict[str, Any] = {}
for field in parse_trace_fields(
os.environ.get("LANGFUSE_TRACE_METADATA", ""),
allow_tags=False,
):
value = resolve_field(field, trace_state)
if value is not None:
metadata[field.value] = value

return tags, metadata


async def run_agent_task(
workspace: Path,
task_key: str,
task_summary: str,
task_description: str,
guardrails: str,
previous_task_keys: list[str] | None = None,
trace_context: dict[str, Any] | None = None,
) -> bool:
"""Run Deep Agents to implement the task.

Expand All @@ -302,6 +341,7 @@ async def run_agent_task(
task_description: Detailed task description.
guardrails: Repository guidelines.
previous_task_keys: List of previously implemented task keys for handoff context.
trace_context: Workflow fields forwarded to Langfuse only.
"""
# Support both new (LLM_MODEL) and legacy (CLAUDE_MODEL) env var names
model_name = os.environ.get("LLM_MODEL") or os.environ.get("CLAUDE_MODEL", "claude-sonnet-4-5@20250929")
Expand Down Expand Up @@ -336,6 +376,11 @@ async def run_agent_task(
system_prompt = build_system_prompt(
workspace, task_key, task_summary, task_description, guardrails, previous_task_keys
)
trace_state = {
**(trace_context or {}),
"system_prompt_length": len(system_prompt),
"llm_model": model_name,
}

# Determine model type (Gemini vs Claude)
is_gemini = model_name.lower().startswith(("gemini", "models/gemini"))
Expand Down Expand Up @@ -448,10 +493,13 @@ async def run_agent_task(
}

if langfuse_enabled:
trace_tags, trace_metadata = resolve_container_trace_fields(trace_state)
tags = ["forge-container", "task-implementation", *trace_tags]
metadata = {"task_summary": task_summary, **trace_metadata}
with propagate_attributes(
session_id=task_key,
tags=["forge-container", "task-implementation"],
metadata={"task_summary": task_summary},
tags=tags,
metadata=metadata,
):
result = await agent.ainvoke(initial_message, config=config)
else:
Expand Down Expand Up @@ -535,6 +583,7 @@ def main():

# Load task details
previous_task_keys: list[str] = []
trace_context: dict[str, Any] = {}
task_key: str = "UNKNOWN"
if args.task_file:
if not args.task_file.exists():
Expand All @@ -546,6 +595,8 @@ def main():
task_summary = task_data.get("summary", "")
task_description = task_data.get("description", "")
previous_task_keys = task_data.get("previous_task_keys", [])
raw_trace_context = task_data.get("trace_context", {})
trace_context = raw_trace_context if isinstance(raw_trace_context, dict) else {}
elif args.task_summary and args.task_description:
task_summary = args.task_summary
task_description = args.task_description
Expand Down Expand Up @@ -583,7 +634,13 @@ def main():
# - Committing changes when ready
if not asyncio.run(
run_agent_task(
workspace, task_key, task_summary, task_description, guardrails, previous_task_keys
workspace,
task_key,
task_summary,
task_description,
guardrails,
previous_task_keys,
trace_context,
)
):
logger.error("Task implementation failed")
Expand Down
6 changes: 6 additions & 0 deletions src/forge/sandbox/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import shutil
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any

from forge.config import Settings, get_settings
from forge.prompts import load_prompt
Expand Down Expand Up @@ -149,6 +150,8 @@ def _build_env_vars(
env["LANGFUSE_PUBLIC_KEY"] = self.settings.langfuse_public_key
env["LANGFUSE_SECRET_KEY"] = self.settings.langfuse_secret_key.get_secret_value()
env["LANGFUSE_HOST"] = self.settings.langfuse_host
env["LANGFUSE_TRACE_TAGS"] = self.settings.langfuse_trace_tags
env["LANGFUSE_TRACE_METADATA"] = self.settings.langfuse_trace_metadata
logger.debug("Container Langfuse tracing enabled")

# Pass system prompt template (unformatted - entrypoint will interpolate)
Expand Down Expand Up @@ -373,6 +376,7 @@ async def run(
task_key: str | None = None,
repo_name: str | None = None,
previous_task_keys: list[str] | None = None,
trace_context: dict[str, Any] | None = None,
) -> ContainerResult:
"""Run a task in a container sandbox.

Expand All @@ -385,6 +389,7 @@ async def run(
task_key: Jira task key being implemented.
repo_name: Repository name (e.g., "owner/repo") for container naming.
previous_task_keys: List of previously implemented task keys for handoff context.
trace_context: Workflow fields forwarded to Langfuse only.

Returns:
ContainerResult with execution status and logs.
Expand All @@ -400,6 +405,7 @@ async def run(
"summary": task_summary,
"description": task_description,
"previous_task_keys": previous_task_keys or [],
"trace_context": trace_context or {},
}
task_file.write_text(json.dumps(task_data, indent=2))

Expand Down
6 changes: 6 additions & 0 deletions src/forge/workflow/nodes/ci_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
post_status_comment,
remove_implementing_label,
set_ci_pending_label,
set_review_pending_label,
)
from forge.workspace.git_ops import GitOperations
from forge.workspace.manager import Workspace
Expand Down Expand Up @@ -150,6 +151,11 @@ def _is_skipped(check: dict) -> bool:

if all_passed:
logger.info(f"All CI checks passed for {ticket_key}")
jira = JiraClient()
try:
await set_review_pending_label(jira, ticket_key)
finally:
await jira.close()
return update_state_timestamp(
{
**state,
Expand Down
34 changes: 34 additions & 0 deletions src/forge/workflow/nodes/human_review.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,16 @@ async def aggregate_epic_status(state: WorkflowState) -> WorkflowState:
"""
ticket_key = state["ticket_key"]
epic_keys = state.get("epic_keys", [])
implemented_tasks = state.get("implemented_tasks", [])

logger.info(f"Aggregating Epic status for {ticket_key}")

jira = JiraClient()

try:
if not epic_keys:
epic_keys = await _derive_epic_keys_from_tasks(jira, implemented_tasks)

all_epics_done = True

for epic_key in epic_keys:
Expand All @@ -142,6 +146,7 @@ async def aggregate_epic_status(state: WorkflowState) -> WorkflowState:
return update_state_timestamp(
{
**state,
"epic_keys": epic_keys,
"epics_completed": True,
"current_node": "aggregate_feature_status",
}
Expand Down Expand Up @@ -248,3 +253,32 @@ async def _check_epic_completion(jira: JiraClient, epic_key: str) -> bool:
logger.error(f"Failed to check Epic completion for {epic_key}: {e}")
# On error, don't falsely report completion
return False


async def _derive_epic_keys_from_tasks(
jira: JiraClient,
task_keys: list[str],
) -> list[str]:
"""Derive Epic keys from implemented Task parents when state lost them."""
epic_keys: list[str] = []
seen: set[str] = set()

for task_key in task_keys:
try:
issue = await jira.get_issue(task_key)
except Exception as e:
logger.warning(f"Failed to fetch Task {task_key} while deriving Epics: {e}")
continue

if not issue.parent_key or issue.parent_key in seen:
continue

seen.add(issue.parent_key)
epic_keys.append(issue.parent_key)

if epic_keys:
logger.info(f"Derived Epic keys from implemented Tasks: {epic_keys}")
else:
logger.warning("No Epic keys available or derivable from implemented Tasks")

return epic_keys
52 changes: 22 additions & 30 deletions src/forge/workflow/nodes/implementation.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,6 @@ async def implement_task(state: WorkflowState) -> WorkflowState:
f"Uncommitted changes found after all tasks for {ticket_key} — "
"committing as fallback"
)
# Remove the .forge/ entry setup_workspace injected into .gitignore
# so we don't pollute the repo's gitignore with Forge internals.
_clean_forge_gitignore(Path(workspace_path))
git.stage_all()
git.commit(f"[{ticket_key}] chore: commit uncommitted changes after implementation")

Expand Down Expand Up @@ -149,6 +146,11 @@ async def implement_task(state: WorkflowState) -> WorkflowState:
task_key=current_task,
repo_name=current_repo,
previous_task_keys=implemented_tasks,
trace_context=_build_implementation_trace_context(
state,
implementation_node=implementation_node,
current_repo=current_repo,
),
)

if result.success:
Expand Down Expand Up @@ -201,33 +203,23 @@ def _implementation_node_name(state: WorkflowState) -> str:
return "implement_bug_fix" if state.get("ticket_type") == TicketType.BUG else "implement_task"


def _clean_forge_gitignore(workspace_path: Path) -> None:
"""Remove the .forge/ entry that setup_workspace injected into .gitignore.

setup_workspace adds a .forge/ exclusion to prevent accidental commits of
workflow state. Before the fallback commit we strip it out so the target
repo's .gitignore isn't polluted with Forge-internal entries.
"""
gitignore_path = workspace_path / ".gitignore"
if not gitignore_path.exists():
return

content = gitignore_path.read_text()
if ".forge" not in content:
return

cleaned = (
"\n".join(
line
for line in content.splitlines()
if ".forge" not in line and "Forge workflow state" not in line
).rstrip("\n")
+ "\n"
)

if cleaned != content:
gitignore_path.write_text(cleaned)
logger.debug("Removed .forge/ entry from .gitignore before fallback commit")
def _build_implementation_trace_context(
state: WorkflowState,
*,
implementation_node: str,
current_repo: str,
) -> dict[str, object]:
"""Build trace-only fields for the container's Langfuse labels/metadata."""
return {
"ticket_key": state.get("ticket_key"),
"ticket_type": state.get("ticket_type"),
"current_node": implementation_node,
"current_repo": current_repo,
"repo": current_repo,
"current_pr_number": state.get("current_pr_number"),
"pr_number": state.get("current_pr_number"),
"retry_count": state.get("retry_count"),
}


def _build_task_description(
Expand Down
21 changes: 10 additions & 11 deletions src/forge/workflow/nodes/workspace_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,17 +212,16 @@ async def setup_workspace(state: WorkflowState) -> WorkflowState:
forge_dir.mkdir(exist_ok=True)
(forge_dir / "history").mkdir(exist_ok=True)

# Ensure .forge/ is in .gitignore to prevent accidental commits
gitignore_path = workspace.path / ".gitignore"
if gitignore_path.exists():
content = gitignore_path.read_text()
if ".forge" not in content:
if not content.endswith("\n"):
content += "\n"
content += "\n# Forge workflow state (do not commit)\n.forge/\n"
gitignore_path.write_text(content)
else:
gitignore_path.write_text("# Forge workflow state (do not commit)\n.forge/\n")
# Keep Forge handoff files local to this clone without modifying the
# target repository's tracked .gitignore.
exclude_path = workspace.path / ".git" / "info" / "exclude"
exclude_path.parent.mkdir(parents=True, exist_ok=True)
exclude_content = exclude_path.read_text() if exclude_path.exists() else ""
if ".forge/" not in exclude_content:
if exclude_content and not exclude_content.endswith("\n"):
exclude_content += "\n"
exclude_content += "\n# Forge workflow state (do not commit)\n.forge/\n"
exclude_path.write_text(exclude_content)

logger.info("Created .forge directory for task handoff")

Expand Down
2 changes: 2 additions & 0 deletions src/forge/workflow/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
remove_implementing_label,
set_ci_pending_label,
set_implementing_label,
set_review_pending_label,
transition_tasks_to_in_progress,
)
from forge.workflow.utils.qa_summary import post_qa_summary_if_needed
Expand Down Expand Up @@ -90,6 +91,7 @@ def set_error(state: dict[str, Any], error: str) -> dict[str, Any]:
"set_error",
"set_implementing_label",
"set_paused",
"set_review_pending_label",
"transition_tasks_to_in_progress",
"update_state_timestamp",
]
26 changes: 26 additions & 0 deletions src/forge/workflow/utils/jira_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,29 @@ async def set_ci_pending_label(
logger.info(f"Set forge:ci-pending label on {feature_key}")
except Exception as e:
logger.warning(f"Failed to set ci-pending label on {feature_key}: {e}")


async def set_review_pending_label(
jira_client: JiraClient,
feature_key: str,
) -> None:
"""Set the forge:review-pending label on a feature issue.

This function suppresses all exceptions to prevent Jira API failures from
blocking workflow execution. Errors are logged at WARNING level.

Args:
jira_client: JiraClient instance for API calls.
feature_key: The feature/bug key to label.

Returns:
None. Exceptions are suppressed and logged.
"""
try:
await jira_client.set_workflow_label(
feature_key,
ForgeLabel.TASK_REVIEW_PENDING,
)
logger.info(f"Set forge:review-pending label on {feature_key}")
except Exception as e:
logger.warning(f"Failed to set review-pending label on {feature_key}: {e}")
Loading
Loading