Skip to content

Commit 2a34df2

Browse files
committed
feat(node): merge PR elide-dev#1618 content; resolve stream/promises conflicts favoring completed finished/pipeline semantics
2 parents 47363b1 + b6a271a commit 2a34df2

File tree

2 files changed

+765
-1
lines changed

2 files changed

+765
-1
lines changed

packages/graalvm/src/main/kotlin/elide/runtime/node/stream/NodeStreamPromises.kt

Lines changed: 163 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
*/
1313
package elide.runtime.node.stream
1414

15+
import org.graalvm.polyglot.Value
1516
import org.graalvm.polyglot.proxy.ProxyExecutable
1617
import elide.annotations.Factory
1718
import elide.annotations.Singleton
@@ -26,7 +27,8 @@ import elide.runtime.intrinsics.js.JsPromise
2627
import elide.runtime.intrinsics.js.node.StreamPromisesAPI
2728
import elide.runtime.lang.javascript.NodeModuleName
2829
import elide.runtime.lang.javascript.asJsSymbolString
29-
import org.graalvm.polyglot.Value
30+
import elide.runtime.intrinsics.js.CompletableJsPromise
31+
import elide.runtime.intrinsics.js.JsPromise as JsPromiseFactory
3032

3133
// Internal symbol where the Node built-in module is installed.
3234
private val STREAM_PROMISES_MODULE_SYMBOL = "node_${NodeModuleName.STREAM_PROMISES.asJsSymbolString()}"
@@ -57,11 +59,162 @@ private val ALL_PROMISES_PROPS = arrayOf(
5759
*/
5860
internal class NodeStreamPromises : ReadOnlyProxyObject, StreamPromisesAPI {
5961
//
62+
private fun valueBooleanOrNull(obj: Value, name: String): Boolean? {
63+
return try {
64+
if (obj.hasMembers() && obj.hasMember(name)) {
65+
val v = obj.getMember(name)
66+
if (v.isBoolean) v.asBoolean() else null
67+
} else null
68+
} catch (_: Throwable) { null }
69+
}
70+
71+
private fun valueOrNull(obj: Value, name: String): Value? {
72+
return try {
73+
if (obj.hasMembers() && obj.hasMember(name)) obj.getMember(name) else null
74+
} catch (_: Throwable) { null }
75+
}
76+
77+
private fun finished(stream: Value): CompletableJsPromise<Unit> {
78+
val promise: CompletableJsPromise<Unit> = JsPromiseFactory()
79+
80+
// If already errored, reject immediately.
81+
valueOrNull(stream, "errored")?.let { err ->
82+
if (!err.isNull) {
83+
promise.reject(err)
84+
return promise
85+
}
86+
}
87+
88+
// If already ended/finished, resolve immediately.
89+
val readableEnded = valueBooleanOrNull(stream, "readableEnded") == true
90+
val writableFinished = valueBooleanOrNull(stream, "writableFinished") == true
91+
if (readableEnded || writableFinished) {
92+
promise.resolve(Unit)
93+
return promise
94+
}
95+
96+
// Attach listeners.
97+
val listeners = mutableListOf<Pair<String, Value>>()
98+
fun addOnce(event: String, listener: ProxyExecutable) {
99+
val v = Value.asValue(listener)
100+
listeners += event to v
101+
stream.invokeMember("once", event, v)
102+
}
103+
fun cleanup() {
104+
if (stream.canInvokeMember("off")) {
105+
listeners.forEach { (event, l) ->
106+
try {
107+
stream.invokeMember("off", event, l)
108+
} catch (_: Throwable) { /* ignore */ }
109+
}
110+
}
111+
listeners.clear()
112+
}
113+
114+
addOnce(StreamEventName.END, ProxyExecutable {
115+
cleanup()
116+
promise.resolve(Unit)
117+
})
118+
addOnce(StreamEventName.FINISH, ProxyExecutable {
119+
cleanup()
120+
promise.resolve(Unit)
121+
})
122+
addOnce(StreamEventName.ERROR, ProxyExecutable { args ->
123+
cleanup()
124+
promise.reject(args.getOrNull(0))
125+
})
126+
127+
return promise
128+
}
129+
130+
private fun pipeline(vararg streams: Value): CompletableJsPromise<Unit> {
131+
val promise: CompletableJsPromise<Unit> = JsPromiseFactory()
132+
133+
if (streams.isEmpty()) {
134+
// Resolve immediately for empty pipeline.
135+
promise.resolve(Unit)
136+
return promise
137+
}
138+
139+
// Connect streams via .pipe()
140+
try {
141+
for (i in 0 until streams.size - 1) {
142+
val src = streams[i]
143+
val dest = streams[i + 1]
144+
if (src.canInvokeMember("pipe")) {
145+
src.invokeMember("pipe", dest)
146+
}
147+
}
148+
} catch (t: Throwable) {
149+
promise.reject(t)
150+
return promise
151+
}
152+
153+
// Cleanup helpers
154+
val listenerMap = HashMap<Value, MutableList<Pair<String, Value>>>(streams.size)
155+
fun addOnce(target: Value, event: String, listener: ProxyExecutable) {
156+
val v = Value.asValue(listener)
157+
listenerMap.getOrPut(target) { mutableListOf() }.add(event to v)
158+
target.invokeMember("once", event, v)
159+
}
160+
fun cleanupAll() {
161+
streams.forEach { s ->
162+
if (s.canInvokeMember("off")) {
163+
listenerMap[s]?.forEach { (event, l) ->
164+
try {
165+
s.invokeMember("off", event, l)
166+
} catch (_: Throwable) { /* ignore */ }
167+
}
168+
}
169+
}
170+
listenerMap.clear()
171+
}
172+
173+
// Reject on first error from any stream.
174+
streams.forEach { s ->
175+
if (s.canInvokeMember("once")) {
176+
addOnce(s, StreamEventName.ERROR, ProxyExecutable { args ->
177+
cleanupAll()
178+
promise.reject(args.getOrNull(0))
179+
})
180+
}
181+
}
182+
183+
// Resolve when last stream finishes/ends.
184+
val last = streams.last()
185+
if (last.canInvokeMember("once")) {
186+
addOnce(last, StreamEventName.END, ProxyExecutable {
187+
cleanupAll()
188+
promise.resolve(Unit)
189+
})
190+
addOnce(last, StreamEventName.FINISH, ProxyExecutable {
191+
cleanupAll()
192+
promise.resolve(Unit)
193+
})
194+
}
195+
196+
// Handle already-completed/errored cases.
197+
valueOrNull(last, "errored")?.let { err ->
198+
if (!err.isNull) {
199+
promise.reject(err)
200+
return promise
201+
}
202+
}
203+
val lastEnded = valueBooleanOrNull(last, "readableEnded") == true ||
204+
valueBooleanOrNull(last, "writableFinished") == true
205+
if (lastEnded) {
206+
promise.resolve(Unit)
207+
return promise
208+
}
209+
210+
return promise
211+
}
60212

61213
override fun getMemberKeys(): Array<String> = ALL_PROMISES_PROPS
62214

63215
override fun getMember(key: String?): Any? = when (key) {
64216
PIPELINE_FN -> ProxyExecutable { args ->
217+
65218
val streams = args.toList()
66219
val promise = JsPromise<Unit>()
67220
if (streams.isEmpty()) return@ProxyExecutable promise.also { it.resolve(Unit) }
@@ -134,6 +287,15 @@ internal class NodeStreamPromises : ReadOnlyProxyObject, StreamPromisesAPI {
134287
on.execute("finish", doneCb)
135288
on.execute("error", errCb)
136289
promise
290+
@Suppress("SpreadOperator")
291+
pipeline(*args)
292+
}
293+
FINISHED_FN -> ProxyExecutable { args ->
294+
val stream = args.getOrNull(0)
295+
?: return@ProxyExecutable elide.runtime.intrinsics.js.JsPromise.rejected<Unit>(
296+
IllegalArgumentException("stream is required"),
297+
)
298+
finished(stream)
137299
}
138300
else -> null
139301
}

0 commit comments

Comments
 (0)