Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 105 additions & 27 deletions packages/core/src/relay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,11 @@ export async function sendViaRelay(options: {
}

export const WORKER_SCRIPT = `
function getPlanConfig(env) {
const paid = (env.RELAY_PLAN || '').toLowerCase() === 'paid'
return { paid, allowWebSocket: paid, logRequests: paid }
}

async function hashBody(body) {
const bytes = new TextEncoder().encode(body)
const digest = await crypto.subtle.digest('SHA-256', bytes)
Expand Down Expand Up @@ -1073,7 +1078,7 @@ async function resolveBody(env, payload) {
return { error: 'unknown mode', status: 400 }
}

async function prepareUpstream(env, payload) {
async function prepareUpstream(env, payload, config) {
if ((payload.protocol !== 1 && payload.protocol !== 2) || payload.type !== 'request' || !payload.affinity || !payload.upstream?.url || !payload.next_hash) {
return { error: 'invalid payload', status: 400 }
}
Expand All @@ -1086,14 +1091,16 @@ async function prepareUpstream(env, payload) {
}

const stateWrite = writeState(env, payload.affinity, { body, hash: payload.next_hash, revision: payload.revision }).catch(() => {})
console.log(JSON.stringify({
relay: 'opencode-anthropic-auth',
transport: 'relay',
mode: payload.mode,
revision: payload.revision,
affinity: String(payload.affinity).slice(0, 12),
bodyBytes: body.length,
}))
if (config.logRequests) {
Comment thread
iceteaSA marked this conversation as resolved.
console.log(JSON.stringify({
relay: 'opencode-anthropic-auth',
transport: 'http',
mode: payload.mode,
revision: payload.revision,
affinity: String(payload.affinity).slice(0, 12),
bodyBytes: body.length,
}))
}

return { body, stateWrite }
}
Expand Down Expand Up @@ -1146,16 +1153,8 @@ async function prepareWebSocketUpstream(env, state, payload) {

const nextState = { body, hash: payload.next_hash, revision: payload.revision }
const checkpoint = checkpointWebSocketState(env, payload, body, nextState)
const logAccepted = () => console.log(JSON.stringify({
relay: 'opencode-anthropic-auth',
transport: 'websocket',
mode: payload.mode,
revision: payload.revision,
affinity: String(payload.affinity).slice(0, 12),
bodyBytes: body.length,
}))

return { body, state: nextState, checkpoint, logAccepted }

return { body, state: nextState, checkpoint }
}

function headersToObject(headers) {
Expand All @@ -1164,18 +1163,55 @@ function headersToObject(headers) {
return result
}

async function handleRelayPayload(env, payload) {
const prepared = await prepareUpstream(env, payload)
const SKIP_ERROR_LOG_STATUSES = new Set([429, 403])

async function logUpstreamError(env, ctx, upstream, meta) {
if (!upstream.status || upstream.status < 400 || SKIP_ERROR_LOG_STATUSES.has(upstream.status)) return
try {
// Callers pass a dedicated clone, so consume it directly — cloning again
// here would leave the intermediate response body unread (stream leak).
const body = await upstream.text()
// Random suffix so two concurrent error logs for the same id/affinity that
// land in the same millisecond do not overwrite each other.
const key = 'error:' + Date.now() + ':' + (meta.id || meta.affinity || 'unknown') + ':' + crypto.randomUUID().slice(0, 8)
const entry = JSON.stringify({
ts: new Date().toISOString(),
status: upstream.status,
statusText: upstream.statusText,
transport: meta.transport,
mode: meta.mode,
affinity: meta.affinity,
id: meta.id,
bodyBytes: meta.bodyBytes,
responseBody: body.slice(0, 50000),
responseHeaders: headersToObject(upstream.headers),
})
const kvWrite = env.RELAY_STATE.put(key, entry, { expirationTtl: 604800 }).catch(() => {})
if (ctx?.waitUntil) ctx.waitUntil(kvWrite)
else void kvWrite
console.error(JSON.stringify({
relay: 'opencode-anthropic-auth',
event: 'upstream_error',
status: upstream.status,
transport: meta.transport,
affinity: String(meta.affinity || '').slice(0, 12),
responsePreview: body.slice(0, 500),
}))
} catch {}
}

async function handleRelayPayload(env, payload, config) {
const prepared = await prepareUpstream(env, payload, config)
if (prepared.error) return prepared
const upstream = await fetch(payload.upstream.url, {
method: payload.upstream.method || 'POST',
headers: payload.upstream.headers,
body: prepared.body,
})
return { upstream, stateWrite: prepared.stateWrite }
return { upstream, stateWrite: prepared.stateWrite, bodyBytes: prepared.body.length }
}

async function handleWebSocket(socket, env, ctx, payload, getState, setState) {
async function handleWebSocket(socket, env, ctx, payload, getState, setState, config) {
const heartbeat = setInterval(() => {
try {
socket.send(JSON.stringify({ protocol: 2, type: 'keepalive' }))
Expand All @@ -1192,7 +1228,16 @@ async function handleWebSocket(socket, env, ctx, payload, getState, setState) {

setState(result.state)
socket.send(JSON.stringify({ protocol: 2, type: 'accepted', id: payload.id, hash: result.state.hash, revision: result.state.revision }))
ctx?.waitUntil?.(deferWorkerTask(result.logAccepted))
if (config.logRequests) {
console.log(JSON.stringify({
relay: 'opencode-anthropic-auth',
transport: 'websocket',
mode: payload.mode,
revision: payload.revision,
affinity: String(payload.affinity).slice(0, 12),
bodyBytes: result.body.length,
}))
}

const upstreamPromise = fetch(payload.upstream.url, {
method: payload.upstream.method || 'POST',
Expand All @@ -1201,6 +1246,19 @@ async function handleWebSocket(socket, env, ctx, payload, getState, setState) {
})
ctx?.waitUntil?.(result.checkpoint)
const upstream = await upstreamPromise
// Log non-429/403 errors to KV for debugging
if (upstream.status >= 400 && !SKIP_ERROR_LOG_STATUSES.has(upstream.status)) {
const errorClone = upstream.clone()
const errorLog = logUpstreamError(env, ctx, errorClone, {
transport: 'websocket',
mode: payload.mode,
affinity: payload.affinity,
id: payload.id,
bodyBytes: result.body.length,
})
if (ctx?.waitUntil) ctx.waitUntil(errorLog)
else void errorLog
}
socket.send(JSON.stringify({
protocol: 2,
type: 'response_start',
Expand Down Expand Up @@ -1228,7 +1286,12 @@ async function handleWebSocket(socket, env, ctx, payload, getState, setState) {

export default {
async fetch(request, env, ctx) {
const config = getPlanConfig(env)

if (request.headers.get('Upgrade') === 'websocket') {
if (!config.allowWebSocket) {
return new Response('WebSocket transport requires Workers Paid plan. Use HTTP transport or upgrade your plan.', { status: 403 })
}
const url = new URL(request.url)
const token = url.searchParams.get('token')
const affinity = url.searchParams.get('affinity')
Expand Down Expand Up @@ -1274,15 +1337,19 @@ export default {
}
payload.affinity = affinity
busy = true
const run = handleWebSocket(server, env, ctx, payload, () => state, (nextState) => { state = nextState }).finally(() => { busy = false })
const run = handleWebSocket(server, env, ctx, payload, () => state, (nextState) => { state = nextState }, config).finally(() => { busy = false })
ctx?.waitUntil?.(run)
if (!ctx?.waitUntil) void run
})
return new Response(null, { status: 101, webSocket: client })
}

if (request.method === 'GET') {
return Response.json({ status: 'ok', transports: ['http', 'websocket'] })
return Response.json({
status: 'ok',
plan: config.paid ? 'paid' : 'free',
transports: config.allowWebSocket ? ['http', 'websocket'] : ['http'],
})
}
if (request.method !== 'POST') return new Response('method not allowed', { status: 405 })
if (request.headers.get('x-relay-token') !== env.RELAY_TOKEN) {
Expand All @@ -1291,12 +1358,23 @@ export default {

try {
const payload = await request.json()
const result = await handleRelayPayload(env, payload)
const result = await handleRelayPayload(env, payload, config)
if (result.error) return Response.json({ error: result.error }, { status: result.status })

if (result.stateWrite) ctx.waitUntil(result.stateWrite)

const upstream = result.upstream
// Defer error logging so reading the upstream error body never adds
// latency to forwarding the response (mirrors the WebSocket path).
const errorClone = upstream.clone()
const errorLog = logUpstreamError(env, ctx, errorClone, {
transport: 'http',
mode: payload.mode,
affinity: payload.affinity,
bodyBytes: result.bodyBytes,
})
if (ctx?.waitUntil) ctx.waitUntil(errorLog)
else void errorLog
return new Response(upstream.body, {
status: upstream.status,
statusText: upstream.statusText,
Expand Down
26 changes: 24 additions & 2 deletions packages/opencode/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ async function uploadRelayWorker(options: {
scriptName: string
kvNamespaceId: string
relayToken: string
plan: 'free' | 'paid'
}) {
const metadata = {
main_module: 'worker.js',
Expand All @@ -129,6 +130,15 @@ async function uploadRelayWorker(options: {
name: 'RELAY_TOKEN',
text: options.relayToken,
},
// The worker gates websocket/logging on RELAY_PLAN (see relay.ts
// WORKER_SCRIPT). A worker PUT replaces ALL bindings, so this must be set
// every deploy or the worker silently reverts to free-plan behavior
// (http-only, websocket → 403).
{
type: 'plain_text',
name: 'RELAY_PLAN',
text: options.plan,
},
],
}
const form = new FormData()
Expand Down Expand Up @@ -184,16 +194,27 @@ async function relaySetup() {
'opencode-anthropic-relay'
const kvTitle = `${scriptName}-state`
const relayToken = generateRelayToken()
// Cloudflare Workers plan. The worker enables websocket transport + logging
// only when RELAY_PLAN=paid; free accounts must stay http-only. Source from
// env for non-interactive setup, else prompt (default free — the safe option
// that works on any account).
const planInput = (
process.env.RELAY_PLAN?.trim() ||
process.env.CLOUDFLARE_PLAN?.trim() ||
(await prompt('Cloudflare Workers plan [free/paid] (default free): '))
).toLowerCase()
const plan: 'free' | 'paid' = planInput === 'paid' ? 'paid' : 'free'

console.log('Creating Cloudflare KV namespace...')
const namespace = await createKvNamespace(token, accountId, kvTitle)
console.log('Uploading relay Worker...')
console.log(`Uploading relay Worker (plan: ${plan})...`)
await uploadRelayWorker({
token,
accountId,
scriptName,
kvNamespaceId: namespace.id,
relayToken,
plan,
})
await enableWorkersDev(token, accountId, scriptName).catch((error) => {
console.warn(
Expand All @@ -214,7 +235,8 @@ async function relaySetup() {
url,
token: relayToken,
fallbackToDirect: true,
transport: 'http',
// websocket is a paid-plan capability; free accounts must use http.
transport: plan === 'paid' ? 'websocket' : 'http',
}
await saveAccounts(storage)

Expand Down
21 changes: 19 additions & 2 deletions packages/opencode/src/tests/cli.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ describe('CLI relay setup', () => {
test('deploys worker resources and saves relay config', async () => {
const accountPath = join(tempDir, 'anthropic-auth.json')
const callsPath = join(tempDir, 'calls.jsonl')
const metadataPath = join(tempDir, 'metadata.json')
const preloadPath = join(tempDir, 'relay-preload.ts')

await writeFile(
Expand All @@ -213,7 +214,12 @@ globalThis.fetch = async (input, init) => {
if (url.includes('/storage/kv/namespaces')) return Response.json({ success: true, result: { id: 'kv-id' } })
if (url.includes('/workers/scripts/opencode-anthropic-relay/subdomain')) return Response.json({ success: true, result: { enabled: true } })
if (url.includes('/workers/subdomain')) return Response.json({ success: true, result: { subdomain: 'user-subdomain' } })
if (url.includes('/workers/scripts/opencode-anthropic-relay')) return Response.json({ success: true, result: {} })
if (url.includes('/workers/scripts/opencode-anthropic-relay')) {
if (init?.body instanceof FormData) {
appendFileSync(${JSON.stringify(metadataPath)}, String(init.body.get('metadata') ?? ''))
}
return Response.json({ success: true, result: {} })
}
return Response.json({ success: false, errors: [{ message: 'unexpected ' + url }] }, { status: 500 })
}
`,
Expand All @@ -229,6 +235,7 @@ globalThis.fetch = async (input, init) => {
OPENCODE_ANTHROPIC_AUTH_FILE: accountPath,
CLOUDFLARE_API_TOKEN: 'cf-token',
CLOUDFLARE_ACCOUNT_ID: 'account-id',
CLOUDFLARE_PLAN: 'paid',
},
stdin: 'pipe',
stdout: 'pipe',
Expand All @@ -254,10 +261,20 @@ globalThis.fetch = async (input, init) => {
enabled: true,
url: 'https://opencode-anthropic-relay.user-subdomain.workers.dev',
fallbackToDirect: true,
transport: 'http',
// CLOUDFLARE_PLAN=paid → websocket transport.
transport: 'websocket',
})
expect(storage.relay.token).toBeString()

// The worker deploy must carry the RELAY_PLAN binding (a worker PUT replaces
// all bindings, so omitting it would revert the worker to free-plan mode).
const metadata = JSON.parse(await readFile(metadataPath, 'utf8'))
expect(metadata.bindings).toContainEqual({
type: 'plain_text',
name: 'RELAY_PLAN',
text: 'paid',
})

const calls = (await readFile(callsPath, 'utf8')).trim().split('\n')
expect(calls).toHaveLength(4)
})
Expand Down
44 changes: 42 additions & 2 deletions packages/opencode/src/tests/relay-worker-miniflare.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ function startUpstream() {
return { server, bodies, url: `http://127.0.0.1:${server.port}/messages` }
}

async function startWorker() {
async function startWorker(plan: 'paid' | 'free' = 'paid') {
const mf = new Miniflare({
script: WORKER_SCRIPT,
modules: true,
compatibilityDate: '2026-04-28',
kvNamespaces: ['RELAY_STATE'],
bindings: { RELAY_TOKEN },
bindings: { RELAY_TOKEN, RELAY_PLAN: plan },
port: 0,
log: new NoOpLog(),
})
Expand Down Expand Up @@ -300,4 +300,44 @@ describe('relay Worker under Miniflare', () => {
upstream.server.stop(true)
}
}, 30_000)

test('free plan rejects websocket upgrade and advertises http-only transport', async () => {
const mf = await startWorker('free')
try {
const base = (await mf.ready).toString()

// Plain fetch with an Upgrade header: the worker returns a normal 403
// Response before any handshake, so no WebSocket client is involved.
const upgrade = await fetch(
`${base}ws?token=${RELAY_TOKEN}&affinity=free-plan-session`,
{ headers: { Upgrade: 'websocket' } },
)
expect(upgrade.status).toBe(403)
expect(await upgrade.text()).toContain('Workers Paid plan')

const health = await (await fetch(base, { method: 'GET' })).json()
expect(health).toMatchObject({
status: 'ok',
plan: 'free',
transports: ['http'],
})
} finally {
await mf.dispose()
}
}, 30_000)

test('paid plan health endpoint advertises websocket transport', async () => {
const mf = await startWorker('paid')
try {
const base = (await mf.ready).toString()
const health = await (await fetch(base, { method: 'GET' })).json()
expect(health).toMatchObject({
status: 'ok',
plan: 'paid',
transports: ['http', 'websocket'],
})
} finally {
await mf.dispose()
}
}, 30_000)
})