- 
                Notifications
    
You must be signed in to change notification settings  - Fork 138
 
Infinite Streams in SimpleReact
(Since :- SimpleReact v0.2 / deprecated SimpleReact doesn't support infinite Streaming, use LazyReact to build infinite LazyFutureStreams instead)
By default SimpleReact processes data flows eagerly. As soon as new SimpleReact().react(()->1,()->2) is executed, data is moving concurrently through the system. As of SimpleReact v0.2, it is possible to create a lazy data flow, that will only start when .run() is called or an eager method from the Streams API on the underlying Stream. (e.g. http://www.drdobbs.com/jvm/lambdas-and-streams-in-java-8-libraries/240166818)
To take advantage of SimpleReact’s support for infinite Streams, use :-
private volatile int count=0;
..
SimpleReact.lazy().reactInfinitely( ()-> count++).run();
This will construct a Lazy Stream [0,1,2,3,4,5,6 and so on].
You can also import Infinite Streams from the Stream API
Stream<String> infinite = Stream.generate(()->count++).map(it -> “*”+it);
SimpleReact.lazy()
                    .fromStreamWithoutFutures(infinite)
                    .then(it -> it +”!”)
                    .peek(it-> System.out.println(it))
                    .run();
This will write *0! *1! *2! .. etc ..
to the console
To limit the infinite Stream you can use limit
Stream<String> infinite = Stream.generate(()->count++).map(it -> “*”+it).limit(2);
SimpleReact.lazy()
                    .fromStreamWithoutFutures(infinite)
                    .then(it -> it +”!”)
                    .peek(it-> System.out.println(it))
                    .run(); //run blocks current thread
This will write *0! *1!
to the console.
You can use one of the Datastructures provided by SimpleReact to create an Infinite Stream of CompletableFutures and use it to seed a SimpleReact data flow. E.g.
Queue queue = new Queue();  //SimpleReact Queue
SimpleReact.lazy()
                    .fromStream(queue.streamOfCompletableFutures())
                    .then(data -> transformData(data))
                    .then(entity -> saveToDb(entity))
                    .run(new ForkJoinPool(1));  //run does not block current thread
As long as the Data structure is held open, SimpleReact will react to any data added to the Queue. E.g.
 queue.add(“hello world”);
or
 queue.fromStream(Stream.of(1,2,3,4,5,6));
This will allow you to join together 2 (or more!) potentially infinite Streams of data / processing.
oops - my bad