Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 24 additions & 25 deletions src/cli/commands/daemon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ export interface KeepKuboUpTickDeps {
pkcRpcUrl: URL;
tcpPortUsedCheck: (port: number, host: string) => Promise<boolean>;
pkcOptionsFromFlag: { kuboRpcClientsOptions?: unknown } | undefined;
usingDifferentProcessRpc: boolean;
hasKuboProcess: boolean;
hasPendingKuboStart: boolean;
keepKuboUp: () => Promise<void>;
Expand All @@ -70,11 +69,11 @@ export interface KeepKuboUpTickDeps {
export async function runKeepKuboUpTick(deps: KeepKuboUpTickDeps): Promise<void> {
let isRpcPortTaken = false;
try {
isRpcPortTaken = await deps.tcpPortUsedCheck(Number(deps.pkcRpcUrl.port), deps.pkcRpcUrl.hostname);
if (!deps.pkcOptionsFromFlag?.kuboRpcClientsOptions && !isRpcPortTaken && !deps.usingDifferentProcessRpc) await deps.keepKuboUp();
else if (deps.pkcOptionsFromFlag?.kuboRpcClientsOptions && !deps.usingDifferentProcessRpc) await deps.keepKuboUp();
isRpcPortTaken = await deps.tcpPortUsedCheck(Number(deps.pkcRpcUrl.port), toConnectableHostname(deps.pkcRpcUrl.hostname));
if (!deps.pkcOptionsFromFlag?.kuboRpcClientsOptions && !isRpcPortTaken) await deps.keepKuboUp();
else if (deps.pkcOptionsFromFlag?.kuboRpcClientsOptions) await deps.keepKuboUp();
// Retry if kubo died and onKuboExit's restart attempt failed (e.g. transient port conflict)
else if (!deps.hasKuboProcess && !deps.hasPendingKuboStart && !deps.usingDifferentProcessRpc) await deps.keepKuboUp();
else if (!deps.hasKuboProcess && !deps.hasPendingKuboStart) await deps.keepKuboUp();
} catch (error) {
deps.onError(`keepKuboUp tick error (will retry): ${error instanceof Error ? error.message : String(error)}`);
}
Expand Down Expand Up @@ -284,6 +283,17 @@ export default class Daemon extends Command {
if (pkcOptionsFromFlag?.ipfsGatewayUrls && pkcOptionsFromFlag.ipfsGatewayUrls.length !== 1)
this.error("Can't provide pkcOptions.ipfsGatewayUrls as an array with more than 1 element, or as a non array");

const rpcConnectHostname = toConnectableHostname(pkcRpcUrl.hostname);
const isRpcPortAlreadyTaken = await tcpPortUsed.check(Number(pkcRpcUrl.port), rpcConnectHostname);
if (isRpcPortAlreadyTaken) {
this.error(
`PKC RPC port is already in use at ${pkcRpcUrl} (another bitsocial daemon is likely running). ` +
`To talk to the running daemon, use other bitsocial commands with --pkcRpcUrl ${pkcRpcUrl} ` +
`(e.g. 'bitsocial community list --pkcRpcUrl ${pkcRpcUrl}'). ` +
`To run a second daemon, restart with a different port, e.g. --pkcRpcUrl ws://${pkcRpcUrl.hostname}:${Number(pkcRpcUrl.port) + 1}.`
);
}

const ipfsConfig = await loadKuboConfigFile(pkcOptionsFromFlag?.dataPath || defaultPkcOptions.dataPath!);
const kuboRpcEndpoint = pkcOptionsFromFlag?.kuboRpcClientsOptions
? new URL(pkcOptionsFromFlag.kuboRpcClientsOptions[0]!.toString())
Expand Down Expand Up @@ -329,7 +339,7 @@ export default class Daemon extends Command {
const keepKuboUp = async () => {
if (mainProcessExited) return;
const kuboApiPort = Number(kuboRpcEndpoint.port);
if (kuboProcess || pendingKuboStart || usingDifferentProcessRpc) return; // already started, no need to intervene
if (kuboProcess || pendingKuboStart) return; // already started, no need to intervene
const connectHostname = toConnectableHostname(kuboRpcEndpoint.hostname);
const isKuboApiPortTaken = await tcpPortUsed.check(kuboApiPort, connectHostname);
if (isKuboApiPortTaken) {
Expand Down Expand Up @@ -418,26 +428,18 @@ export default class Daemon extends Command {
};

let startedOwnRpc = false;
let usingDifferentProcessRpc = false;
let daemonServer: Awaited<ReturnType<typeof startDaemonServer>> | undefined;
const createOrConnectRpc = async () => {
if (mainProcessExited) return;
if (startedOwnRpc) return;
const isRpcPortTaken = await tcpPortUsed.check(Number(pkcRpcUrl.port), pkcRpcUrl.hostname);
if (isRpcPortTaken && usingDifferentProcessRpc) return;
// Re-check the port: the early fail-fast at startup is a few ms before this runs,
// so a TOCTOU race could let another process grab the port in between. If that
// happens we must fail rather than silently leaving the daemon without an RPC.
const isRpcPortTaken = await tcpPortUsed.check(Number(pkcRpcUrl.port), rpcConnectHostname);
if (isRpcPortTaken) {
log(
`PKC RPC is already running (${pkcRpcUrl}) by another program. bitsocial-cli will use the running RPC server, and if shuts down, bitsocial-cli will start a new RPC instance`
throw new Error(
`PKC RPC port ${pkcRpcUrl.hostname}:${pkcRpcUrl.port} (${pkcRpcUrl}) became occupied before the daemon could bind it.`
);
console.log("Using the already started RPC server at:", pkcRpcUrl);
console.log("bitsocial-cli daemon will monitor the PKC RPC and kubo ipfs API to make sure they're always up");
const PKC = await import("@pkcprotocol/pkc-js");
const pkc = await PKC.default({ pkcRpcClientsOptions: [pkcRpcUrl.toString()] });
await new Promise((resolve) => pkc.once("communitieschange", resolve));
pkc.on("error", (error) => console.error("Error from pkc instance", error));
console.log(`Communities in data path: `, pkc.communities);
usingDifferentProcessRpc = true;
return;
}

// Load installed challenge packages before starting the RPC server
Expand All @@ -446,7 +448,6 @@ export default class Daemon extends Command {

daemonServer = await startDaemonServer(pkcRpcUrl, ipfsGatewayEndpoint, mergedPkcOptions);

usingDifferentProcessRpc = false;
startedOwnRpc = true;
console.log(`pkc rpc: listening on ${pkcRpcUrl} (local connections only)`);
console.log(`pkc rpc: listening on ${pkcRpcUrl}${daemonServer.rpcAuthKey} (secret auth key for remote connections)`);
Expand All @@ -470,9 +471,8 @@ export default class Daemon extends Command {
}
};

const isRpcPortTaken = await tcpPortUsed.check(Number(pkcRpcUrl.port), pkcRpcUrl.hostname);

if (!pkcOptionsFromFlag?.kuboRpcClientsOptions && !isRpcPortTaken && !usingDifferentProcessRpc) await keepKuboUp();
// RPC port was already verified free above (fail-fast); only the kuboRpcClientsOptions branch skips local kubo.
if (!pkcOptionsFromFlag?.kuboRpcClientsOptions) await keepKuboUp();
await createOrConnectRpc();

let keepKuboUpInterval: NodeJS.Timeout | undefined;
Expand Down Expand Up @@ -576,7 +576,6 @@ export default class Daemon extends Command {
pkcRpcUrl,
tcpPortUsedCheck: (port, host) => tcpPortUsed.check(port, host),
pkcOptionsFromFlag,
usingDifferentProcessRpc,
hasKuboProcess: !!kuboProcess,
hasPendingKuboStart: !!pendingKuboStart,
keepKuboUp,
Expand Down
34 changes: 20 additions & 14 deletions src/ipfs/startIpfs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,32 +75,38 @@ export async function mergeCliDefaultsIntoIpfsConfig(log: any, ipfsConfigPath: s
// use this custom function instead of spawnSync for better logging
// also spawnSync might have been causing crash on start on windows

function _spawnAsync(log: any, ...args: any[]) {
// Listens on 'close' (not 'exit') so all stderr 'data' events have been delivered
// before we read errorMessage — otherwise on macOS a fast-exiting child can deliver
// its exit signal before its stderr drains, producing a rejection with an empty
// message and breaking the "configuration file already exists" suppression upstream.
export function _gatherChildOutput(log: any, child: ChildProcessWithoutNullStreams): Promise<null> {
return new Promise((resolve, reject) => {
//@ts-ignore
const spawedProcess: ChildProcessWithoutNullStreams = spawn(...args);
let errorMessage = "";
spawedProcess.on("exit", (exitCode, signal) => {
if (exitCode === 0) resolve(null);
else {
const error = new Error(errorMessage);
Object.assign(error, { exitCode, pid: spawedProcess.pid, signal });
reject(error);
}
child.on("close", (exitCode, signal) => {
if (exitCode === 0) return resolve(null);
const error = new Error(errorMessage);
Object.assign(error, { exitCode, pid: child.pid, signal });
reject(error);
});
spawedProcess.stderr.on("data", (data) => {
child.stderr.on("data", (data) => {
log.trace(data.toString());
errorMessage += data.toString();
});
spawedProcess.stdin.on("data", (data) => log.trace(data.toString()));
spawedProcess.stdout.on("data", (data) => log.trace(data.toString()));
spawedProcess.on("error", (data) => {
child.stdin.on("data", (data) => log.trace(data.toString()));
child.stdout.on("data", (data) => log.trace(data.toString()));
child.on("error", (data) => {
errorMessage += data.toString();
log.error(data.toString());
});
});
}

function _spawnAsync(log: any, ...args: any[]) {
//@ts-ignore
const child: ChildProcessWithoutNullStreams = spawn(...args);
return _gatherChildOutput(log, child);
}

type MultiaddrComponent = { name: string; value?: string };
type MultiaddrModule = {
multiaddr: (multiAddr: string) => {
Expand Down
85 changes: 15 additions & 70 deletions test/cli/daemon.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,21 @@ describe("bitsocial daemon port availability validation", () => {
() => true
);
});

it("fails when PKC RPC port is already in use", { timeout: 60000 }, async () => {
const server = await occupyPort(validationRpcPort, "localhost");
occupiedServers.push(server);

const result = await runPkcDaemonExpectFailure(
["--pkcOptions.dataPath", randomDirectory(), "--pkcRpcUrl", validationRpcUrl],
{ KUBO_RPC_URL: validationKuboUrl, IPFS_GATEWAY_URL: validationGatewayUrl }
);
expect(result.exitCode).not.toBe(0);
const combinedOutput = `${result.stdout}\n${result.stderr}`;
expect(combinedOutput).toContain("PKC RPC port is already in use");
expect(combinedOutput).toContain(String(validationRpcPort));
expect(combinedOutput).toContain("--pkcRpcUrl");
});
});

describe("bitsocial daemon kubo restart cleanup", async () => {
Expand Down Expand Up @@ -633,76 +648,6 @@ describe("bitsocial daemon survives transient port occupation after its own kubo
});
});

describe(`bitsocial daemon (relying on PKC RPC started by another process)`, async () => {
let rpcProcess: ManagedChildProcess;
const rpcRpcUrl = `ws://localhost:9368`;
const rpcKuboUrl = `http://0.0.0.0:50149/api/v0`;
const rpcGatewayUrl = `http://0.0.0.0:6603`;

beforeAll(async () => {
await ensureKuboNodeStopped(`http://localhost:50149/api/v0`);
rpcProcess = await startPkcDaemon(
["--pkcOptions.dataPath", randomDirectory(), "--pkcRpcUrl", rpcRpcUrl],
{ KUBO_RPC_URL: rpcKuboUrl, IPFS_GATEWAY_URL: rpcGatewayUrl }
);
await testConnectionToPkcRpc(9368);
});

afterAll(async () => {
await stopPkcDaemon(rpcProcess);
await waitForPortFree(9368, "localhost", 10000);
});

it(`bitsocial daemon detects and uses another process' PKC RPC`, async () => {
let anotherRpcProcess: ManagedChildProcess | undefined;
try {
anotherRpcProcess = await startPkcDaemon(
["--pkcRpcUrl", rpcRpcUrl],
{ KUBO_RPC_URL: rpcKuboUrl, IPFS_GATEWAY_URL: rpcGatewayUrl }
);
await testConnectionToPkcRpc(9368);
} finally {
await stopPkcDaemon(anotherRpcProcess); // should not affect rpcProcess
}
await testConnectionToPkcRpc(9368);
});
it(`bitsocial daemon is monitoring another process' PKC RPC and make sure it's always up`, async () => {
let anotherRpcProcess: ManagedChildProcess | undefined;
try {
anotherRpcProcess = await startPkcDaemon(
["--pkcOptions.dataPath", randomDirectory(), "--pkcRpcUrl", rpcRpcUrl],
{ KUBO_RPC_URL: rpcKuboUrl, IPFS_GATEWAY_URL: rpcGatewayUrl }
);
await stopPkcDaemon(rpcProcess);

// Wait for anotherRpcProcess to restart the RPC
const rpcRestarted = await waitForCondition(async () => {
try {
const ws = new WebSocket(rpcRpcUrl);
const opened = await new Promise<boolean>((resolve) => {
const timer = setTimeout(() => resolve(false), 2000);
ws.once("open", () => {
clearTimeout(timer);
resolve(true);
});
ws.once("error", () => {
clearTimeout(timer);
resolve(false);
});
});
ws.close();
return opened;
} catch {
return false;
}
}, 30000, 1000);
expect(rpcRestarted).toBe(true);
} finally {
await stopPkcDaemon(anotherRpcProcess);
}
});
});

describe(`bitsocial daemon --pkcRpcUrl`, async () => {
it(`A bitsocial daemon should be change where to listen URL`, async () => {
const rpcUrl = new URL("ws://localhost:11138");
Expand Down
4 changes: 0 additions & 4 deletions test/cli/keep-kubo-up-tick.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ describe("runKeepKuboUpTick", () => {
return Promise.reject(err);
},
pkcOptionsFromFlag: undefined,
usingDifferentProcessRpc: false,
hasKuboProcess: false,
hasPendingKuboStart: false,
keepKuboUp: async () => {},
Expand All @@ -54,7 +53,6 @@ describe("runKeepKuboUpTick", () => {
pkcRpcUrl: new URL("ws://localhost:9138"),
tcpPortUsedCheck: async () => false,
pkcOptionsFromFlag: undefined,
usingDifferentProcessRpc: false,
hasKuboProcess: false,
hasPendingKuboStart: false,
keepKuboUp: async () => {
Expand All @@ -76,7 +74,6 @@ describe("runKeepKuboUpTick", () => {
pkcRpcUrl: new URL("ws://localhost:9138"),
tcpPortUsedCheck: async () => true,
pkcOptionsFromFlag: undefined,
usingDifferentProcessRpc: true,
hasKuboProcess: true,
hasPendingKuboStart: false,
keepKuboUp: async () => {},
Expand All @@ -98,7 +95,6 @@ describe("runKeepKuboUpTick", () => {
pkcRpcUrl: new URL("ws://localhost:9138"),
tcpPortUsedCheck: async () => false,
pkcOptionsFromFlag: undefined,
usingDifferentProcessRpc: false,
hasKuboProcess: false,
hasPendingKuboStart: false,
keepKuboUp: async () => {
Expand Down
41 changes: 41 additions & 0 deletions test/kubo/gather-child-output.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { describe, it, expect } from "vitest";
import { EventEmitter } from "node:events";
import { _gatherChildOutput } from "../../dist/ipfs/startIpfs.js";

const noopLog = Object.assign(() => {}, { trace: () => {}, error: () => {} });

const makeFakeChild = () =>
Object.assign(new EventEmitter(), {
stdout: new EventEmitter(),
stderr: new EventEmitter(),
stdin: new EventEmitter(),
pid: 12345
}) as any;

describe("_gatherChildOutput", () => {
it("captures stderr even when the child's 'exit' event fires before stderr 'data' (macOS race)", async () => {
const child = makeFakeChild();
const promise = _gatherChildOutput(noopLog, child);

// Simulate the macOS-style ordering: process exits first, stderr drains
// afterwards, 'close' fires last. The previous 'exit'-based listener would
// settle the promise with an empty errorMessage at the first emit and miss
// the stderr payload entirely.
queueMicrotask(() => {
child.emit("exit", 1, null);
setImmediate(() => {
child.stderr.emit("data", Buffer.from("Error: ipfs configuration file already exists!\n"));
child.emit("close", 1, null);
});
});

await expect(promise).rejects.toThrow(/ipfs configuration file already exists!/);
});

it("resolves cleanly when the child exits with code 0", async () => {
const child = makeFakeChild();
const promise = _gatherChildOutput(noopLog, child);
queueMicrotask(() => child.emit("close", 0, null));
await expect(promise).resolves.toBeNull();
});
});
Loading