diff --git a/src/cli/commands/daemon.ts b/src/cli/commands/daemon.ts index 0cfa717..47aafe5 100644 --- a/src/cli/commands/daemon.ts +++ b/src/cli/commands/daemon.ts @@ -51,7 +51,6 @@ export interface KeepKuboUpTickDeps { pkcRpcUrl: URL; tcpPortUsedCheck: (port: number, host: string) => Promise; pkcOptionsFromFlag: { kuboRpcClientsOptions?: unknown } | undefined; - usingDifferentProcessRpc: boolean; hasKuboProcess: boolean; hasPendingKuboStart: boolean; keepKuboUp: () => Promise; @@ -70,11 +69,11 @@ export interface KeepKuboUpTickDeps { export async function runKeepKuboUpTick(deps: KeepKuboUpTickDeps): Promise { let isRpcPortTaken = false; try { - isRpcPortTaken = await deps.tcpPortUsedCheck(Number(deps.pkcRpcUrl.port), deps.pkcRpcUrl.hostname); - if (!deps.pkcOptionsFromFlag?.kuboRpcClientsOptions && !isRpcPortTaken && !deps.usingDifferentProcessRpc) await deps.keepKuboUp(); - else if (deps.pkcOptionsFromFlag?.kuboRpcClientsOptions && !deps.usingDifferentProcessRpc) await deps.keepKuboUp(); + isRpcPortTaken = await deps.tcpPortUsedCheck(Number(deps.pkcRpcUrl.port), toConnectableHostname(deps.pkcRpcUrl.hostname)); + if (!deps.pkcOptionsFromFlag?.kuboRpcClientsOptions && !isRpcPortTaken) await deps.keepKuboUp(); + else if (deps.pkcOptionsFromFlag?.kuboRpcClientsOptions) await deps.keepKuboUp(); // Retry if kubo died and onKuboExit's restart attempt failed (e.g. transient port conflict) - else if (!deps.hasKuboProcess && !deps.hasPendingKuboStart && !deps.usingDifferentProcessRpc) await deps.keepKuboUp(); + else if (!deps.hasKuboProcess && !deps.hasPendingKuboStart) await deps.keepKuboUp(); } catch (error) { deps.onError(`keepKuboUp tick error (will retry): ${error instanceof Error ? error.message : String(error)}`); } @@ -284,6 +283,17 @@ export default class Daemon extends Command { if (pkcOptionsFromFlag?.ipfsGatewayUrls && pkcOptionsFromFlag.ipfsGatewayUrls.length !== 1) this.error("Can't provide pkcOptions.ipfsGatewayUrls as an array with more than 1 element, or as a non array"); + const rpcConnectHostname = toConnectableHostname(pkcRpcUrl.hostname); + const isRpcPortAlreadyTaken = await tcpPortUsed.check(Number(pkcRpcUrl.port), rpcConnectHostname); + if (isRpcPortAlreadyTaken) { + this.error( + `PKC RPC port is already in use at ${pkcRpcUrl} (another bitsocial daemon is likely running). ` + + `To talk to the running daemon, use other bitsocial commands with --pkcRpcUrl ${pkcRpcUrl} ` + + `(e.g. 'bitsocial community list --pkcRpcUrl ${pkcRpcUrl}'). ` + + `To run a second daemon, restart with a different port, e.g. --pkcRpcUrl ws://${pkcRpcUrl.hostname}:${Number(pkcRpcUrl.port) + 1}.` + ); + } + const ipfsConfig = await loadKuboConfigFile(pkcOptionsFromFlag?.dataPath || defaultPkcOptions.dataPath!); const kuboRpcEndpoint = pkcOptionsFromFlag?.kuboRpcClientsOptions ? new URL(pkcOptionsFromFlag.kuboRpcClientsOptions[0]!.toString()) @@ -329,7 +339,7 @@ export default class Daemon extends Command { const keepKuboUp = async () => { if (mainProcessExited) return; const kuboApiPort = Number(kuboRpcEndpoint.port); - if (kuboProcess || pendingKuboStart || usingDifferentProcessRpc) return; // already started, no need to intervene + if (kuboProcess || pendingKuboStart) return; // already started, no need to intervene const connectHostname = toConnectableHostname(kuboRpcEndpoint.hostname); const isKuboApiPortTaken = await tcpPortUsed.check(kuboApiPort, connectHostname); if (isKuboApiPortTaken) { @@ -418,26 +428,18 @@ export default class Daemon extends Command { }; let startedOwnRpc = false; - let usingDifferentProcessRpc = false; let daemonServer: Awaited> | undefined; const createOrConnectRpc = async () => { if (mainProcessExited) return; if (startedOwnRpc) return; - const isRpcPortTaken = await tcpPortUsed.check(Number(pkcRpcUrl.port), pkcRpcUrl.hostname); - if (isRpcPortTaken && usingDifferentProcessRpc) return; + // Re-check the port: the early fail-fast at startup is a few ms before this runs, + // so a TOCTOU race could let another process grab the port in between. If that + // happens we must fail rather than silently leaving the daemon without an RPC. + const isRpcPortTaken = await tcpPortUsed.check(Number(pkcRpcUrl.port), rpcConnectHostname); if (isRpcPortTaken) { - log( - `PKC RPC is already running (${pkcRpcUrl}) by another program. bitsocial-cli will use the running RPC server, and if shuts down, bitsocial-cli will start a new RPC instance` + throw new Error( + `PKC RPC port ${pkcRpcUrl.hostname}:${pkcRpcUrl.port} (${pkcRpcUrl}) became occupied before the daemon could bind it.` ); - console.log("Using the already started RPC server at:", pkcRpcUrl); - console.log("bitsocial-cli daemon will monitor the PKC RPC and kubo ipfs API to make sure they're always up"); - const PKC = await import("@pkcprotocol/pkc-js"); - const pkc = await PKC.default({ pkcRpcClientsOptions: [pkcRpcUrl.toString()] }); - await new Promise((resolve) => pkc.once("communitieschange", resolve)); - pkc.on("error", (error) => console.error("Error from pkc instance", error)); - console.log(`Communities in data path: `, pkc.communities); - usingDifferentProcessRpc = true; - return; } // Load installed challenge packages before starting the RPC server @@ -446,7 +448,6 @@ export default class Daemon extends Command { daemonServer = await startDaemonServer(pkcRpcUrl, ipfsGatewayEndpoint, mergedPkcOptions); - usingDifferentProcessRpc = false; startedOwnRpc = true; console.log(`pkc rpc: listening on ${pkcRpcUrl} (local connections only)`); console.log(`pkc rpc: listening on ${pkcRpcUrl}${daemonServer.rpcAuthKey} (secret auth key for remote connections)`); @@ -470,9 +471,8 @@ export default class Daemon extends Command { } }; - const isRpcPortTaken = await tcpPortUsed.check(Number(pkcRpcUrl.port), pkcRpcUrl.hostname); - - if (!pkcOptionsFromFlag?.kuboRpcClientsOptions && !isRpcPortTaken && !usingDifferentProcessRpc) await keepKuboUp(); + // RPC port was already verified free above (fail-fast); only the kuboRpcClientsOptions branch skips local kubo. + if (!pkcOptionsFromFlag?.kuboRpcClientsOptions) await keepKuboUp(); await createOrConnectRpc(); let keepKuboUpInterval: NodeJS.Timeout | undefined; @@ -576,7 +576,6 @@ export default class Daemon extends Command { pkcRpcUrl, tcpPortUsedCheck: (port, host) => tcpPortUsed.check(port, host), pkcOptionsFromFlag, - usingDifferentProcessRpc, hasKuboProcess: !!kuboProcess, hasPendingKuboStart: !!pendingKuboStart, keepKuboUp, diff --git a/src/ipfs/startIpfs.ts b/src/ipfs/startIpfs.ts index da9cb89..d50ee35 100644 --- a/src/ipfs/startIpfs.ts +++ b/src/ipfs/startIpfs.ts @@ -75,32 +75,38 @@ export async function mergeCliDefaultsIntoIpfsConfig(log: any, ipfsConfigPath: s // use this custom function instead of spawnSync for better logging // also spawnSync might have been causing crash on start on windows -function _spawnAsync(log: any, ...args: any[]) { +// Listens on 'close' (not 'exit') so all stderr 'data' events have been delivered +// before we read errorMessage — otherwise on macOS a fast-exiting child can deliver +// its exit signal before its stderr drains, producing a rejection with an empty +// message and breaking the "configuration file already exists" suppression upstream. +export function _gatherChildOutput(log: any, child: ChildProcessWithoutNullStreams): Promise { return new Promise((resolve, reject) => { - //@ts-ignore - const spawedProcess: ChildProcessWithoutNullStreams = spawn(...args); let errorMessage = ""; - spawedProcess.on("exit", (exitCode, signal) => { - if (exitCode === 0) resolve(null); - else { - const error = new Error(errorMessage); - Object.assign(error, { exitCode, pid: spawedProcess.pid, signal }); - reject(error); - } + child.on("close", (exitCode, signal) => { + if (exitCode === 0) return resolve(null); + const error = new Error(errorMessage); + Object.assign(error, { exitCode, pid: child.pid, signal }); + reject(error); }); - spawedProcess.stderr.on("data", (data) => { + child.stderr.on("data", (data) => { log.trace(data.toString()); errorMessage += data.toString(); }); - spawedProcess.stdin.on("data", (data) => log.trace(data.toString())); - spawedProcess.stdout.on("data", (data) => log.trace(data.toString())); - spawedProcess.on("error", (data) => { + child.stdin.on("data", (data) => log.trace(data.toString())); + child.stdout.on("data", (data) => log.trace(data.toString())); + child.on("error", (data) => { errorMessage += data.toString(); log.error(data.toString()); }); }); } +function _spawnAsync(log: any, ...args: any[]) { + //@ts-ignore + const child: ChildProcessWithoutNullStreams = spawn(...args); + return _gatherChildOutput(log, child); +} + type MultiaddrComponent = { name: string; value?: string }; type MultiaddrModule = { multiaddr: (multiAddr: string) => { diff --git a/test/cli/daemon.test.ts b/test/cli/daemon.test.ts index 1386f97..ef7d61a 100644 --- a/test/cli/daemon.test.ts +++ b/test/cli/daemon.test.ts @@ -362,6 +362,21 @@ describe("bitsocial daemon port availability validation", () => { () => true ); }); + + it("fails when PKC RPC port is already in use", { timeout: 60000 }, async () => { + const server = await occupyPort(validationRpcPort, "localhost"); + occupiedServers.push(server); + + const result = await runPkcDaemonExpectFailure( + ["--pkcOptions.dataPath", randomDirectory(), "--pkcRpcUrl", validationRpcUrl], + { KUBO_RPC_URL: validationKuboUrl, IPFS_GATEWAY_URL: validationGatewayUrl } + ); + expect(result.exitCode).not.toBe(0); + const combinedOutput = `${result.stdout}\n${result.stderr}`; + expect(combinedOutput).toContain("PKC RPC port is already in use"); + expect(combinedOutput).toContain(String(validationRpcPort)); + expect(combinedOutput).toContain("--pkcRpcUrl"); + }); }); describe("bitsocial daemon kubo restart cleanup", async () => { @@ -633,76 +648,6 @@ describe("bitsocial daemon survives transient port occupation after its own kubo }); }); -describe(`bitsocial daemon (relying on PKC RPC started by another process)`, async () => { - let rpcProcess: ManagedChildProcess; - const rpcRpcUrl = `ws://localhost:9368`; - const rpcKuboUrl = `http://0.0.0.0:50149/api/v0`; - const rpcGatewayUrl = `http://0.0.0.0:6603`; - - beforeAll(async () => { - await ensureKuboNodeStopped(`http://localhost:50149/api/v0`); - rpcProcess = await startPkcDaemon( - ["--pkcOptions.dataPath", randomDirectory(), "--pkcRpcUrl", rpcRpcUrl], - { KUBO_RPC_URL: rpcKuboUrl, IPFS_GATEWAY_URL: rpcGatewayUrl } - ); - await testConnectionToPkcRpc(9368); - }); - - afterAll(async () => { - await stopPkcDaemon(rpcProcess); - await waitForPortFree(9368, "localhost", 10000); - }); - - it(`bitsocial daemon detects and uses another process' PKC RPC`, async () => { - let anotherRpcProcess: ManagedChildProcess | undefined; - try { - anotherRpcProcess = await startPkcDaemon( - ["--pkcRpcUrl", rpcRpcUrl], - { KUBO_RPC_URL: rpcKuboUrl, IPFS_GATEWAY_URL: rpcGatewayUrl } - ); - await testConnectionToPkcRpc(9368); - } finally { - await stopPkcDaemon(anotherRpcProcess); // should not affect rpcProcess - } - await testConnectionToPkcRpc(9368); - }); - it(`bitsocial daemon is monitoring another process' PKC RPC and make sure it's always up`, async () => { - let anotherRpcProcess: ManagedChildProcess | undefined; - try { - anotherRpcProcess = await startPkcDaemon( - ["--pkcOptions.dataPath", randomDirectory(), "--pkcRpcUrl", rpcRpcUrl], - { KUBO_RPC_URL: rpcKuboUrl, IPFS_GATEWAY_URL: rpcGatewayUrl } - ); - await stopPkcDaemon(rpcProcess); - - // Wait for anotherRpcProcess to restart the RPC - const rpcRestarted = await waitForCondition(async () => { - try { - const ws = new WebSocket(rpcRpcUrl); - const opened = await new Promise((resolve) => { - const timer = setTimeout(() => resolve(false), 2000); - ws.once("open", () => { - clearTimeout(timer); - resolve(true); - }); - ws.once("error", () => { - clearTimeout(timer); - resolve(false); - }); - }); - ws.close(); - return opened; - } catch { - return false; - } - }, 30000, 1000); - expect(rpcRestarted).toBe(true); - } finally { - await stopPkcDaemon(anotherRpcProcess); - } - }); -}); - describe(`bitsocial daemon --pkcRpcUrl`, async () => { it(`A bitsocial daemon should be change where to listen URL`, async () => { const rpcUrl = new URL("ws://localhost:11138"); diff --git a/test/cli/keep-kubo-up-tick.test.ts b/test/cli/keep-kubo-up-tick.test.ts index 174c4c4..81f5625 100644 --- a/test/cli/keep-kubo-up-tick.test.ts +++ b/test/cli/keep-kubo-up-tick.test.ts @@ -33,7 +33,6 @@ describe("runKeepKuboUpTick", () => { return Promise.reject(err); }, pkcOptionsFromFlag: undefined, - usingDifferentProcessRpc: false, hasKuboProcess: false, hasPendingKuboStart: false, keepKuboUp: async () => {}, @@ -54,7 +53,6 @@ describe("runKeepKuboUpTick", () => { pkcRpcUrl: new URL("ws://localhost:9138"), tcpPortUsedCheck: async () => false, pkcOptionsFromFlag: undefined, - usingDifferentProcessRpc: false, hasKuboProcess: false, hasPendingKuboStart: false, keepKuboUp: async () => { @@ -76,7 +74,6 @@ describe("runKeepKuboUpTick", () => { pkcRpcUrl: new URL("ws://localhost:9138"), tcpPortUsedCheck: async () => true, pkcOptionsFromFlag: undefined, - usingDifferentProcessRpc: true, hasKuboProcess: true, hasPendingKuboStart: false, keepKuboUp: async () => {}, @@ -98,7 +95,6 @@ describe("runKeepKuboUpTick", () => { pkcRpcUrl: new URL("ws://localhost:9138"), tcpPortUsedCheck: async () => false, pkcOptionsFromFlag: undefined, - usingDifferentProcessRpc: false, hasKuboProcess: false, hasPendingKuboStart: false, keepKuboUp: async () => { diff --git a/test/kubo/gather-child-output.test.ts b/test/kubo/gather-child-output.test.ts new file mode 100644 index 0000000..7e88613 --- /dev/null +++ b/test/kubo/gather-child-output.test.ts @@ -0,0 +1,41 @@ +import { describe, it, expect } from "vitest"; +import { EventEmitter } from "node:events"; +import { _gatherChildOutput } from "../../dist/ipfs/startIpfs.js"; + +const noopLog = Object.assign(() => {}, { trace: () => {}, error: () => {} }); + +const makeFakeChild = () => + Object.assign(new EventEmitter(), { + stdout: new EventEmitter(), + stderr: new EventEmitter(), + stdin: new EventEmitter(), + pid: 12345 + }) as any; + +describe("_gatherChildOutput", () => { + it("captures stderr even when the child's 'exit' event fires before stderr 'data' (macOS race)", async () => { + const child = makeFakeChild(); + const promise = _gatherChildOutput(noopLog, child); + + // Simulate the macOS-style ordering: process exits first, stderr drains + // afterwards, 'close' fires last. The previous 'exit'-based listener would + // settle the promise with an empty errorMessage at the first emit and miss + // the stderr payload entirely. + queueMicrotask(() => { + child.emit("exit", 1, null); + setImmediate(() => { + child.stderr.emit("data", Buffer.from("Error: ipfs configuration file already exists!\n")); + child.emit("close", 1, null); + }); + }); + + await expect(promise).rejects.toThrow(/ipfs configuration file already exists!/); + }); + + it("resolves cleanly when the child exits with code 0", async () => { + const child = makeFakeChild(); + const promise = _gatherChildOutput(noopLog, child); + queueMicrotask(() => child.emit("close", 0, null)); + await expect(promise).resolves.toBeNull(); + }); +});