Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 36 additions & 27 deletions src/cli/commands/logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,40 +198,49 @@ export default class Logs extends Command {
let position = stat.size;
let pendingBuffer = "";

// Watch for new data using polling (works across filesystems including Docker volumes)
// Watch for new data by reading directly from `position`. We intentionally do
// NOT gate on fsPromise.stat().size — on Windows + NTFS, stat() returns a stale
// size for a short window after another process appends, which causes the gate
// to miss new bytes. read() sees the true file end at syscall time.
const READ_BUF_SIZE = 64 * 1024;
const readBuf = Buffer.alloc(READ_BUF_SIZE);
const readNewData = async () => {
let fd: fsPromise.FileHandle | undefined;
try {
const currentStat = await fsPromise.stat(currentLogFile);
if (currentStat.size > position) {
const fd = await fsPromise.open(currentLogFile, "r");
const buf = new Uint8Array(currentStat.size - position);
const { bytesRead } = await fd.read(buf, 0, buf.length, position);
await fd.close();
fd = await fsPromise.open(currentLogFile, "r");
let chunk = "";
while (true) {
const { bytesRead } = await fd.read(readBuf, 0, readBuf.length, position);
if (bytesRead === 0) break;
position += bytesRead;
chunk += readBuf.subarray(0, bytesRead).toString("utf-8");
if (bytesRead < readBuf.length) break;
}
if (!chunk) return;

const chunk = pendingBuffer + new TextDecoder().decode(buf.subarray(0, bytesRead));
// Split into complete lines; keep any incomplete trailing line in the buffer
const lastNewline = chunk.lastIndexOf("\n");
if (lastNewline === -1) {
pendingBuffer = chunk;
return;
}
pendingBuffer = chunk.slice(lastNewline + 1);
const completeText = chunk.slice(0, lastNewline + 1);

if (!since && !until && !streamFilter) {
// No filtering — pass through directly
process.stdout.write(completeText);
} else {
const newEntries = this._parseLogEntries(completeText.replace(/\n$/, ""));
const filteredNew = this._filterEntries(newEntries, since, until);
const streamFilteredNew = streamFilter ? this._filterByStream(filteredNew, streamFilter) : filteredNew;
const output = streamFilteredNew.map((e) => e.lines.join("\n")).join("\n");
if (output) process.stdout.write(output + "\n");
}
const combined = pendingBuffer + chunk;
const lastNewline = combined.lastIndexOf("\n");
if (lastNewline === -1) {
pendingBuffer = combined;
return;
}
pendingBuffer = combined.slice(lastNewline + 1);
const completeText = combined.slice(0, lastNewline + 1);

if (!since && !until && !streamFilter) {
// No filtering — pass through directly
process.stdout.write(completeText);
} else {
const newEntries = this._parseLogEntries(completeText.replace(/\n$/, ""));
const filteredNew = this._filterEntries(newEntries, since, until);
const streamFilteredNew = streamFilter ? this._filterByStream(filteredNew, streamFilter) : filteredNew;
const output = streamFilteredNew.map((e) => e.lines.join("\n")).join("\n");
if (output) process.stdout.write(output + "\n");
}
} catch {
// File may have been rotated or deleted
} finally {
if (fd) await fd.close().catch(() => {});
}
};

Expand Down
Loading