diff --git a/.changeset/shy-rivers-punch.md b/.changeset/shy-rivers-punch.md new file mode 100644 index 00000000..affdcb6a --- /dev/null +++ b/.changeset/shy-rivers-punch.md @@ -0,0 +1,5 @@ +--- +'@livekit/rtc-node': patch +--- + +Close in-progress stream controllers on room disconnect to prevent FD leaks diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index b228f1d8..e5358d57 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -303,6 +303,36 @@ export class Room extends (EventEmitter as new () => TypedEmitter return ev.message.case == 'disconnect' && ev.message.value.asyncId == res.asyncId; }); + this.cleanupOnDisconnect(); + + FfiClient.instance.removeListener(FfiClientEvent.FfiEvent, this.onFfiEvent); + this.removeAllListeners(); + } + + private cleanupOnDisconnect() { + // Error all in-progress stream controllers to prevent FD leaks. + // Streams that were receiving data but never got a trailer (e.g. the sender + // disconnected mid-transfer) would otherwise keep their ReadableStream open + // indefinitely, leaking the underlying controller and any buffered chunks. + // Using error() instead of close() signals an abnormal termination to consumers. + for (const [, streamController] of this.byteStreamControllers) { + try { + streamController.controller.error(new Error('Disconnected while receiving')); + } catch { + // controller may already be closed or errored + } + } + this.byteStreamControllers.clear(); + + for (const [, streamController] of this.textStreamControllers) { + try { + streamController.controller.error(new Error('Disconnected while receiving')); + } catch { + // controller may already be closed or errored + } + } + this.textStreamControllers.clear(); + // Clear sidPromise before removing listeners so that a reconnect // doesn't return a stale, permanently-pending promise. this.sidPromise = undefined; @@ -310,9 +340,6 @@ export class Room extends (EventEmitter as new () => TypedEmitter // This causes any in-flight operations (publishData, publishTrack, etc.) // to reject and clean up their event listeners. this.disconnectController.abort(); - - FfiClient.instance.removeListener(FfiClientEvent.FfiEvent, this.onFfiEvent); - this.removeAllListeners(); } /** @@ -630,9 +657,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter /*} else if (ev.case == 'connected') { this.emit(RoomEvent.Connected);*/ } else if (ev.case == 'disconnected') { - // Abort pending waitFor() listeners on server-initiated disconnect too, - // not just on explicit disconnect() calls. - this.disconnectController.abort(); + this.cleanupOnDisconnect(); this.emit(RoomEvent.Disconnected, ev.value.reason!); } else if (ev.case == 'reconnecting') { this.emit(RoomEvent.Reconnecting); diff --git a/packages/livekit-rtc/src/tests/e2e.test.ts b/packages/livekit-rtc/src/tests/e2e.test.ts index dd234ae7..5d765135 100644 --- a/packages/livekit-rtc/src/tests/e2e.test.ts +++ b/packages/livekit-rtc/src/tests/e2e.test.ts @@ -515,6 +515,46 @@ describeE2E('livekit-rtc e2e', () => { testTimeoutMs * 2, ); + it( + 'cleans up stream controllers when disconnecting during an active stream', + async () => { + const { rooms } = await connectTestRooms(2); + const [receivingRoom, sendingRoom] = rooms; + const topic = 'cleanup-stream-topic'; + + // Register a handler on the receiving side that will intentionally + // NOT fully consume the stream — simulating an abandoned transfer. + let readerReceived = false; + // eslint-disable-next-line @typescript-eslint/no-unused-vars + receivingRoom!.registerTextStreamHandler(topic, async (_reader, _sender) => { + readerReceived = true; + // Deliberately do not call reader.readAll() so the stream stays open + }); + + // Start sending a text stream but don't close it + const writer = await sendingRoom!.localParticipant!.streamText({ topic }); + await writer.write('partial data'); + + // Wait for the receiving side to get the stream header + await waitFor(() => readerReceived, { + timeoutMs: 5000, + debugName: 'text stream header received', + }); + + // Disconnect the receiving room while the stream is still open. + // This should close the stream controller without throwing. + await receivingRoom!.disconnect(); + + // Also close the writer and disconnect the sender + await writer.close(); + await sendingRoom!.disconnect(); + + // If we got here without hanging or throwing, the stream controller + // was properly cleaned up on disconnect. + }, + testTimeoutMs, + ); + it( 'cleans up track publications when a remote participant disconnects', async () => {