From ada6916da8066969d9de1f720b70cead20778d7d Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 6 Jan 2026 19:39:47 +0100 Subject: [PATCH] stream: sync resume on drain Inside drain context we are already async and don't need to defer resume. --- benchmark/streams/pipe-backpressure.js | 24 ++++++++++++++++++++++++ lib/internal/streams/readable.js | 25 ++++++++++++++----------- 2 files changed, 38 insertions(+), 11 deletions(-) create mode 100644 benchmark/streams/pipe-backpressure.js diff --git a/benchmark/streams/pipe-backpressure.js b/benchmark/streams/pipe-backpressure.js new file mode 100644 index 00000000000000..07d87ec7565545 --- /dev/null +++ b/benchmark/streams/pipe-backpressure.js @@ -0,0 +1,24 @@ +'use strict'; + +const common = require('../common'); +const { Readable, Writable } = require('stream'); + +const bench = common.createBenchmark(main, { + n: [5e6], +}); + +function main({ n }) { + const b = Buffer.alloc(1024); + const r = new Readable(); + const w = new Writable(); + + let i = 0; + + r._read = () => r.push(i++ === n ? null : b); + w._write = (data, enc, cb) => queueMicrotask(cb); + + bench.start(); + + r.pipe(w); + w.on('finish', () => bench.end(n)); +} diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 97fe9dc6f60c2f..d76a0e191def62 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -1091,7 +1091,7 @@ function pipeOnDrain(src, dest) { if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) && (state[kState] & kDataListening) !== 0) { - src.resume(); + resume(stream, state, dest.listenerCount('drain') <= 1); } }; } @@ -1232,7 +1232,10 @@ function nReadingNextTick(self) { // pause() and resume() are remnants of the legacy readable stream API // If the user uses them, then switch into old mode. Readable.prototype.resume = function() { - const state = this._readableState; + return resume(this, this._readableState, false); +}; + +function resume(stream, state, sync) { if ((state[kState] & kFlowing) === 0) { debug('resume'); // We flow only if there is no one listening @@ -1244,18 +1247,18 @@ Readable.prototype.resume = function() { } else { state[kState] &= ~kFlowing; } - resume(this, state); + if ((state[kState] & kResumeScheduled) === 0) { + if (sync) { + resume_(stream, state, false); + } else { + state[kState] |= kResumeScheduled; + process.nextTick(resume_, stream, state); + } + } } state[kState] |= kHasPaused; state[kState] &= ~kPaused; - return this; -}; - -function resume(stream, state) { - if ((state[kState] & kResumeScheduled) === 0) { - state[kState] |= kResumeScheduled; - process.nextTick(resume_, stream, state); - } + return stream; } function resume_(stream, state) {