Skip to content

Commit 922b6ae

Browse files
(fix): stop the sender closing connection without the receiver finishing
1 parent c02600d commit 922b6ae

File tree

2 files changed

+113
-53
lines changed

2 files changed

+113
-53
lines changed

src/commands/send.js

Lines changed: 94 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import { formatProgressLine } from "../util/progress.js";
2020
import { closeTransport, waitForPeerClose } from "../util/rtc.js";
2121

2222
const CHUNK = 64 * 1024;
23+
const DEFAULT_FIN_ACK_TIMEOUT_MS = 45000;
24+
const DEFAULT_PEER_CLOSE_TIMEOUT_MS = 5000;
2325

2426
function deriveSendName(srcPath, opts) {
2527
if (opts?.name) return String(opts.name);
@@ -120,7 +122,7 @@ export async function run(paths, opts, ctx = {}) {
120122
} catch (err) {
121123
getLogger().debug("send: failed to announce telemetry ice-connected", err);
122124
}
123-
getLogger().debug("send: Zipping the contents...");
125+
getLogger().debug("send: Packing the contents...");
124126
if (tarPromise) {
125127
const { pack, totalSizeTar } = await tarPromise;
126128
sourceStream = pack;
@@ -167,48 +169,42 @@ export async function run(paths, opts, ctx = {}) {
167169

168170

169171
// Wait for FIN/OK from receiver or peer close
170-
const waitForFinAck = () =>
171-
new Promise((resolve) => {
172-
let settled = false;
173-
const offMsg = rtc.onMessage?.((m) => {
174-
if (settled) return;
175-
try {
176-
let msg = m;
177-
if (typeof m === "string") msg = JSON.parse(m);
178-
if (m instanceof Uint8Array || ArrayBuffer.isView(m)) {
179-
try {
180-
msg = JSON.parse(new TextDecoder().decode(m));
181-
} catch {}
182-
}
183-
const fin = msg ? parseStreamFin(msg) : null;
184-
if (fin && fin.sessionId === sessionId && fin.ok === true) {
185-
settled = true;
186-
offMsg?.();
187-
offClose?.();
188-
resolve();
189-
}
190-
} catch {}
191-
});
192-
const offClose = rtc.onClose?.(() => {
193-
if (settled) return;
194-
settled = true;
195-
offMsg?.();
196-
resolve();
197-
});
198-
setTimeout(() => {
199-
if (!settled) {
200-
settled = true;
201-
offMsg?.();
202-
offClose?.();
203-
resolve();
204-
}
205-
}, 3000);
206-
});
172+
const finAckTimeoutMs = normalizeTimeout(
173+
opts?.finAckTimeoutMs,
174+
DEFAULT_FIN_ACK_TIMEOUT_MS,
175+
);
176+
const peerCloseTimeoutMs = normalizeTimeout(
177+
opts?.peerCloseTimeoutMs,
178+
DEFAULT_PEER_CLOSE_TIMEOUT_MS,
179+
);
207180

208-
await waitForFinAck();
209-
// Drain bufferedAmount so wrtc doesn’t die on teardown
210-
// Let the peer close first to avoid races
211-
await waitForPeerClose(rtc, 1500);
181+
const finAckResult = await waitForFinAck({
182+
rtc,
183+
sessionId,
184+
timeoutMs: finAckTimeoutMs,
185+
});
186+
if (finAckResult === "timeout" && Number.isFinite(finAckTimeoutMs) && finAckTimeoutMs > 0) {
187+
getLogger().info(
188+
`send: timed out waiting for receiver FIN acknowledgement after ${finAckTimeoutMs} ms`,
189+
);
190+
} else if (finAckResult === "ack") {
191+
getLogger().debug("send: received receiver FIN acknowledgement");
192+
} else if (finAckResult === "peer-close") {
193+
getLogger().debug("send: data channel closed before FIN acknowledgement was observed");
194+
}
195+
196+
if (finAckResult !== "peer-close") {
197+
const peerCloseResult = await waitForPeerClose(rtc, peerCloseTimeoutMs);
198+
if (
199+
peerCloseResult === "timeout" &&
200+
Number.isFinite(peerCloseTimeoutMs) &&
201+
peerCloseTimeoutMs > 0
202+
) {
203+
getLogger().info(
204+
`send: timed out waiting for peer connection to close after ${peerCloseTimeoutMs} ms`,
205+
);
206+
}
207+
}
212208
process.stderr.write("\nDone • " + humanBytes(totalBytes) + "\n");
213209
// --- Silent workaround on success
214210
} finally {
@@ -225,6 +221,62 @@ export async function run(paths, opts, ctx = {}) {
225221
/* ------------------------------- utilities ------------------------------- */
226222

227223
const sleep = (ms) => new Promise((r) => setTimeout(r, ms));
224+
225+
function waitForFinAck({ rtc, sessionId, timeoutMs }) {
226+
return new Promise((resolve) => {
227+
let settled = false;
228+
let timer = null;
229+
let offMsg;
230+
let offClose;
231+
const finish = (reason) => {
232+
if (settled) return;
233+
settled = true;
234+
try {
235+
offMsg?.();
236+
} catch {}
237+
try {
238+
offClose?.();
239+
} catch {}
240+
if (timer) {
241+
clearTimeout(timer);
242+
}
243+
resolve(reason);
244+
};
245+
offMsg = rtc.onMessage?.((m) => {
246+
if (settled) return;
247+
try {
248+
let msg = m;
249+
if (typeof m === "string") msg = JSON.parse(m);
250+
if (m instanceof Uint8Array || ArrayBuffer.isView(m)) {
251+
try {
252+
msg = JSON.parse(new TextDecoder().decode(m));
253+
} catch {}
254+
}
255+
const fin = msg ? parseStreamFin(msg) : null;
256+
if (fin && fin.sessionId === sessionId && fin.ok === true) {
257+
finish("ack");
258+
}
259+
} catch {}
260+
});
261+
offClose = rtc.onClose?.(() => {
262+
finish("peer-close");
263+
});
264+
timer = Number.isFinite(timeoutMs) && timeoutMs > 0
265+
? setTimeout(() => {
266+
finish("timeout");
267+
}, timeoutMs)
268+
: null;
269+
});
270+
}
271+
272+
function normalizeTimeout(value, fallback) {
273+
if (value == null) return fallback;
274+
const ms = Number(value);
275+
if (!Number.isFinite(ms)) return fallback;
276+
if (ms < 0) return fallback;
277+
return ms;
278+
}
279+
228280
async function isRegularFile(p) {
229281
try {
230282
const st = await fsp.stat(p);

src/util/rtc.js

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,27 @@ import { flush, forceCloseNoFlush, scrubTransport } from "@noisytransfer/transpo
33
export function waitForPeerClose(rtc, ms = 1500) {
44
return new Promise((resolve) => {
55
let done = false;
6-
const off = rtc.onClose?.(() => {
7-
if (!done) {
8-
done = true;
9-
resolve();
10-
}
11-
});
12-
setTimeout(() => {
13-
if (!done) {
14-
done = true;
6+
let timer = null;
7+
let off;
8+
const finish = (reason) => {
9+
if (done) return;
10+
done = true;
11+
try {
1512
off?.();
16-
resolve();
13+
} catch {}
14+
if (timer) {
15+
clearTimeout(timer);
1716
}
18-
}, ms);
17+
resolve(reason);
18+
};
19+
off = rtc.onClose?.(() => {
20+
finish("closed");
21+
});
22+
timer = Number.isFinite(ms) && ms > 0
23+
? setTimeout(() => {
24+
finish("timeout");
25+
}, ms)
26+
: null;
1927
});
2028
}
2129

0 commit comments

Comments
 (0)