Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
class CommandsProcessPerfTest {

private static final String COMMAND_NAME = "app.command.test";
private static final int messageCount = 40000;
private static final int MESSAGE_COUNT = 40000;
private static final Semaphore semaphore = new Semaphore(0);
private static final CountDownLatch latch = new CountDownLatch(12 + 1);

Expand All @@ -41,18 +41,18 @@ class CommandsProcessPerfTest {
@Test
void commandShouldArrive() throws InterruptedException {
final long init_p = System.currentTimeMillis();
createMessages(messageCount);
createMessages(MESSAGE_COUNT);
final long end_p = System.currentTimeMillis() - init_p;
System.out.println("Total Publication Time: " + end_p + "ms");

latch.countDown();
final long init = System.currentTimeMillis();
semaphore.acquire(messageCount);
semaphore.acquire(MESSAGE_COUNT);
final long end = System.currentTimeMillis();

final long total = end - init;
final double microsPerMessage = ((total + 0.0) / messageCount) * 1000;
System.out.println("Message count: " + messageCount);
final double microsPerMessage = ((total + 0.0) / MESSAGE_COUNT) * 1000;
System.out.println("Message count: " + MESSAGE_COUNT);
System.out.println("Total Execution Time: " + total + "ms");
System.out.println("Microseconds per message: " + microsPerMessage + "us");
if (System.getProperty("env.ci") == null) {
Expand Down Expand Up @@ -82,7 +82,10 @@ public static void main(String[] args) {

@Bean
public HandlerRegistry registry() {
final HandlerRegistry registry = range(0, 20).reduce(HandlerRegistry.register(), (r, i) -> r.handleCommand("app.command.name" + i, message -> Mono.empty(), Map.class)).block();
final HandlerRegistry registry = range(0, 20)
.reduce(HandlerRegistry.register(), (r, i) ->
r.handleCommand("app.command.name" + i, message -> Mono.empty(), Map.class))
.block();
return registry
.handleCommand(COMMAND_NAME, this::handleSimple, DummyMessage.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
class DirectGatewayPerfTest {

private static final String COMMAND_NAME = "app.command.test";
private static final int messageCount = 40000;
private static final int MESSAGE_COUNT = 40000;
private static final Semaphore semaphore = new Semaphore(0);

@Autowired
Expand All @@ -36,17 +36,17 @@ class DirectGatewayPerfTest {

@Test
void shouldSendInOptimalTime() throws InterruptedException {
final Flux<Command<DummyMessage>> messages = createMessages(messageCount);
final Flux<Command<DummyMessage>> messages = createMessages(MESSAGE_COUNT);
final Flux<Void> target = messages.flatMap(dummyMessageCommand ->
gateway.sendCommand(dummyMessageCommand, appName)
.doOnSuccess(aVoid -> semaphore.release()));

final long init = System.currentTimeMillis();
target.subscribe();
semaphore.acquire(messageCount);
semaphore.acquire(MESSAGE_COUNT);
final long end = System.currentTimeMillis();

assertMessageThroughput(end - init, messageCount, 200);
assertMessageThroughput(end - init, MESSAGE_COUNT, 200);
}

@Test
Expand All @@ -67,8 +67,10 @@ void shouldSendBatchInOptimalTime1Channel() throws InterruptedException {
private void shouldSendBatchInOptimalTimeNChannels(int channels) throws InterruptedException {
List<Mono<Void>> subs = new ArrayList<>(channels);
for (int i = 0; i < channels; ++i) {
final Flux<Command<DummyMessage>> messages = createMessages(messageCount / channels);
final Mono<Void> target = gateway.sendCommands(messages, appName).then().doOnSuccess(_v -> semaphore.release());
final Flux<Command<DummyMessage>> messages = createMessages(MESSAGE_COUNT / channels);
final Mono<Void> target = gateway.sendCommands(messages, appName)
.then()
.doOnSuccess(_v -> semaphore.release());
subs.add(target);
}

Expand All @@ -79,7 +81,7 @@ private void shouldSendBatchInOptimalTimeNChannels(int channels) throws Interrup
final long end = System.currentTimeMillis();

final long total = end - init;
assertMessageThroughput(total, messageCount, 230);
assertMessageThroughput(total, MESSAGE_COUNT, 230);
}

private void assertMessageThroughput(long total, long messageCount, int reqMicrosPerMessage) {
Expand All @@ -94,7 +96,9 @@ private void assertMessageThroughput(long total, long messageCount, int reqMicro
}

private Flux<Command<DummyMessage>> createMessages(int count) {
final List<Command<DummyMessage>> commands = IntStream.range(0, count).mapToObj(value -> new Command<>(COMMAND_NAME, UUID.randomUUID().toString(), new DummyMessage())).collect(Collectors.toList());
final List<Command<DummyMessage>> commands = IntStream.range(0, count)
.mapToObj(value -> new Command<>(COMMAND_NAME, UUID.randomUUID().toString(), new DummyMessage()))
.collect(Collectors.toList());
return Flux.fromIterable(commands);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.publisher.Sinks;
import reactor.test.StepVerifier;

import java.time.Duration;
Expand All @@ -35,28 +35,26 @@ class DynamicRegistryTest {

@Test
void shouldReceiveResponse() {
UnicastProcessor<String> result = UnicastProcessor.create();
DomainEventHandler<String> fn = message -> fromRunnable(() -> result.onNext(message.getData()));
Sinks.Many<String> result = Sinks.many().unicast().onBackpressureBuffer();
DomainEventHandler<String> fn = message -> fromRunnable(
() -> result.emitNext(message.getData(), Sinks.EmitFailureHandler.FAIL_FAST)
);

dynamicRegistry.listenEvent("test.event", fn, String.class).block();
final Publisher<Void> emit = eventBus.emit(new DomainEvent<>("test.event", "42", "Hello"));
from(emit).block();

StepVerifier.create(result.next().timeout(Duration.ofSeconds(10)))
StepVerifier.create(result.asFlux().next().timeout(Duration.ofSeconds(10)))
.expectNext("Hello")
.verifyComplete();


}


@SpringBootApplication
@EnableMessageListeners
@EnableDomainEventBus
static class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
class QueryProcessPerfTest {

private static final String QUERY_NAME = "app.command.test";
private static final int messageCount = 40000;
private static final int MESSAGE_COUNT = 40000;
private static final Semaphore semaphore = new Semaphore(0);
private static final AtomicLong atomicLong = new AtomicLong(0);
private static final CountDownLatch latch = new CountDownLatch(12 + 1);
Expand All @@ -44,19 +44,20 @@ class QueryProcessPerfTest {

@Test
void serveQueryPerformanceTest() throws InterruptedException {
final Flux<AsyncQuery<DummyMessage>> messages = createMessages(messageCount);
final Flux<AsyncQuery<DummyMessage>> messages = createMessages(MESSAGE_COUNT);

final long init = System.currentTimeMillis();
messages
.flatMap(dummyMessageAsyncQuery -> gateway.requestReply(dummyMessageAsyncQuery, appName, DummyMessage.class)
.doOnNext(s -> semaphore.release())
.flatMap(dummyMessageAsyncQuery ->
gateway.requestReply(dummyMessageAsyncQuery, appName, DummyMessage.class)
.doOnNext(s -> semaphore.release())
)
.subscribe();
semaphore.acquire(messageCount);
semaphore.acquire(MESSAGE_COUNT);
final long end = System.currentTimeMillis();

final long total = end - init;
assertMessageThroughput(total, messageCount, 200);
assertMessageThroughput(total, MESSAGE_COUNT, 200);
}

private void assertMessageThroughput(long total, long messageCount, int reqMicrosPerMessage) {
Expand All @@ -72,7 +73,9 @@ private void assertMessageThroughput(long total, long messageCount, int reqMicro


private Flux<AsyncQuery<DummyMessage>> createMessages(int count) {
final List<AsyncQuery<DummyMessage>> queryList = IntStream.range(0, count).mapToObj(_v -> new AsyncQuery<>(QUERY_NAME, new DummyMessage())).collect(Collectors.toList());
final List<AsyncQuery<DummyMessage>> queryList = IntStream.range(0, count)
.mapToObj(_v -> new AsyncQuery<>(QUERY_NAME, new DummyMessage()))
.collect(Collectors.toList());
return Flux.fromIterable(queryList);
}

Expand All @@ -87,7 +90,11 @@ public static void main(String[] args) {

@Bean
public HandlerRegistry registry() {
final HandlerRegistry registry = range(0, 20).reduce(HandlerRegistry.register(), (r, i) -> r.handleCommand("app.command.name" + i, message -> Mono.empty(), Map.class)).block();
final HandlerRegistry registry = range(0, 20)
.reduce(HandlerRegistry.register(), (r, i) -> r.handleCommand(
"app.command.name" + i, message -> Mono.empty(), Map.class
))
.block();
return registry
.serveQuery(QUERY_NAME, this::handleSimple, DummyMessage.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.publisher.Sinks;
import reactor.test.StepVerifier;

import java.time.Duration;
Expand All @@ -36,55 +36,52 @@ class SimpleDirectCommunicationTest {
@Value("${spring.application.name}")
private String appName;

@Autowired
private UnicastProcessor<Command<Long>> listener;

private String commandId = ThreadLocalRandom.current().nextInt() + "";
private Long data = ThreadLocalRandom.current().nextLong();
private final String commandId = ThreadLocalRandom.current().nextInt() + "";
private final Long data = ThreadLocalRandom.current().nextLong();

@Test
void commandShouldArrive() {
Command<Long> command = new Command<>(COMMAND_NAME, commandId, data);
gateway.sendCommand(command, appName).subscribe();
Sinks.Many<Command<Long>> listener = Sinks.many().unicast().onBackpressureBuffer();

StepVerifier.create(listener.next()).assertNext(cmd -> {
StepVerifier.create(listener.asFlux().next()).assertNext(cmd -> {
assertThat(cmd).extracting(Command::getCommandId, Command::getData, Command::getName)
.containsExactly(commandId, data, COMMAND_NAME);
.containsExactly(commandId, data, COMMAND_NAME);
}).verifyComplete();
}

@Test
void shouldReceiveResponse() {
final Mono<Integer> reply = gateway.requestReply(new AsyncQuery<>("double", 42), appName, Integer.class);
StepVerifier.create(reply.timeout(Duration.ofSeconds(15)))
.expectNext(42*2)
.verifyComplete();
.expectNext(42 * 2)
.verifyComplete();
}


@SpringBootApplication
@EnableDirectAsyncGateway
@EnableMessageListeners
static class App{
static class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}

@Bean
public HandlerRegistry registry(UnicastProcessor<Command<Long>> listener) {
public HandlerRegistry registry(Sinks.Many<Command<Long>> listener) {
return HandlerRegistry.register()
.serveQuery("double", rqt -> just(rqt*2), Long.class)
.handleCommand(COMMAND_NAME, handle(listener), Long.class);
.serveQuery("double", rqt -> just(rqt * 2), Long.class)
.handleCommand(COMMAND_NAME, handle(listener), Long.class);
}

@Bean
public UnicastProcessor<Command<Long>> listener() {
return UnicastProcessor.create();
public Sinks.Many<Command<Long>> listener() {
return Sinks.many().unicast().onBackpressureBuffer();
}

private DomainCommandHandler<Long> handle(UnicastProcessor<Command<Long>> listener) {
private DomainCommandHandler<Long> handle(Sinks.Many<Command<Long>> listener) {
return command -> {
listener.onNext(command);
listener.emitNext(command, Sinks.EmitFailureHandler.FAIL_FAST);
return empty();
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.publisher.Sinks;
import reactor.test.StepVerifier;

import java.util.concurrent.ThreadLocalRandom;
Expand All @@ -30,17 +30,15 @@ class SimpleEventNotificationTest {
@Autowired
private DomainEventBus eventBus;

@Autowired
private UnicastProcessor<DomainEvent<Long>> listener;

private String eventId = ThreadLocalRandom.current().nextInt() + "";
private Long data = ThreadLocalRandom.current().nextLong();
private final String eventId = ThreadLocalRandom.current().nextInt() + "";
private final Long data = ThreadLocalRandom.current().nextLong();

@Test
void shouldReceiveEvent() throws InterruptedException {
DomainEvent<?> event = new DomainEvent<>(EVENT_NAME, eventId, data);
Sinks.Many<DomainEvent<Long>> listener = Sinks.many().unicast().onBackpressureBuffer();
from(eventBus.emit(event)).subscribe();
StepVerifier.create(listener.take(1)).assertNext(evt ->
StepVerifier.create(listener.asFlux().take(1)).assertNext(evt ->
assertThat(evt).extracting(DomainEvent::getName, DomainEvent::getEventId, DomainEvent::getData)
.containsExactly(EVENT_NAME, eventId, data)
).verifyComplete();
Expand All @@ -56,20 +54,20 @@ public static void main(String[] args) {
}

@Bean
public HandlerRegistry registry(UnicastProcessor<DomainEvent<Long>> listener) {
public HandlerRegistry registry(Sinks.Many<DomainEvent<Long>> listener) {
return HandlerRegistry.register()
.serveQuery("double", rqt -> just(rqt * 2), Long.class)
.listenEvent(EVENT_NAME, handle(listener), Long.class);
}

@Bean
public UnicastProcessor<DomainEvent<Long>> listener() {
return UnicastProcessor.create();
public Sinks.Many<DomainEvent<Long>> listener() {
return Sinks.many().unicast().onBackpressureBuffer();
}

private DomainEventHandler<Long> handle(UnicastProcessor<DomainEvent<Long>> listener) {
private DomainEventHandler<Long> handle(Sinks.Many<DomainEvent<Long>> listener) {
return command -> {
listener.onNext(command);
listener.emitNext(command, Sinks.EmitFailureHandler.FAIL_FAST);
return empty();
};
}
Expand Down
Loading