diff --git a/src/cli/commands/logs.ts b/src/cli/commands/logs.ts index 7637689..50dc3e5 100644 --- a/src/cli/commands/logs.ts +++ b/src/cli/commands/logs.ts @@ -198,40 +198,49 @@ export default class Logs extends Command { let position = stat.size; let pendingBuffer = ""; - // Watch for new data using polling (works across filesystems including Docker volumes) + // Watch for new data by reading directly from `position`. We intentionally do + // NOT gate on fsPromise.stat().size — on Windows + NTFS, stat() returns a stale + // size for a short window after another process appends, which causes the gate + // to miss new bytes. read() sees the true file end at syscall time. + const READ_BUF_SIZE = 64 * 1024; + const readBuf = Buffer.alloc(READ_BUF_SIZE); const readNewData = async () => { + let fd: fsPromise.FileHandle | undefined; try { - const currentStat = await fsPromise.stat(currentLogFile); - if (currentStat.size > position) { - const fd = await fsPromise.open(currentLogFile, "r"); - const buf = new Uint8Array(currentStat.size - position); - const { bytesRead } = await fd.read(buf, 0, buf.length, position); - await fd.close(); + fd = await fsPromise.open(currentLogFile, "r"); + let chunk = ""; + while (true) { + const { bytesRead } = await fd.read(readBuf, 0, readBuf.length, position); + if (bytesRead === 0) break; position += bytesRead; + chunk += readBuf.subarray(0, bytesRead).toString("utf-8"); + if (bytesRead < readBuf.length) break; + } + if (!chunk) return; - const chunk = pendingBuffer + new TextDecoder().decode(buf.subarray(0, bytesRead)); - // Split into complete lines; keep any incomplete trailing line in the buffer - const lastNewline = chunk.lastIndexOf("\n"); - if (lastNewline === -1) { - pendingBuffer = chunk; - return; - } - pendingBuffer = chunk.slice(lastNewline + 1); - const completeText = chunk.slice(0, lastNewline + 1); - - if (!since && !until && !streamFilter) { - // No filtering — pass through directly - process.stdout.write(completeText); - } else { - const newEntries = this._parseLogEntries(completeText.replace(/\n$/, "")); - const filteredNew = this._filterEntries(newEntries, since, until); - const streamFilteredNew = streamFilter ? this._filterByStream(filteredNew, streamFilter) : filteredNew; - const output = streamFilteredNew.map((e) => e.lines.join("\n")).join("\n"); - if (output) process.stdout.write(output + "\n"); - } + const combined = pendingBuffer + chunk; + const lastNewline = combined.lastIndexOf("\n"); + if (lastNewline === -1) { + pendingBuffer = combined; + return; + } + pendingBuffer = combined.slice(lastNewline + 1); + const completeText = combined.slice(0, lastNewline + 1); + + if (!since && !until && !streamFilter) { + // No filtering — pass through directly + process.stdout.write(completeText); + } else { + const newEntries = this._parseLogEntries(completeText.replace(/\n$/, "")); + const filteredNew = this._filterEntries(newEntries, since, until); + const streamFilteredNew = streamFilter ? this._filterByStream(filteredNew, streamFilter) : filteredNew; + const output = streamFilteredNew.map((e) => e.lines.join("\n")).join("\n"); + if (output) process.stdout.write(output + "\n"); } } catch { // File may have been rotated or deleted + } finally { + if (fd) await fd.close().catch(() => {}); } };