From 2e7467c304264f37360906c86581439899a2034d Mon Sep 17 00:00:00 2001 From: iceteaSA <171169159+iceteaSA@users.noreply.github.com> Date: Thu, 21 May 2026 21:05:45 +0200 Subject: [PATCH 1/3] feat(relay): add plan-based gating, KV error logging, and request logging Worker script improvements: - Plan-based gating: WebSocket transport requires paid plan (RELAY_PLAN=paid) - KV error logging: non-429/403 upstream errors logged to KV with 7-day TTL - Request logging: HTTP and WebSocket requests logged on paid plan - GET health endpoint returns plan info and available transports --- packages/core/src/relay.ts | 132 ++++++++++++++---- .../src/tests/relay-worker-miniflare.test.ts | 44 +++++- 2 files changed, 147 insertions(+), 29 deletions(-) diff --git a/packages/core/src/relay.ts b/packages/core/src/relay.ts index ad766e0..4d973e5 100644 --- a/packages/core/src/relay.ts +++ b/packages/core/src/relay.ts @@ -1026,6 +1026,11 @@ export async function sendViaRelay(options: { } export const WORKER_SCRIPT = ` +function getPlanConfig(env) { + const paid = (env.RELAY_PLAN || '').toLowerCase() === 'paid' + return { paid, allowWebSocket: paid, logRequests: paid } +} + async function hashBody(body) { const bytes = new TextEncoder().encode(body) const digest = await crypto.subtle.digest('SHA-256', bytes) @@ -1073,7 +1078,7 @@ async function resolveBody(env, payload) { return { error: 'unknown mode', status: 400 } } -async function prepareUpstream(env, payload) { +async function prepareUpstream(env, payload, config) { if ((payload.protocol !== 1 && payload.protocol !== 2) || payload.type !== 'request' || !payload.affinity || !payload.upstream?.url || !payload.next_hash) { return { error: 'invalid payload', status: 400 } } @@ -1086,14 +1091,16 @@ async function prepareUpstream(env, payload) { } const stateWrite = writeState(env, payload.affinity, { body, hash: payload.next_hash, revision: payload.revision }).catch(() => {}) - console.log(JSON.stringify({ - relay: 'opencode-anthropic-auth', - transport: 'relay', - mode: payload.mode, - revision: payload.revision, - affinity: String(payload.affinity).slice(0, 12), - bodyBytes: body.length, - })) + if (config.logRequests) { + console.log(JSON.stringify({ + relay: 'opencode-anthropic-auth', + transport: 'http', + mode: payload.mode, + revision: payload.revision, + affinity: String(payload.affinity).slice(0, 12), + bodyBytes: body.length, + })) + } return { body, stateWrite } } @@ -1146,16 +1153,8 @@ async function prepareWebSocketUpstream(env, state, payload) { const nextState = { body, hash: payload.next_hash, revision: payload.revision } const checkpoint = checkpointWebSocketState(env, payload, body, nextState) - const logAccepted = () => console.log(JSON.stringify({ - relay: 'opencode-anthropic-auth', - transport: 'websocket', - mode: payload.mode, - revision: payload.revision, - affinity: String(payload.affinity).slice(0, 12), - bodyBytes: body.length, - })) - - return { body, state: nextState, checkpoint, logAccepted } + + return { body, state: nextState, checkpoint } } function headersToObject(headers) { @@ -1164,18 +1163,55 @@ function headersToObject(headers) { return result } -async function handleRelayPayload(env, payload) { - const prepared = await prepareUpstream(env, payload) +const SKIP_ERROR_LOG_STATUSES = new Set([429, 403]) + +async function logUpstreamError(env, ctx, upstream, meta) { + if (!upstream.status || upstream.status < 400 || SKIP_ERROR_LOG_STATUSES.has(upstream.status)) return + try { + // Callers pass a dedicated clone, so consume it directly — cloning again + // here would leave the intermediate response body unread (stream leak). + const body = await upstream.text() + // Random suffix so two concurrent error logs for the same id/affinity that + // land in the same millisecond do not overwrite each other. + const key = 'error:' + Date.now() + ':' + (meta.id || meta.affinity || 'unknown') + ':' + crypto.randomUUID().slice(0, 8) + const entry = JSON.stringify({ + ts: new Date().toISOString(), + status: upstream.status, + statusText: upstream.statusText, + transport: meta.transport, + mode: meta.mode, + affinity: meta.affinity, + id: meta.id, + bodyBytes: meta.bodyBytes, + responseBody: body.slice(0, 50000), + responseHeaders: headersToObject(upstream.headers), + }) + const kvWrite = env.RELAY_STATE.put(key, entry, { expirationTtl: 604800 }).catch(() => {}) + if (ctx?.waitUntil) ctx.waitUntil(kvWrite) + else void kvWrite + console.error(JSON.stringify({ + relay: 'opencode-anthropic-auth', + event: 'upstream_error', + status: upstream.status, + transport: meta.transport, + affinity: String(meta.affinity || '').slice(0, 12), + responsePreview: body.slice(0, 500), + })) + } catch {} +} + +async function handleRelayPayload(env, payload, config) { + const prepared = await prepareUpstream(env, payload, config) if (prepared.error) return prepared const upstream = await fetch(payload.upstream.url, { method: payload.upstream.method || 'POST', headers: payload.upstream.headers, body: prepared.body, }) - return { upstream, stateWrite: prepared.stateWrite } + return { upstream, stateWrite: prepared.stateWrite, bodyBytes: prepared.body.length } } -async function handleWebSocket(socket, env, ctx, payload, getState, setState) { +async function handleWebSocket(socket, env, ctx, payload, getState, setState, config) { const heartbeat = setInterval(() => { try { socket.send(JSON.stringify({ protocol: 2, type: 'keepalive' })) @@ -1192,7 +1228,16 @@ async function handleWebSocket(socket, env, ctx, payload, getState, setState) { setState(result.state) socket.send(JSON.stringify({ protocol: 2, type: 'accepted', id: payload.id, hash: result.state.hash, revision: result.state.revision })) - ctx?.waitUntil?.(deferWorkerTask(result.logAccepted)) + if (config.logRequests) { + console.log(JSON.stringify({ + relay: 'opencode-anthropic-auth', + transport: 'websocket', + mode: payload.mode, + revision: payload.revision, + affinity: String(payload.affinity).slice(0, 12), + bodyBytes: result.body.length, + })) + } const upstreamPromise = fetch(payload.upstream.url, { method: payload.upstream.method || 'POST', @@ -1201,6 +1246,19 @@ async function handleWebSocket(socket, env, ctx, payload, getState, setState) { }) ctx?.waitUntil?.(result.checkpoint) const upstream = await upstreamPromise + // Log non-429/403 errors to KV for debugging + if (upstream.status >= 400 && !SKIP_ERROR_LOG_STATUSES.has(upstream.status)) { + const errorClone = upstream.clone() + const errorLog = logUpstreamError(env, ctx, errorClone, { + transport: 'websocket', + mode: payload.mode, + affinity: payload.affinity, + id: payload.id, + bodyBytes: result.body.length, + }) + if (ctx?.waitUntil) ctx.waitUntil(errorLog) + else void errorLog + } socket.send(JSON.stringify({ protocol: 2, type: 'response_start', @@ -1228,7 +1286,12 @@ async function handleWebSocket(socket, env, ctx, payload, getState, setState) { export default { async fetch(request, env, ctx) { + const config = getPlanConfig(env) + if (request.headers.get('Upgrade') === 'websocket') { + if (!config.allowWebSocket) { + return new Response('WebSocket transport requires Workers Paid plan. Use HTTP transport or upgrade your plan.', { status: 403 }) + } const url = new URL(request.url) const token = url.searchParams.get('token') const affinity = url.searchParams.get('affinity') @@ -1274,7 +1337,7 @@ export default { } payload.affinity = affinity busy = true - const run = handleWebSocket(server, env, ctx, payload, () => state, (nextState) => { state = nextState }).finally(() => { busy = false }) + const run = handleWebSocket(server, env, ctx, payload, () => state, (nextState) => { state = nextState }, config).finally(() => { busy = false }) ctx?.waitUntil?.(run) if (!ctx?.waitUntil) void run }) @@ -1282,7 +1345,11 @@ export default { } if (request.method === 'GET') { - return Response.json({ status: 'ok', transports: ['http', 'websocket'] }) + return Response.json({ + status: 'ok', + plan: config.paid ? 'paid' : 'free', + transports: config.allowWebSocket ? ['http', 'websocket'] : ['http'], + }) } if (request.method !== 'POST') return new Response('method not allowed', { status: 405 }) if (request.headers.get('x-relay-token') !== env.RELAY_TOKEN) { @@ -1291,12 +1358,23 @@ export default { try { const payload = await request.json() - const result = await handleRelayPayload(env, payload) + const result = await handleRelayPayload(env, payload, config) if (result.error) return Response.json({ error: result.error }, { status: result.status }) if (result.stateWrite) ctx.waitUntil(result.stateWrite) const upstream = result.upstream + // Defer error logging so reading the upstream error body never adds + // latency to forwarding the response (mirrors the WebSocket path). + const errorClone = upstream.clone() + const errorLog = logUpstreamError(env, ctx, errorClone, { + transport: 'http', + mode: payload.mode, + affinity: payload.affinity, + bodyBytes: result.bodyBytes, + }) + if (ctx?.waitUntil) ctx.waitUntil(errorLog) + else void errorLog return new Response(upstream.body, { status: upstream.status, statusText: upstream.statusText, diff --git a/packages/opencode/src/tests/relay-worker-miniflare.test.ts b/packages/opencode/src/tests/relay-worker-miniflare.test.ts index aac32c3..3e623fa 100644 --- a/packages/opencode/src/tests/relay-worker-miniflare.test.ts +++ b/packages/opencode/src/tests/relay-worker-miniflare.test.ts @@ -42,13 +42,13 @@ function startUpstream() { return { server, bodies, url: `http://127.0.0.1:${server.port}/messages` } } -async function startWorker() { +async function startWorker(plan: 'paid' | 'free' = 'paid') { const mf = new Miniflare({ script: WORKER_SCRIPT, modules: true, compatibilityDate: '2026-04-28', kvNamespaces: ['RELAY_STATE'], - bindings: { RELAY_TOKEN }, + bindings: { RELAY_TOKEN, RELAY_PLAN: plan }, port: 0, log: new NoOpLog(), }) @@ -300,4 +300,44 @@ describe('relay Worker under Miniflare', () => { upstream.server.stop(true) } }, 30_000) + + test('free plan rejects websocket upgrade and advertises http-only transport', async () => { + const mf = await startWorker('free') + try { + const base = (await mf.ready).toString() + + // Plain fetch with an Upgrade header: the worker returns a normal 403 + // Response before any handshake, so no WebSocket client is involved. + const upgrade = await fetch( + `${base}ws?token=${RELAY_TOKEN}&affinity=free-plan-session`, + { headers: { Upgrade: 'websocket' } }, + ) + expect(upgrade.status).toBe(403) + expect(await upgrade.text()).toContain('Workers Paid plan') + + const health = await (await fetch(base, { method: 'GET' })).json() + expect(health).toMatchObject({ + status: 'ok', + plan: 'free', + transports: ['http'], + }) + } finally { + await mf.dispose() + } + }, 30_000) + + test('paid plan health endpoint advertises websocket transport', async () => { + const mf = await startWorker('paid') + try { + const base = (await mf.ready).toString() + const health = await (await fetch(base, { method: 'GET' })).json() + expect(health).toMatchObject({ + status: 'ok', + plan: 'paid', + transports: ['http', 'websocket'], + }) + } finally { + await mf.dispose() + } + }, 30_000) }) From 367ae2177c7201767ecf43f676e0e05e4bd554f4 Mon Sep 17 00:00:00 2001 From: iceteaSA <171169159+iceteaSA@users.noreply.github.com> Date: Thu, 4 Jun 2026 10:38:33 +0200 Subject: [PATCH 2/3] fix(opencode): set RELAY_PLAN on relay worker deploy and match transport to plan The worker gates websocket + logging on RELAY_PLAN, but the CLI never set it, so a worker PUT silently reverted to free-plan behaviour. relaySetup now selects the plan (env or prompt, default free) and passes it through uploadRelayWorker as the RELAY_PLAN binding; transport defaults to websocket for paid, http for free. --- packages/opencode/src/cli.ts | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/packages/opencode/src/cli.ts b/packages/opencode/src/cli.ts index 3f6afc8..d4c6fcf 100644 --- a/packages/opencode/src/cli.ts +++ b/packages/opencode/src/cli.ts @@ -114,6 +114,7 @@ async function uploadRelayWorker(options: { scriptName: string kvNamespaceId: string relayToken: string + plan: 'free' | 'paid' }) { const metadata = { main_module: 'worker.js', @@ -129,6 +130,15 @@ async function uploadRelayWorker(options: { name: 'RELAY_TOKEN', text: options.relayToken, }, + // The worker gates websocket/logging on RELAY_PLAN (see relay.ts + // WORKER_SCRIPT). A worker PUT replaces ALL bindings, so this must be set + // every deploy or the worker silently reverts to free-plan behavior + // (http-only, websocket → 403). + { + type: 'plain_text', + name: 'RELAY_PLAN', + text: options.plan, + }, ], } const form = new FormData() @@ -184,16 +194,27 @@ async function relaySetup() { 'opencode-anthropic-relay' const kvTitle = `${scriptName}-state` const relayToken = generateRelayToken() + // Cloudflare Workers plan. The worker enables websocket transport + logging + // only when RELAY_PLAN=paid; free accounts must stay http-only. Source from + // env for non-interactive setup, else prompt (default free — the safe option + // that works on any account). + const planInput = ( + process.env.RELAY_PLAN?.trim() || + process.env.CLOUDFLARE_PLAN?.trim() || + (await prompt('Cloudflare Workers plan [free/paid] (default free): ')) + ).toLowerCase() + const plan: 'free' | 'paid' = planInput === 'paid' ? 'paid' : 'free' console.log('Creating Cloudflare KV namespace...') const namespace = await createKvNamespace(token, accountId, kvTitle) - console.log('Uploading relay Worker...') + console.log(`Uploading relay Worker (plan: ${plan})...`) await uploadRelayWorker({ token, accountId, scriptName, kvNamespaceId: namespace.id, relayToken, + plan, }) await enableWorkersDev(token, accountId, scriptName).catch((error) => { console.warn( @@ -214,7 +235,8 @@ async function relaySetup() { url, token: relayToken, fallbackToDirect: true, - transport: 'http', + // websocket is a paid-plan capability; free accounts must use http. + transport: plan === 'paid' ? 'websocket' : 'http', } await saveAccounts(storage) From b4939fefe224da9b434a1bb4cbe771ed8dabca01 Mon Sep 17 00:00:00 2001 From: iceteaSA <171169159+iceteaSA@users.noreply.github.com> Date: Thu, 4 Jun 2026 11:37:14 +0200 Subject: [PATCH 3/3] test(opencode): cover RELAY_PLAN binding in relay setup deploy --- packages/opencode/src/tests/cli.test.ts | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/packages/opencode/src/tests/cli.test.ts b/packages/opencode/src/tests/cli.test.ts index 8a1c043..331b315 100644 --- a/packages/opencode/src/tests/cli.test.ts +++ b/packages/opencode/src/tests/cli.test.ts @@ -202,6 +202,7 @@ describe('CLI relay setup', () => { test('deploys worker resources and saves relay config', async () => { const accountPath = join(tempDir, 'anthropic-auth.json') const callsPath = join(tempDir, 'calls.jsonl') + const metadataPath = join(tempDir, 'metadata.json') const preloadPath = join(tempDir, 'relay-preload.ts') await writeFile( @@ -213,7 +214,12 @@ globalThis.fetch = async (input, init) => { if (url.includes('/storage/kv/namespaces')) return Response.json({ success: true, result: { id: 'kv-id' } }) if (url.includes('/workers/scripts/opencode-anthropic-relay/subdomain')) return Response.json({ success: true, result: { enabled: true } }) if (url.includes('/workers/subdomain')) return Response.json({ success: true, result: { subdomain: 'user-subdomain' } }) - if (url.includes('/workers/scripts/opencode-anthropic-relay')) return Response.json({ success: true, result: {} }) + if (url.includes('/workers/scripts/opencode-anthropic-relay')) { + if (init?.body instanceof FormData) { + appendFileSync(${JSON.stringify(metadataPath)}, String(init.body.get('metadata') ?? '')) + } + return Response.json({ success: true, result: {} }) + } return Response.json({ success: false, errors: [{ message: 'unexpected ' + url }] }, { status: 500 }) } `, @@ -229,6 +235,7 @@ globalThis.fetch = async (input, init) => { OPENCODE_ANTHROPIC_AUTH_FILE: accountPath, CLOUDFLARE_API_TOKEN: 'cf-token', CLOUDFLARE_ACCOUNT_ID: 'account-id', + CLOUDFLARE_PLAN: 'paid', }, stdin: 'pipe', stdout: 'pipe', @@ -254,10 +261,20 @@ globalThis.fetch = async (input, init) => { enabled: true, url: 'https://opencode-anthropic-relay.user-subdomain.workers.dev', fallbackToDirect: true, - transport: 'http', + // CLOUDFLARE_PLAN=paid → websocket transport. + transport: 'websocket', }) expect(storage.relay.token).toBeString() + // The worker deploy must carry the RELAY_PLAN binding (a worker PUT replaces + // all bindings, so omitting it would revert the worker to free-plan mode). + const metadata = JSON.parse(await readFile(metadataPath, 'utf8')) + expect(metadata.bindings).toContainEqual({ + type: 'plain_text', + name: 'RELAY_PLAN', + text: 'paid', + }) + const calls = (await readFile(callsPath, 'utf8')).trim().split('\n') expect(calls).toHaveLength(4) })