Skip to content

Commit b6a271a

Browse files
committed
feat(node:stream/promises): implement finished and pipeline with immediate settle and listener cleanup
1 parent af2715f commit b6a271a

File tree

1 file changed

+164
-2
lines changed

1 file changed

+164
-2
lines changed

packages/graalvm/src/main/kotlin/elide/runtime/node/stream/NodeStreamPromises.kt

Lines changed: 164 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
*/
1313
package elide.runtime.node.stream
1414

15+
import org.graalvm.polyglot.Value
1516
import org.graalvm.polyglot.proxy.ProxyExecutable
1617
import elide.annotations.Factory
1718
import elide.annotations.Singleton
@@ -25,6 +26,8 @@ import elide.runtime.intrinsics.GuestIntrinsic.MutableIntrinsicBindings
2526
import elide.runtime.intrinsics.js.node.StreamPromisesAPI
2627
import elide.runtime.lang.javascript.NodeModuleName
2728
import elide.runtime.lang.javascript.asJsSymbolString
29+
import elide.runtime.intrinsics.js.CompletableJsPromise
30+
import elide.runtime.intrinsics.js.JsPromise as JsPromiseFactory
2831

2932
// Internal symbol where the Node built-in module is installed.
3033
private val STREAM_PROMISES_MODULE_SYMBOL = "node_${NodeModuleName.STREAM_PROMISES.asJsSymbolString()}"
@@ -55,12 +58,171 @@ private val ALL_PROMISES_PROPS = arrayOf(
5558
*/
5659
internal class NodeStreamPromises : ReadOnlyProxyObject, StreamPromisesAPI {
5760
//
61+
private fun valueBooleanOrNull(obj: Value, name: String): Boolean? {
62+
return try {
63+
if (obj.hasMembers() && obj.hasMember(name)) {
64+
val v = obj.getMember(name)
65+
if (v.isBoolean) v.asBoolean() else null
66+
} else null
67+
} catch (_: Throwable) { null }
68+
}
69+
70+
private fun valueOrNull(obj: Value, name: String): Value? {
71+
return try {
72+
if (obj.hasMembers() && obj.hasMember(name)) obj.getMember(name) else null
73+
} catch (_: Throwable) { null }
74+
}
75+
76+
private fun finished(stream: Value): CompletableJsPromise<Unit> {
77+
val promise: CompletableJsPromise<Unit> = JsPromiseFactory()
78+
79+
// If already errored, reject immediately.
80+
valueOrNull(stream, "errored")?.let { err ->
81+
if (!err.isNull) {
82+
promise.reject(err)
83+
return promise
84+
}
85+
}
86+
87+
// If already ended/finished, resolve immediately.
88+
val readableEnded = valueBooleanOrNull(stream, "readableEnded") == true
89+
val writableFinished = valueBooleanOrNull(stream, "writableFinished") == true
90+
if (readableEnded || writableFinished) {
91+
promise.resolve(Unit)
92+
return promise
93+
}
94+
95+
// Attach listeners.
96+
val listeners = mutableListOf<Pair<String, Value>>()
97+
fun addOnce(event: String, listener: ProxyExecutable) {
98+
val v = Value.asValue(listener)
99+
listeners += event to v
100+
stream.invokeMember("once", event, v)
101+
}
102+
fun cleanup() {
103+
if (stream.canInvokeMember("off")) {
104+
listeners.forEach { (event, l) ->
105+
try {
106+
stream.invokeMember("off", event, l)
107+
} catch (_: Throwable) { /* ignore */ }
108+
}
109+
}
110+
listeners.clear()
111+
}
112+
113+
addOnce(StreamEventName.END, ProxyExecutable {
114+
cleanup()
115+
promise.resolve(Unit)
116+
})
117+
addOnce(StreamEventName.FINISH, ProxyExecutable {
118+
cleanup()
119+
promise.resolve(Unit)
120+
})
121+
addOnce(StreamEventName.ERROR, ProxyExecutable { args ->
122+
cleanup()
123+
promise.reject(args.getOrNull(0))
124+
})
125+
126+
return promise
127+
}
128+
129+
private fun pipeline(vararg streams: Value): CompletableJsPromise<Unit> {
130+
val promise: CompletableJsPromise<Unit> = JsPromiseFactory()
131+
132+
if (streams.isEmpty()) {
133+
// Resolve immediately for empty pipeline.
134+
promise.resolve(Unit)
135+
return promise
136+
}
137+
138+
// Connect streams via .pipe()
139+
try {
140+
for (i in 0 until streams.size - 1) {
141+
val src = streams[i]
142+
val dest = streams[i + 1]
143+
if (src.canInvokeMember("pipe")) {
144+
src.invokeMember("pipe", dest)
145+
}
146+
}
147+
} catch (t: Throwable) {
148+
promise.reject(t)
149+
return promise
150+
}
151+
152+
// Cleanup helpers
153+
val listenerMap = HashMap<Value, MutableList<Pair<String, Value>>>(streams.size)
154+
fun addOnce(target: Value, event: String, listener: ProxyExecutable) {
155+
val v = Value.asValue(listener)
156+
listenerMap.getOrPut(target) { mutableListOf() }.add(event to v)
157+
target.invokeMember("once", event, v)
158+
}
159+
fun cleanupAll() {
160+
streams.forEach { s ->
161+
if (s.canInvokeMember("off")) {
162+
listenerMap[s]?.forEach { (event, l) ->
163+
try {
164+
s.invokeMember("off", event, l)
165+
} catch (_: Throwable) { /* ignore */ }
166+
}
167+
}
168+
}
169+
listenerMap.clear()
170+
}
171+
172+
// Reject on first error from any stream.
173+
streams.forEach { s ->
174+
if (s.canInvokeMember("once")) {
175+
addOnce(s, StreamEventName.ERROR, ProxyExecutable { args ->
176+
cleanupAll()
177+
promise.reject(args.getOrNull(0))
178+
})
179+
}
180+
}
181+
182+
// Resolve when last stream finishes/ends.
183+
val last = streams.last()
184+
if (last.canInvokeMember("once")) {
185+
addOnce(last, StreamEventName.END, ProxyExecutable {
186+
cleanupAll()
187+
promise.resolve(Unit)
188+
})
189+
addOnce(last, StreamEventName.FINISH, ProxyExecutable {
190+
cleanupAll()
191+
promise.resolve(Unit)
192+
})
193+
}
194+
195+
// Handle already-completed/errored cases.
196+
valueOrNull(last, "errored")?.let { err ->
197+
if (!err.isNull) {
198+
promise.reject(err)
199+
return promise
200+
}
201+
}
202+
val lastEnded = valueBooleanOrNull(last, "readableEnded") == true ||
203+
valueBooleanOrNull(last, "writableFinished") == true
204+
if (lastEnded) {
205+
promise.resolve(Unit)
206+
return promise
207+
}
208+
209+
return promise
210+
}
58211

59212
override fun getMemberKeys(): Array<String> = ALL_PROMISES_PROPS
60213

61214
override fun getMember(key: String?): Any? = when (key) {
62-
PIPELINE_FN -> ProxyExecutable { TODO("`stream/promises.pipeline` is not implemented yet") }
63-
FINISHED_FN -> ProxyExecutable { TODO("`stream/promises.finished` is not implemented yet") }
215+
PIPELINE_FN -> ProxyExecutable { args ->
216+
@Suppress("SpreadOperator")
217+
pipeline(*args)
218+
}
219+
FINISHED_FN -> ProxyExecutable { args ->
220+
val stream = args.getOrNull(0)
221+
?: return@ProxyExecutable elide.runtime.intrinsics.js.JsPromise.rejected<Unit>(
222+
IllegalArgumentException("stream is required"),
223+
)
224+
finished(stream)
225+
}
64226
else -> null
65227
}
66228

0 commit comments

Comments
 (0)