diff --git a/.gitignore b/.gitignore
index 45dc651..f3c3ed5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,6 +1,25 @@
+# Runtime artifacts
+.supervisor.log
+.supervisor.lock
+.supervisor_snapshot/
+.logger_runs/
+.log/
+.mle_log.jsonl
+gym_log.json
+
+# Python/editor cruft
__pycache__/
-*.egg-info/
-dist/
-build/
-.DS_Store
*.pyc
+.DS_Store
+
+# gym-environment
+.claudeignore
+.copilotignore
+.cursorignore
+.cursorrules
+.geminiignore
+.github
+.gitignore
+AGENTS.md
+CLAUDE.md
+GEMINI.md
diff --git a/README.md b/README.md
index f64b306..bfb6d22 100644
--- a/README.md
+++ b/README.md
@@ -54,7 +54,10 @@ aicodinggym configure --user-id USER_ID [--workspace-dir DIR]
#### `aicodinggym swe fetch PROBLEM_ID`
-Fetch a problem and clone the repo locally.
+Fetch a problem and clone the repo locally. After a successful `swe fetch`, `mle download`, or `cr fetch`, the CLI downloads agent instruction files from [AICodingGym/gym-environment](https://github.com/AICodingGym/gym-environment) via the GitHub Contents API. By default it uses the **`test` branch**. Override with environment variables:
+
+- `AICODINGGYM_GYM_ENV_REPO` — `owner/repo` (default: `AICodingGym/gym-environment`)
+- `AICODINGGYM_GYM_ENV_REF` — branch, tag, or commit SHA for `?ref=` (default: `test` when unset)
```
aicodinggym swe fetch PROBLEM_ID [--user-id ID] [--workspace-dir DIR]
diff --git a/__init__.py b/__init__.py
index c5f223c..40ec2de 100644
--- a/__init__.py
+++ b/__init__.py
@@ -1,3 +1,44 @@
-"""AI Coding Gym CLI."""
+"""AI Coding Gym CLI.
-__version__ = "0.3.0"
+Imports are lazy so tooling that loads this file without package context
+(e.g. some pytest collection paths) does not fail on relative imports.
+"""
+
+from __future__ import annotations
+
+import importlib
+import importlib.metadata
+from typing import TYPE_CHECKING, Any
+
+try:
+ __version__ = importlib.metadata.version("aicodinggym-cli")
+except importlib.metadata.PackageNotFoundError: # pragma: no cover - dev without install
+ __version__ = "0.0.0"
+
+__all__ = [
+ "__version__",
+ "ExperimentLog",
+ "LogEntry",
+ "capture_mle_provenance",
+ "log_entry",
+ "print_summary",
+ "set_log_path",
+ "gym_logger",
+]
+
+
+def __getattr__(name: str) -> Any:
+ if name in ("ExperimentLog", "LogEntry", "capture_mle_provenance"):
+ m = importlib.import_module("aicodinggym.experiment_log")
+ return getattr(m, name)
+ if name in ("log_entry", "print_summary", "set_log_path"):
+ m = importlib.import_module("aicodinggym.gym_logger")
+ return getattr(m, name)
+ if name == "gym_logger":
+ return importlib.import_module("aicodinggym.gym_logger")
+ raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
+
+
+if TYPE_CHECKING:
+ from .experiment_log import ExperimentLog, LogEntry, capture_mle_provenance
+ from .gym_logger import log_entry, print_summary, set_log_path
diff --git a/api.py b/api.py
index 08c6e85..bd7c24a 100644
--- a/api.py
+++ b/api.py
@@ -76,8 +76,11 @@ def fetch_problem(user_id: str, problem_id: str) -> dict:
def submit_notification(problem_id: str, user_id: str, commit_hash: str,
- branch: str, commit_message: str, timestamp: str) -> dict:
- """Notify backend of a submission."""
+ branch: str, commit_message: str, timestamp: str,
+ tool: str | None = None,
+ tool_version: str | None = None,
+ ai_model: str | None = None) -> dict:
+ """Notify backend of a SWE submission, optionally attributing the tool/model used."""
return _post("submissions", {
"problem_id": problem_id,
"user_id": user_id,
@@ -85,6 +88,9 @@ def submit_notification(problem_id: str, user_id: str, commit_hash: str,
"branch": branch,
"commit_message": commit_message,
"timestamp": timestamp,
+ "tool": tool,
+ "tool_version": tool_version,
+ "ai_model": ai_model,
})
@@ -93,18 +99,73 @@ def fetch_pr(user_id: str, problem_id: str) -> dict:
return _post("code-review-fetch", {"user_id": user_id, "problem_id": problem_id})
-def cr_submit_review(user_id: str, problem_id: str, review: str) -> dict:
+def cr_submit_review(user_id: str, problem_id: str, review: str,
+ tool: str | None = None,
+ tool_version: str | None = None,
+ ai_model: str | None = None) -> dict:
"""Submit a code review."""
return _post("code-review-submit", {
"user_id": user_id,
"problem_id": problem_id,
"review": review,
+ "tool": tool,
+ "tool_version": tool_version,
+ "ai_model": ai_model,
})
+def notify_mle_progress(user_id: str, problem_slug: str, best_percentile: float,
+ tool: str | None = None,
+ tool_version: str | None = None,
+ ai_model: str | None = None) -> dict:
+ """After an MLE-bench grade is returned, log tool/model attribution and
+ bestPercentile against the Prisma UserProgress row so the leaderboard
+ aggregator can pick it up."""
+ payload = {
+ "problemSlug": problem_slug,
+ "status": "solved",
+ "bestPercentile": best_percentile,
+ "tool": tool,
+ "tool_version": tool_version,
+ "ai_model": ai_model,
+ }
+ return _post(f"users/{user_id}/progress", payload)
+
+
def mlebench_download_info(user_id: str, competition_id: str, dest_path: str) -> None:
- """Download dataset for an MLE-bench competition directly to dest_path."""
- resp = _get(f"competitions/{competition_id}/download", stream=True)
+ """Download dataset for an MLE-bench competition directly to dest_path.
+
+ Uses a long read timeout: large zips can take many minutes between chunks
+ over slow links; the default 30s read timeout would abort mid-stream.
+ """
+ read_s = int(os.environ.get("AICODINGGYM_DOWNLOAD_READ_TIMEOUT", "0"))
+ if read_s <= 0:
+ read_s = 7200 # seconds between reads; large zips need headroom
+ url = f"{API_BASE}/competitions/{competition_id}/download"
+ try:
+ resp = requests.get(
+ url,
+ stream=True,
+ timeout=(120, read_s),
+ )
+ resp.raise_for_status()
+ except requests.ConnectionError:
+ raise APIError(
+ f"Cannot connect to {API_BASE}.\n"
+ "Check your internet connection and try again."
+ )
+ except requests.Timeout:
+ raise APIError(f"Download from {url} timed out.")
+ except requests.HTTPError as e:
+ body = ""
+ try:
+ body = e.response.json().get("detail", e.response.text)
+ except Exception:
+ body = e.response.text
+ raise APIError(f"API error (HTTP {e.response.status_code}): {body}")
+ except requests.RequestException as e:
+ raise APIError(f"Request failed: {e}")
+
with open(dest_path, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
f.write(chunk)
@@ -122,15 +183,50 @@ def mlebench_download_file(url: str, dest_path: str, timeout: int = 300) -> None
raise APIError(f"Download failed: {e}")
-def mlebench_submit_csv(user_id: str, competition_id: str, csv_path: str) -> dict:
+def record_mle_submission(user_id: str, competition_id: str,
+ score: float | None,
+ percentile: float | None,
+ status: str,
+ csv_name: str | None = None,
+ error: str | None = None,
+ tool: str | None = None,
+ tool_version: str | None = None,
+ ai_model: str | None = None) -> dict:
+ """Persist an MLE-bench grading result as a Prisma Submission row so the
+ main DB has per-attempt history with tool/model attribution."""
+ return _post("mle-submission", {
+ "user_id": user_id,
+ "competition_id": competition_id,
+ "score": score,
+ "percentile": percentile,
+ "status": status,
+ "csv_name": csv_name,
+ "error": error,
+ "tool": tool,
+ "tool_version": tool_version,
+ "ai_model": ai_model,
+ })
+
+
+def mlebench_submit_csv(user_id: str, competition_id: str, csv_path: str,
+ tool: str | None = None,
+ tool_version: str | None = None,
+ ai_model: str | None = None) -> dict:
"""Upload a prediction CSV for an MLE-bench competition."""
try:
csv_name = Path(csv_path).name
with open(csv_path, "rb") as f:
compressed = gzip.compress(f.read())
+ form = {
+ "user_id": user_id,
+ "competition_id": competition_id,
+ "tool": tool or "",
+ "tool_version": tool_version or "",
+ "ai_model": ai_model or "",
+ }
resp = requests.post(
f"{API_BASE}/competitions/{competition_id}/submit",
- data={"user_id": user_id, "competition_id": competition_id},
+ data=form,
files={"file": (csv_name + ".gz", compressed, "application/gzip")},
timeout=120,
)
diff --git a/cli.py b/cli.py
index ebc1c78..39c9c93 100644
--- a/cli.py
+++ b/cli.py
@@ -28,9 +28,11 @@
import re
import subprocess
import sys
+import time
import urllib.request
from datetime import datetime
from pathlib import Path
+from typing import Any
import click
@@ -44,11 +46,18 @@
mlebench_download_file,
mlebench_download_info,
mlebench_submit_csv,
+ notify_mle_progress,
+ record_mle_submission,
submit_notification,
)
+from .cli_env import read_solution_log_model, resolve as resolve_env
from .config import (
+ ATTRIBUTION_PATH,
+ clear_attribution,
+ load_attribution,
load_config,
load_credentials,
+ save_attribution,
save_config,
save_credentials,
)
@@ -80,14 +89,50 @@ def _warn(msg: str) -> None:
click.echo(f"Warning: {msg}", err=True)
-_GYM_ENV_API = "https://api.github.com/repos/AICodingGym/gym-environment/contents"
_GYM_ENV_SKIP = {"README.md"}
+_GYM_ENV_MLE_ONLY: set[str] = set()
-def _install_gym_environment(dest: Path) -> None:
- """Download gym-environment files into dest and add them to .gitignore."""
+def _gym_env_repo() -> str:
+ """GitHub ``owner/repo`` for gym-environment assets (override with env)."""
+ return os.environ.get("AICODINGGYM_GYM_ENV_REPO", "").strip() or "AICodingGym/gym-environment"
+
+
+def _gym_env_ref() -> str:
+ """Git ref (branch, tag, or commit) for Contents API ``?ref=``.
+
+ If ``AICODINGGYM_GYM_ENV_REF`` is unset or empty, defaults to ``test`` so
+ fetched problems get the same supervisor/dashboard stack as CI/staging.
+ Set ``AICODINGGYM_GYM_ENV_REF=main`` (or another branch) to override.
+ """
+ ref = os.environ.get("AICODINGGYM_GYM_ENV_REF", "")
+ ref = ref.strip()
+ if ref:
+ return ref
+ return "test"
+
+
+def _gym_env_contents_api_url(subpath: str = "") -> str:
+ """GitHub Contents API URL for gym-environment at the configured ref."""
+ base = f"https://api.github.com/repos/{_gym_env_repo()}/contents"
+ subpath = subpath.strip("/")
+ if subpath:
+ base = f"{base}/{subpath}"
+ ref = _gym_env_ref()
+ return f"{base}?ref={ref}"
+
+
+def _install_gym_environment(dest: Path, challenge: str | None = None) -> None:
+ """Download gym-environment files from GitHub into dest and add to .gitignore.
+
+ Ref and repo are configurable via ``AICODINGGYM_GYM_ENV_REF`` and
+ ``AICODINGGYM_GYM_ENV_REPO``. When ref is unset, the ``test`` branch is used.
+ """
try:
- req = urllib.request.Request(_GYM_ENV_API, headers={"Accept": "application/vnd.github.v3+json"})
+ req = urllib.request.Request(
+ _gym_env_contents_api_url(),
+ headers={"Accept": "application/vnd.github.v3+json"},
+ )
with urllib.request.urlopen(req, timeout=15) as resp:
entries = json.loads(resp.read())
except Exception as e:
@@ -117,7 +162,7 @@ def _install_gym_environment(dest: Path) -> None:
# Fetch subdirectory contents recursively (one level deep)
try:
sub_req = urllib.request.Request(
- f"{_GYM_ENV_API}/{name}",
+ _gym_env_contents_api_url(name),
headers={"Accept": "application/vnd.github.v3+json"},
)
with urllib.request.urlopen(sub_req, timeout=15) as r:
@@ -140,19 +185,117 @@ def _install_gym_environment(dest: Path) -> None:
_warn(f"Failed to download {name}/{sub_name}: {e}")
downloaded.append(name)
- if not downloaded:
- return
+ # Seed empty solution_log.json if absent (AI agent populates it after each prompt)
+ log_file = dest / "solution_log.json"
+ if not log_file.exists():
+ log_file.write_text(
+ '{"version": "1.0", "problem": "", "problem_type": "mle", "prompts": []}\n',
+ encoding="utf-8",
+ )
# Append to .gitignore
gitignore = dest / ".gitignore"
existing = gitignore.read_text(encoding="utf-8") if gitignore.exists() else ""
existing_lines = set(existing.splitlines())
- new_entries = [f for f in downloaded if f not in existing_lines and f"/{f}" not in existing_lines]
- if new_entries:
- block = "\n# gym-environment\n" + "\n".join(new_entries) + "\n"
- with open(gitignore, "a", encoding="utf-8", newline="\n") as fh:
- fh.write(block)
+ gym_artifacts = [".gym_watcher.lock", ".gym_watcher.log", "solution_log.json", ".dashboard.tmp", ".gym_attribution.json"]
+ if downloaded:
+ new_entries = [f for f in downloaded if f not in existing_lines and f"/{f}" not in existing_lines]
+ new_entries += [a for a in gym_artifacts if a not in existing_lines and f"/{a}" not in existing_lines]
+ if new_entries:
+ block = "\n# gym-environment\n" + "\n".join(new_entries) + "\n"
+ with open(gitignore, "a", encoding="utf-8", newline="\n") as fh:
+ fh.write(block)
+
+def _open_in_browser(path: Path) -> bool:
+ """Best-effort open a local file in the user's default browser.
+
+ Returns True if the open call was dispatched, False otherwise. Never
+ raises - a missing display / headless box should not break ``fetch``.
+ """
+ try:
+ if not path.exists():
+ # Create a minimal placeholder so the browser has something to load;
+ # the watcher will overwrite it moments later.
+ path.parent.mkdir(parents=True, exist_ok=True)
+ path.write_text("
AI Coding GymPreparing dashboard\u2026
", encoding="utf-8")
+ import webbrowser
+ return bool(webbrowser.open(path.resolve().as_uri()))
+ except Exception:
+ return False
+
+
+def _autostart_watcher(problem_dir: Path) -> None:
+ """Launch gym_watcher.py in background inside problem_dir. Non-fatal."""
+ problem_dir = Path(problem_dir)
+ watcher = problem_dir / "gym_watcher.py"
+ if not watcher.exists():
+ return
+ lock = problem_dir / ".gym_watcher.lock"
+ if lock.exists():
+ try:
+ pid = int(lock.read_text(encoding="utf-8").strip())
+ except (OSError, ValueError):
+ pid = None
+ if pid and _pid_alive(pid):
+ click.echo("Gym watcher already running; skipping auto-start.")
+ return
+ try:
+ lock.unlink()
+ except OSError:
+ pass
+ log_path = problem_dir / ".gym_watcher.log"
+ try:
+ cmd = [sys.executable, str(watcher), str(problem_dir)]
+ log_fh = open(log_path, "ab", buffering=0)
+ kwargs: dict[str, Any] = {
+ "stdout": log_fh,
+ "stderr": log_fh,
+ "stdin": subprocess.DEVNULL,
+ "cwd": str(problem_dir),
+ }
+ if platform.system() == "Windows":
+ DETACHED_PROCESS = 0x00000008
+ CREATE_NEW_PROCESS_GROUP = 0x00000200
+ kwargs["creationflags"] = DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP
+ kwargs["close_fds"] = False
+ else:
+ kwargs["start_new_session"] = True
+ subprocess.Popen(cmd, **kwargs) # type: ignore[arg-type]
+ dashboard = problem_dir / "dashboard.html"
+ opened = _open_in_browser(dashboard)
+ msg = "Gym watcher started (logs: .gym_watcher.log)."
+ if not opened:
+ msg += f" Open {dashboard} to view dashboard."
+ click.echo(msg)
+ except Exception as exc:
+ _warn(f"Could not auto-start gym_watcher.py: {exc}.")
+
+
+def _pid_alive(pid: int) -> bool:
+ """Cross-platform ``kill(0)`` equivalent."""
+ try:
+ if platform.system() == "Windows":
+ # ``tasklist`` is universally available on Windows; short-circuit via signal.
+ out = subprocess.run(
+ ["tasklist", "/FI", f"PID eq {pid}", "/NH"],
+ capture_output=True, text=True, check=False, timeout=5,
+ )
+ return str(pid) in out.stdout
+ else:
+ os.kill(pid, 0)
+ return True
+ except (ProcessLookupError, PermissionError, subprocess.TimeoutExpired, OSError):
+ return False
+
+
+def _shquote(text: str) -> str:
+ """Minimal POSIX-shell quoting sufficient for paths used by the autostart shim."""
+ if not text:
+ return "''"
+ if all(ch.isalnum() or ch in "@%+=:,./-_" for ch in text):
+ return text
+ return "'" + text.replace("'", "'\"'\"'") + "'"
def _resolve_user_id(config: dict, user_id: str | None) -> str:
@@ -430,6 +573,123 @@ def configure(user_id: str, workspace_dir: str | None):
_error(f"Configuration failed: {e}")
+# ── attribution commands ─────────────────────────────────────────────────────
+
+
+def _require_attribution(env: dict[str, Any], submit_cmd: str, problem_id: str) -> None:
+ """Hard-reject a submission that lacks tool + model attribution.
+
+ Both fields must resolve (via live detection, ``.gym_attribution.json``,
+ flags, or persisted config) or the row would land on the leaderboard with
+ no attribution — defeating the purpose. On failure, print a copy-paste
+ block the user can hand to their AI IDE to self-report and resubmit, then
+ exit non-zero.
+
+ ``submit_cmd`` is the subcommand path, e.g. ``"swe submit"``.
+ """
+ missing = [label for label, key in (("tool", "tool"), ("model", "ai_model")) if not env.get(key)]
+ if not missing:
+ return
+
+ full_cmd = f"aicodinggym {submit_cmd} {problem_id}"
+ rule = "-" * 60
+ _error(
+ f"Submission rejected - missing {' and '.join(missing)} attribution.\n\n"
+ f"The leaderboard must know which AI tool and model produced this\n"
+ f"solution. Nothing was auto-detected and no .gym_attribution.json was\n"
+ f"found in the problem folder.\n\n"
+ f"Paste this into your AI IDE / agent to log attribution and resubmit:\n"
+ f"{rule}\n"
+ f"Create a file named .gym_attribution.json in the {problem_id}\n"
+ f"problem folder, filling in your real tool and model:\n\n"
+ f"{{\n"
+ f' \"tool\": \"\",\n'
+ f' \"tool_version\": \"\",\n'
+ f' \"ai_model\": \"\"\n'
+ f"}}\n\n"
+ f"Then run: {full_cmd}\n"
+ f"{rule}\n\n"
+ f"Alternatives (no file needed):\n"
+ f" * One-off: {full_cmd} --tool --ai-model \n"
+ f" * Persist: aicodinggym set-attribution --tool --model "
+ )
+
+
+@main.command("set-attribution")
+@click.option(
+ "--tool", default=None,
+ help="Coding tool name (e.g. claude-code, cursor, aider, codex-cli).",
+)
+@click.option(
+ "--tool-version", default=None,
+ help="Optional version string for the tool.",
+)
+@click.option(
+ "--model", "ai_model", default=None,
+ help="AI model identifier (e.g. claude-opus-4-7, gpt-5, gemini-2.5-pro).",
+)
+def set_attribution(tool: str | None, tool_version: str | None, ai_model: str | None):
+ """Persist tool/model attribution used as the reliable fallback when
+ auto-detection cannot identify the current session.
+
+ Every subsequent submission picks up these values automatically unless
+ overridden by per-command flags (``--tool``, ``--tool-version``,
+ ``--ai-model``) or live auto-detection.
+
+ \b
+ EXAMPLE:
+ aicodinggym set-attribution --tool claude-code --model claude-opus-4-7
+ aicodinggym set-attribution --tool cursor --model claude-sonnet-4-5
+ """
+ if not any([tool, tool_version, ai_model]):
+ _error(
+ "Provide at least one of --tool, --tool-version, --model.\n\n"
+ "Example:\n"
+ " aicodinggym set-attribution --tool claude-code --model claude-opus-4-7"
+ )
+
+ current = load_attribution()
+ if tool is not None:
+ current["tool"] = tool
+ if tool_version is not None:
+ current["tool_version"] = tool_version
+ if ai_model is not None:
+ current["ai_model"] = ai_model
+ save_attribution(current)
+
+ click.echo(
+ f"Saved attribution to {ATTRIBUTION_PATH}:\n"
+ f" tool: {current.get('tool') or '(unset)'}\n"
+ f" tool_version: {current.get('tool_version') or '(unset)'}\n"
+ f" ai_model: {current.get('ai_model') or '(unset)'}"
+ )
+
+
+@main.command("show-attribution")
+def show_attribution():
+ """Print the persisted attribution and the live auto-detected values."""
+ persisted = load_attribution()
+ resolved = resolve_env(None, None, None)
+ click.echo("Persisted attribution (~/.aicodinggym/attribution.json):")
+ click.echo(f" tool: {persisted.get('tool') or '(unset)'}")
+ click.echo(f" tool_version: {persisted.get('tool_version') or '(unset)'}")
+ click.echo(f" ai_model: {persisted.get('ai_model') or '(unset)'}")
+ click.echo("")
+ click.echo("Effective values for the next submission (auto-detect ∪ persisted):")
+ click.echo(f" tool: {resolved.get('tool') or '(none)'}")
+ click.echo(f" tool_version: {resolved.get('tool_version') or '(none)'}")
+ click.echo(f" ai_model: {resolved.get('ai_model') or '(none)'}")
+
+
+@main.command("clear-attribution")
+def clear_attribution_cmd():
+ """Remove the persisted attribution file."""
+ if clear_attribution():
+ click.echo(f"Removed {ATTRIBUTION_PATH}.")
+ else:
+ click.echo("No persisted attribution to remove.")
+
+
# ── swe group ────────────────────────────────────────────────────────────────
@@ -528,7 +788,8 @@ def swe_fetch(problem_id: str, user_id: str | None, workspace_dir: str | None):
if not success:
_error(msg)
- _install_gym_environment(workspace / problem_id)
+ _install_gym_environment(workspace / problem_id, "swe")
+ _autostart_watcher(workspace / problem_id)
click.echo(
f"\nSuccessfully fetched problem: {problem_id}\n"
@@ -555,8 +816,21 @@ def swe_fetch(problem_id: str, user_id: str | None, workspace_dir: str | None):
"--workspace-dir", default=None, type=click.Path(),
help="Workspace directory. Overrides configured/cached value.",
)
+@click.option(
+ "--tool", default=None,
+ help="Override detected coding tool (e.g. claude-code, cursor, antigravity).",
+)
+@click.option(
+ "--tool-version", default=None,
+ help="Override detected tool version string.",
+)
+@click.option(
+ "--ai-model", default=None,
+ help="Override detected AI model (e.g. opus-4.7, gpt-5, gemini-2.5-pro).",
+)
def swe_submit(problem_id: str, user_id: str | None, message: str | None,
- force: bool, workspace_dir: str | None):
+ force: bool, workspace_dir: str | None,
+ tool: str | None, tool_version: str | None, ai_model: str | None):
"""Submit your SWE-bench solution by committing and pushing changes.
Stages all changes, commits them, pushes to the remote, and notifies
@@ -620,6 +894,11 @@ def swe_submit(problem_id: str, user_id: str | None, message: str | None,
branch = creds["branch"]
commit_msg = message or f"Solution submission for {problem_id} at {datetime.now().isoformat()}"
+ # Resolve + enforce attribution BEFORE pushing, so a rejection leaves no
+ # side effects (nothing committed/pushed, backend not notified).
+ env = resolve_env(tool, tool_version, ai_model, problem_dir=problem_dir)
+ _require_attribution(env, "swe submit", problem_id)
+
click.echo(f"Submitting solution for '{problem_id}'...")
success, msg, commit_hash = add_commit_push(str(problem_dir), branch, key_path, commit_msg, force)
@@ -635,13 +914,24 @@ def swe_submit(problem_id: str, user_id: str | None, message: str | None,
branch=branch,
commit_message=commit_msg,
timestamp=datetime.now().isoformat(),
+ **env,
)
except APIError as e:
_warn(f"Changes pushed, but failed to notify backend: {e}")
+ tool_line = ""
+ if env["tool"] or env["ai_model"]:
+ bits = []
+ if env["tool"]:
+ bits.append(env["tool"] + (f" {env['tool_version']}" if env["tool_version"] else ""))
+ if env["ai_model"]:
+ bits.append(f"model={env['ai_model']}")
+ tool_line = f" Tool: {' · '.join(bits)}\n"
+
click.echo(
f"\nSuccessfully submitted solution for {problem_id}\n"
f"\n"
+ f"{tool_line}"
f" Commit: {commit_hash[:8]}\n"
f" Branch: {branch}\n"
f" Status: Pushed and backend notified\n"
@@ -1021,7 +1311,8 @@ def cr_fetch(problem_id: str, user_id: str | None, workspace_dir: str | None):
if not success:
_error(msg)
- _install_gym_environment(workspace / problem_id)
+ _install_gym_environment(workspace / problem_id, "cr")
+ _autostart_watcher(workspace / problem_id)
problem_dir = workspace / problem_id
@@ -1072,8 +1363,21 @@ def cr_fetch(problem_id: str, user_id: str | None, workspace_dir: str | None):
"-m", "--message", "review_text",
help="Inline review text.",
)
+@click.option(
+ "--tool", default=None,
+ help="Override detected coding tool (e.g. claude-code, cursor, antigravity).",
+)
+@click.option(
+ "--tool-version", default=None,
+ help="Override detected tool version string.",
+)
+@click.option(
+ "--ai-model", default=None,
+ help="Override detected AI model (e.g. opus-4.7, gpt-5, gemini-2.5-pro).",
+)
def cr_submit(problem_id: str, user_id: str | None, review_file: str | None,
- review_text: str | None):
+ review_text: str | None,
+ tool: str | None, tool_version: str | None, ai_model: str | None):
"""Submit a code review for a Code Review challenge.
Reads your review from a file (-f), inline text (-m), or piped stdin,
@@ -1112,14 +1416,27 @@ def cr_submit(problem_id: str, user_id: str | None, review_file: str | None,
f" aicodinggym cr submit {problem_id} -f review.md"
)
+ cr_problem_dir = _resolve_workspace(config, None) / problem_id
+ env = resolve_env(tool, tool_version, ai_model, problem_dir=cr_problem_dir)
+ _require_attribution(env, "cr submit", problem_id)
try:
- result = cr_submit_review(uid, problem_id, review.strip())
+ result = cr_submit_review(uid, problem_id, review.strip(), **env)
except APIError as e:
_error(str(e))
+ tool_line = ""
+ if env["tool"] or env["ai_model"]:
+ bits = []
+ if env["tool"]:
+ bits.append(env["tool"] + (f" {env['tool_version']}" if env["tool_version"] else ""))
+ if env["ai_model"]:
+ bits.append(f"model={env['ai_model']}")
+ tool_line = f" Tool: {' · '.join(bits)}\n"
+
click.echo(
f"\nSuccessfully submitted code review for {problem_id}\n"
f"\n"
+ f"{tool_line}"
f" Status: {result.get('status', 'COMPLETED')}\n"
f"\n"
f"View results at: {_hyperlink(f'https://aicodinggym.com/challenges/cr/{problem_id}')}"
@@ -1182,7 +1499,8 @@ def mle_download(competition_id: str, user_id: str | None, workspace_dir: str |
except APIError as e:
_error(str(e))
- _install_gym_environment(workspace / competition_id)
+ _install_gym_environment(workspace / competition_id, "mle")
+ _autostart_watcher(workspace / competition_id)
click.echo(
f"\nDataset downloaded to: {dest_path}\n"
@@ -1202,8 +1520,21 @@ def mle_download(competition_id: str, user_id: str | None, workspace_dir: str |
"--message", "-m", default=None,
help="Description of your submission (optional).",
)
+@click.option(
+ "--tool", default=None,
+ help="Override detected coding tool (e.g. claude-code, cursor, antigravity).",
+)
+@click.option(
+ "--tool-version", default=None,
+ help="Override detected tool version string.",
+)
+@click.option(
+ "--ai-model", default=None,
+ help="Override detected AI model (e.g. opus-4.7, gpt-5, gemini-2.5-pro).",
+)
def mle_submit(competition_id: str, csv_path: str, user_id: str | None,
- message: str | None):
+ message: str | None,
+ tool: str | None, tool_version: str | None, ai_model: str | None):
"""Submit a prediction CSV for an MLE-bench competition.
Uploads your prediction CSV directly to the AI Coding Gym server
@@ -1239,22 +1570,66 @@ def mle_submit(competition_id: str, csv_path: str, user_id: str | None,
csv_src = Path(csv_path).resolve()
+ # solution_log.json (per CLAUDE.md) is the most accurate model record for MLE
+ log_model = read_solution_log_model(csv_src.parent)
+ env = resolve_env(tool, tool_version, ai_model or log_model, problem_dir=csv_src.parent)
+ _require_attribution(env, "mle submit", competition_id)
+
click.echo(f"Uploading {csv_src.name} for '{competition_id}'...")
try:
- result = mlebench_submit_csv(uid, competition_id, str(csv_src))
+ result = mlebench_submit_csv(uid, competition_id, str(csv_src), **env)
except APIError as e:
_error(str(e))
score_msg = result.get("message", "Submission received for scoring.")
score = result.get("score")
+ percentile = result.get("leaderboard_percentile")
+ grade_status = result.get("status") or ("graded" if score is not None else "invalid")
+ grade_error = result.get("error")
+
+ # Persist one Submission row per MLE upload (mirrors SWE/CR) so per-attempt
+ # history with tool/model attribution lands in Prisma even when grading
+ # fails. Fire-and-forget — never fail the submit if the call errors.
+ try:
+ record_mle_submission(
+ user_id=uid,
+ competition_id=competition_id,
+ score=score,
+ percentile=percentile,
+ status=grade_status,
+ csv_name=csv_src.name,
+ error=grade_error,
+ **env,
+ )
+ except APIError as e:
+ _warn(f"Submitted, but failed to record submission row: {e}")
+
+ # Also update UserProgress (best-percentile leaderboard view).
+ if percentile is not None:
+ try:
+ notify_mle_progress(uid, competition_id, float(percentile), **env)
+ except APIError as e:
+ _warn(f"Submitted, but failed to log progress: {e}")
+
+ tool_line = ""
+ if env["tool"] or env["ai_model"]:
+ bits = []
+ if env["tool"]:
+ bits.append(env["tool"] + (f" {env['tool_version']}" if env["tool_version"] else ""))
+ if env["ai_model"]:
+ bits.append(f"model={env['ai_model']}")
+ tool_line = f" Tool: {' · '.join(bits)}\n"
click.echo(
f"\nSuccessfully submitted prediction for {competition_id}\n"
f"\n"
+ f"{tool_line}"
f" CSV: {csv_src.name}\n"
f" Status: {score_msg}\n"
)
if score is not None:
click.echo(f" Score: {score}\n")
+ if percentile is not None:
+ click.echo(f" Top %: {percentile}\n")
click.echo(f"View results at: {_hyperlink(f'https://aicodinggym.com/challenges/mle/{competition_id}')}")
diff --git a/cli_env.py b/cli_env.py
new file mode 100644
index 0000000..18dbd9b
--- /dev/null
+++ b/cli_env.py
@@ -0,0 +1,582 @@
+"""Detect AI coding tool + model used for the current shell session.
+
+Reads only an allowlist of well-known env vars — never the full environment —
+so secrets cannot accidentally leak into the submission payload.
+"""
+
+from __future__ import annotations
+
+import json
+import os
+import shutil
+import subprocess
+import sys
+from pathlib import Path
+
+from .config import load_attribution
+
+# Universal self-report file. Any AI coding tool can write this into the
+# challenge folder (instructed via AGENTS.md) to declare the tool + model it
+# runs as. This is the catch-all that makes attribution work for *every* tool
+# — the agent self-identifies instead of us reverse-engineering each tool's
+# private on-disk format.
+AGENT_REPORT_FILENAME = ".gym_attribution.json"
+
+ALLOWED_TOOLS = (
+ "claude-code",
+ "cursor",
+ "antigravity",
+ "aider",
+ "codex-cli",
+ "copilot-cli",
+ "windsurf",
+ "continue",
+ "cline",
+ "gemini-cli",
+)
+
+# Substring → canonical tool name. Lowercased process-image basenames are
+# matched against the keys; first match wins so order matters (longer / more
+# specific substrings first).
+_PROCESS_NAME_MAP: tuple[tuple[str, str], ...] = (
+ ("antigravity", "antigravity"),
+ ("claude", "claude-code"),
+ ("cursor", "cursor"),
+ ("windsurf", "windsurf"),
+ ("gemini", "gemini-cli"),
+ ("codex", "codex-cli"),
+ ("aider", "aider"),
+ ("cline", "cline"),
+ ("continue", "continue"),
+ ("copilot", "copilot-cli"),
+)
+
+# CLI binary to invoke for ``--version`` per tool. Missing entries mean we
+# don't know how to interrogate that tool for a version string.
+_TOOL_VERSION_CMD: dict[str, str] = {
+ "claude-code": "claude",
+ "aider": "aider",
+ "codex-cli": "codex",
+ "gemini-cli": "gemini",
+ "cursor": "cursor",
+ "windsurf": "windsurf",
+}
+
+
+def detect_tool() -> tuple[str | None, str | None]:
+ """Return (tool_name, version) inferred from env signals or the process
+ tree. Falls back to (None, None) if no tool is identifiable.
+ """
+ if os.environ.get("CLAUDECODE") == "1":
+ return ("claude-code", _version("claude"))
+ if os.environ.get("CURSOR_TRACE_ID") or os.environ.get("TERM_PROGRAM") == "cursor":
+ return ("cursor", os.environ.get("CURSOR_VERSION") or _version("cursor"))
+ if os.environ.get("ANTIGRAVITY"):
+ return ("antigravity", os.environ.get("ANTIGRAVITY_VERSION"))
+ if os.environ.get("AIDER_MODEL"):
+ return ("aider", _version("aider"))
+ if os.environ.get("CODEX_CLI"):
+ return ("codex-cli", _version("codex"))
+ if os.environ.get("WINDSURF"):
+ return ("windsurf", os.environ.get("WINDSURF_VERSION") or _version("windsurf"))
+ if os.environ.get("CONTINUE_CLI"):
+ return ("continue", _version("continue"))
+ if os.environ.get("CLINE_CLI"):
+ return ("cline", _version("cline"))
+ if os.environ.get("GEMINI_CLI"):
+ return ("gemini-cli", _version("gemini"))
+
+ # Process-tree fallback: walk parent processes and match well-known
+ # tool binary names. Reliable even when the tool itself doesn't export
+ # any environment variable.
+ tool = detect_tool_from_process_tree()
+ if tool:
+ cmd = _TOOL_VERSION_CMD.get(tool)
+ return (tool, _version(cmd) if cmd else None)
+ return (None, None)
+
+
+def detect_tool_from_process_tree() -> str | None:
+ """Walk ancestor processes; return the first matching tool name.
+
+ Uses ``psutil`` when available (cross-platform, robust). Falls back to
+ platform-specific stdlib probes (``ps`` on POSIX, PowerShell/CIM on
+ Windows). Returns None when no known tool name is seen in the chain.
+ """
+ for name in _process_ancestor_names():
+ lowered = name.lower()
+ if lowered.endswith(".exe"):
+ lowered = lowered[:-4]
+ for needle, tool in _PROCESS_NAME_MAP:
+ if needle in lowered:
+ return tool
+ return None
+
+
+def _process_ancestor_names(max_depth: int = 16) -> list[str]:
+ """Return ancestor process image names (current → root), capped at
+ ``max_depth`` entries to avoid pathological loops.
+ """
+ try:
+ import psutil # type: ignore[import-not-found]
+ except ImportError:
+ psutil = None # type: ignore[assignment]
+
+ if psutil is not None:
+ try:
+ names: list[str] = []
+ proc = psutil.Process()
+ while proc and len(names) < max_depth:
+ try:
+ names.append(proc.name() or "")
+ except Exception:
+ break
+ try:
+ proc = proc.parent()
+ except Exception:
+ break
+ return [n for n in names if n]
+ except Exception:
+ pass
+
+ if sys.platform == "win32":
+ return _ancestor_names_windows(max_depth)
+ return _ancestor_names_posix(max_depth)
+
+
+def _ancestor_names_posix(max_depth: int) -> list[str]:
+ names: list[str] = []
+ pid = os.getppid()
+ seen: set[int] = set()
+ while pid and pid > 1 and len(names) < max_depth and pid not in seen:
+ seen.add(pid)
+ try:
+ out = subprocess.check_output(
+ ["ps", "-o", "ppid=,comm=", "-p", str(pid)],
+ text=True, timeout=2, stderr=subprocess.DEVNULL,
+ ).strip()
+ except Exception:
+ break
+ parts = out.split(None, 1)
+ if len(parts) < 2:
+ break
+ try:
+ ppid = int(parts[0])
+ except ValueError:
+ break
+ names.append(parts[1].strip())
+ pid = ppid
+ return names
+
+
+def _ancestor_names_windows(max_depth: int) -> list[str]:
+ """Build the full PID→(Name,PPID) map once via PowerShell, then walk.
+
+ Spawning PowerShell N times for a chain is slow; one snapshot is enough.
+ """
+ try:
+ out = subprocess.check_output(
+ [
+ "powershell", "-NoProfile", "-NonInteractive", "-Command",
+ "Get-CimInstance Win32_Process | "
+ "Select-Object ProcessId,ParentProcessId,Name | "
+ "ConvertTo-Json -Compress",
+ ],
+ text=True, timeout=5, stderr=subprocess.DEVNULL,
+ )
+ data = json.loads(out) if out.strip() else []
+ except Exception:
+ return []
+
+ if isinstance(data, dict):
+ data = [data]
+ table: dict[int, tuple[int, str]] = {}
+ for row in data:
+ try:
+ pid = int(row.get("ProcessId"))
+ ppid = int(row.get("ParentProcessId"))
+ name = str(row.get("Name") or "")
+ except (TypeError, ValueError):
+ continue
+ table[pid] = (ppid, name)
+
+ names: list[str] = []
+ pid = os.getppid()
+ seen: set[int] = set()
+ while pid and pid not in seen and len(names) < max_depth:
+ seen.add(pid)
+ entry = table.get(pid)
+ if not entry:
+ break
+ ppid, name = entry
+ if name:
+ names.append(name)
+ pid = ppid
+ return names
+
+
+def detect_model() -> str | None:
+ """Best-effort model detection.
+
+ Order: explicit env vars, then a tool-aware reader for whichever coding
+ tool we detected. The tool-aware path is what makes the auto path
+ actually work for the major CLIs (Claude Code, Codex CLI, Aider) since
+ none of them export their model to the shell environment.
+ """
+ raw = (
+ os.environ.get("ANTHROPIC_MODEL")
+ or os.environ.get("CLAUDE_CODE_MODEL")
+ or os.environ.get("OPENAI_MODEL")
+ or os.environ.get("AIDER_MODEL")
+ or os.environ.get("GEMINI_MODEL")
+ or os.environ.get("CURSOR_MODEL")
+ )
+ if raw:
+ return raw.strip().lower()
+
+ tool, _ = detect_tool()
+ if tool == "claude-code":
+ return read_claude_code_session_model()
+ if tool == "codex-cli":
+ return read_codex_session_model() or read_codex_config_model()
+ if tool == "aider":
+ # AIDER_MODEL already covered by the env block above; nothing else
+ # is reliably written to disk by aider.
+ return None
+ return None
+
+
+def read_codex_config_model() -> str | None:
+ """Return the default model from ``~/.codex/config.toml``.
+
+ Codex CLI persists ``model = ""`` as the top-level default. We
+ avoid a full TOML parse (``tomllib`` is 3.11+) and just scan for the
+ first top-level ``model`` assignment before any ``[section]`` header.
+ """
+ cfg = Path.home() / ".codex" / "config.toml"
+ if not cfg.is_file():
+ return None
+ try:
+ text = cfg.read_text(encoding="utf-8", errors="replace")
+ except OSError:
+ return None
+ for line in text.splitlines():
+ stripped = line.strip()
+ if not stripped or stripped.startswith("#"):
+ continue
+ if stripped.startswith("["):
+ break # entered a sub-section; the top-level default lives above
+ if stripped.startswith("model"):
+ # Match: model = "name" or model="name"
+ _, _, rhs = stripped.partition("=")
+ value = rhs.strip().strip('"').strip("'")
+ if value:
+ return value.lower()
+ return None
+
+
+def read_codex_session_model(cwd: Path | None = None) -> str | None:
+ """Return the newest model from a Codex CLI session log whose ``cwd``
+ matches the current working directory (or any ancestor).
+
+ Sessions live under ``~/.codex/sessions/YYYY/MM/DD/rollout-*.jsonl``
+ and contain a ``session_meta`` line with the originating ``cwd`` and
+ ``"model":"..."`` fields throughout. The newest matching file wins.
+ """
+ cwd = (cwd or Path.cwd()).resolve()
+ sessions_root = Path.home() / ".codex" / "sessions"
+ if not sessions_root.is_dir():
+ return None
+
+ candidate_cwds = {str(p).lower() for p in (cwd, *cwd.parents)}
+
+ try:
+ files = sorted(
+ sessions_root.rglob("rollout-*.jsonl"),
+ key=lambda p: p.stat().st_mtime,
+ reverse=True,
+ )
+ except OSError:
+ return None
+
+ for jsonl in files[:8]:
+ meta_cwd = _codex_session_cwd(jsonl)
+ if meta_cwd is None or meta_cwd.lower() not in candidate_cwds:
+ continue
+ model = _scan_codex_jsonl_for_model(jsonl)
+ if model:
+ return model
+ return None
+
+
+def _scan_codex_jsonl_for_model(path: Path, max_bytes: int = 256 * 1024) -> str | None:
+ """Tail a Codex session JSONL and return the newest ``payload.model``."""
+ try:
+ size = path.stat().st_size
+ with open(path, "rb") as f:
+ if size > max_bytes:
+ f.seek(size - max_bytes)
+ f.readline()
+ tail = f.read()
+ except OSError:
+ return None
+ for raw in reversed(tail.splitlines()):
+ if not raw.strip():
+ continue
+ try:
+ obj = json.loads(raw)
+ except Exception:
+ continue
+ payload = obj.get("payload") if isinstance(obj, dict) else None
+ if isinstance(payload, dict):
+ model = payload.get("model")
+ if isinstance(model, str) and model.strip():
+ return model.strip().lower()
+ return None
+
+
+def _codex_session_cwd(path: Path) -> str | None:
+ """Read just the first line of a Codex session log and return its cwd."""
+ try:
+ with open(path, "rb") as f:
+ first = f.readline()
+ except OSError:
+ return None
+ if not first.strip():
+ return None
+ try:
+ obj = json.loads(first)
+ except Exception:
+ return None
+ payload = obj.get("payload") if isinstance(obj, dict) else None
+ if isinstance(payload, dict):
+ cwd_val = payload.get("cwd")
+ if isinstance(cwd_val, str) and cwd_val:
+ return cwd_val
+ return None
+
+
+def read_claude_code_session_model(cwd: Path | None = None) -> str | None:
+ """Return newest assistant ``message.model`` from the Claude Code session
+ transcript matching ``cwd`` (or any ancestor). None if nothing found.
+
+ Claude Code writes per-session JSONL transcripts to
+ ``~/.claude/projects//.jsonl`` where ```` is the
+ absolute working directory with ``:``, ``\\`` and ``/`` each replaced by
+ ``-``. Each assistant line carries ``message.model`` (e.g.
+ ``claude-opus-4-7``).
+ """
+ cwd = (cwd or Path.cwd()).resolve()
+ projects = Path.home() / ".claude" / "projects"
+ if not projects.is_dir():
+ return None
+
+ try:
+ listing = {p.name.lower(): p for p in projects.iterdir() if p.is_dir()}
+ except OSError:
+ return None
+
+ for ancestor in (cwd, *cwd.parents):
+ slug = _claude_project_slug(ancestor)
+ slug_dir = projects / slug
+ if not slug_dir.is_dir():
+ slug_dir = listing.get(slug.lower())
+ if slug_dir is None:
+ continue
+ try:
+ files = sorted(
+ (p for p in slug_dir.glob("*.jsonl") if p.is_file()),
+ key=lambda p: p.stat().st_mtime,
+ reverse=True,
+ )
+ except OSError:
+ continue
+ for jsonl in files[:3]:
+ model = _scan_jsonl_for_model(jsonl)
+ if model:
+ return model
+ return None
+
+
+def _claude_project_slug(path: Path) -> str:
+ s = str(path)
+ for sep in (":", "\\", "/"):
+ s = s.replace(sep, "-")
+ return s
+
+
+def _scan_jsonl_for_model(path: Path, max_bytes: int = 256 * 1024) -> str | None:
+ """Tail the JSONL and return the newest non-synthetic assistant model."""
+ try:
+ size = path.stat().st_size
+ with open(path, "rb") as f:
+ if size > max_bytes:
+ f.seek(size - max_bytes)
+ f.readline() # discard partial line
+ tail = f.read()
+ except OSError:
+ return None
+ for raw in reversed(tail.splitlines()):
+ if not raw.strip():
+ continue
+ try:
+ obj = json.loads(raw)
+ except Exception:
+ continue
+ if not isinstance(obj, dict):
+ continue
+ msg = obj.get("message")
+ if not isinstance(msg, dict):
+ continue
+ model = msg.get("model")
+ if not isinstance(model, str):
+ continue
+ model = model.strip().lower()
+ if model and model != "":
+ return model
+ return None
+
+
+def read_agent_report(problem_dir: Path | None = None) -> dict[str, str | None]:
+ """Read the agent self-reported attribution file (``.gym_attribution.json``).
+
+ Any AI coding tool can write this file into the challenge folder (per
+ AGENTS.md) to declare the tool + model it is running as. This is the
+ universal capture path — it works for *every* tool/model because the agent
+ self-identifies rather than us reverse-engineering each tool's on-disk
+ format.
+
+ Looks in ``problem_dir`` first, then the current working directory.
+ Accepts both snake_case and camelCase keys, plus ``model`` as an alias for
+ ``ai_model``. Missing or malformed files yield all-None.
+ """
+ empty: dict[str, str | None] = {"tool": None, "tool_version": None, "ai_model": None}
+
+ candidates: list[Path] = []
+ if problem_dir is not None:
+ candidates.append(Path(problem_dir) / AGENT_REPORT_FILENAME)
+ cwd_path = Path.cwd() / AGENT_REPORT_FILENAME
+ if cwd_path not in candidates:
+ candidates.append(cwd_path)
+
+ def _clean(value: object) -> str | None:
+ return value.strip() if isinstance(value, str) and value.strip() else None
+
+ for path in candidates:
+ try:
+ if not path.is_file():
+ continue
+ data = json.loads(path.read_text(encoding="utf-8"))
+ except (OSError, ValueError):
+ continue
+ if not isinstance(data, dict):
+ continue
+ return {
+ "tool": _clean(data.get("tool")),
+ "tool_version": _clean(data.get("tool_version") or data.get("toolVersion")),
+ "ai_model": _clean(
+ data.get("ai_model") or data.get("aiModel") or data.get("model")
+ ),
+ }
+ return empty
+
+
+def resolve(
+ cli_tool: str | None,
+ cli_version: str | None,
+ cli_model: str | None,
+ problem_dir: Path | None = None,
+) -> dict[str, str | None]:
+ """Resolve attribution for a submission. Precedence (highest first):
+
+ 1. CLI flags (``--tool``, ``--tool-version``, ``--ai-model``)
+ 2. Live auto-detection (env vars, Claude Code session log, process tree)
+ 3. Agent self-report file (``.gym_attribution.json`` in the challenge dir)
+ 4. Persistent attribution config (``~/.aicodinggym/attribution.json``)
+
+ Layer 2 is authoritative where it fires (real model string from the tool's
+ own session transcript) but only covers tools we know how to read. Layer 3
+ is the universal backstop: any agent can self-report, so attribution is
+ captured for *every* tool/model with zero human input. Layer 4 is the
+ human-set fallback (``aicodinggym set-attribution``).
+ """
+ auto_tool, auto_ver = detect_tool()
+ auto_model = detect_model()
+ agent = read_agent_report(problem_dir)
+ persisted = load_attribution()
+
+ # ``tool`` and ``tool_version`` are paired — the version always belongs to
+ # whichever layer supplied the tool. Walk layers in precedence order and
+ # take both fields from the first one that names a tool. ``--tool-version``
+ # (cli_version) still overrides at the end.
+ layers: tuple[tuple[str | None, str | None], ...] = (
+ (cli_tool, cli_version),
+ (auto_tool, auto_ver),
+ (agent.get("tool"), agent.get("tool_version")),
+ (persisted.get("tool"), persisted.get("tool_version")),
+ )
+ final_tool: str | None = None
+ final_version: str | None = None
+ for tool_layer, version_layer in layers:
+ if tool_layer:
+ final_tool = tool_layer
+ final_version = version_layer
+ break
+ if cli_version:
+ final_version = cli_version
+
+ return {
+ "tool": final_tool,
+ "tool_version": final_version,
+ "ai_model": cli_model or auto_model or agent.get("ai_model") or persisted.get("ai_model"),
+ }
+
+
+def read_solution_log_model(problem_dir: Path) -> str | None:
+ """For MLE: prefer the model recorded in solution_log.json (set by the agent
+ after each prompt per CLAUDE.md). Falls back to None if missing or malformed.
+ """
+ log_path = problem_dir / "solution_log.json"
+ if not log_path.exists():
+ return None
+ try:
+ data = json.loads(log_path.read_text())
+ except Exception:
+ return None
+
+ # Tolerate a few common shapes: {"model": "..."} or {"model_id": "..."}
+ # or {"entries": [{"model": "..."}, ...]} — take the most recent one.
+ if isinstance(data, dict):
+ if isinstance(data.get("model"), str):
+ return data["model"].strip().lower()
+ if isinstance(data.get("model_id"), str):
+ return data["model_id"].strip().lower()
+ entries = data.get("entries")
+ if isinstance(entries, list) and entries:
+ last = entries[-1]
+ if isinstance(last, dict):
+ for key in ("model", "model_id"):
+ if isinstance(last.get(key), str):
+ return last[key].strip().lower()
+ return None
+
+
+def _version(cmd: str) -> str | None:
+ if not shutil.which(cmd):
+ return None
+ try:
+ out = subprocess.check_output(
+ [cmd, "--version"],
+ text=True,
+ timeout=3,
+ stderr=subprocess.DEVNULL,
+ )
+ # First whitespace token of the first line — e.g. "claude --version"
+ # prints "2.1.141 (Claude Code)" and we want "2.1.141", not "Code)".
+ first_line = out.strip().splitlines()[0]
+ for token in first_line.split():
+ if any(ch.isdigit() for ch in token):
+ return token[:32]
+ return first_line.split()[0][:32]
+ except Exception:
+ return None
diff --git a/config.py b/config.py
index 7c5065e..83575a8 100644
--- a/config.py
+++ b/config.py
@@ -16,9 +16,11 @@
CONFIG_DIR = Path.home() / ".aicodinggym"
CONFIG_PATH = CONFIG_DIR / "config.json"
CREDENTIALS_PATH = CONFIG_DIR / "credentials.json"
+ATTRIBUTION_PATH = CONFIG_DIR / "attribution.json"
# Fields persisted in config.json
_CONFIG_FIELDS = ("user_id", "repo_name", "private_key_path", "workspace_dir")
+_ATTRIBUTION_FIELDS = ("tool", "tool_version", "ai_model")
def ensure_config_dir() -> Path:
@@ -80,6 +82,47 @@ def save_credentials(credentials: dict[str, dict[str, Any]]) -> None:
CREDENTIALS_PATH.write_text(json.dumps(credentials, indent=2) + "\n")
+def load_attribution() -> dict[str, str]:
+ """Load persistent tool/model attribution from ~/.aicodinggym/attribution.json.
+
+ Used as a reliable fallback when auto-detection cannot identify the
+ coding tool or model — set once via ``aicodinggym set-attribution`` and
+ every subsequent submission picks it up automatically.
+ """
+ if not ATTRIBUTION_PATH.exists():
+ return {}
+ try:
+ data = json.loads(ATTRIBUTION_PATH.read_text())
+ if not isinstance(data, dict):
+ return {}
+ return {
+ k: v.strip()
+ for k, v in data.items()
+ if k in _ATTRIBUTION_FIELDS and isinstance(v, str) and v.strip()
+ }
+ except (json.JSONDecodeError, OSError):
+ return {}
+
+
+def save_attribution(attribution: dict[str, str]) -> None:
+ """Persist attribution to ~/.aicodinggym/attribution.json."""
+ ensure_config_dir()
+ data = {
+ k: attribution[k].strip()
+ for k in _ATTRIBUTION_FIELDS
+ if isinstance(attribution.get(k), str) and attribution[k].strip()
+ }
+ ATTRIBUTION_PATH.write_text(json.dumps(data, indent=2) + "\n")
+
+
+def clear_attribution() -> bool:
+ """Remove persistent attribution. Returns True if a file was deleted."""
+ if ATTRIBUTION_PATH.exists():
+ ATTRIBUTION_PATH.unlink()
+ return True
+ return False
+
+
def require_config(config: dict[str, str], field: str, label: str) -> str:
"""Get a required config field or raise a descriptive error."""
value = config.get(field)
diff --git a/pyproject.toml b/pyproject.toml
index 9f749e9..a3c733b 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,6 +1,6 @@
[project]
name = "aicodinggym-cli"
-version = "0.5.1"
+version = "0.6.0"
description = "CLI tool for AI Coding Gym platform"
readme = "README.md"
requires-python = ">=3.10"
@@ -37,3 +37,7 @@ packages = ["aicodinggym"]
[tool.setuptools.package-dir]
aicodinggym = "."
+
+[tool.pytest.ini_options]
+testpaths = ["tests"]
+python_files = ["test_*.py"]