Skip to content

Commit 86b61e5

Browse files
committed
feat: add highlight.js for syntax highlighting and enhance cron module with new services
- Added highlight.js dependency for improved syntax highlighting in logs. - Updated CronModule to import AgentsModule and include new services: CronLogsStreamService and CronGateway. - Enhanced cron.vue to improve log viewing experience with updated UI elements and functionality for real-time log updates.
1 parent 1344aef commit 86b61e5

7 files changed

Lines changed: 781 additions & 127 deletions

File tree

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
import { Injectable, Logger } from '@nestjs/common';
2+
import { ConfigService } from '@nestjs/config';
3+
import { existsSync, FSWatcher, openSync, readSync, closeSync, statSync, watch } from 'fs';
4+
import path from 'path';
5+
import { toSafeHandlerName } from '~/_common/functions/handler-logger';
6+
import { CronService } from './cron.service';
7+
8+
export type CronLogsSnapshot = {
9+
type: 'snapshot';
10+
exists: boolean;
11+
content: string;
12+
updatedAt: string | null;
13+
};
14+
15+
export type CronLogsAppend = {
16+
type: 'append';
17+
content: string;
18+
};
19+
20+
export type CronLogsResync = {
21+
type: 'resync';
22+
};
23+
24+
export type CronLogsEvent = CronLogsSnapshot | CronLogsAppend | CronLogsResync;
25+
26+
type TaskWatcher = {
27+
watcher: FSWatcher;
28+
lastPosition: number;
29+
debounceTimer: ReturnType<typeof setTimeout> | null;
30+
};
31+
32+
@Injectable()
33+
export class CronLogsStreamService {
34+
private readonly logger = new Logger(CronLogsStreamService.name);
35+
private readonly watchers = new Map<string, TaskWatcher>();
36+
37+
public constructor(
38+
private readonly configService: ConfigService,
39+
private readonly cronService: CronService,
40+
) {}
41+
42+
public getRoomName(taskName: string): string {
43+
return `cron-logs:${toSafeHandlerName(taskName)}`;
44+
}
45+
46+
public getLogFilePath(taskName: string): string {
47+
const safeName = toSafeHandlerName(taskName);
48+
const logDir = this.configService.get('cron.logDirectory') || path.join(process.cwd(), 'logs', 'handlers');
49+
return path.join(logDir, `${safeName}.log`);
50+
}
51+
52+
public async readSnapshot(taskName: string, tail = 250): Promise<CronLogsSnapshot> {
53+
const logs = await this.cronService.readLogs(taskName, tail);
54+
return {
55+
type: 'snapshot',
56+
exists: logs.exists,
57+
content: logs.content,
58+
updatedAt: logs.updatedAt,
59+
};
60+
}
61+
62+
public startWatching(taskName: string, onEvent: (event: CronLogsEvent) => void): void {
63+
const roomKey = this.getRoomName(taskName);
64+
if (this.watchers.has(roomKey)) {
65+
return;
66+
}
67+
68+
const logFile = this.getLogFilePath(taskName);
69+
const logDir = path.dirname(logFile);
70+
const logFileName = path.basename(logFile);
71+
72+
if (!existsSync(logFile)) {
73+
let dirWatcher: FSWatcher;
74+
try {
75+
dirWatcher = watch(logDir, (event, filename) => {
76+
if (filename && filename !== logFileName) {
77+
return;
78+
}
79+
80+
if (!existsSync(logFile)) {
81+
return;
82+
}
83+
84+
dirWatcher.close();
85+
this.watchers.delete(roomKey);
86+
this.startWatching(taskName, onEvent);
87+
onEvent({ type: 'resync' });
88+
});
89+
} catch (err) {
90+
this.logger.warn(`Could not watch cron log directory for <${taskName}>: ${(err as Error).message}`);
91+
return;
92+
}
93+
94+
this.watchers.set(roomKey, {
95+
watcher: dirWatcher,
96+
lastPosition: 0,
97+
debounceTimer: null,
98+
});
99+
return;
100+
}
101+
102+
const lastPosition = statSync(logFile).size;
103+
104+
const emitFileChanges = () => {
105+
try {
106+
if (!existsSync(logFile)) {
107+
onEvent({ type: 'resync' });
108+
return;
109+
}
110+
111+
const stats = statSync(logFile);
112+
const watcherState = this.watchers.get(roomKey);
113+
if (!watcherState) {
114+
return;
115+
}
116+
117+
if (stats.size < watcherState.lastPosition) {
118+
watcherState.lastPosition = stats.size;
119+
onEvent({ type: 'resync' });
120+
return;
121+
}
122+
123+
if (stats.size === watcherState.lastPosition) {
124+
return;
125+
}
126+
127+
const fileDescriptor = openSync(logFile, 'r');
128+
try {
129+
const length = stats.size - watcherState.lastPosition;
130+
const chunkBuffer = Buffer.allocUnsafe(length);
131+
readSync(fileDescriptor, chunkBuffer, 0, length, watcherState.lastPosition);
132+
watcherState.lastPosition = stats.size;
133+
const content = chunkBuffer.toString('utf-8');
134+
if (content) {
135+
onEvent({ type: 'append', content });
136+
}
137+
} finally {
138+
closeSync(fileDescriptor);
139+
}
140+
} catch (err) {
141+
this.logger.error(`Failed to read cron log updates for <${taskName}>: ${(err as Error).message}`);
142+
}
143+
};
144+
145+
let watcher: FSWatcher;
146+
try {
147+
watcher = watch(logFile, () => {
148+
const watcherState = this.watchers.get(roomKey);
149+
if (!watcherState) {
150+
return;
151+
}
152+
153+
if (watcherState.debounceTimer) {
154+
clearTimeout(watcherState.debounceTimer);
155+
}
156+
157+
watcherState.debounceTimer = setTimeout(() => {
158+
watcherState.debounceTimer = null;
159+
emitFileChanges();
160+
}, 150);
161+
});
162+
} catch (err) {
163+
this.logger.warn(`Could not watch cron log file for <${taskName}>: ${(err as Error).message}`);
164+
return;
165+
}
166+
167+
watcher.on('error', (err) => {
168+
this.logger.warn(`Cron log watcher error for <${taskName}>: ${err.message}`);
169+
});
170+
171+
this.watchers.set(roomKey, {
172+
watcher,
173+
lastPosition,
174+
debounceTimer: null,
175+
});
176+
}
177+
178+
public stopWatching(taskName: string): void {
179+
const roomKey = this.getRoomName(taskName);
180+
const watcherState = this.watchers.get(roomKey);
181+
if (!watcherState) {
182+
return;
183+
}
184+
185+
if (watcherState.debounceTimer) {
186+
clearTimeout(watcherState.debounceTimer);
187+
}
188+
189+
watcherState.watcher.close();
190+
this.watchers.delete(roomKey);
191+
}
192+
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
import { Logger, UnauthorizedException } from '@nestjs/common';
2+
import {
3+
ConnectedSocket,
4+
MessageBody,
5+
OnGatewayConnection,
6+
OnGatewayDisconnect,
7+
SubscribeMessage,
8+
WebSocketGateway,
9+
WebSocketServer,
10+
} from '@nestjs/websockets';
11+
import { hash } from 'crypto';
12+
import { Server, Socket } from 'socket.io';
13+
import { Public } from '~/_common/decorators/public.decorator';
14+
import { Agents } from '../agents/_schemas/agents.schema';
15+
import { AgentsService } from '../agents/agents.service';
16+
import { CronLogsStreamService } from './cron-logs-stream.service';
17+
18+
type CronLogsSubscribePayload = {
19+
taskName?: string;
20+
tail?: number;
21+
};
22+
23+
@Public()
24+
@WebSocketGateway({
25+
namespace: '/core/cron',
26+
cors: { origin: true, credentials: true },
27+
})
28+
export class CronGateway implements OnGatewayConnection, OnGatewayDisconnect {
29+
private readonly logger = new Logger(CronGateway.name);
30+
private readonly clientSubscriptions = new Map<string, string>();
31+
32+
@WebSocketServer()
33+
server: Server;
34+
35+
public constructor(
36+
private readonly agentsService: AgentsService,
37+
private readonly cronLogsStreamService: CronLogsStreamService,
38+
) {}
39+
40+
public async handleConnection(client: Socket): Promise<void> {
41+
try {
42+
const id = `${client.handshake.query.id || ''}`;
43+
const key = `${client.handshake.query.key || ''}`;
44+
45+
if (!id || !key) {
46+
throw new UnauthorizedException();
47+
}
48+
49+
const user = await this.agentsService.findById<Agents>(id);
50+
if (!user) {
51+
throw new UnauthorizedException();
52+
}
53+
54+
const sseSeed = `${user.security?.secretKey || user._id || ''}`;
55+
if (key !== hash('sha256', sseSeed)) {
56+
throw new UnauthorizedException();
57+
}
58+
59+
this.logger.debug(`WebSocket connected: ${client.id}`);
60+
} catch {
61+
client.disconnect(true);
62+
}
63+
}
64+
65+
public handleDisconnect(client: Socket): void {
66+
const taskName = this.clientSubscriptions.get(client.id);
67+
if (taskName) {
68+
void this.unsubscribeClient(client, taskName);
69+
}
70+
71+
this.clientSubscriptions.delete(client.id);
72+
this.logger.debug(`WebSocket disconnected: ${client.id}`);
73+
}
74+
75+
@SubscribeMessage('subscribe')
76+
public async handleSubscribe(
77+
@ConnectedSocket() client: Socket,
78+
@MessageBody() payload: CronLogsSubscribePayload,
79+
): Promise<void> {
80+
await this.subscribeClient(client, payload?.taskName || '', payload?.tail);
81+
}
82+
83+
@SubscribeMessage('resync')
84+
public async handleResync(
85+
@ConnectedSocket() client: Socket,
86+
@MessageBody() payload: CronLogsSubscribePayload,
87+
): Promise<void> {
88+
const taskName = payload?.taskName || this.clientSubscriptions.get(client.id) || '';
89+
if (!taskName) {
90+
return;
91+
}
92+
93+
await this.sendSnapshot(client, taskName, payload?.tail);
94+
}
95+
96+
@SubscribeMessage('unsubscribe')
97+
public async handleUnsubscribe(@ConnectedSocket() client: Socket): Promise<void> {
98+
const taskName = this.clientSubscriptions.get(client.id);
99+
if (!taskName) {
100+
return;
101+
}
102+
103+
await this.unsubscribeClient(client, taskName);
104+
this.clientSubscriptions.delete(client.id);
105+
}
106+
107+
private async subscribeClient(client: Socket, taskName: string, tail?: number): Promise<void> {
108+
if (!taskName) {
109+
return;
110+
}
111+
112+
const previousTaskName = this.clientSubscriptions.get(client.id);
113+
if (previousTaskName && previousTaskName !== taskName) {
114+
await this.unsubscribeClient(client, previousTaskName);
115+
}
116+
117+
const roomName = this.cronLogsStreamService.getRoomName(taskName);
118+
await client.join(roomName);
119+
this.clientSubscriptions.set(client.id, taskName);
120+
121+
await this.sendSnapshot(client, taskName, tail);
122+
this.ensureRoomWatcher(taskName, roomName);
123+
}
124+
125+
private async sendSnapshot(client: Socket, taskName: string, tail?: number): Promise<void> {
126+
const snapshot = await this.cronLogsStreamService.readSnapshot(taskName, tail || 250);
127+
client.emit('logs', snapshot);
128+
}
129+
130+
private ensureRoomWatcher(taskName: string, roomName: string): void {
131+
this.cronLogsStreamService.startWatching(taskName, (event) => {
132+
this.server.to(roomName).emit('logs', event);
133+
});
134+
}
135+
136+
private async unsubscribeClient(client: Socket, taskName: string): Promise<void> {
137+
const roomName = this.cronLogsStreamService.getRoomName(taskName);
138+
await client.leave(roomName);
139+
140+
try {
141+
const remainingClients = await this.server.in(roomName).fetchSockets();
142+
if (remainingClients.length === 0) {
143+
this.cronLogsStreamService.stopWatching(taskName);
144+
}
145+
} catch (err) {
146+
this.logger.warn(`Could not inspect room ${roomName} after unsubscribe: ${(err as Error).message}`);
147+
this.cronLogsStreamService.stopWatching(taskName);
148+
}
149+
}
150+
}
Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1-
2-
import { Module } from '@nestjs/common'
3-
import './cron-console-handlers.bootstrap'
4-
import { CronService } from './cron.service'
5-
import { CronController } from './cron.controller'
6-
import { CronHooksService } from './cron-hooks.service'
1+
import { Module } from '@nestjs/common';
2+
import './cron-console-handlers.bootstrap';
3+
import { AgentsModule } from '~/core/agents/agents.module';
4+
import { CronService } from './cron.service';
5+
import { CronController } from './cron.controller';
6+
import { CronHooksService } from './cron-hooks.service';
7+
import { CronGateway } from './cron.gateway';
8+
import { CronLogsStreamService } from './cron-logs-stream.service';
79

810
/**
911
* Module Cron - Gestion des tâches planifiées (cron).
@@ -12,11 +14,9 @@ import { CronHooksService } from './cron-hooks.service'
1214
* en s'appuyant sur la configuration YAML et la persistance MongoDB.
1315
*/
1416
@Module({
17+
imports: [AgentsModule],
1518
controllers: [CronController],
16-
providers: [CronService, CronHooksService],
17-
exports: [
18-
CronService,
19-
CronHooksService,
20-
],
19+
providers: [CronService, CronHooksService, CronLogsStreamService, CronGateway],
20+
exports: [CronService, CronHooksService],
2121
})
22-
export class CronModule { }
22+
export class CronModule {}

apps/web/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
"echarts": "^6.0.0",
4646
"fast-password-entropy": "^1.1.1",
4747
"hibp": "^15.0.1",
48+
"highlight.js": "^11.11.1",
4849
"lodash": "^4.17.23",
4950
"lodash-es": "^4.17.23",
5051
"moment": "^2.30.1",

0 commit comments

Comments
 (0)