Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package elide.runtime.node.stream

import org.graalvm.polyglot.Value
import org.graalvm.polyglot.proxy.ProxyExecutable
import elide.annotations.Factory
import elide.annotations.Singleton
Expand All @@ -25,6 +26,8 @@ import elide.runtime.intrinsics.GuestIntrinsic.MutableIntrinsicBindings
import elide.runtime.intrinsics.js.node.StreamPromisesAPI
import elide.runtime.lang.javascript.NodeModuleName
import elide.runtime.lang.javascript.asJsSymbolString
import elide.runtime.intrinsics.js.CompletableJsPromise
import elide.runtime.intrinsics.js.JsPromise as JsPromiseFactory

// Internal symbol where the Node built-in module is installed.
private val STREAM_PROMISES_MODULE_SYMBOL = "node_${NodeModuleName.STREAM_PROMISES.asJsSymbolString()}"
Expand Down Expand Up @@ -55,12 +58,171 @@ private val ALL_PROMISES_PROPS = arrayOf(
*/
internal class NodeStreamPromises : ReadOnlyProxyObject, StreamPromisesAPI {
//
private fun valueBooleanOrNull(obj: Value, name: String): Boolean? {
return try {
if (obj.hasMembers() && obj.hasMember(name)) {
val v = obj.getMember(name)
if (v.isBoolean) v.asBoolean() else null
} else null
} catch (_: Throwable) { null }
}

private fun valueOrNull(obj: Value, name: String): Value? {
return try {
if (obj.hasMembers() && obj.hasMember(name)) obj.getMember(name) else null
} catch (_: Throwable) { null }
}

private fun finished(stream: Value): CompletableJsPromise<Unit> {
val promise: CompletableJsPromise<Unit> = JsPromiseFactory()

// If already errored, reject immediately.
valueOrNull(stream, "errored")?.let { err ->
if (!err.isNull) {
promise.reject(err)
return promise
}
}

// If already ended/finished, resolve immediately.
val readableEnded = valueBooleanOrNull(stream, "readableEnded") == true
val writableFinished = valueBooleanOrNull(stream, "writableFinished") == true
if (readableEnded || writableFinished) {
promise.resolve(Unit)
return promise
}

// Attach listeners.
val listeners = mutableListOf<Pair<String, Value>>()
fun addOnce(event: String, listener: ProxyExecutable) {
val v = Value.asValue(listener)
listeners += event to v
stream.invokeMember("once", event, v)
}
fun cleanup() {
if (stream.canInvokeMember("off")) {
listeners.forEach { (event, l) ->
try {
stream.invokeMember("off", event, l)
} catch (_: Throwable) { /* ignore */ }
}
}
listeners.clear()
}

addOnce(StreamEventName.END, ProxyExecutable {
cleanup()
promise.resolve(Unit)
})
addOnce(StreamEventName.FINISH, ProxyExecutable {
cleanup()
promise.resolve(Unit)
})
addOnce(StreamEventName.ERROR, ProxyExecutable { args ->
cleanup()
promise.reject(args.getOrNull(0))
})

return promise
}

private fun pipeline(vararg streams: Value): CompletableJsPromise<Unit> {
val promise: CompletableJsPromise<Unit> = JsPromiseFactory()

if (streams.isEmpty()) {
// Resolve immediately for empty pipeline.
promise.resolve(Unit)
return promise
}

// Connect streams via .pipe()
try {
for (i in 0 until streams.size - 1) {
val src = streams[i]
val dest = streams[i + 1]
if (src.canInvokeMember("pipe")) {
src.invokeMember("pipe", dest)
}
}
} catch (t: Throwable) {
promise.reject(t)
return promise
}

// Cleanup helpers
val listenerMap = HashMap<Value, MutableList<Pair<String, Value>>>(streams.size)
fun addOnce(target: Value, event: String, listener: ProxyExecutable) {
val v = Value.asValue(listener)
listenerMap.getOrPut(target) { mutableListOf() }.add(event to v)
target.invokeMember("once", event, v)
}
fun cleanupAll() {
streams.forEach { s ->
if (s.canInvokeMember("off")) {
listenerMap[s]?.forEach { (event, l) ->
try {
s.invokeMember("off", event, l)
} catch (_: Throwable) { /* ignore */ }
}
}
}
listenerMap.clear()
}

// Reject on first error from any stream.
streams.forEach { s ->
if (s.canInvokeMember("once")) {
addOnce(s, StreamEventName.ERROR, ProxyExecutable { args ->
cleanupAll()
promise.reject(args.getOrNull(0))
})
}
}

// Resolve when last stream finishes/ends.
val last = streams.last()
if (last.canInvokeMember("once")) {
addOnce(last, StreamEventName.END, ProxyExecutable {
cleanupAll()
promise.resolve(Unit)
})
addOnce(last, StreamEventName.FINISH, ProxyExecutable {
cleanupAll()
promise.resolve(Unit)
})
}

// Handle already-completed/errored cases.
valueOrNull(last, "errored")?.let { err ->
if (!err.isNull) {
promise.reject(err)
return promise
}
}
val lastEnded = valueBooleanOrNull(last, "readableEnded") == true ||
valueBooleanOrNull(last, "writableFinished") == true
if (lastEnded) {
promise.resolve(Unit)
return promise
}

return promise
}

override fun getMemberKeys(): Array<String> = ALL_PROMISES_PROPS

override fun getMember(key: String?): Any? = when (key) {
PIPELINE_FN -> ProxyExecutable { TODO("`stream/promises.pipeline` is not implemented yet") }
FINISHED_FN -> ProxyExecutable { TODO("`stream/promises.finished` is not implemented yet") }
PIPELINE_FN -> ProxyExecutable { args ->
@Suppress("SpreadOperator")
pipeline(*args)
}
FINISHED_FN -> ProxyExecutable { args ->
val stream = args.getOrNull(0)
?: return@ProxyExecutable elide.runtime.intrinsics.js.JsPromise.rejected<Unit>(
IllegalArgumentException("stream is required"),
)
finished(stream)
}
else -> null
}

Expand Down
Loading
Loading