Skip to content

Commit c7b6c1b

Browse files
additional fixes for synchronizing the receiver with sender
1 parent 5e9149e commit c7b6c1b

File tree

3 files changed

+50
-25
lines changed

3 files changed

+50
-25
lines changed

src/commands/recv.js

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,24 +44,39 @@ export async function run(outDir, opts, ctx = {}) {
4444
});
4545
await signal.waitOpen?.(100000);
4646

47+
let sawSenderReadySignal = false;
48+
let notifySenderReady = null;
49+
const offSenderReady = signal.onMessage?.((msg) => {
50+
if (!msg || msg.type !== "sender_ready") return;
51+
if (msg.sessionId && sessionId && msg.sessionId !== sessionId) return;
52+
sawSenderReadySignal = true;
53+
const cb = notifySenderReady;
54+
notifySenderReady = null;
55+
try {
56+
cb?.();
57+
} catch {}
58+
});
59+
4760
const waitForSenderReady = ({ fallbackMs = 2_000 } = {}) =>
4861
new Promise((resolve) => {
62+
if (sawSenderReadySignal) {
63+
resolve(true);
64+
return;
65+
}
4966
let settled = false;
67+
let timer = null;
5068
const finish = (ready) => {
5169
if (settled) return;
5270
settled = true;
5371
clearTimeout(timer);
54-
try {
55-
off?.();
56-
} catch {}
72+
if (notifySenderReady === onReady) {
73+
notifySenderReady = null;
74+
}
5775
resolve(ready);
5876
};
59-
const off = signal.onMessage?.((msg) => {
60-
if (!msg || msg.type !== "sender_ready") return;
61-
if (msg.sessionId && sessionId && msg.sessionId !== sessionId) return;
62-
finish(true);
63-
});
64-
const timer = setTimeout(() => finish(false), fallbackMs);
77+
const onReady = () => finish(true);
78+
notifySenderReady = onReady;
79+
timer = setTimeout(() => finish(false), fallbackMs);
6580
});
6681

6782
let totalBytes = 0;
@@ -161,6 +176,9 @@ export async function run(outDir, opts, ctx = {}) {
161176
process.stderr.write(`\nError: ${msg}\n`);
162177
throw e;
163178
} finally {
179+
try {
180+
offSenderReady?.();
181+
} catch {}
164182
await closeTransport(rtc, {
165183
signal,
166184
waitForCloseMs: 0,

src/commands/send.js

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -121,24 +121,23 @@ export async function run(paths, opts, ctx = {}) {
121121
getLogger().debug("send: failed to announce telemetry ice-connected", err);
122122
}
123123

124+
if (tarPromise) {
125+
const { pack, totalSizeTar } = await tarPromise;
126+
sourceStream = pack;
127+
totalBytes = totalSizeTar;
128+
}
124129
try {
125-
if (tarPromise) {
126-
const { pack, totalSizeTar } = await tarPromise;
127-
sourceStream = pack;
128-
totalBytes = totalSizeTar;
129-
}
130+
signal.send?.({ type: "sender_ready", sessionId });
131+
getLogger().debug("send: announced sender_ready");
132+
} catch (err) {
133+
getLogger().debug("send: failed to announce sender_ready", err);
134+
}
130135

131-
try {
132-
signal.send?.({ type: "sender_ready", sessionId });
133-
getLogger().debug("send: announced sender_ready");
134-
} catch (err) {
135-
getLogger().debug("send: failed to announce sender_ready", err);
136-
}
136+
if (!Number.isInteger(totalBytes) || totalBytes <= 0) {
137+
throw new Error(`internal: computed totalBytes invalid (${totalBytes})`);
138+
}
137139

138-
if (!Number.isInteger(totalBytes) || totalBytes <= 0) {
139-
throw new Error(`internal: computed totalBytes invalid (${totalBytes})`);
140-
}
141-
140+
try {
142141
if (opts.pq) {
143142
//const rtcAuth = wrapAuthDC(rtc, { sessionId, label: "pq-auth-sender" });
144143
await pqSend(rtc, {

test/integration/cli_auto_mode_e2e.test.mjs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,15 @@ test(
212212
async () => {
213213
// Reuse the same directory across runs so we can observe conflicts/overwrite.
214214
const outDir = await fsp.mkdtemp(path.join(os.tmpdir(), "ntcli-bundle-"));
215-
const paths = [path.join(ROOT, "README.md"), path.join(ROOT, "LICENSE")];
215+
const srcTmp = await fsp.mkdtemp(path.join(os.tmpdir(), "ntcli-bundle-src-"));
216+
const paths = [
217+
path.join(srcTmp, "one.txt"),
218+
path.join(srcTmp, "two.txt"),
219+
];
220+
await Promise.all([
221+
fsp.writeFile(paths[0], "hello bundle one\n".repeat(8)),
222+
fsp.writeFile(paths[1], "hello bundle two\n".repeat(6)),
223+
]);
216224
const name = "bundle.tar";
217225
const tarPath = path.join(outDir, name);
218226

0 commit comments

Comments
 (0)