11import { Logger , UnauthorizedException } from '@nestjs/common' ;
2- import { OnGatewayConnection , OnGatewayDisconnect , WebSocketGateway , WebSocketServer } from '@nestjs/websockets' ;
2+ import {
3+ ConnectedSocket ,
4+ OnGatewayConnection ,
5+ OnGatewayDisconnect ,
6+ SubscribeMessage ,
7+ WebSocketGateway ,
8+ WebSocketServer ,
9+ } from '@nestjs/websockets' ;
310import { hash } from 'crypto' ;
411import { Server , Socket } from 'socket.io' ;
512import { Public } from '~/_common/decorators/public.decorator' ;
@@ -10,7 +17,15 @@ import { BackendsService } from './backends.service';
1017
1118type JobChannel = 'job:added' | 'job:completed' | 'job:failed' | 'job:progress' | 'job:active' ;
1219
20+ type DaemonStatusPayload = {
21+ online : boolean ;
22+ pingMs : number | null ;
23+ error ?: string ;
24+ version ?: string ;
25+ } ;
26+
1327const IDENTITY_JOB_TYPES = [ ActionType . IDENTITY_UPDATE , ActionType . IDENTITY_CREATE , ActionType . IDENTITY_DELETE ] ;
28+ const DAEMON_STATUS_INTERVAL_MS = 20_000 ;
1429
1530@Public ( )
1631@WebSocketGateway ( {
@@ -20,6 +35,9 @@ const IDENTITY_JOB_TYPES = [ActionType.IDENTITY_UPDATE, ActionType.IDENTITY_CREA
2035export class BackendsGateway implements OnGatewayConnection , OnGatewayDisconnect {
2136 private readonly logger = new Logger ( BackendsGateway . name ) ;
2237 private readonly clientSubscriptions = new Map < string , ( ) => void > ( ) ;
38+ private lastDaemonStatus : DaemonStatusPayload | null = null ;
39+ private daemonStatusInterval : NodeJS . Timeout | null = null ;
40+ private daemonStatusPingInFlight = false ;
2341
2442 @WebSocketServer ( )
2543 server : Server ;
@@ -50,6 +68,10 @@ export class BackendsGateway implements OnGatewayConnection, OnGatewayDisconnect
5068
5169 const cleanup = this . subscribeClient ( client ) ;
5270 this . clientSubscriptions . set ( client . id , cleanup ) ;
71+ this . ensureDaemonStatusWatcher ( ) ;
72+ if ( this . lastDaemonStatus ) {
73+ this . emitDaemonStatus ( client , this . lastDaemonStatus ) ;
74+ }
5375 this . logger . debug ( `WebSocket connected: ${ client . id } ` ) ;
5476 } catch {
5577 client . disconnect ( true ) ;
@@ -60,9 +82,20 @@ export class BackendsGateway implements OnGatewayConnection, OnGatewayDisconnect
6082 const cleanup = this . clientSubscriptions . get ( client . id ) ;
6183 cleanup ?.( ) ;
6284 this . clientSubscriptions . delete ( client . id ) ;
85+ this . stopDaemonStatusWatcherIfIdle ( ) ;
6386 this . logger . debug ( `WebSocket disconnected: ${ client . id } ` ) ;
6487 }
6588
89+ @SubscribeMessage ( 'daemon:status' )
90+ public async handleDaemonStatusRequest ( @ConnectedSocket ( ) client : Socket ) : Promise < void > {
91+ if ( this . lastDaemonStatus ) {
92+ this . emitDaemonStatus ( client , this . lastDaemonStatus ) ;
93+ return ;
94+ }
95+
96+ await this . refreshDaemonStatus ( ) ;
97+ }
98+
6699 private subscribeClient ( client : Socket ) : ( ) => void {
67100 const fireMessage = ( channel : JobChannel , message : unknown ) => {
68101 try {
@@ -122,4 +155,66 @@ export class BackendsGateway implements OnGatewayConnection, OnGatewayDisconnect
122155 this . backendsService . queueEvents . off ( 'failed' , onFailed ) ;
123156 } ;
124157 }
158+
159+ private ensureDaemonStatusWatcher ( ) : void {
160+ if ( this . daemonStatusInterval ) {
161+ return ;
162+ }
163+
164+ void this . refreshDaemonStatus ( ) ;
165+ this . daemonStatusInterval = setInterval ( ( ) => void this . refreshDaemonStatus ( ) , DAEMON_STATUS_INTERVAL_MS ) ;
166+ }
167+
168+ private stopDaemonStatusWatcherIfIdle ( ) : void {
169+ const connectedClients = this . server ?. sockets ?. sockets ?. size ?? 0 ;
170+ if ( connectedClients > 0 ) {
171+ return ;
172+ }
173+
174+ if ( this . daemonStatusInterval ) {
175+ clearInterval ( this . daemonStatusInterval ) ;
176+ this . daemonStatusInterval = null ;
177+ }
178+ }
179+
180+ private async refreshDaemonStatus ( ) : Promise < void > {
181+ if ( this . daemonStatusPingInFlight ) {
182+ return ;
183+ }
184+
185+ this . daemonStatusPingInFlight = true ;
186+ try {
187+ const status = await this . backendsService . pingDaemon ( ) ;
188+ this . lastDaemonStatus = status ;
189+ this . broadcastDaemonStatus ( status ) ;
190+ } catch ( err ) {
191+ this . logger . error ( `Daemon status refresh failed: ${ err } ` , BackendsGateway . name ) ;
192+ } finally {
193+ this . daemonStatusPingInFlight = false ;
194+ }
195+ }
196+
197+ private emitDaemonStatus ( client : Socket , status : DaemonStatusPayload ) : void {
198+ try {
199+ client . emit ( 'message' , { channel : 'daemon:status' , payload : status } ) ;
200+ this . logger . debug ( `Emit to <daemon:status> with data <${ JSON . stringify ( status ) } >` , BackendsGateway . name ) ;
201+ } catch ( err ) {
202+ this . logger . error (
203+ `Emit error from <daemon:status> with data <${ JSON . stringify ( status ) } >. Error: ${ err } ` ,
204+ BackendsGateway . name ,
205+ ) ;
206+ }
207+ }
208+
209+ private broadcastDaemonStatus ( status : DaemonStatusPayload ) : void {
210+ try {
211+ this . server . emit ( 'message' , { channel : 'daemon:status' , payload : status } ) ;
212+ this . logger . debug ( `Broadcast <daemon:status> with data <${ JSON . stringify ( status ) } >` , BackendsGateway . name ) ;
213+ } catch ( err ) {
214+ this . logger . error (
215+ `Broadcast error from <daemon:status> with data <${ JSON . stringify ( status ) } >. Error: ${ err } ` ,
216+ BackendsGateway . name ,
217+ ) ;
218+ }
219+ }
125220}
0 commit comments