Skip to content

Commit 16c27b5

Browse files
committed
feat: enhance backend and frontend integration with WebSocket support
- Added support for WebSocket communication in the backend, allowing real-time job updates via the new BackendsGateway. - Implemented a new daemon status endpoint in the BackendsController to check the health of the daemon. - Updated the BackendsService to handle job execution and status reporting, including handling for the SESAME_MS_BETA environment variable. - Refactored the frontend to utilize Socket.IO for real-time updates, replacing the previous SSE implementation. - Enhanced the Nuxt.js configuration to support WebSocket connections and ensure client manifest stubs are created as needed. - Updated various components to reflect the new WebSocket integration, improving user experience with real-time feedback on job statuses.
1 parent c9ef0c3 commit 16c27b5

27 files changed

Lines changed: 1296 additions & 291 deletions

apps/api/.env.example

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
# SESAME_LOG_LEVEL="info"
22
# SESAME_NAME_QUEUE="sesame"
3+
# SESAME_MS_BETA=1
34
SESAME_JWT_SECRET="zeaezazeaezazaeeazrftrqezfqfqszewfsqddfqsqsqsdqdsqdsqdzsdqzsdqzs"
4-
SESAME_REDIS_URI="redis://sesame-redis:6379"
5+
# Docker (make dev) : hostname du conteneur Redis sur le réseau dev
6+
SESAME_REDIS_URI="redis://sesame-redis:6379/0"
7+
# Local hors Docker : redis://localhost:6379/0
58
SESAME_MONGO_URI="mongodb://sesame-mongodb:27017/sesame"
69
# Adresse du frontal de changement de mot de passe
710
SESAME_FRONT_MDP="http://localhost:3000"

apps/api/package.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,15 @@
2828
"@nestjs/core": "^10.4.8",
2929
"@nestjs/event-emitter": "^3.0.1",
3030
"@nestjs/jwt": "^10.2.0",
31+
"@nestjs/microservices": "^10.4.16",
3132
"@nestjs/mongoose": "^10.1.0",
3233
"@nestjs/passport": "^10.0.3",
3334
"@nestjs/platform-express": "^10.4.8",
35+
"@nestjs/platform-socket.io": "^10.4.8",
3436
"@nestjs/schedule": "^6.0.0",
3537
"@nestjs/swagger": "^8.0.7",
3638
"@nestjs/terminus": "^11.0.0",
39+
"@nestjs/websockets": "^10.4.8",
3740
"@sentry/nestjs": "^10.25.0",
3841
"@sentry/profiling-node": "^10.25.0",
3942
"@simplewebauthn/server": "^13.3.0",
@@ -83,6 +86,7 @@
8386
"rxjs": "^7.8.1",
8487
"schema-to-yup": "^1.12.18",
8588
"smpp": "^0.6.0-rc.4",
89+
"socket.io": "^4.8.3",
8690
"speakeasy": "^2.0.0",
8791
"swagger-themes": "^1.4.3",
8892
"types-package-json": "^2.0.39",
Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
1-
import { Queue, QueueEvents, ConnectionOptions } from 'bullmq';
21
import { AbstractService, AbstractServiceContext } from './abstract.service';
3-
import { getRedisConnectionToken } from '@nestjs-modules/ioredis';
4-
import { Redis } from 'ioredis';
52
import { OnModuleInit } from '@nestjs/common';
63
import { ConfigService } from '@nestjs/config';
4+
import { SesameQueueAdapter } from '~/_common/interfaces/sesame-job.interface';
5+
import { SESAME_QUEUE } from '~/_common/queue/sesame-queue.constants';
76

87
export abstract class AbstractQueueProcessor extends AbstractService implements OnModuleInit {
98
protected config: ConfigService;
9+
protected sesameQueue: SesameQueueAdapter;
1010

11-
private redis: Redis;
12-
protected _queue: Queue;
13-
public queueEvents: QueueEvents;
11+
public get queue(): SesameQueueAdapter {
12+
return this.sesameQueue;
13+
}
1414

15-
public get queue(): Queue {
16-
return this._queue;
15+
public get queueEvents(): SesameQueueAdapter['events'] {
16+
return this.sesameQueue.events;
1717
}
1818

1919
public constructor(context?: AbstractServiceContext) {
@@ -23,13 +23,7 @@ export abstract class AbstractQueueProcessor extends AbstractService implements
2323

2424
public async onModuleInit() {
2525
this.config = this.moduleRef.get<ConfigService>(ConfigService, { strict: false });
26-
this.redis = this.moduleRef.get<Redis>(getRedisConnectionToken(), { strict: false });
27-
28-
this._queue = new Queue(this.config.get<string>('application.nameQueue'), {
29-
connection: this.redis as unknown as ConnectionOptions,
30-
});
31-
this.queueEvents = new QueueEvents(this.config.get<string>('application.nameQueue'), {
32-
connection: this.redis as unknown as ConnectionOptions,
33-
});
26+
this.sesameQueue = this.moduleRef.get<SesameQueueAdapter>(SESAME_QUEUE, { strict: false });
27+
await this.sesameQueue.connect();
3428
}
3529
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import { IoAdapter } from '@nestjs/platform-socket.io';
2+
import { Server as HttpServer } from 'http';
3+
import { Server as HttpsServer } from 'https';
4+
import { ServerOptions } from 'socket.io';
5+
6+
export class SesameIoAdapter extends IoAdapter {
7+
private ioServer: ReturnType<IoAdapter['createIOServer']> | undefined;
8+
9+
public constructor(httpServer: HttpServer) {
10+
super(httpServer);
11+
}
12+
13+
public createIOServer(port: number, options?: ServerOptions) {
14+
if (!this.ioServer) {
15+
this.ioServer = super.createIOServer(port, options);
16+
}
17+
18+
return this.ioServer;
19+
}
20+
21+
public attachToServer(server: HttpServer | HttpsServer): void {
22+
this.ioServer?.attach(server);
23+
}
24+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import { RedisOptions } from 'ioredis';
2+
3+
/** Options NestJS Redis transport (ioredis + retryAttempts/retryDelay). */
4+
export type RedisMicroserviceTransportOptions = RedisOptions & {
5+
retryAttempts?: number;
6+
retryDelay?: number;
7+
};
8+
9+
/**
10+
* Convertit SESAME_REDIS_URI en options ioredis (host/port/db) pour le transport Redis NestJS.
11+
* Plus fiable que `url` seul avec ClientsModule / createMicroservice.
12+
*/
13+
export function redisOptionsFromUri(uri: string, extra?: RedisOptions): RedisOptions {
14+
const parsed = new URL(uri);
15+
const db = parsed.pathname?.replace(/^\//, '');
16+
const port = parsed.port ? parseInt(parsed.port, 10) : 6379;
17+
18+
return {
19+
host: parsed.hostname,
20+
port,
21+
...(parsed.username ? { username: decodeURIComponent(parsed.username) } : {}),
22+
...(parsed.password ? { password: decodeURIComponent(parsed.password) } : {}),
23+
...(db !== '' && !Number.isNaN(Number(db)) ? { db: Number(db) } : {}),
24+
...extra,
25+
};
26+
}
27+
28+
/** Options recommandées pour @nestjs/microservices (pub/sub Redis). */
29+
export function redisMicroserviceTransportOptions(uri: string): RedisMicroserviceTransportOptions {
30+
return {
31+
...redisOptionsFromUri(uri, {
32+
maxRetriesPerRequest: null,
33+
enableReadyCheck: true,
34+
lazyConnect: false,
35+
}),
36+
retryAttempts: 0,
37+
retryDelay: 0,
38+
};
39+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
export type SesameJobEventName = 'active' | 'progress' | 'completed' | 'failed';
2+
3+
export interface SesameJobEvent {
4+
event: SesameJobEventName;
5+
jobId: string;
6+
name?: string;
7+
progress?: number;
8+
returnvalue?: unknown;
9+
failedReason?: string;
10+
}
11+
12+
export function sesameJobEventsChannel(queueName: string): string {
13+
return `${queueName}:events`;
14+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
export interface SesameJobMessagePayload {
2+
id?: string;
3+
name: string;
4+
data?: Record<string, unknown>;
5+
options?: Record<string, unknown>;
6+
}
7+
8+
export interface SesameSubmittedJob {
9+
id: string;
10+
waitUntilFinished(timeoutMs: number): Promise<unknown>;
11+
getState(): Promise<string>;
12+
discard(): Promise<void>;
13+
}
14+
15+
export interface SesameQueueEventsEmitter {
16+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
17+
on(event: string, handler: (...args: any[]) => void): void;
18+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
19+
off(event: string, handler: (...args: any[]) => void): void;
20+
}
21+
22+
export interface SesameQueueAdapter {
23+
add(
24+
name: string,
25+
data: Record<string, unknown>,
26+
options?: { jobId?: string; attempts?: number },
27+
isAsync?: boolean,
28+
): Promise<SesameSubmittedJob>;
29+
getCompleted(): Promise<Array<{ id: string; name: string; returnvalue: unknown }>>;
30+
readonly events: SesameQueueEventsEmitter;
31+
connect(): Promise<void>;
32+
close(): Promise<void>;
33+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import { Logger } from '@nestjs/common';
2+
import { ConfigService } from '@nestjs/config';
3+
import { ConnectionOptions, Job, Queue, QueueEvents } from 'bullmq';
4+
import Redis from 'ioredis';
5+
import {
6+
SesameQueueAdapter,
7+
SesameQueueEventsEmitter,
8+
SesameSubmittedJob,
9+
} from '~/_common/interfaces/sesame-job.interface';
10+
11+
class BullmqQueueEventsEmitter implements SesameQueueEventsEmitter {
12+
public constructor(private readonly queueEvents: QueueEvents) {}
13+
14+
public on(event: string, handler: (...args: any[]) => void): void {
15+
// BullMQ event names are validated at runtime
16+
(this.queueEvents as any).on(event, handler);
17+
}
18+
19+
public off(event: string, handler: (...args: any[]) => void): void {
20+
(this.queueEvents as any).off(event, handler);
21+
}
22+
}
23+
24+
export class BullmqQueueAdapter implements SesameQueueAdapter {
25+
private readonly _logger = new Logger(BullmqQueueAdapter.name);
26+
private _queue: Queue;
27+
private _queueEvents: QueueEvents;
28+
private _eventsEmitter: BullmqQueueEventsEmitter;
29+
30+
public constructor(
31+
private readonly redis: Redis,
32+
private readonly config: ConfigService,
33+
) {}
34+
35+
public get events(): SesameQueueEventsEmitter {
36+
return this._eventsEmitter;
37+
}
38+
39+
public async connect(): Promise<void> {
40+
const queueName = this.config.get<string>('application.nameQueue');
41+
const connection = this.redis as unknown as ConnectionOptions;
42+
43+
this._queue = new Queue(queueName, { connection });
44+
this._queueEvents = new QueueEvents(queueName, { connection });
45+
this._eventsEmitter = new BullmqQueueEventsEmitter(this._queueEvents);
46+
this._logger.log(`BullMQ queue "${queueName}" ready`);
47+
}
48+
49+
public async close(): Promise<void> {
50+
await this._queueEvents?.close();
51+
await this._queue?.close();
52+
}
53+
54+
public async add(
55+
name: string,
56+
data: Record<string, unknown>,
57+
options?: { jobId?: string; attempts?: number },
58+
): Promise<SesameSubmittedJob> {
59+
const job = await this._queue.add(name, data, {
60+
jobId: options?.jobId,
61+
attempts: options?.attempts ?? 1,
62+
});
63+
return this.toSubmittedJob(job);
64+
}
65+
66+
public async getCompleted(): Promise<Array<{ id: string; name: string; returnvalue: unknown }>> {
67+
const jobs = await this._queue.getCompleted();
68+
return jobs.map((job) => ({
69+
id: String(job.id),
70+
name: job.name,
71+
returnvalue: job.returnvalue,
72+
}));
73+
}
74+
75+
private toSubmittedJob(job: Job): SesameSubmittedJob {
76+
return {
77+
id: String(job.id),
78+
waitUntilFinished: (timeoutMs) => job.waitUntilFinished(this._queueEvents, timeoutMs),
79+
getState: () => job.getState(),
80+
discard: async () => {
81+
await job.discard();
82+
},
83+
};
84+
}
85+
}

0 commit comments

Comments
 (0)