diff --git a/packages/graalvm/src/main/kotlin/elide/runtime/node/stream/NodeStreamPromises.kt b/packages/graalvm/src/main/kotlin/elide/runtime/node/stream/NodeStreamPromises.kt index 000639a864..349587b948 100644 --- a/packages/graalvm/src/main/kotlin/elide/runtime/node/stream/NodeStreamPromises.kt +++ b/packages/graalvm/src/main/kotlin/elide/runtime/node/stream/NodeStreamPromises.kt @@ -12,6 +12,7 @@ */ package elide.runtime.node.stream +import org.graalvm.polyglot.Value import org.graalvm.polyglot.proxy.ProxyExecutable import elide.annotations.Factory import elide.annotations.Singleton @@ -25,6 +26,8 @@ import elide.runtime.intrinsics.GuestIntrinsic.MutableIntrinsicBindings import elide.runtime.intrinsics.js.node.StreamPromisesAPI import elide.runtime.lang.javascript.NodeModuleName import elide.runtime.lang.javascript.asJsSymbolString +import elide.runtime.intrinsics.js.CompletableJsPromise +import elide.runtime.intrinsics.js.JsPromise as JsPromiseFactory // Internal symbol where the Node built-in module is installed. private val STREAM_PROMISES_MODULE_SYMBOL = "node_${NodeModuleName.STREAM_PROMISES.asJsSymbolString()}" @@ -55,12 +58,171 @@ private val ALL_PROMISES_PROPS = arrayOf( */ internal class NodeStreamPromises : ReadOnlyProxyObject, StreamPromisesAPI { // + private fun valueBooleanOrNull(obj: Value, name: String): Boolean? { + return try { + if (obj.hasMembers() && obj.hasMember(name)) { + val v = obj.getMember(name) + if (v.isBoolean) v.asBoolean() else null + } else null + } catch (_: Throwable) { null } + } + + private fun valueOrNull(obj: Value, name: String): Value? { + return try { + if (obj.hasMembers() && obj.hasMember(name)) obj.getMember(name) else null + } catch (_: Throwable) { null } + } + + private fun finished(stream: Value): CompletableJsPromise { + val promise: CompletableJsPromise = JsPromiseFactory() + + // If already errored, reject immediately. + valueOrNull(stream, "errored")?.let { err -> + if (!err.isNull) { + promise.reject(err) + return promise + } + } + + // If already ended/finished, resolve immediately. + val readableEnded = valueBooleanOrNull(stream, "readableEnded") == true + val writableFinished = valueBooleanOrNull(stream, "writableFinished") == true + if (readableEnded || writableFinished) { + promise.resolve(Unit) + return promise + } + + // Attach listeners. + val listeners = mutableListOf>() + fun addOnce(event: String, listener: ProxyExecutable) { + val v = Value.asValue(listener) + listeners += event to v + stream.invokeMember("once", event, v) + } + fun cleanup() { + if (stream.canInvokeMember("off")) { + listeners.forEach { (event, l) -> + try { + stream.invokeMember("off", event, l) + } catch (_: Throwable) { /* ignore */ } + } + } + listeners.clear() + } + + addOnce(StreamEventName.END, ProxyExecutable { + cleanup() + promise.resolve(Unit) + }) + addOnce(StreamEventName.FINISH, ProxyExecutable { + cleanup() + promise.resolve(Unit) + }) + addOnce(StreamEventName.ERROR, ProxyExecutable { args -> + cleanup() + promise.reject(args.getOrNull(0)) + }) + + return promise + } + + private fun pipeline(vararg streams: Value): CompletableJsPromise { + val promise: CompletableJsPromise = JsPromiseFactory() + + if (streams.isEmpty()) { + // Resolve immediately for empty pipeline. + promise.resolve(Unit) + return promise + } + + // Connect streams via .pipe() + try { + for (i in 0 until streams.size - 1) { + val src = streams[i] + val dest = streams[i + 1] + if (src.canInvokeMember("pipe")) { + src.invokeMember("pipe", dest) + } + } + } catch (t: Throwable) { + promise.reject(t) + return promise + } + + // Cleanup helpers + val listenerMap = HashMap>>(streams.size) + fun addOnce(target: Value, event: String, listener: ProxyExecutable) { + val v = Value.asValue(listener) + listenerMap.getOrPut(target) { mutableListOf() }.add(event to v) + target.invokeMember("once", event, v) + } + fun cleanupAll() { + streams.forEach { s -> + if (s.canInvokeMember("off")) { + listenerMap[s]?.forEach { (event, l) -> + try { + s.invokeMember("off", event, l) + } catch (_: Throwable) { /* ignore */ } + } + } + } + listenerMap.clear() + } + + // Reject on first error from any stream. + streams.forEach { s -> + if (s.canInvokeMember("once")) { + addOnce(s, StreamEventName.ERROR, ProxyExecutable { args -> + cleanupAll() + promise.reject(args.getOrNull(0)) + }) + } + } + + // Resolve when last stream finishes/ends. + val last = streams.last() + if (last.canInvokeMember("once")) { + addOnce(last, StreamEventName.END, ProxyExecutable { + cleanupAll() + promise.resolve(Unit) + }) + addOnce(last, StreamEventName.FINISH, ProxyExecutable { + cleanupAll() + promise.resolve(Unit) + }) + } + + // Handle already-completed/errored cases. + valueOrNull(last, "errored")?.let { err -> + if (!err.isNull) { + promise.reject(err) + return promise + } + } + val lastEnded = valueBooleanOrNull(last, "readableEnded") == true || + valueBooleanOrNull(last, "writableFinished") == true + if (lastEnded) { + promise.resolve(Unit) + return promise + } + + return promise + } override fun getMemberKeys(): Array = ALL_PROMISES_PROPS override fun getMember(key: String?): Any? = when (key) { - PIPELINE_FN -> ProxyExecutable { TODO("`stream/promises.pipeline` is not implemented yet") } - FINISHED_FN -> ProxyExecutable { TODO("`stream/promises.finished` is not implemented yet") } + PIPELINE_FN -> ProxyExecutable { args -> + @Suppress("SpreadOperator") + pipeline(*args) + } + FINISHED_FN -> ProxyExecutable { args -> + val stream = args.getOrNull(0) + ?: return@ProxyExecutable elide.runtime.intrinsics.js.JsPromise.rejected( + IllegalArgumentException("stream is required"), + ) + finished(stream) + } else -> null } diff --git a/packages/graalvm/src/test/kotlin/elide/runtime/node/NodeStreamPromisesTest.kt b/packages/graalvm/src/test/kotlin/elide/runtime/node/NodeStreamPromisesTest.kt index 2537c6492d..774612bb73 100644 --- a/packages/graalvm/src/test/kotlin/elide/runtime/node/NodeStreamPromisesTest.kt +++ b/packages/graalvm/src/test/kotlin/elide/runtime/node/NodeStreamPromisesTest.kt @@ -35,4 +35,606 @@ import elide.testing.annotations.TestCase @Test override fun testInjectable() { assertNotNull(stream) } + + @Test fun `finished resolves on end and cleans listeners`() { + executeGuest { + """ + const { finished } = require('node:stream/promises'); + + function mkStream() { + const events = Object.create(null); + return { + _events: events, + readableEnded: false, + writableFinished: false, + errored: null, + once(event, listener) { + (events[event] || (events[event] = [])).push({ listener, once: true }); + return this; + }, + off(event, listener) { + const arr = events[event]; + if (!arr) return this; + events[event] = arr.filter(e => e.listener !== listener); + return this; + }, + emit(event, ...args) { + const arr = events[event]; + if (!arr) return false; + for (const e of [...arr]) { + try { e.listener(...args); } finally { + if (e.once) this.off(event, e.listener); + } + } + return true; + }, + listenerCount(event) { return (events[event] ? events[event].length : 0); }, + pipe(dest) { this._dest = dest; return dest; }, + }; + } + + const s = mkStream(); + let resolved = false; + const p = finished(s); + test(typeof p?.then === 'function').shouldBeTrue('finished returns a promise'); + p.then(() => { + resolved = true; + test(s.listenerCount('end')).isEqualTo(0, 'end listener cleaned'); + test(s.listenerCount('finish')).isEqualTo(0, 'finish listener cleaned'); + test(s.listenerCount('error')).isEqualTo(0, 'error listener cleaned'); + }); + s.emit('end'); + Promise.resolve().then(() => { + test(resolved).shouldBeTrue('promise resolved after end'); + }); + """ + }.doesNotFail() + } + + @Test fun `finished resolves on finish and cleans listeners`() { + executeGuest { + """ + const { finished } = require('node:stream/promises'); + + function mkStream() { + const events = Object.create(null); + return { + _events: events, + readableEnded: false, + writableFinished: false, + errored: null, + once(event, listener) { + (events[event] || (events[event] = [])).push({ listener, once: true }); + return this; + }, + off(event, listener) { + const arr = events[event]; + if (!arr) return this; + events[event] = arr.filter(e => e.listener !== listener); + return this; + }, + emit(event, ...args) { + const arr = events[event]; + if (!arr) return false; + for (const e of [...arr]) { + try { e.listener(...args); } finally { + if (e.once) this.off(event, e.listener); + } + } + return true; + }, + listenerCount(event) { return (events[event] ? events[event].length : 0); }, + pipe(dest) { this._dest = dest; return dest; }, + }; + } + + const s = mkStream(); + let resolved = false; + const p = finished(s); + p.then(() => { + resolved = true; + test(s.listenerCount('end')).isEqualTo(0); + test(s.listenerCount('finish')).isEqualTo(0); + test(s.listenerCount('error')).isEqualTo(0); + }); + s.emit('finish'); + Promise.resolve().then(() => { + test(resolved).shouldBeTrue('promise resolved after finish'); + }); + """ + }.doesNotFail() + } + + @Test fun `finished rejects on error and cleans listeners`() { + executeGuest { + """ + const { finished } = require('node:stream/promises'); + + function mkStream() { + const events = Object.create(null); + return { + _events: events, + readableEnded: false, + writableFinished: false, + errored: null, + once(event, listener) { + (events[event] || (events[event] = [])).push({ listener, once: true }); + return this; + }, + off(event, listener) { + const arr = events[event]; + if (!arr) return this; + events[event] = arr.filter(e => e.listener !== listener); + return this; + }, + emit(event, ...args) { + const arr = events[event]; + if (!arr) return false; + for (const e of [...arr]) { + try { e.listener(...args); } finally { + if (e.once) this.off(event, e.listener); + } + } + return true; + }, + listenerCount(event) { return (events[event] ? events[event].length : 0); }, + pipe(dest) { this._dest = dest; return dest; }, + }; + } + + const s = mkStream(); + let rejected = false; + finished(s).then( + () => { throw new Error('should not resolve'); }, + (err) => { + rejected = true; + test(!!err).shouldBeTrue('error provided'); + test(s.listenerCount('end')).isEqualTo(0); + test(s.listenerCount('finish')).isEqualTo(0); + test(s.listenerCount('error')).isEqualTo(0); + } + ); + s.emit('error', new Error('boom')); + Promise.resolve().then(() => { + test(rejected).shouldBeTrue('promise rejected after error'); + }); + """ + }.doesNotFail() + } + + @Test fun `finished settles immediately if already ended`() { + executeGuest { + """ + const { finished } = require('node:stream/promises'); + + const s = { + readableEnded: true, + writableFinished: false, + errored: null, + once() { throw new Error('should not add listeners when already ended'); }, + off() { /* no-op */ }, + listenerCount() { return 0; }, + }; + let resolved = false; + finished(s).then(() => { resolved = true; }); + Promise.resolve().then(() => { + test(resolved).shouldBeTrue('resolved immediately'); + }); + """ + }.doesNotFail() + } + + @Test fun `finished rejects immediately if already errored`() { + executeGuest { + """ + const { finished } = require('node:stream/promises'); + + const s = { + readableEnded: false, + writableFinished: false, + errored: new Error('pre-error'), + once() { throw new Error('should not add listeners when already errored'); }, + off() { /* no-op */ }, + listenerCount() { return 0; }, + }; + let rejected = false; + finished(s).then( + () => { throw new Error('should not resolve'); }, + (err) => { rejected = true; test(!!err).shouldBeTrue('has error'); } + ); + Promise.resolve().then(() => { + test(rejected).shouldBeTrue('rejected immediately'); + }); + """ + }.doesNotFail() + } + + @Test fun `pipeline resolves on last end and cleans listeners`() { + executeGuest { + """ + const { pipeline } = require('node:stream/promises'); + + function mkStream() { + const events = Object.create(null); + return { + _events: events, + readableEnded: false, + writableFinished: false, + errored: null, + once(event, listener) { + (events[event] || (events[event] = [])).push({ listener, once: true }); + return this; + }, + off(event, listener) { + const arr = events[event]; + if (!arr) return this; + events[event] = arr.filter(e => e.listener !== listener); + return this; + }, + emit(event, ...args) { + const arr = events[event]; + if (!arr) return false; + for (const e of [...arr]) { + try { e.listener(...args); } finally { + if (e.once) this.off(event, e.listener); + } + } + return true; + }, + listenerCount(event) { return (events[event] ? events[event].length : 0); }, + pipe(dest) { this._dest = dest; return dest; }, + }; + } + + const s1 = mkStream(), s2 = mkStream(), s3 = mkStream(); + let resolved = false; + const p = pipeline(s1, s2, s3); + test(typeof p?.then === 'function').shouldBeTrue('pipeline returns a promise'); + // verify pipe chaining occurred + test(s1._dest === s2).shouldBeTrue('s1 piped to s2'); + test(s2._dest === s3).shouldBeTrue('s2 piped to s3'); + p.then(() => { + resolved = true; + [s1, s2, s3].forEach(s => test(s.listenerCount('error')).isEqualTo(0)); + test(s3.listenerCount('end')).isEqualTo(0); + test(s3.listenerCount('finish')).isEqualTo(0); + }); + s3.emit('end'); + Promise.resolve().then(() => { + test(resolved).shouldBeTrue('pipeline resolved on last end'); + }); + """ + }.doesNotFail() + } + + @Test fun `pipeline rejects on intermediate error and cleans listeners`() { + executeGuest { + """ + const { pipeline } = require('node:stream/promises'); + + function mkStream() { + const events = Object.create(null); + return { + _events: events, + readableEnded: false, + writableFinished: false, + errored: null, + once(event, listener) { + (events[event] || (events[event] = [])).push({ listener, once: true }); + return this; + }, + off(event, listener) { + const arr = events[event]; + if (!arr) return this; + events[event] = arr.filter(e => e.listener !== listener); + return this; + }, + emit(event, ...args) { + const arr = events[event]; + if (!arr) return false; + for (const e of [...arr]) { + try { e.listener(...args); } finally { + if (e.once) this.off(event, e.listener); + } + } + return true; + }, + listenerCount(event) { return (events[event] ? events[event].length : 0); }, + pipe(dest) { this._dest = dest; return dest; }, + }; + } + + const s1 = mkStream(), s2 = mkStream(), s3 = mkStream(); + let rejected = false; + pipeline(s1, s2, s3).then( + () => { throw new Error('should not resolve'); }, + (err) => { + rejected = true; + test(!!err).shouldBeTrue('error provided'); + [s1, s2, s3].forEach(s => test(s.listenerCount('error')).isEqualTo(0)); + test(s3.listenerCount('end')).isEqualTo(0); + test(s3.listenerCount('finish')).isEqualTo(0); + } + ); + s2.emit('error', new Error('EPIPE')); + Promise.resolve().then(() => { + test(rejected).shouldBeTrue('pipeline rejected on error'); + }); + """ + }.doesNotFail() + } + + @Test fun `pipeline settles immediately if last already ended`() { + executeGuest { + """ + const { pipeline } = require('node:stream/promises'); + + function mkStream() { + const events = Object.create(null); + return { + _events: events, + readableEnded: false, + writableFinished: false, + errored: null, + once(event, listener) { + (events[event] || (events[event] = [])).push({ listener, once: true }); + return this; + }, + off(event, listener) { + const arr = events[event]; + if (!arr) return this; + events[event] = arr.filter(e => e.listener !== listener); + return this; + }, + emit(event, ...args) { + const arr = events[event]; + if (!arr) return false; + for (const e of [...arr]) { + try { e.listener(...args); } finally { + if (e.once) this.off(event, e.listener); + } + } + return true; + }, + listenerCount(event) { return (events[event] ? events[event].length : 0); }, + pipe(dest) { this._dest = dest; return dest; }, + }; + } + + const s1 = mkStream(), s2 = mkStream(), s3 = mkStream(); + s3.readableEnded = true; + let resolved = false; + pipeline(s1, s2, s3).then(() => { resolved = true; }); + Promise.resolve().then(() => { + test(resolved).shouldBeTrue('pipeline resolved immediately on last ended'); + }); + """ + }.doesNotFail() + } + + @Test fun `pipeline rejects immediately if last already errored`() { + executeGuest { + """ + const { pipeline } = require('node:stream/promises'); + + function mkStream() { + const events = Object.create(null); + return { + _events: events, + readableEnded: false, + writableFinished: false, + errored: null, + once(event, listener) { + (events[event] || (events[event] = [])).push({ listener, once: true }); + return this; + }, + off(event, listener) { + const arr = events[event]; + if (!arr) return this; + events[event] = arr.filter(e => e.listener !== listener); + return this; + }, + emit(event, ...args) { + const arr = events[event]; + if (!arr) return false; + for (const e of [...arr]) { + try { e.listener(...args); } finally { + if (e.once) this.off(event, e.listener); + } + } + return true; + }, + listenerCount(event) { return (events[event] ? events[event].length : 0); }, + pipe(dest) { this._dest = dest; return dest; }, + }; + } + + const s1 = mkStream(), s2 = mkStream(), s3 = mkStream(); + s3.errored = new Error('pre-error'); + let rejected = false; + pipeline(s1, s2, s3).then( + () => { throw new Error('should not resolve'); }, + (err) => { rejected = true; test(!!err).shouldBeTrue('has error'); } + ); + Promise.resolve().then(() => { + test(rejected).shouldBeTrue('pipeline rejected immediately on last errored'); + }); + """ + }.doesNotFail() + } + + @Test fun `finished settles immediately if already finished (writable)`() { + executeGuest { + """ + const { finished } = require('node:stream/promises'); + const s = { + readableEnded: false, + writableFinished: true, + errored: null, + once() { throw new Error('should not add listeners when already finished'); }, + off() { /* no-op */ }, + listenerCount() { return 0; }, + }; + let resolved = false; + finished(s).then(() => { resolved = true; }); + Promise.resolve().then(() => { + test(resolved).shouldBeTrue('resolved immediately'); + }); + """ + }.doesNotFail() + } + + @Test fun `finished rejects immediately if errored even when ended`() { + executeGuest { + """ + const { finished } = require('node:stream/promises'); + const s = { + readableEnded: true, + writableFinished: false, + errored: new Error('pre-error'), + once() { throw new Error('should not add listeners when already errored'); }, + off() { /* no-op */ }, + listenerCount() { return 0; }, + }; + let rejected = false; + finished(s).then( + () => { throw new Error('should not resolve'); }, + (err) => { rejected = true; test(!!err).shouldBeTrue('has error'); }, + ); + Promise.resolve().then(() => { + test(rejected).shouldBeTrue('rejected immediately'); + }); + """ + }.doesNotFail() + } + + @Test fun `pipeline resolves with no streams`() { + executeGuest { + """ + const { pipeline } = require('node:stream/promises'); + let resolved = false; + const p = pipeline(); + test(typeof p?.then === 'function').shouldBeTrue('pipeline returns a promise'); + p.then(() => { resolved = true; }); + Promise.resolve().then(() => { + test(resolved).shouldBeTrue('resolved immediately for empty pipeline'); + }); + """ + }.doesNotFail() + } + + @Test fun `pipeline with single stream resolves on end and cleans listeners`() { + executeGuest { + """ + const { pipeline } = require('node:stream/promises'); + + function mkStream() { + const events = Object.create(null); + return { + _events: events, + readableEnded: false, + writableFinished: false, + errored: null, + once(event, listener) { + (events[event] || (events[event] = [])).push({ listener, once: true }); + return this; + }, + off(event, listener) { + const arr = events[event]; + if (!arr) return this; + events[event] = arr.filter(e => e.listener !== listener); + return this; + }, + emit(event, ...args) { + const arr = events[event]; + if (!arr) return false; + for (const e of [...arr]) { + try { e.listener(...args); } finally { + if (e.once) this.off(event, e.listener); + } + } + return true; + }, + listenerCount(event) { return (events[event] ? events[event].length : 0); }, + pipe(dest) { this._dest = dest; return dest; }, + }; + } + + const s = mkStream(); + let resolved = false; + const p = pipeline(s); + test(typeof p?.then === 'function').shouldBeTrue('pipeline returns a promise'); + p.then(() => { + resolved = true; + test(s.listenerCount('error')).isEqualTo(0); + test(s.listenerCount('end')).isEqualTo(0); + test(s.listenerCount('finish')).isEqualTo(0); + }); + s.emit('end'); + Promise.resolve().then(() => { + test(resolved).shouldBeTrue('pipeline resolved on single stream end'); + }); + """ + }.doesNotFail() + } + + @Test fun `pipeline rejects if a pipe call throws and does not attach listeners`() { + executeGuest { + """ + const { pipeline } = require('node:stream/promises'); + + function mkStream() { + const events = Object.create(null); + return { + _events: events, + readableEnded: false, + writableFinished: false, + errored: null, + once(event, listener) { + (events[event] || (events[event] = [])).push({ listener, once: true }); + return this; + }, + off(event, listener) { + const arr = events[event]; + if (!arr) return this; + events[event] = arr.filter(e => e.listener !== listener); + return this; + }, + emit(event, ...args) { + const arr = events[event]; + if (!arr) return false; + for (const e of [...arr]) { + try { e.listener(...args); } finally { + if (e.once) this.off(event, e.listener); + } + } + return true; + }, + listenerCount(event) { return (events[event] ? events[event].length : 0); }, + pipe(dest) { this._dest = dest; return dest; }, + }; + } + + const s1 = mkStream(); + const s2 = mkStream(); + // Force pipe() to throw on the first stream + s1.pipe = () => { throw new Error('fail'); }; + + let rejected = false; + const p = pipeline(s1, s2).then( + () => { throw new Error('should not resolve'); }, + () => { rejected = true; }, + ); + + Promise.resolve().then(() => { + test(rejected).shouldBeTrue('pipeline rejected when pipe throws'); + // No listeners should have been attached since rejection happened during piping + test(s1.listenerCount('error')).isEqualTo(0); + test(s2.listenerCount('error')).isEqualTo(0); + test(s2.listenerCount('end')).isEqualTo(0); + test(s2.listenerCount('finish')).isEqualTo(0); + }); + """ + }.doesNotFail() + } }