Skip to content

Commit 65d42ae

Browse files
(chore): remove unneccesary code and add mode tracking to the telemetry.
1 parent 3b4f033 commit 65d42ae

File tree

4 files changed

+42
-57
lines changed

4 files changed

+42
-57
lines changed

src/commands/recv.js

Lines changed: 5 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,10 @@ export async function run(outDir, opts, ctx = {}) {
165165
const throughputStr = humanThroughput(written, durationMs);
166166
const stats = sink.getStats?.() || {};
167167
const announced = Number.isFinite(stats.announced) ? stats.announced : null;
168+
const hasAnnounced =
169+
announced != null && (announced > 0 || (announced === 0 && written === 0));
168170
let announcedValue = "—";
169-
if (announced != null) {
171+
if (hasAnnounced) {
170172
const delta = announced - written;
171173
const deltaSuffix =
172174
delta === 0
@@ -202,7 +204,6 @@ export async function run(outDir, opts, ctx = {}) {
202204
// Return a structured result for programmatic use
203205
return {
204206
bytesWritten: written,
205-
announcedBytes: stats.announced ?? 0,
206207
label: stats.label ?? null,
207208
path: stats.filePath ?? null,
208209
mode,
@@ -242,7 +243,7 @@ function makeSniffingSink({ outToStdout, outPath, appID, overwrite, onStart, onP
242243
filePath,
243244
started = false,
244245
written = 0,
245-
announced = 0,
246+
announced = null,
246247
label = null,
247248
desiredName = null; // <- set by sink.info({ name }) (sanitize it)
248249

@@ -343,50 +344,4 @@ function makeSniffingSink({ outToStdout, outPath, appID, overwrite, onStart, onP
343344

344345
onProgress,
345346
};
346-
}
347-
348-
/**
349-
* Dumb sink: write everything to a single file (or stdout).
350-
* - If outToStdout is true, all data goes to process.stdout.
351-
* - Otherwise, if outPath is given it's treated as a FILE path.
352-
* - Otherwise we write ./nt-${appID || 'transfer'}.bin
353-
*/
354-
export function makeSniffingDumbSink({ outToStdout = false, outPath, appID = "transfer", overwrite = false, onProgress } = {}) {
355-
let stream;
356-
let filePath = outToStdout ? null : (outPath
357-
? path.resolve(outPath)
358-
: path.resolve(process.cwd(), `nt-${appID}.bin`)
359-
);
360-
let written = 0;
361-
362-
if (outToStdout) {
363-
stream = process.stdout;
364-
} else {
365-
// ensure the directory exists
366-
fs.mkdirSync(path.dirname(filePath), { recursive: true });
367-
stream = fs.createWriteStream(filePath, {
368-
flags: overwrite ? "w" : "wx", // "wx" -> error if exists
369-
});
370-
}
371-
372-
return {
373-
async write(chunk) {
374-
const buf = chunk instanceof Uint8Array ? chunk : Buffer.from(chunk);
375-
if (buf.byteLength === 0) return;
376-
377-
await new Promise((res, rej) => stream.write(buf, (e) => (e ? rej(e) : res())));
378-
written += buf.byteLength;
379-
if (onProgress) onProgress({ w: written, t: 0 });
380-
},
381-
382-
async close() {
383-
if (!stream || stream === process.stdout) return;
384-
await new Promise((res) => stream.end(res));
385-
},
386-
387-
// Optional helper for callers
388-
getStats() {
389-
return { written, filePath };
390-
},
391-
};
392-
}
347+
}

src/commands/send.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@ export async function run(paths, opts, ctx = {}) {
130130
getLogger().debug("send: RTC connected");
131131
let peerCloseObservedAt = null;
132132
try {
133-
signal.send?.({ type: "telemetry", event: "ice-connected", sessionId });
133+
const mode = opts.pq === true ? "pq" : "dtls";
134+
signal.send?.({ type: "telemetry", event: "ice-connected", sessionId, mode });
134135
getLogger().debug("send: announced telemetry ice-connected");
135136
} catch (err) {
136137
getLogger().debug("send: failed to announce telemetry ice-connected", err);

src/transfer/default.js

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -287,8 +287,18 @@ export async function defaultRecv(rtc, { sessionId, sink, onProgress, assumeYes
287287
// INIT
288288
const init = safe(() => parseStreamInit(m));
289289
if (init && init.sessionId === sessionId) {
290-
announced = Number(init.totalBytes) || 0;
291-
getLogger().debug(`DTLS: recv INIT totalBytes=${announced}`);
290+
const total = Number(init.totalBytes);
291+
if (Number.isFinite(total) && total >= 0) {
292+
announced = total;
293+
try {
294+
sink.info?.({ totalBytes: total });
295+
} catch {}
296+
} else {
297+
announced = null;
298+
}
299+
getLogger().debug(
300+
`DTLS: recv INIT totalBytes=${announced != null ? announced : "(none)"}`,
301+
);
292302
return;
293303
}
294304

@@ -313,10 +323,13 @@ export async function defaultRecv(rtc, { sessionId, sink, onProgress, assumeYes
313323
run(async () => {
314324
await sink.write(u8);
315325
written += u8.byteLength;
316-
if (written % 4096 === 0 || (announced && written === announced))
326+
if (
327+
written % 4096 === 0 ||
328+
(Number.isFinite(announced) && announced > 0 && written === announced)
329+
)
317330
getLogger().debug(`DTLS: recv DATA written=${written}`);
318331
try {
319-
onProgress?.(written, announced || 0);
332+
onProgress?.(written, announced ?? 0);
320333
} catch {}
321334
});
322335
return;
@@ -331,7 +344,7 @@ export async function defaultRecv(rtc, { sessionId, sink, onProgress, assumeYes
331344
if (queueErr) throw queueErr;
332345
}).finally(() => {
333346
offMsg?.();
334-
if (announced != null && announced !== 0 && written !== announced) {
347+
if (Number.isFinite(announced) && announced >= 0 && written !== announced) {
335348
getLogger().debug(`recv FIN mismatch written=${written} announced=${announced}`);
336349
rejectDone(
337350
new NoisyError({

src/transfer/pq.js

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,12 +285,28 @@ export async function pqRecv(rtc, { sessionId, sink, onProgress, assumeYes }) {
285285
// Import SPKI bytes to a CryptoKey for verification
286286
const verifyKey = await importVerifyKey(spkiBytes);
287287

288+
let announcedTotal = null;
289+
const handleProgress = (bytes, total) => {
290+
if (
291+
announcedTotal == null &&
292+
Number.isFinite(total) &&
293+
total >= 0
294+
) {
295+
announcedTotal = total;
296+
try {
297+
sinkStripping.info?.({ totalBytes: total });
298+
} catch {}
299+
}
300+
try {
301+
onProgress?.(bytes, total);
302+
} catch {}
303+
};
288304

289305
await recvFileWithAuth({
290306
tx: rtc,
291307
sessionId: streamSid(sessionId),
292308
sink: sinkStripping, // <- strip the meta header
293-
onProgress,
309+
onProgress: handleProgress,
294310
hpke: { ownPriv: kp.privateKey },
295311
sign: {
296312
// Prefer explicit verify key from noisyauth if present; otherwise the FIN carries SPKI

0 commit comments

Comments
 (0)