Skip to content

Commit 2ef9c12

Browse files
(fix) remove the noisystream packing for default dtls
1 parent bb57ad2 commit 2ef9c12

File tree

2 files changed

+27
-43
lines changed

2 files changed

+27
-43
lines changed

src/commands/recv.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,6 @@ export async function run(outDir, opts, ctx = {}) {
196196
{ label: "Bytes", value: humanBytes(written) },
197197
{ label: "Duration", value: formatDuration(durationMs) },
198198
{ label: "Throughput", value: throughputStr },
199-
{ label: "Announced", value: announcedValue },
200199
{ label: "Destination", value: destinationInfo },
201200
],
202201
});

src/transfer/default.js

Lines changed: 27 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,7 @@ import { createAuthSender, createAuthReceiver } from "@noisytransfer/noisyauth";
44
import { NoisyError } from "@noisytransfer/errors";
55
import { unb64u as b64uToBytes } from "@noisytransfer/util";
66
import { buildMetaHeader, stripMetaHeader } from "./meta-header.js";
7-
import {
8-
packStreamInit,
9-
packStreamData,
10-
packStreamFin,
11-
parseStreamInit,
12-
parseStreamData,
13-
parseStreamFin,
14-
} from "@noisytransfer/noisystream/frames";
7+
import { packStreamInit, packStreamFin, parseStreamInit, parseStreamFin } from "@noisytransfer/noisystream/frames";
158
import { getLogger } from "../util/logger.js";
169
import { confirmSAS } from "../core/sas-confirm.js";
1710
import { MAX_STREAM_CHUNK_BYTES } from "./constants.js";
@@ -196,28 +189,24 @@ export async function defaultSend(
196189

197190
// 1a) Optional filename: embed as first data frame (encrypted by DTLS)
198191
// [ 4 bytes magic = 'N' 'T' 'M' '1' ] [ 1 byte nameLen ] [ name UTF-8 bytes ]
199-
let seq = 0;
200192
if (name) {
201193
try {
202194
const header = buildMetaHeader(name);
203195
for (const slice of splitStreamChunk(header)) {
204-
rtc.send(packStreamData({ sessionId, seq, chunk: slice }));
196+
rtc.send(slice);
205197
await applyBackpressure(rtc);
206-
seq += 1;
207198
}
208199
} catch {}
209200
}
210201

211202
// 2) Stream data frames (ns_data)
212-
let sent = 0;
213203
for await (const chunk of toAsyncIterable(source)) {
214204
const u8 = chunk instanceof Uint8Array ? chunk : Buffer.from(chunk);
215205
if (!u8.byteLength) continue;
216206
for (const slice of splitStreamChunk(u8)) {
217-
rtc.send(packStreamData({ sessionId, seq, chunk: slice }));
207+
rtc.send(slice);
218208
await applyBackpressure(rtc);
219209
sent += slice.byteLength;
220-
seq += 1;
221210
onProgress?.(Math.min(sent, total), total);
222211
}
223212
onProgress?.(Math.min(sent, total), total);
@@ -280,46 +269,25 @@ export async function defaultRecv(rtc, { sessionId, sink, onProgress, assumeYes
280269
}
281270

282271
const offMsg = rtc.onMessage?.((raw) => {
283-
const m = toObjectMessage(raw);
284-
if (!m) return;
285-
286272
try {
287-
// INIT
288-
const init = safe(() => parseStreamInit(m));
289-
if (init && init.sessionId === sessionId) {
290-
const total = Number(init.totalBytes);
291-
if (Number.isFinite(total) && total >= 0) {
292-
announced = total;
293-
try {
294-
sink.info?.({ totalBytes: total });
295-
} catch {}
296-
} else {
297-
announced = null;
298-
}
299-
getLogger().debug(
300-
`DTLS: recv INIT totalBytes=${announced != null ? announced : "(none)"}`,
301-
);
302-
return;
303-
}
304-
305-
// DATA
306-
const data = safe(() => parseStreamData(m));
307-
if (data && data.sessionId === sessionId) {
308-
let u8 = data.chunk instanceof Uint8Array ? data.chunk : new Uint8Array(data.chunk);
273+
// Data frames are expected to arrive as binary payloads (Uint8Array or views).
274+
const maybeBinary = toUint8(raw);
275+
if (maybeBinary) {
276+
let u8 = maybeBinary;
309277
if (!metaSeen) {
310278
const info = stripMetaHeader(u8);
311279
if (info) {
312280
metaSeen = true;
313-
// announce filename without touching totalBytes
314281
try {
315282
sink.info?.({ name: info.name });
316283
} catch {}
317-
u8 = info.data; // write only payload portion
284+
u8 = info.data;
318285
getLogger().debug("DTLS: recv META name=", info.name);
319286
} else {
320-
metaSeen = true; // first data had no header; avoid re-checking later
287+
metaSeen = true;
321288
}
322289
}
290+
if (!u8?.byteLength) return;
323291
run(async () => {
324292
await sink.write(u8);
325293
written += u8.byteLength;
@@ -334,6 +302,16 @@ export async function defaultRecv(rtc, { sessionId, sink, onProgress, assumeYes
334302
});
335303
return;
336304
}
305+
const m = toObjectMessage(raw);
306+
if (!m) return;
307+
308+
// INIT
309+
const init = safe(() => parseStreamInit(m));
310+
if (init && init.sessionId === sessionId) {
311+
announced = Number(init.totalBytes) || 0;
312+
getLogger().debug(`DTLS: recv INIT totalBytes=${announced}`);
313+
return;
314+
}
337315

338316
// FIN
339317
const fin = safe(() => parseStreamFin(m));
@@ -415,3 +393,10 @@ function safe(fn) {
415393
return null;
416394
}
417395
}
396+
397+
function toUint8(raw) {
398+
if (raw instanceof Uint8Array) return raw;
399+
if (ArrayBuffer.isView(raw)) return new Uint8Array(raw.buffer, raw.byteOffset, raw.byteLength);
400+
if (raw instanceof ArrayBuffer) return new Uint8Array(raw);
401+
return null;
402+
}

0 commit comments

Comments
 (0)