Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 66 additions & 7 deletions ADWs/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,31 @@ def _log_to_file(log_name, prompt, stdout, stderr, returncode, duration, usage=N
_ALLOWED_CLI_COMMANDS = frozenset({"claude", "openclaude"})


def _spawn_cli(cli_command: str, prompt: str, agent: str | None, provider_env: dict) -> subprocess.Popen:
def _spawn_cli(
cli_command: str,
prompt: str,
agent: str | None,
provider_env: dict,
resume_session_id: str | None = None,
) -> subprocess.Popen:
"""Spawn a CLI process using only hardcoded command strings.

Uses a dictionary lookup so that the subprocess argument is always
a static string, satisfying semgrep/opengrep subprocess injection rules.

Args:
resume_session_id: when set, prepends `--resume <id>` so the CLI
continues an existing Claude session (preserves context window
across consecutive webhooks for the same logical thread).
If the session has expired or been cleaned, the CLI will error
and the caller is responsible for retrying without resume.
"""
base_args = ["--print", "--dangerously-skip-permissions", "--output-format", "json"]
if resume_session_id:
# `--resume` only meaningful with the Anthropic CLI ("claude").
# OpenClaude (Codex/OpenAI) doesn't expose session resume yet —
# caller is expected to gate this branch by provider.
base_args.extend(["--resume", resume_session_id])
if agent:
base_args.extend(["--agent", agent])
base_args.append(prompt)
Expand Down Expand Up @@ -258,6 +276,7 @@ def run_claude(
timeout: int = 600,
agent: str = None,
daily_output_kind: str | None = None,
resume_session_id: str | None = None,
) -> dict:
"""
Execute AI CLI (claude or openclaude) with streaming output.
Expand All @@ -273,9 +292,29 @@ def run_claude(
daily_output_kind: When set, files written to workspace/daily-logs/ by the
subprocess are snapshotted and persisted to daily_outputs (PG mode).
In SQLite mode this is a no-op — files stay on disk as before.
resume_session_id: when set and the active provider is the Anthropic
CLI, the subprocess receives `--resume <id>` so the model
continues a previously-stored conversation. The CLI uses the
id to load the conversation history from
~/.claude/projects/<workspace>/<session>.jsonl. If the session
no longer exists, the CLI errors with non-zero exit and the
caller should retry without resume. The captured session_id
from the output JSON is returned in the result so callers can
persist it for next time.

Returns dict with keys:
success, stdout, stderr, returncode, duration, usage,
session_id — Claude session_id captured from output JSON
(None if not available, e.g. JSON parse failure
or non-Anthropic provider).
"""
cli_command, provider_env = _get_provider_config()

# `--resume` is Claude-CLI specific. Silently drop for other providers
# so callers can pass it unconditionally without checking provider.
if resume_session_id and cli_command != "claude":
resume_session_id = None

if agent:
agent_label = f"@{agent}"
else:
Expand All @@ -289,7 +328,10 @@ def run_claude(
start_time = datetime.now()

try:
process = _spawn_cli(cli_command, prompt, agent, provider_env)
process = _spawn_cli(
cli_command, prompt, agent, provider_env,
resume_session_id=resume_session_id,
)

stdout_lines = []
line_count = 0
Expand All @@ -304,13 +346,19 @@ def run_claude(
stdout = "".join(stdout_lines)
duration = (datetime.now() - start_time).total_seconds()

# Parse JSON output to extract result and usage
# Parse JSON output to extract result, usage, and session_id
usage = None
result_text = stdout
captured_session_id = None
try:
json_result = json.loads(stdout)
usage = _parse_usage(json_result)
result_text = json_result.get("result", stdout)
# Claude CLI returns session_id in the output JSON regardless
# of whether --resume was used (it always uses some session,
# just creates a new one when --resume is absent). Capturing
# it here lets the caller persist it for next-time --resume.
captured_session_id = json_result.get("session_id")
except (json.JSONDecodeError, TypeError):
pass

Expand All @@ -323,7 +371,8 @@ def run_claude(
if usage:
tokens_total = usage["input_tokens"] + usage["output_tokens"]
cost_str = f" | {tokens_total:,}tok | ${usage['cost_usd']:.2f}"
console.print(f"\r [success]✓[/success] {log_name} [dim]({duration:.0f}s{cost_str})[/dim]")
resume_str = " (resumed)" if resume_session_id else ""
console.print(f"\r [success]✓[/success] {log_name} [dim]({duration:.0f}s{cost_str}){resume_str}[/dim]")
# Post-process: persist new daily-log files to DB (PG mode only)
if daily_output_kind:
_persist_new_daily_outputs(before_snapshot, daily_output_kind, agent)
Expand All @@ -340,14 +389,15 @@ def run_claude(
"returncode": process.returncode,
"duration": duration,
"usage": usage,
"session_id": captured_session_id,
}

except subprocess.TimeoutExpired:
process.kill()
duration = (datetime.now() - start_time).total_seconds()
console.print(f"\r [error]✗[/error] {log_name} [warning](timeout {timeout}s)[/warning]")
_log_to_file(log_name, prompt, "", f"Timeout after {timeout}s", -1, duration)
return {"success": False, "stdout": "", "stderr": f"Timeout after {timeout}s", "returncode": -1, "duration": duration}
return {"success": False, "stdout": "", "stderr": f"Timeout after {timeout}s", "returncode": -1, "duration": duration, "session_id": None}

except KeyboardInterrupt:
process.kill()
Expand All @@ -360,7 +410,7 @@ def run_claude(
duration = (datetime.now() - start_time).total_seconds()
console.print(f"\r [error]✗[/error] {log_name} [error]({e})[/error]")
_log_to_file(log_name, prompt, "", str(e), -3, duration)
return {"success": False, "stdout": "", "stderr": str(e), "returncode": -3, "duration": duration}
return {"success": False, "stdout": "", "stderr": str(e), "returncode": -3, "duration": duration, "session_id": None}


def run_skill(
Expand All @@ -371,6 +421,7 @@ def run_skill(
agent: str = None,
notify_telegram: bool | str = False,
daily_output_kind: str | None = None,
resume_session_id: str | None = None,
) -> dict:
"""Execute a skill via CLI, optionally with an agent.

Expand All @@ -382,6 +433,7 @@ def run_skill(
"<chat_id>" — same as True but overrides the chat_id.
daily_output_kind: When set, files written to workspace/daily-logs/ by the
skill subprocess are persisted to daily_outputs in PG mode.
resume_session_id: forwarded to run_claude — see its docstring.
"""
prompt = f"Execute the skill /{skill_name} {args}".strip()
if notify_telegram:
Expand All @@ -400,7 +452,14 @@ def run_skill(
f"Nunca chame reply para progresso, confirmação intermediária ou teste.\n"
f"---"
)
return run_claude(prompt, log_name or skill_name, timeout, agent=agent, daily_output_kind=daily_output_kind)
return run_claude(
prompt,
log_name or skill_name,
timeout,
agent=agent,
daily_output_kind=daily_output_kind,
resume_session_id=resume_session_id,
)


def run_script(func, log_name: str = "unnamed", timeout: int = 120) -> dict:
Expand Down
42 changes: 42 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,48 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [unreleased]

### Added — clickup-session-resume

Per-trigger opt-in for `claude --resume` continuation across consecutive
webhooks of the same logical thread (e.g. ClickUp task, GitHub PR). Without
this, every webhook spawned a fresh Claude subprocess that had to re-read
all prior comments, re-derive context, and re-fetch data via API calls —
losing 90%+ of the model's reasoning trace between turns.

- **`Trigger.resume_sessions BOOLEAN`** column (default false) — opt-in
per trigger. Existing triggers keep current "fresh subprocess" behaviour.
- **`trigger_session_threads`** table (Alembic migration `0012`) maps
`(trigger_id, dedup_key) → claude_session_id`. The dedup_key is
extracted per-source: ClickUp task_id, GitHub PR number, Linear issue
id (extensible via `_extract_dedup_key` in `routes/triggers.py`).
- **`run_claude(..., resume_session_id=...)`** in `ADWs/runner.py` —
prepends `--resume <id>` to the CLI invocation. Captures `session_id`
from output JSON and returns it in the result dict for upsert. Falls
back gracefully when resume fails (stale session): retries once
without resume.
- **Settings UI** at `/settings → Sessions` tab:
- Default-on toggle for new triggers
- Auto-cleanup window (days) + daily cleanup hour
- Force compaction turn count
- Storage stats (disk usage of `~/.claude/projects/`)
- Live list of active session threads with manual reset + bulk
cleanup-stale actions
- **Trigger UI** — checkbox "Enable session resume" on the trigger
edit form, with explanatory tooltip.
- **Endpoints**:
- `GET/PUT /api/settings/sessions` — global config
- `GET /api/sessions` — list active threads
- `DELETE /api/sessions/<id>` — manual reset
- `POST /api/sessions/cleanup-stale` — bulk delete stale rows

Incident driving this: 2026-05-02 ClickUp task 86c9kyquv. User asked
for a marketing report, then 30 min later asked to "implement
recommendation 3". The fresh Oracle had to re-read the entire Google
Doc and re-derive what "rec 3" meant — wasting ~$2 of tokens that
the prior Oracle already had cached in context.

### Added — postgres-compat (carried from prior unreleased)

PostgreSQL is now a first-class storage option alongside SQLite. Two
features land together: **postgres-compat** (dual-backend schema, data
migration tool) and **pg-native-configs** (configs live in DB when on PG,
Expand Down
147 changes: 147 additions & 0 deletions dashboard/alembic/versions/0012_clickup_session_resume.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""ClickUp session resume — per-trigger toggle + dedup-keyed Claude session persistence.

Adds:
- triggers.resume_sessions BOOLEAN — per-trigger opt-in for `claude --resume`
behaviour.
- new table `trigger_session_threads` — maps (trigger_id, dedup_key) →
claude_session_id, allowing consecutive webhooks for the same logical
"thread" (e.g. ClickUp task, GitHub PR) to continue the same Claude
session window instead of starting fresh every time.

Why this matters:
Without session resume, every webhook spawns a fresh `claude --print`
subprocess that must re-read all prior comments, re-derive context, and
re-fetch data via API calls. Long multi-comment workflows (reports →
follow-up questions → implementations) lose 90% of the model's reasoning
trace between turns. With resume, the model carries forward its full
context window across turns at zero re-derivation cost.

Real-world incident driving this (2026-05-02 ClickUp task 86c9kyquv):
the user requested a marketing report, then asked to "implement
recommendation 3". The fresh Oracle had to re-read the entire Google Doc
and re-derive what "rec 3" meant — wasting ~$2 of work that the prior
Oracle already had cached in context.

Schema:
trigger_session_threads(
id PK,
trigger_id FK triggers(id) on delete cascade,
dedup_key TEXT NOT NULL -- string extracted from event_data
-- (ClickUp task_id, GitHub PR number, etc.)
claude_session_id TEXT, -- the session_id captured from claude --print --output-format json
last_used_at DATETIME,
created_at DATETIME
)
UNIQUE (trigger_id, dedup_key)
INDEX (trigger_id, last_used_at DESC) -- for cleanup queries

Revision ID: 0012
Revises: 0011
Create Date: 2026-05-03
"""

from __future__ import annotations

from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op
from sqlalchemy import inspect


# revision identifiers, used by Alembic.
revision: str = "0012"
down_revision: Union[str, None] = "0011"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def _is_pg() -> bool:
return op.get_bind().dialect.name == "postgresql"


def _has_table(conn, name: str) -> bool:
return inspect(conn).has_table(name)


def _has_column(conn, table: str, column: str) -> bool:
cols = {c["name"] for c in inspect(conn).get_columns(table)}
return column in cols


def upgrade() -> None:
conn = op.get_bind()

# ── 1. triggers.resume_sessions ──────────────────────────────────────
# Per-trigger toggle. Default FALSE so existing triggers keep their
# current "fresh subprocess every time" behaviour. Operator opts in
# explicitly via dashboard checkbox or YAML.
if not _has_column(conn, "triggers", "resume_sessions"):
with op.batch_alter_table("triggers") as batch:
batch.add_column(
sa.Column(
"resume_sessions",
sa.Boolean(),
nullable=False,
server_default=sa.false(),
)
)

# ── 2. trigger_session_threads table ─────────────────────────────────
if not _has_table(conn, "trigger_session_threads"):
op.create_table(
"trigger_session_threads",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column(
"trigger_id",
sa.Integer(),
sa.ForeignKey("triggers.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("dedup_key", sa.Text(), nullable=False),
sa.Column("claude_session_id", sa.Text(), nullable=True),
sa.Column(
"last_used_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
sa.UniqueConstraint(
"trigger_id",
"dedup_key",
name="uq_trigger_session_threads_trigger_key",
),
)

# Cleanup index — DESC last_used so "find stale sessions older
# than N days" scans only the tail of the table.
op.create_index(
"ix_trigger_session_threads_trigger_last_used",
"trigger_session_threads",
["trigger_id", "last_used_at"],
)


def downgrade() -> None:
conn = op.get_bind()

if _has_table(conn, "trigger_session_threads"):
# Drop index first (PG complains otherwise on some versions).
try:
op.drop_index(
"ix_trigger_session_threads_trigger_last_used",
table_name="trigger_session_threads",
)
except Exception:
pass
op.drop_table("trigger_session_threads")

if _has_column(conn, "triggers", "resume_sessions"):
with op.batch_alter_table("triggers") as batch:
batch.drop_column("resume_sessions")
Loading
Loading