Skip to content

Commit 5e9149e

Browse files
(fix) fix race condition when a sender was receiving messages uninitialized from receiver
1 parent bc8c090 commit 5e9149e

File tree

3 files changed

+82
-5
lines changed

3 files changed

+82
-5
lines changed

src/commands/recv.js

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,26 @@ export async function run(outDir, opts, ctx = {}) {
4444
});
4545
await signal.waitOpen?.(100000);
4646

47+
const waitForSenderReady = ({ fallbackMs = 2_000 } = {}) =>
48+
new Promise((resolve) => {
49+
let settled = false;
50+
const finish = (ready) => {
51+
if (settled) return;
52+
settled = true;
53+
clearTimeout(timer);
54+
try {
55+
off?.();
56+
} catch {}
57+
resolve(ready);
58+
};
59+
const off = signal.onMessage?.((msg) => {
60+
if (!msg || msg.type !== "sender_ready") return;
61+
if (msg.sessionId && sessionId && msg.sessionId !== sessionId) return;
62+
finish(true);
63+
});
64+
const timer = setTimeout(() => finish(false), fallbackMs);
65+
});
66+
4767
let totalBytes = 0;
4868
let written = 0;
4969
let startedAt = 0;
@@ -92,7 +112,13 @@ export async function run(outDir, opts, ctx = {}) {
92112
} else {
93113
mode = "dtls";
94114
}
95-
await sleep(2_000); // give the sender some headstart after seeing room_full
115+
const sawReadySignal = await waitForSenderReady();
116+
if (sawReadySignal) {
117+
getLogger().debug("recv: sender_ready signal received; skipping headstart delay");
118+
} else {
119+
getLogger().debug("recv: sender_ready signal missing; continuing after fallback delay");
120+
}
121+
96122
getLogger().debug("recv: selected mode =", mode);
97123

98124
try {
@@ -172,10 +198,24 @@ function makeSniffingSink({ outToStdout, outPath, appID, overwrite, onStart, onP
172198
// baseName is already sanitized to a leaf; join is fine and clearer.
173199
let p = path.join(targetDir, baseName);
174200
if (!overwrite && fs.existsSync(p)) {
175-
const stem = path.basename(baseName, path.extname(baseName));
176201
const ext = path.extname(baseName);
202+
const stem = path.basename(baseName, ext);
177203
let i = 1;
178-
while (fs.existsSync(p)) p = path.join(targetDir, `${stem}-${i++}${ext}`);
204+
let dedup;
205+
do {
206+
dedup = path.join(targetDir, `${stem}-${i++}${ext}`);
207+
} while (fs.existsSync(dedup));
208+
try {
209+
fs.renameSync(p, dedup);
210+
getLogger().debug(
211+
`resolveTargetPath: renamed existing ${baseName} -> ${path.basename(dedup)}`,
212+
);
213+
} catch (err) {
214+
getLogger().info(
215+
`resolveTargetPath: failed to rename existing ${baseName}: ${err?.message || err}`,
216+
);
217+
p = dedup;
218+
}
179219
}
180220
return p;
181221
}

src/commands/send.js

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,15 +114,27 @@ export async function run(paths, opts, ctx = {}) {
114114
// Attach low-level logger BEFORE wiring auth/stream
115115
const detachTap = attachRawLogger(rtc, { label: "low" });
116116
getLogger().debug("send: RTC connected");
117-
118117
try {
118+
signal.send?.({ type: "telemetry", event: "ice-connected", sessionId });
119+
getLogger().debug("send: announced telemetry ice-connected");
120+
} catch (err) {
121+
getLogger().debug("send: failed to announce telemetry ice-connected", err);
122+
}
119123

124+
try {
120125
if (tarPromise) {
121126
const { pack, totalSizeTar } = await tarPromise;
122127
sourceStream = pack;
123128
totalBytes = totalSizeTar;
124129
}
125130

131+
try {
132+
signal.send?.({ type: "sender_ready", sessionId });
133+
getLogger().debug("send: announced sender_ready");
134+
} catch (err) {
135+
getLogger().debug("send: failed to announce sender_ready", err);
136+
}
137+
126138
if (!Number.isInteger(totalBytes) || totalBytes <= 0) {
127139
throw new Error(`internal: computed totalBytes invalid (${totalBytes})`);
128140
}

test/integration/cli_auto_mode_e2e.test.mjs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,19 +32,43 @@ const WAIT_FILE_MS = 12_000;
3232
async function waitForNewOrUpdatedFile(
3333
dir,
3434
prev = new Map(),
35-
{ timeoutMs = WAIT_FILE_MS, stableMs = 300 } = {},
35+
{ timeoutMs = WAIT_FILE_MS, stableMs = 300, preferredBasename } = {},
3636
) {
3737
const deadline = Date.now() + timeoutMs;
3838
let lastName = null,
3939
lastSize = -1,
4040
lastT = 0;
41+
const preferredName = preferredBasename ? String(preferredBasename) : null;
42+
const preferredPath = preferredName ? path.join(dir, preferredName) : null;
43+
let preferredSize = -1;
44+
let preferredT = 0;
45+
let preferredActive = false;
4146
while (Date.now() < deadline) {
4247
let names;
4348
try {
4449
names = await fsp.readdir(dir);
4550
} catch {
4651
names = [];
4752
}
53+
if (preferredPath) {
54+
let st;
55+
try {
56+
st = await fsp.stat(preferredPath);
57+
} catch {}
58+
if (st) {
59+
const prevInfo = prev.get(preferredName);
60+
const changed = !prevInfo || st.mtimeMs !== prevInfo.mtimeMs || st.size !== prevInfo.size;
61+
if (changed) {
62+
if (preferredActive && st.size === preferredSize) {
63+
if (Date.now() - preferredT >= stableMs) return preferredPath;
64+
} else {
65+
preferredActive = true;
66+
preferredSize = st.size;
67+
preferredT = Date.now();
68+
}
69+
}
70+
}
71+
}
4872
for (const name of names) {
4973
const p = path.join(dir, name);
5074
let st;
@@ -126,6 +150,7 @@ async function runCase({
126150
assert.equal(rcode, 0, "receiver should exit cleanly (code 0)");
127151
const outPath = await waitForNewOrUpdatedFile(outDir, prev, {
128152
timeoutMs: 2000, // grace period after clean exit
153+
preferredBasename: expectedBasename,
129154
});
130155

131156
if (expectedBasename) {

0 commit comments

Comments
 (0)