Skip to content
Merged
15 changes: 12 additions & 3 deletions src/api/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ import { handleOrbIngest, readOrbIngestBody } from "../orb/ingest";
import { handleOrbWebhook } from "../orb/webhook";
import { handleOrbOAuthCallback } from "../orb/oauth";
import { brokerOrbToken, isOrbBrokerEnabled, issueOrbEnrollment } from "../orb/broker";
import { registerOrbRelay } from "../orb/relay";
import { readOrbRelayRegisterBody, registerValidatedOrbRelay, validateOrbRelayEnrollment } from "../orb/relay";
import { computeFleetAnalytics } from "../orb/analytics";
import { handleMcpRequest } from "../mcp/server";
import { buildOpenApiSpec } from "../openapi/spec";
Expand Down Expand Up @@ -2916,10 +2916,19 @@ export function createApp() {
const auth = c.req.header("authorization") ?? "";
const secret = auth.startsWith("Bearer ") ? auth.slice(7).trim() : "";
if (!secret) return c.json({ error: "missing_enrollment_secret" }, 401);
const body = (await c.req.json().catch(() => null)) as { relayUrl?: unknown } | null;
const enrollment = await validateOrbRelayEnrollment(c.env, secret);
if ("error" in enrollment) return c.json(enrollment, enrollment.error === "invalid_enrollment" ? 401 : 403);
const rawBody = await readOrbRelayRegisterBody(c.req.raw, c.req.header("content-length"));
if (rawBody === null) return c.json({ error: "payload_too_large" }, 413);
let body: { relayUrl?: unknown } | null;
try {
body = JSON.parse(rawBody) as { relayUrl?: unknown };
} catch {
body = null;
}
const relayUrl = typeof body?.relayUrl === "string" ? body.relayUrl.trim() : "";
if (!relayUrl) return c.json({ error: "missing_relay_url" }, 400);
const result = await registerOrbRelay(c.env, secret, relayUrl);
const result = await registerValidatedOrbRelay(c.env, enrollment, secret, relayUrl);
if ("error" in result) {
const status = result.error === "invalid_enrollment" ? 401 : result.error === "installation_not_eligible" ? 403 : result.error === "encryption_unavailable" ? 500 : 400;
return c.json(result, status);
Expand Down
58 changes: 51 additions & 7 deletions src/orb/relay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,41 @@ export type RegisterResult =
| { ok: true; installationId: number }
| { error: "invalid_enrollment" | "installation_not_eligible" | "invalid_relay_url" | "encryption_unavailable" };

/** Register (or update) the container's relay target for a valid enrollment. Validates the secret (→ the bound,
* registered, non-suspended install — same gate as the token broker), SSRF-validates the relay URL, then stores
* the URL + the enrollment secret encrypted at rest (for the forward-time HMAC). The container presents its OWN
* plaintext enrollment secret as the Bearer, so this is self-service + bound to that install. */
export async function registerOrbRelay(env: Env, secret: string, relayUrl: string): Promise<RegisterResult> {
export const MAX_ORB_RELAY_REGISTER_BODY_BYTES = 4096;

function parseContentLength(header: string | null | undefined): number | null {
if (typeof header !== "string") return null;
const n = Number(header);
return Number.isInteger(n) && n >= 0 ? n : null;
}

/** Read the relay-registration JSON with a small hard ceiling; returns null when the sender exceeds it. */
export async function readOrbRelayRegisterBody(request: Request, contentLengthHeader: string | null | undefined): Promise<string | null> {
const declared = parseContentLength(contentLengthHeader);
if (declared !== null && declared > MAX_ORB_RELAY_REGISTER_BODY_BYTES) return null;

const stream = request.body;
if (!stream) return "";
const reader = stream.getReader();
const decoder = new TextDecoder();
let total = 0;
let out = "";
for (;;) {
const { done, value } = await reader.read();
if (done) break;
total += value.byteLength;
if (total > MAX_ORB_RELAY_REGISTER_BODY_BYTES) {
await reader.cancel();
return null;
}
out += decoder.decode(value, { stream: true });
}
return out + decoder.decode();
}

export type RelayEnrollment = { enrollId: string; installationId: number };

export async function validateOrbRelayEnrollment(env: Env, secret: string): Promise<RelayEnrollment | { error: "invalid_enrollment" | "installation_not_eligible" }> {
const row = await env.DB
.prepare("SELECT enroll_id, installation_id, state, revoked_at FROM orb_enrollments WHERE secret_hash = ?")
.bind(await hashToken(secret))
Expand All @@ -70,16 +100,30 @@ export async function registerOrbRelay(env: Env, secret: string, relayUrl: strin
.bind(row.installation_id)
.first<{ registered: number; suspended_at: string | null; removed_at: string | null }>();
if (!install || install.registered !== 1 || install.suspended_at !== null || install.removed_at !== null) return { error: "installation_not_eligible" };
return { enrollId: row.enroll_id, installationId: row.installation_id };
}

export async function registerValidatedOrbRelay(env: Env, enrollment: RelayEnrollment, secret: string, relayUrl: string): Promise<RegisterResult> {
// SSRF guard: the Orb will POST events to this URL — it must be a public https endpoint (no loopback / private /
// link-local host), so a registered relay URL can never coerce the Orb into hitting an internal service.
if (!isSafeHttpUrl(relayUrl)) return { error: "invalid_relay_url" };
if (!env.TOKEN_ENCRYPTION_SECRET) return { error: "encryption_unavailable" };
const enc = await encryptSecret(secret, env.TOKEN_ENCRYPTION_SECRET);
await env.DB
.prepare("UPDATE orb_enrollments SET relay_url = ?, relay_secret_enc = ?, relay_secret_iv = ?, relay_secret_salt = ?, relay_registered_at = CURRENT_TIMESTAMP WHERE enroll_id = ?")
.bind(relayUrl, enc.ciphertext, enc.iv, enc.salt, row.enroll_id)
.bind(relayUrl, enc.ciphertext, enc.iv, enc.salt, enrollment.enrollId)
.run();
return { ok: true, installationId: row.installation_id };
return { ok: true, installationId: enrollment.installationId };
}

/** Register (or update) the container's relay target for a valid enrollment. Validates the secret (→ the bound,
* registered, non-suspended install — same gate as the token broker), SSRF-validates the relay URL, then stores
* the URL + the enrollment secret encrypted at rest (for the forward-time HMAC). The container presents its OWN
* plaintext enrollment secret as the Bearer, so this is self-service + bound to that install. */
export async function registerOrbRelay(env: Env, secret: string, relayUrl: string): Promise<RegisterResult> {
const enrollment = await validateOrbRelayEnrollment(env, secret);
if ("error" in enrollment) return enrollment;
return registerValidatedOrbRelay(env, enrollment, secret, relayUrl);
}

const RELAY_RETRY_MAX_ATTEMPTS = 5;
Expand Down
48 changes: 46 additions & 2 deletions test/integration/orb-relay.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { describe, expect, it } from "vitest";
import { createApp } from "../../src/api/routes";
import { issueOrbEnrollment } from "../../src/orb/broker";
import { forwardOrbEvent, registerOrbRelay, relaySignature, relayVerify, retryFailedRelays, storeRelayFailure } from "../../src/orb/relay";
import { forwardOrbEvent, MAX_ORB_RELAY_REGISTER_BODY_BYTES, readOrbRelayRegisterBody, registerOrbRelay, relaySignature, relayVerify, retryFailedRelays, storeRelayFailure } from "../../src/orb/relay";
import { createTestEnv, type TestD1Database } from "../helpers/d1";

const db = (e: Env) => e.DB as unknown as TestD1Database;
Expand Down Expand Up @@ -79,19 +79,63 @@ describe("POST /v1/orb/relay/register", () => {
expect(await ok.json()).toMatchObject({ ok: true, installationId: 710 });
});

it("maps each failure to its status: 401 bad secret, 403 ineligible, 400 SSRF, 500 no-encryption", async () => {
it("maps each failure to its status: 401 bad secret, 403 ineligible, 400 SSRF, 413 too-large body, 500 no-encryption", async () => {
const e = brokeredEnv();
const sBad = "Bearer orbsec_bad";
expect((await app.request("/v1/orb/relay/register", { method: "POST", headers: { authorization: sBad }, body: JSON.stringify({ relayUrl: "https://x.example" }) }, e)).status).toBe(401);
const s1 = await enroll(e, 711);
expect((await app.request("/v1/orb/relay/register", { method: "POST", headers: { authorization: `Bearer ${s1}` }, body: JSON.stringify({ relayUrl: "http://127.0.0.1" }) }, e)).status).toBe(400);
expect((await app.request("/v1/orb/relay/register", { method: "POST", headers: { authorization: `Bearer ${s1}` }, body: JSON.stringify({ relayUrl: `${"https://x.example/"}${"a".repeat(4096)}` }) }, e)).status).toBe(413);
const s2 = await enroll(e, 712);
await db(e).prepare("UPDATE orb_github_installations SET registered=0 WHERE installation_id=712").run();
expect((await app.request("/v1/orb/relay/register", { method: "POST", headers: { authorization: `Bearer ${s2}` }, body: JSON.stringify({ relayUrl: "https://x.example" }) }, e)).status).toBe(403);
const noEnc = createTestEnv({ ORB_BROKER_ENABLED: "true" });
const s3 = await enroll(noEnc, 713);
expect((await app.request("/v1/orb/relay/register", { method: "POST", headers: { authorization: `Bearer ${s3}` }, body: JSON.stringify({ relayUrl: "https://x.example/relay" }) }, noEnc)).status).toBe(500);
});

it("rejects an invalid enrollment before reading the registration body", async () => {
const e = brokeredEnv();
let bodyAccesses = 0;
const req = new Request("http://localhost/v1/orb/relay/register", { method: "POST", headers: { authorization: "Bearer orbsec_bad" } });
Object.defineProperty(req, "body", {
get() {
bodyAccesses += 1;
throw new Error("body should not be read before enrollment validation");
},
});
const res = await app.fetch(req, e);
expect(res.status).toBe(401);
expect(bodyAccesses).toBe(0);
});
});

describe("readOrbRelayRegisterBody", () => {
it("returns an empty string when the request has no body stream", async () => {
// A request without a body (e.g. a bodyless POST) has request.body === null — the empty-body path.
const req = new Request("http://localhost/v1/orb/relay/register", { method: "POST" });
expect(req.body).toBeNull();
expect(await readOrbRelayRegisterBody(req, null)).toBe(""); // null content-length → declared null → empty stream
expect(await readOrbRelayRegisterBody(req, undefined)).toBe(""); // undefined header → typeof !== "string" arm
});

it("rejects an oversized DECLARED content-length up front (before touching the stream)", async () => {
// Past the ceiling: parseContentLength returns a valid integer that exceeds MAX → short-circuit to null.
const req = new Request("http://localhost/v1/orb/relay/register", { method: "POST", body: "{}" });
expect(await readOrbRelayRegisterBody(req, String(MAX_ORB_RELAY_REGISTER_BODY_BYTES + 1))).toBeNull();
});

it("ignores a malformed or negative content-length and reads the actual body", async () => {
// Number("abc")=NaN and "-1"<0 → parseContentLength returns null → the declared-too-large guard is skipped.
expect(await readOrbRelayRegisterBody(new Request("http://localhost/r", { method: "POST", body: "{}" }), "abc")).toBe("{}"); // non-integer
expect(await readOrbRelayRegisterBody(new Request("http://localhost/r", { method: "POST", body: "{}" }), "-1")).toBe("{}"); // negative
});

it("returns null when the STREAMED body exceeds the ceiling regardless of the declared length", async () => {
// No content-length declared, but the stream itself runs past MAX → reader is cancelled, null returned.
const req = new Request("http://localhost/r", { method: "POST", body: "x".repeat(MAX_ORB_RELAY_REGISTER_BODY_BYTES + 1) });
expect(await readOrbRelayRegisterBody(req, null)).toBeNull();
});
});

describe("relaySignature", () => {
Expand Down
Loading