Skip to content

Commit 5a15eb7

Browse files
committed
cleaned up signal handling and resolved some valid 🐇 issues
1 parent bcdb99c commit 5a15eb7

File tree

5 files changed

+45
-12
lines changed

5 files changed

+45
-12
lines changed

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,8 @@ export class RunsReplicationService {
204204
}
205205

206206
public async shutdown() {
207+
if (this._isShuttingDown) return;
208+
207209
this._isShuttingDown = true;
208210

209211
this.logger.info("Initiating shutdown of runs replication service");

apps/webapp/app/v3/dynamicFlushScheduler.server.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,20 @@ export class DynamicFlushScheduler<T> {
142142
}
143143

144144
private setupShutdownHandlers(): void {
145-
signalsEmitter.on("SIGTERM", () => this.shutdown());
146-
signalsEmitter.on("SIGINT", () => this.shutdown());
145+
signalsEmitter.on("SIGTERM", () =>
146+
this.shutdown().catch((error) => {
147+
this.logger.error("Error shutting down dynamic flush scheduler", {
148+
error,
149+
});
150+
})
151+
);
152+
signalsEmitter.on("SIGINT", () =>
153+
this.shutdown().catch((error) => {
154+
this.logger.error("Error shutting down dynamic flush scheduler", {
155+
error,
156+
});
157+
})
158+
);
147159
}
148160

149161
private startFlushTimer(): void {

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ export class MarQS {
113113
private queueDequeueCooloffPeriod: Map<string, number> = new Map();
114114
private queueDequeueCooloffCounts: Map<string, number> = new Map();
115115
private clearCooloffPeriodInterval: NodeJS.Timeout;
116+
isShuttingDown: boolean = false;
116117

117118
constructor(private readonly options: MarQSOptions) {
118119
this.redis = options.redis;
@@ -157,6 +158,9 @@ export class MarQS {
157158
}
158159

159160
async shutdown(signal: NodeJS.Signals) {
161+
if (this.isShuttingDown) return;
162+
this.isShuttingDown = true;
163+
160164
console.log("👇 Shutting down marqs", this.name, signal);
161165
clearInterval(this.clearCooloffPeriodInterval);
162166
this.#rebalanceWorkers.forEach((worker) => worker.stop());

apps/webapp/app/v3/tracing.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,12 @@ export async function startSpanWithEnv<T>(
4242
options?: SpanOptions
4343
): Promise<T> {
4444
return startSpan(tracer, name, fn, {
45+
...options,
4546
attributes: {
4647
...attributesFromAuthenticatedEnv(env),
4748
...options?.attributes,
4849
},
4950
kind: SpanKind.SERVER,
50-
...options,
5151
});
5252
}
5353

apps/webapp/server.ts

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ function forkWorkers() {
2727
}
2828

2929
function installPrimarySignalHandlers() {
30+
let didHandleSigterm = false;
31+
let didHandleSigint = false;
32+
let didGracefulExit = false;
33+
3034
const forward = (signal: NodeJS.Signals) => {
3135
for (const id in cluster.workers) {
3236
const w = cluster.workers[id];
@@ -39,21 +43,28 @@ function installPrimarySignalHandlers() {
3943
};
4044

4145
const gracefulExit = () => {
42-
const timeoutMs = Number(process.env.GRACEFUL_SHUTDOWN_TIMEOUT || 30_000) * 1000;
46+
if (didGracefulExit) return;
47+
didGracefulExit = true;
48+
49+
const timeoutMs = Number(process.env.GRACEFUL_SHUTDOWN_TIMEOUT || 30_000);
4350
// wait for workers to exit, then exit the primary too
4451
const maybeExit = () => {
4552
const alive = Object.values(cluster.workers || {}).some((w) => w && !w.isDead());
4653
if (!alive) process.exit(0);
4754
};
4855
setInterval(maybeExit, 1000);
49-
setTimeout(() => process.exit(0), timeoutMs).unref();
56+
setTimeout(() => process.exit(0), timeoutMs);
5057
};
5158

5259
process.on("SIGTERM", () => {
60+
if (didHandleSigterm) return;
61+
didHandleSigterm = true;
5362
forward("SIGTERM");
5463
gracefulExit();
5564
});
5665
process.on("SIGINT", () => {
66+
if (didHandleSigint) return;
67+
didHandleSigint = true;
5768
forward("SIGINT");
5869
gracefulExit();
5970
});
@@ -193,25 +204,29 @@ if (ENABLE_CLUSTER && cluster.isPrimary) {
193204
// headers will instead be limited by the maxHeaderSize
194205
server.maxHeadersCount = 0;
195206

196-
process.on("SIGTERM", () => {
207+
let didCloseServer = false;
208+
209+
function closeServer(signal: NodeJS.Signals) {
210+
if (didCloseServer) return;
211+
didCloseServer = true;
212+
197213
server.close((err) => {
198214
if (err) {
199215
console.error("Error closing express server:", err);
200216
} else {
201217
console.log("Express server closed gracefully.");
202218
}
203219
});
204-
});
220+
}
221+
222+
process.on("SIGTERM", closeServer);
223+
process.on("SIGINT", closeServer);
205224

206225
socketIo?.io.attach(server);
207226
server.removeAllListeners("upgrade"); // prevent duplicate upgrades from listeners created by io.attach()
208227

209228
server.on("upgrade", async (req, socket, head) => {
210-
console.log(
211-
`Attemping to upgrade connection at url ${req.url} with headers: ${JSON.stringify(
212-
req.headers
213-
)}`
214-
);
229+
console.log(`Attemping to upgrade connection at url ${req.url}`);
215230

216231
socket.on("error", (err) => {
217232
console.error("Connection upgrade error:", err);

0 commit comments

Comments
 (0)