Skip to content

Commit 86ca58e

Browse files
chore(hotfix): apply DTLS SCTP backpressure to the datachannel
1 parent 2f3c4d3 commit 86ca58e

File tree

1 file changed

+20
-0
lines changed

1 file changed

+20
-0
lines changed

src/transfer/default.js

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,22 @@ import {
1515
import { getLogger } from "../util/logger.js";
1616
import { confirmSAS } from "../core/sas-confirm.js";
1717

18+
// --- Back-pressure (congestion-cooperation) settings ---
19+
// Pause sending whenever the underlying RTC DataChannel queue grows too large.
20+
// Tune via env NT_SEND_HIGH_WATER (bytes). Default: 1 MiB.
21+
const SEND_HIGH_WATER = Number(process.env.NT_SEND_HIGH_WATER || (1 * 1024 * 1024));
22+
23+
async function applyBackpressure(rtc) {
24+
try {
25+
if (typeof rtc?.bufferedAmount === "number" && typeof rtc?.flush === "function") {
26+
if (rtc.bufferedAmount > SEND_HIGH_WATER) {
27+
await rtc.flush();
28+
}
29+
}
30+
} catch {}
31+
}
32+
33+
1834
/**
1935
* Try to read DTLS fingerprints for a short window; return null if not ready.
2036
* @returns {{local:{alg:string,bytes:Uint8Array},remote:{alg:string,bytes:Uint8Array}}|null}
@@ -157,6 +173,7 @@ export async function defaultSend(
157173
throw new Error("defaultSend: totalBytes must be a positive integer");
158174
const init = packStreamInit({ sessionId, totalBytes: total });
159175
rtc.send(init);
176+
await applyBackpressure(rtc);
160177

161178
// 1a) Optional filename: embed as first data frame (encrypted by DTLS)
162179
// [ 4 bytes magic = 'N' 'T' 'M' '1' ] [ 1 byte nameLen ] [ name UTF-8 bytes ]
@@ -165,6 +182,7 @@ export async function defaultSend(
165182
try {
166183
const header = buildMetaHeader(name);
167184
rtc.send(packStreamData({ sessionId, seq, chunk: header }));
185+
await applyBackpressure(rtc);
168186
seq += 1;
169187
} catch {}
170188
}
@@ -175,6 +193,7 @@ export async function defaultSend(
175193
const u8 = chunk instanceof Uint8Array ? chunk : Buffer.from(chunk);
176194
if (!u8.byteLength) continue;
177195
rtc.send(packStreamData({ sessionId, seq, chunk: u8 }));
196+
await applyBackpressure(rtc);
178197
sent += u8.byteLength;
179198
seq += 1;
180199
onProgress?.(Math.min(sent, total), total);
@@ -183,6 +202,7 @@ export async function defaultSend(
183202
// 3) FIN with ok=true if sizes match, else ok=false
184203
const ok = sent === total;
185204
rtc.send(packStreamFin({ sessionId, ok }));
205+
await applyBackpressure(rtc);
186206

187207
// 4) Flush if supported (mirrors tests' "flush" semantics)
188208
if (typeof rtc.flush === "function") {

0 commit comments

Comments
 (0)