Skip to content
5 changes: 5 additions & 0 deletions .changeset/shy-rivers-punch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/rtc-node': patch
---

Close in-progress stream controllers on room disconnect to prevent FD leaks
37 changes: 31 additions & 6 deletions packages/livekit-rtc/src/room.ts
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -303,16 +303,43 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
return ev.message.case == 'disconnect' && ev.message.value.asyncId == res.asyncId;
});

this.cleanupOnDisconnect();

FfiClient.instance.removeListener(FfiClientEvent.FfiEvent, this.onFfiEvent);
this.removeAllListeners();
}

private cleanupOnDisconnect() {
// Error all in-progress stream controllers to prevent FD leaks.
// Streams that were receiving data but never got a trailer (e.g. the sender
// disconnected mid-transfer) would otherwise keep their ReadableStream open
// indefinitely, leaking the underlying controller and any buffered chunks.
// Using error() instead of close() signals an abnormal termination to consumers.
for (const [, streamController] of this.byteStreamControllers) {
try {
streamController.controller.error(new Error('Disconnected while receiving'));
} catch {
// controller may already be closed or errored
}
}
this.byteStreamControllers.clear();

for (const [, streamController] of this.textStreamControllers) {
try {
streamController.controller.error(new Error('Disconnected while receiving'));
} catch {
// controller may already be closed or errored
}
}
this.textStreamControllers.clear();

// Clear sidPromise before removing listeners so that a reconnect
// doesn't return a stale, permanently-pending promise.
this.sidPromise = undefined;
// Abort all pending FfiClient.waitFor() listeners so they don't leak.
// This causes any in-flight operations (publishData, publishTrack, etc.)
// to reject and clean up their event listeners.
this.disconnectController.abort();

FfiClient.instance.removeListener(FfiClientEvent.FfiEvent, this.onFfiEvent);
this.removeAllListeners();
}

/**
Expand Down Expand Up @@ -630,9 +657,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
/*} else if (ev.case == 'connected') {
this.emit(RoomEvent.Connected);*/
} else if (ev.case == 'disconnected') {
// Abort pending waitFor() listeners on server-initiated disconnect too,
// not just on explicit disconnect() calls.
this.disconnectController.abort();
this.cleanupOnDisconnect();
this.emit(RoomEvent.Disconnected, ev.value.reason!);
} else if (ev.case == 'reconnecting') {
this.emit(RoomEvent.Reconnecting);
Expand Down
40 changes: 40 additions & 0 deletions packages/livekit-rtc/src/tests/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,46 @@ describeE2E('livekit-rtc e2e', () => {
testTimeoutMs * 2,
);

it(
'cleans up stream controllers when disconnecting during an active stream',
async () => {
const { rooms } = await connectTestRooms(2);
const [receivingRoom, sendingRoom] = rooms;
const topic = 'cleanup-stream-topic';

// Register a handler on the receiving side that will intentionally
// NOT fully consume the stream — simulating an abandoned transfer.
let readerReceived = false;
// eslint-disable-next-line @typescript-eslint/no-unused-vars
receivingRoom!.registerTextStreamHandler(topic, async (_reader, _sender) => {
readerReceived = true;
// Deliberately do not call reader.readAll() so the stream stays open
});

// Start sending a text stream but don't close it
const writer = await sendingRoom!.localParticipant!.streamText({ topic });
await writer.write('partial data');

// Wait for the receiving side to get the stream header
await waitFor(() => readerReceived, {
timeoutMs: 5000,
debugName: 'text stream header received',
});

// Disconnect the receiving room while the stream is still open.
// This should close the stream controller without throwing.
await receivingRoom!.disconnect();

// Also close the writer and disconnect the sender
await writer.close();
await sendingRoom!.disconnect();

// If we got here without hanging or throwing, the stream controller
// was properly cleaned up on disconnect.
},
testTimeoutMs,
);

it(
'cleans up track publications when a remote participant disconnects',
async () => {
Expand Down
Loading