Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions packages/cli/src/utils/outbox.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { mkdirSync, readFileSync, readdirSync, renameSync, writeFileSync } from "node:fs";
import { mkdirSync, readFileSync, readdirSync, renameSync, unlinkSync, writeFileSync } from "node:fs";
import { randomUUID } from "node:crypto";
import { join } from "node:path";
import { homedir } from "node:os";
Expand All @@ -21,7 +21,14 @@ export function queueOutboxMessage(to: string, body: string, from: string): void
const id = randomUUID();
const timestamp = new Date().toISOString();
const filename = `${timestamp.replace(/[:.]/g, "-")}-${id}.json`;
writeFileSync(join(dir, filename), JSON.stringify({ id, to, from, body, timestamp }, null, 2), "utf-8");
const content = JSON.stringify({ id, to, from, body, timestamp }, null, 2);
// Atomic write: stage to a dot-prefixed tmp file in the same directory, then
// rename into place. drainOutbox filters out dot-prefixed files so a reader
// running concurrently never sees a half-written file. rename(2) within the
// same filesystem is atomic on POSIX.
const tmp = join(dir, `.${filename}.tmp`);
writeFileSync(tmp, content, "utf-8");
renameSync(tmp, join(dir, filename));
}

export function drainOutbox(): OutboxMessage[] {
Expand All @@ -30,11 +37,27 @@ export function drainOutbox(): OutboxMessage[] {
mkdirSync(newDir, { recursive: true });
mkdirSync(sentDir, { recursive: true });

const files = readdirSync(newDir).filter((f) => f.endsWith(".json"));
return files.map((f) => {
const files = readdirSync(newDir).filter((f) => f.endsWith(".json") && !f.startsWith("."));
const out: OutboxMessage[] = [];
for (const f of files) {
const src = join(newDir, f);
const msg = JSON.parse(readFileSync(src, "utf-8")) as OutboxMessage;
let msg: OutboxMessage;
try {
msg = JSON.parse(readFileSync(src, "utf-8")) as OutboxMessage;
} catch (err) {
// Defense-in-depth: even with atomic writes, a partial file could appear
// (manual edit, crash mid-write before rename). Don't take the whole
// daemon down — log and quarantine the bad file.
console.error(`drainOutbox: failed to parse ${f}: ${(err as Error).message}; quarantining`);
try {
renameSync(src, join(sentDir, `.malformed-${f}`));
} catch {
try { unlinkSync(src); } catch {}
}
continue;
}
renameSync(src, join(sentDir, f));
return msg;
});
out.push(msg);
}
return out;
}
37 changes: 36 additions & 1 deletion packages/cli/test/outbox.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { describe, test, expect, beforeEach, afterEach } from "bun:test";
import { mkdtempSync, rmSync, readdirSync } from "node:fs";
import { mkdtempSync, rmSync, readdirSync, writeFileSync, existsSync } from "node:fs";
import { join } from "node:path";
import { tmpdir } from "node:os";
import { queueOutboxMessage, drainOutbox } from "../src/utils/outbox.js";
Expand All @@ -23,6 +23,12 @@ describe("outbox", () => {
expect(files.length).toBe(1);
});

test("queueOutboxMessage leaves no dot-prefixed tmp files after a successful write", () => {
queueOutboxMessage("host", "hello", "austin");
const all = readdirSync(join(root, ".tps", "outbox", "new"));
expect(all.every((f) => !f.startsWith("."))).toBe(true);
});

test("drainOutbox returns messages and moves files to sent", () => {
queueOutboxMessage("host", "hello", "austin");
const rows = drainOutbox();
Expand All @@ -34,4 +40,33 @@ describe("outbox", () => {
expect(newFiles.length).toBe(0);
expect(sentFiles.length).toBe(1);
});

test("drainOutbox ignores dot-prefixed in-flight tmp files (atomic-write race guard)", () => {
// Simulate a writer that's mid-write: a dot-tmp file exists but the
// final rename hasn't happened yet. drainOutbox must not try to parse it.
const newDir = join(root, ".tps", "outbox", "new");
queueOutboxMessage("host", "hello", "austin");
writeFileSync(join(newDir, ".pending.json.tmp"), "{ partial", "utf-8");
const rows = drainOutbox();
expect(rows.length).toBe(1);
expect(rows[0]?.body).toBe("hello");
// The in-flight tmp file is untouched
expect(existsSync(join(newDir, ".pending.json.tmp"))).toBe(true);
});

test("drainOutbox quarantines malformed JSON without throwing (defense in depth)", () => {
// Inject a fully-published-but-corrupt file (not dot-prefixed) and prove
// the daemon-equivalent loop doesn't crash. Pre-fix, this threw SyntaxError
// and killed the branch daemon on tps-reed 2026-05-16T17:17Z.
const newDir = join(root, ".tps", "outbox", "new");
const sentDir = join(root, ".tps", "outbox", "sent");
queueOutboxMessage("host", "good", "austin");
writeFileSync(join(newDir, "2026-05-16-corrupt.json"), "", "utf-8");
const rows = drainOutbox();
expect(rows.length).toBe(1);
expect(rows[0]?.body).toBe("good");
expect(readdirSync(newDir).length).toBe(0);
// Bad file lands in sent/ with a .malformed- prefix for forensics
expect(readdirSync(sentDir).some((f) => f.startsWith(".malformed-"))).toBe(true);
});
});
Loading