@@ -4,100 +4,16 @@ import { createReadStream, FSWatcher, watch } from 'fs';
4
4
import { handleRequest , handleSyncServerRequest } from '@/api-helpers/axios' ;
5
5
import { ServiceNames } from '@/constants/service' ;
6
6
import {
7
- ServiceStatus ,
8
7
UPDATE_INTERVAL ,
9
8
LogFile ,
10
9
LOG_FILES ,
11
10
StreamEventType ,
12
11
FileEvent
13
12
} from '@/constants/stream' ;
14
13
15
- let watchers : FSWatcher [ ] = [ ] ;
16
- let lastPositions : { [ key : string ] : number } = { } ;
17
-
18
- const sendFileContent = async ( filePath : string , serviceName : string ) => {
19
- if ( streamClosed ) return ; // Prevent sending if the stream is closed
20
- console . log ( `Sending file content for ${ serviceName } ` ) ;
21
-
22
- return new Promise < void > ( ( resolve , reject ) => {
23
- const stream = createReadStream ( filePath , {
24
- start : lastPositions [ filePath ] || 0 ,
25
- encoding : 'utf8'
26
- } ) ;
27
-
28
- stream . on ( 'data' , ( chunk ) => {
29
- if ( streamClosed ) return ;
30
- const data = {
31
- type : 'log-update' ,
32
- serviceName,
33
- content : chunk
34
- } ;
35
-
36
- writer . write ( encoder . encode ( `data: ${ JSON . stringify ( data ) } \n\n` ) ) ;
37
- lastPositions [ filePath ] =
38
- ( lastPositions [ filePath ] || 0 ) + Buffer . byteLength ( chunk ) ;
39
- } ) ;
40
-
41
- stream . on ( 'end' , ( ) => resolve ( ) ) ;
42
- stream . on ( 'error' , ( error ) => reject ( error ) ) ;
43
- } ) ;
44
- } ;
45
-
46
- const startWatchers = ( ) => {
47
- logFiles . forEach ( async ( { path, serviceName } ) => {
48
- await sendFileContent ( path , serviceName ) ;
49
-
50
- const watcher = watch ( path , async ( eventType ) => {
51
- if ( eventType === 'change' ) {
52
- console . log ( `File ${ path } (${ serviceName } ) has been changed` ) ;
53
- await sendFileContent ( path , serviceName ) ;
54
- }
55
- } ) ;
56
-
57
- watchers . push ( watcher ) ;
58
- console . log ( `Watcher created for ${ path } ` ) ;
59
- } ) ;
60
- } ;
61
-
62
- const cleanupWatchers = ( ) => {
63
- watchers . forEach ( ( watcher ) => watcher . close ( ) ) ;
64
- watchers = [ ] ;
65
- } ;
66
-
67
- sendStatuses ( ) ;
68
- startWatchers ( ) ;
69
-
70
- const closeStream = ( ) => {
71
- console . log ( 'Client Disconnected' ) ;
72
- if ( ! streamClosed ) {
73
- streamClosed = true ;
74
- writer
75
- . close ( )
76
- . catch ( ( error ) => console . error ( 'Error closing writer:' , error ) ) ;
77
- if ( timeoutId ) {
78
- clearTimeout ( timeoutId ) ;
79
- }
80
- }
81
- cleanupWatchers ( ) ;
82
- } ;
83
-
84
- request . signal . onabort = closeStream ;
85
-
86
- return new Response ( responseStream . readable , {
87
- headers : {
88
- 'Content-Type' : 'text/event-stream' ,
89
- Connection : 'keep-alive' ,
90
- 'Cache-Control' : 'no-cache, no-transform'
91
- }
92
- } ) ;
93
- }
94
-
95
- process . setMaxListeners ( 20 ) ;
96
-
97
- // Utility function to execute shell commands as promises
98
14
const execPromise = ( command : string ) : Promise < string > => {
99
15
return new Promise ( ( resolve , reject ) => {
100
- exec ( command , ( error , stdout , stderr ) => {
16
+ exec ( command , ( error , stdout ) => {
101
17
if ( error ) {
102
18
return reject ( error ) ;
103
19
}
@@ -144,7 +60,7 @@ const checkServiceStatus = async (serviceName: string): Promise<boolean> => {
144
60
}
145
61
} ;
146
62
147
- = const getStatus = async ( ) : Promise < {
63
+ const getStatus = async ( ) : Promise < {
148
64
[ key in ServiceNames ] : { isUp : boolean } ;
149
65
} > => {
150
66
const services = Object . values ( ServiceNames ) ;
@@ -165,11 +81,7 @@ const checkServiceStatus = async (serviceName: string): Promise<boolean> => {
165
81
return statuses ;
166
82
} ;
167
83
168
- // Stream handling function
169
- export async function GET ( request : NextRequest ) : Promise < Response > {
170
- const responseStream = new TransformStream ( ) ;
171
-
172
- export async function GET ( request : NextRequest ) : Promise < Response > {
84
+ export async function GET ( ) : Promise < Response > {
173
85
const encoder = new TextEncoder ( ) ;
174
86
let streamClosed = false ;
175
87
let lastPositions : { [ key : string ] : number } = { } ;
@@ -184,6 +96,7 @@ export async function GET(request: NextRequest): Promise<Response> {
184
96
const stream = new ReadableStream ( {
185
97
start ( controller ) {
186
98
const pushStatus = async ( ) => {
99
+ console . log ( 'push status' ) ;
187
100
if ( streamClosed ) return ;
188
101
try {
189
102
const statuses = await getStatus ( ) ;
@@ -201,6 +114,8 @@ export async function GET(request: NextRequest): Promise<Response> {
201
114
} ;
202
115
203
116
const pushFileContent = async ( { path, serviceName } : LogFile ) => {
117
+ console . log ( 'push content' ) ;
118
+
204
119
if ( streamClosed ) return ;
205
120
try {
206
121
const fileStream = createReadStream ( path , {
@@ -240,6 +155,7 @@ export async function GET(request: NextRequest): Promise<Response> {
240
155
startWatchers ( ) ;
241
156
} ,
242
157
cancel ( ) {
158
+ console . log ( 'CLOSE ' ) ;
243
159
streamClosed = true ;
244
160
245
161
if ( statusTimer ) {
@@ -260,127 +176,3 @@ export async function GET(request: NextRequest): Promise<Response> {
260
176
}
261
177
} ) ;
262
178
}
263
-
264
- const execPromise = ( command : string ) : Promise < string > => {
265
- return new Promise ( ( resolve , reject ) => {
266
- exec ( command , ( error , stdout , _stderr ) => {
267
- if ( error ) {
268
- return reject ( error ) ;
269
- }
270
- resolve ( stdout ) ;
271
- } ) ;
272
- } ) ;
273
- } ;
274
-
275
- const checkServiceStatus = async ( serviceName : string ) : Promise < boolean > => {
276
- try {
277
- switch ( serviceName ) {
278
- case ServiceNames . API_SERVER : {
279
- const response = await handleRequest ( '' ) ;
280
- return response . message . includes ( 'hello world' ) ;
281
- }
282
-
283
- case ServiceNames . REDIS : {
284
- const REDIS_PORT = process . env . REDIS_PORT ;
285
- const response = await execPromise ( `redis-cli -p ${ REDIS_PORT } ping` ) ;
286
- return response . trim ( ) . includes ( 'PONG' ) ;
287
- }
288
-
289
- case ServiceNames . POSTGRES : {
290
- const POSTGRES_PORT = process . env . DB_PORT ;
291
- const POSTGRES_HOST = process . env . DB_HOST ;
292
- const response = await execPromise (
293
- `pg_isready -h ${ POSTGRES_HOST } -p ${ POSTGRES_PORT } `
294
- ) ;
295
- return response . includes ( 'accepting connections' ) ;
296
- }
297
-
298
- case ServiceNames . SYNC_SERVER : {
299
- const response = await handleSyncServerRequest ( '' ) ;
300
- return response . message . includes ( 'hello world' ) ;
301
- }
302
-
303
- default :
304
- console . warn ( `Service ${ serviceName } not recognized.` ) ;
305
- return false ;
306
- }
307
- } catch ( error ) {
308
- console . error ( `${ serviceName } service is down:` , error ) ;
309
- return false ;
310
- }
311
- } ;
312
-
313
- const getStatus = async ( ) : Promise < {
314
- [ key in ServiceNames ] : { isUp : boolean } ;
315
- } > => {
316
- const services = Object . values ( ServiceNames ) ;
317
- const statuses : { [ key in ServiceNames ] : { isUp : boolean } } = {
318
- [ ServiceNames . API_SERVER ] : { isUp : false } ,
319
- [ ServiceNames . REDIS ] : { isUp : false } ,
320
- [ ServiceNames . POSTGRES ] : { isUp : false } ,
321
- [ ServiceNames . SYNC_SERVER ] : { isUp : false }
322
- } ;
323
-
324
- await Promise . all (
325
- services . map ( async ( service ) => {
326
- const isUp = await checkServiceStatus ( service ) ;
327
- statuses [ service ] = { isUp } ;
328
- } )
329
- ) ;
330
-
331
- return statuses ;
332
- } ;
333
-
334
- // Stream handling function
335
- export async function GET ( request : NextRequest ) : Promise < Response > {
336
- const responseStream = new TransformStream ( ) ;
337
- const writer = responseStream . writable . getWriter ( ) ;
338
- const encoder = new TextEncoder ( ) ;
339
- let timeoutId : NodeJS . Timeout | null = null ;
340
- let streamClosed = false ;
341
-
342
- // Function to send statuses to the client
343
- const sendStatuses = async ( ) => {
344
- if ( streamClosed ) return ; // Prevent sending if the stream is closed
345
-
346
- try {
347
- console . log ( 'Fetching service statuses...' ) ;
348
- const statuses = await getStatus ( ) ;
349
- const statusData = { type : 'status-update' , statuses } ;
350
- await writer . write (
351
- encoder . encode ( `data: ${ JSON . stringify ( statusData ) } \n\n` )
352
- ) ;
353
- } catch ( error ) {
354
- console . error ( 'Error sending statuses:' , error ) ;
355
- }
356
-
357
- // Schedule the next status update
358
- timeoutId = setTimeout ( sendStatuses , 15000 ) ;
359
- } ;
360
-
361
- // Start the initial status send
362
- sendStatuses ( ) ;
363
-
364
- // Function to close the stream and clear timeout
365
- const closeStream = ( ) => {
366
- console . log ( 'CLIENT DISCONNECTED' ) ;
367
- if ( ! streamClosed ) {
368
- streamClosed = true ;
369
- writer
370
- . close ( )
371
- . catch ( ( error ) => console . error ( 'Error closing writer:' , error ) ) ;
372
- if ( timeoutId ) {
373
- clearTimeout ( timeoutId ) ;
374
- }
375
- }
376
- } ;
377
-
378
- // Return the response stream
379
- return new Response ( responseStream . readable , {
380
- headers : {
381
- 'Content-Type' : 'text/event-stream' ,
382
- Connection : 'keep-alive' ,
383
- 'Cache-Control' : 'no-cache, no-transform'
384
- }
385
- } ) ;
386
- }
0 commit comments