- 
                Notifications
    
You must be signed in to change notification settings  - Fork 5
 
Description
The FiberInterop library allows creating an RxJava Flowable stream easily, as demonstrated in the following example:
var executor = Executors.newVirtualThreadPerTaskExecutor();
var flow = FiberInterop.create(emitter -> {
    for (var i = 0; i < 10; i++) {
        Thread.sleep(500);
        emitter.emit(i);
    }
}, executor);
flow.blockingForEach(value -> {
    System.out.println(value);
});This example works perfectly for simple sequential flow generation.
However, issues arise when concurrency is introduced into the flow generation process. For example:
var executor = Executors.newVirtualThreadPerTaskExecutor();
var taskCount = 10;
while (true) {
    var flow = FiberInterop.create(emitter -> {
        try (var scope = Executors.newVirtualThreadPerTaskExecutor()) {
            var latch = new CountDownLatch(1);
            for (var i = 0; i < taskCount; i++) {
                final var taskId = i;
                scope.submit(() -> {
                    try {
                        latch.await();
                        emitter.emit(taskId);
                    } catch (Throwable e) {
                        throw new RuntimeException(e);
                    }
                });
            }
            latch.countDown();
        }
    }, executor);
    var size = flow.toList().blockingGet().size();
    if (size < taskCount) {
        System.out.println(STR."Expected \{taskCount} but got \{size}");
        break;
    }
}As can be seen from the example above, the create method does not handle concurrency, leading to a situation where the resulting flow size can be less than taskCount. This behavior is not documented, which can confuse users who might expect concurrent generation to work seamlessly.
To address this issue, it would be beneficial to differentiate between methods designed for simple non-concurrent workflows and those that handle concurrency by default. Specifically:
- Introduce a 
generatemethod. This method would be intended for simple, non-concurrent workflows. It would clearly indicate to users that it is not designed for concurrent flow generation. - Enhance the 
createmethod. Modify thecreatemethod to handle concurrency properly by default. This would involve ensuring that concurrent tasks are managed and their emissions are correctly handled to prevent missing items. 
By introducing the generate method and enhancing the create method to handle concurrency, users would have the opportunity to select an appropriate approach for handling their tasks.
This would provide functionality similar to the distinction between flow and channelFlow in Kotlin.