From 7d365491bfb336e81417245a548ae1663d8d6d86 Mon Sep 17 00:00:00 2001 From: Juan C Galvis <8420868+juancgalvis@users.noreply.github.com> Date: Thu, 10 Apr 2025 11:18:18 -0500 Subject: [PATCH 1/3] feat(open-multi-domain-api): Allow use any domain as source or destination --- .../async/api/HandlerRegistry.java | 122 ++++++++++++------ .../async/api/handlers/RawCommandHandler.java | 6 + .../registered/RegisteredDomainHandlers.java | 18 +++ .../async/api/HandlerRegistryTest.java | 24 ++-- .../async/commons/HandlerResolver.java | 15 ++- .../async/commons/HandlerResolverBuilder.java | 121 ----------------- .../resolver/HandlerResolverBuilder.java | 106 +++++++++++++++ .../utils/resolver/HandlerResolverUtil.java | 61 --------- .../ApplicationCommandListenerPerfTest.java | 2 + .../ListenerReporterTestSuperClass.java | 7 +- .../java/sample/SampleRestController.java | 12 +- .../src/main/resources/application.yaml | 2 +- .../ReactiveCommonsListenersConfig.java | 2 +- .../async/rabbit/RabbitMQBrokerProvider.java | 70 +++++----- .../rabbit/RabbitMQBrokerProviderTest.java | 4 +- 15 files changed, 297 insertions(+), 275 deletions(-) create mode 100644 async/async-commons-api/src/main/java/org/reactivecommons/async/api/handlers/RawCommandHandler.java create mode 100644 async/async-commons-api/src/main/java/org/reactivecommons/async/api/handlers/registered/RegisteredDomainHandlers.java delete mode 100644 async/async-commons/src/main/java/org/reactivecommons/async/commons/HandlerResolverBuilder.java create mode 100644 async/async-commons/src/main/java/org/reactivecommons/async/commons/utils/resolver/HandlerResolverBuilder.java delete mode 100644 async/async-commons/src/main/java/org/reactivecommons/async/commons/utils/resolver/HandlerResolverUtil.java diff --git a/async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java index 71d2e8e5..cc08ddca 100644 --- a/async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java +++ b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java @@ -11,26 +11,29 @@ import org.reactivecommons.async.api.handlers.DomainEventHandler; import org.reactivecommons.async.api.handlers.QueryHandler; import org.reactivecommons.async.api.handlers.QueryHandlerDelegate; +import org.reactivecommons.async.api.handlers.RawCommandHandler; import org.reactivecommons.async.api.handlers.RawEventHandler; import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler; +import org.reactivecommons.async.api.handlers.registered.RegisteredDomainHandlers; import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler; import java.lang.reflect.ParameterizedType; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @Getter @NoArgsConstructor(access = AccessLevel.PACKAGE) public final class HandlerRegistry { public static final String DEFAULT_DOMAIN = "app"; - private final Map>> domainEventListeners = new ConcurrentHashMap<>(); - private final List> dynamicEventHandlers = new CopyOnWriteArrayList<>(); - private final List> eventNotificationListener = new CopyOnWriteArrayList<>(); - private final List> handlers = new CopyOnWriteArrayList<>(); - private final List> commandHandlers = new CopyOnWriteArrayList<>(); + private final RegisteredDomainHandlers> domainEventListeners = + new RegisteredDomainHandlers<>(); + private final RegisteredDomainHandlers> dynamicEventHandlers = + new RegisteredDomainHandlers<>(); + private final RegisteredDomainHandlers> eventNotificationListener = + new RegisteredDomainHandlers<>(); + private final RegisteredDomainHandlers> handlers = new RegisteredDomainHandlers<>(); + private final RegisteredDomainHandlers> commandHandlers = + new RegisteredDomainHandlers<>(); public static HandlerRegistry register() { @@ -39,90 +42,136 @@ public static HandlerRegistry register() { return instance; } + //events: DomainEvent + public HandlerRegistry listenEvent(String eventName, DomainEventHandler handler, Class eventClass) { + return listenDomainEvent(DEFAULT_DOMAIN, eventName, handler, eventClass); + } + public HandlerRegistry listenDomainEvent(String domain, String eventName, DomainEventHandler handler, Class eventClass) { - domainEventListeners.computeIfAbsent(domain, ignored -> new CopyOnWriteArrayList<>()) - .add(new RegisteredEventListener<>(eventName, handler, eventClass)); + domainEventListeners.add(domain, new RegisteredEventListener<>(eventName, handler, eventClass)); return this; } - public HandlerRegistry listenDomainCloudEvent(String domain, String eventName, CloudEventHandler handler) { - domainEventListeners.computeIfAbsent(domain, ignored -> new CopyOnWriteArrayList<>()) - .add(new RegisteredEventListener<>(eventName, handler, CloudEvent.class)); - return this; + // events: CloudEvent + public HandlerRegistry listenCloudEvent(String eventName, CloudEventHandler handler) { + return listenDomainCloudEvent(DEFAULT_DOMAIN, eventName, handler); } - public HandlerRegistry listenDomainRawEvent(String domain, String eventName, RawEventHandler handler) { - domainEventListeners.computeIfAbsent(domain, ignored -> new CopyOnWriteArrayList<>()) - .add(new RegisteredEventListener<>(eventName, handler, RawMessage.class)); + public HandlerRegistry listenDomainCloudEvent(String domain, String eventName, CloudEventHandler handler) { + domainEventListeners.add(domain, new RegisteredEventListener<>(eventName, handler, CloudEvent.class)); return this; } - public HandlerRegistry listenEvent(String eventName, DomainEventHandler handler, Class eventClass) { - domainEventListeners.computeIfAbsent(DEFAULT_DOMAIN, ignored -> new CopyOnWriteArrayList<>()) - .add(new RegisteredEventListener<>(eventName, handler, eventClass)); - return this; + // events: RawMessage + public HandlerRegistry listenRawEvent(String eventName, RawEventHandler handler) { + return listenDomainRawEvent(DEFAULT_DOMAIN, eventName, handler); } - public HandlerRegistry listenCloudEvent(String eventName, CloudEventHandler handler) { - domainEventListeners.computeIfAbsent(DEFAULT_DOMAIN, ignored -> new CopyOnWriteArrayList<>()) - .add(new RegisteredEventListener<>(eventName, handler, CloudEvent.class)); + public HandlerRegistry listenDomainRawEvent(String domain, String eventName, RawEventHandler handler) { + domainEventListeners.add(domain, new RegisteredEventListener<>(eventName, handler, RawMessage.class)); return this; } + // notifications: DomainEvent public HandlerRegistry listenNotificationEvent(String eventName, DomainEventHandler handler, Class eventClass) { - eventNotificationListener.add(new RegisteredEventListener<>(eventName, handler, eventClass)); + return listenNotificationEvent(DEFAULT_DOMAIN, eventName, handler, eventClass); + } + + public HandlerRegistry listenNotificationEvent(String domain, String eventName, DomainEventHandler handler, + Class eventClass) { + eventNotificationListener.add(domain, new RegisteredEventListener<>(eventName, handler, eventClass)); return this; } + // notifications: CloudEvent public HandlerRegistry listenNotificationCloudEvent(String eventName, CloudEventHandler handler) { - eventNotificationListener.add(new RegisteredEventListener<>(eventName, handler, CloudEvent.class)); + return listenNotificationCloudEvent(DEFAULT_DOMAIN, eventName, handler); + } + + public HandlerRegistry listenNotificationCloudEvent(String domain, String eventName, CloudEventHandler handler) { + eventNotificationListener.add(domain, new RegisteredEventListener<>(eventName, handler, CloudEvent.class)); + return this; + } + + // notifications: RawMessage + public HandlerRegistry listenNotificationRawEvent(String eventName, RawEventHandler handler) { + return listenDomainRawEvent(DEFAULT_DOMAIN, eventName, handler); + } + + public HandlerRegistry listenNotificationRawEvent(String domain, String eventName, RawEventHandler handler) { + eventNotificationListener.add(domain, new RegisteredEventListener<>(eventName, handler, RawMessage.class)); return this; } + // dynamic: DomainEvent supported only for default domain public HandlerRegistry handleDynamicEvents(String eventNamePattern, DomainEventHandler handler, Class eventClass) { - dynamicEventHandlers.add(new RegisteredEventListener<>(eventNamePattern, handler, eventClass)); + dynamicEventHandlers.add(DEFAULT_DOMAIN, new RegisteredEventListener<>(eventNamePattern, handler, eventClass)); return this; } + // dynamic: CloudEvent supported only for default domain public HandlerRegistry handleDynamicCloudEvents(String eventNamePattern, CloudEventHandler handler) { - dynamicEventHandlers.add(new RegisteredEventListener<>(eventNamePattern, handler, CloudEvent.class)); + dynamicEventHandlers.add(DEFAULT_DOMAIN, new RegisteredEventListener<>(eventNamePattern, handler, + CloudEvent.class)); return this; } + // commands: Command public HandlerRegistry handleCommand(String commandName, DomainCommandHandler fn, Class commandClass) { - commandHandlers.add(new RegisteredCommandHandler<>(commandName, fn, commandClass)); + return handleCommand(DEFAULT_DOMAIN, commandName, fn, commandClass); + } + + public HandlerRegistry handleCommand(String domain, String commandName, DomainCommandHandler fn, + Class commandClass) { + commandHandlers.add(domain, new RegisteredCommandHandler<>(commandName, fn, commandClass)); return this; } + // commands: CloudEvent public HandlerRegistry handleCloudEventCommand(String commandName, CloudCommandHandler handler) { - commandHandlers.add(new RegisteredCommandHandler<>(commandName, handler, CloudEvent.class)); + return handleCloudEventCommand(DEFAULT_DOMAIN, commandName, handler); + } + + public HandlerRegistry handleCloudEventCommand(String domain, String commandName, CloudCommandHandler handler) { + commandHandlers.add(domain, new RegisteredCommandHandler<>(commandName, handler, CloudEvent.class)); + return this; + } + + // commands: RawMessage + public HandlerRegistry handleRawCommand(String commandName, RawCommandHandler handler) { + return handleRawCommand(DEFAULT_DOMAIN, commandName, handler); + } + + public HandlerRegistry handleRawCommand(String domain, String commandName, RawCommandHandler handler) { + commandHandlers.add(domain, new RegisteredCommandHandler<>(commandName, handler, RawMessage.class)); return this; } + // queries: Query public HandlerRegistry serveQuery(String resource, QueryHandler handler, Class queryClass) { - handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> + handlers.add(DEFAULT_DOMAIN, new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message), queryClass )); return this; } public HandlerRegistry serveQuery(String resource, QueryHandlerDelegate handler, Class queryClass) { - handlers.add(new RegisteredQueryHandler<>(resource, handler, queryClass)); + handlers.add(DEFAULT_DOMAIN, new RegisteredQueryHandler<>(resource, handler, queryClass)); return this; } public HandlerRegistry serveCloudEventQuery(String resource, QueryHandler handler) { - handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> + handlers.add(DEFAULT_DOMAIN, new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message), CloudEvent.class )); return this; } - public HandlerRegistry serveCloudEventQuery(String resource, QueryHandlerDelegate handler) { - handlers.add(new RegisteredQueryHandler<>(resource, handler, CloudEvent.class)); + public HandlerRegistry serveCloudEventQuery(String resource, QueryHandlerDelegate handler) { + handlers.add(DEFAULT_DOMAIN, new RegisteredQueryHandler<>(resource, handler, CloudEvent.class)); return this; } @@ -139,7 +188,8 @@ public HandlerRegistry handleDynamicEvents(String eventNamePattern, DomainEv @Deprecated(forRemoval = true) public HandlerRegistry handleCommand(String commandName, DomainCommandHandler handler) { - commandHandlers.add(new RegisteredCommandHandler<>(commandName, handler, inferGenericParameterType(handler))); + commandHandlers.add(DEFAULT_DOMAIN, new RegisteredCommandHandler<>(commandName, handler, + inferGenericParameterType(handler))); return this; } diff --git a/async/async-commons-api/src/main/java/org/reactivecommons/async/api/handlers/RawCommandHandler.java b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/handlers/RawCommandHandler.java new file mode 100644 index 00000000..c2c63bad --- /dev/null +++ b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/handlers/RawCommandHandler.java @@ -0,0 +1,6 @@ +package org.reactivecommons.async.api.handlers; + +import org.reactivecommons.api.domain.RawMessage; + +public interface RawCommandHandler extends CommandHandler { +} diff --git a/async/async-commons-api/src/main/java/org/reactivecommons/async/api/handlers/registered/RegisteredDomainHandlers.java b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/handlers/registered/RegisteredDomainHandlers.java new file mode 100644 index 00000000..1410e005 --- /dev/null +++ b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/handlers/registered/RegisteredDomainHandlers.java @@ -0,0 +1,18 @@ +package org.reactivecommons.async.api.handlers.registered; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; + +public class RegisteredDomainHandlers extends ConcurrentHashMap> { + private static final String DEFAULT_DOMAIN = "app"; + + public RegisteredDomainHandlers() { + super(); + put(DEFAULT_DOMAIN, new CopyOnWriteArrayList<>()); + } + + public void add(String domain, T handler) { + computeIfAbsent(domain, ignored -> new CopyOnWriteArrayList<>()).add(handler); + } +} diff --git a/async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java b/async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java index b0e4ef8d..4823c85c 100644 --- a/async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java +++ b/async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java @@ -120,14 +120,14 @@ void shouldRegisterPatternEventHandler() { @Test void shouldRegisterNotificationEventListener() { registry.listenNotificationEvent(name, message -> Mono.empty(), SomeDataClass.class); - assertThat(registry.getEventNotificationListener()) + assertThat(registry.getEventNotificationListener().get(DEFAULT_DOMAIN)) .anySatisfy(listener -> assertThat(listener.getPath()).isEqualTo(name)); } @Test void shouldRegisterNotificationCloudEventListener() { registry.listenNotificationCloudEvent(name, message -> Mono.empty()); - assertThat(registry.getEventNotificationListener()) + assertThat(registry.getEventNotificationListener().get(DEFAULT_DOMAIN)) .anySatisfy(listener -> assertThat(listener.getPath()).isEqualTo(name)); } @@ -151,7 +151,7 @@ void shouldListenDynamicEvent() { registry.handleDynamicEvents(name, eventHandler, SomeDataClass.class); - assertThat(registry.getDynamicEventHandlers()) + assertThat(registry.getDynamicEventHandlers().get(DEFAULT_DOMAIN)) .anySatisfy(registered -> assertThat(registered) .extracting(RegisteredEventListener::getPath, RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler @@ -165,7 +165,7 @@ void shouldListenDynamicCloudEvent() { registry.handleDynamicCloudEvents(name, eventHandler); - assertThat(registry.getDynamicEventHandlers()) + assertThat(registry.getDynamicEventHandlers().get(DEFAULT_DOMAIN)) .anySatisfy(registered -> assertThat(registered) .extracting(RegisteredEventListener::getPath, RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler @@ -179,7 +179,7 @@ void handleDomainCommand() { registry.handleCommand(name, handler, SomeDataClass.class); - assertThat(registry.getCommandHandlers()) + assertThat(registry.getCommandHandlers().get(DEFAULT_DOMAIN)) .anySatisfy(registered -> assertThat(registered) .extracting(RegisteredCommandHandler::getPath, RegisteredCommandHandler::getInputClass, RegisteredCommandHandler::getHandler @@ -193,7 +193,7 @@ void handleCloudEventCommand() { registry.handleCloudEventCommand(name, cloudCommandHandler); - assertThat(registry.getCommandHandlers()) + assertThat(registry.getCommandHandlers().get(DEFAULT_DOMAIN)) .anySatisfy(registered -> assertThat(registered) .extracting(RegisteredCommandHandler::getPath, RegisteredCommandHandler::getInputClass, RegisteredCommandHandler::getHandler @@ -207,7 +207,7 @@ void shouldServerCloudEventQuery() { registry.serveCloudEventQuery(name, queryHandler); - assertThat(registry.getHandlers()) + assertThat(registry.getHandlers().get(DEFAULT_DOMAIN)) .anySatisfy(registered -> assertThat(registered) .extracting(RegisteredQueryHandler::getPath, RegisteredQueryHandler::getQueryClass) .containsExactly(name, CloudEvent.class)).hasSize(1); @@ -217,7 +217,7 @@ void shouldServerCloudEventQuery() { void handleCommandWithLambda() { registry.handleCommand(name, (Command message) -> Mono.empty(), SomeDataClass.class); - assertThat(registry.getCommandHandlers()) + assertThat(registry.getCommandHandlers().get(DEFAULT_DOMAIN)) .anySatisfy(registered -> assertThat(registered) .extracting(RegisteredCommandHandler::getPath, RegisteredCommandHandler::getInputClass) .containsExactly(name, SomeDataClass.class)).hasSize(1); @@ -227,7 +227,7 @@ void handleCommandWithLambda() { @Test void serveQueryWithLambda() { registry.serveQuery(name, message -> Mono.empty(), SomeDataClass.class); - assertThat(registry.getHandlers()) + assertThat(registry.getHandlers().get(DEFAULT_DOMAIN)) .anySatisfy(registered -> assertThat(registered) .extracting(RegisteredQueryHandler::getPath, RegisteredQueryHandler::getQueryClass) .containsExactly(name, SomeDataClass.class)).hasSize(1); @@ -237,7 +237,7 @@ void serveQueryWithLambda() { void serveQueryWithTypeInference() { QueryHandler handler = new SomeQueryHandler(); registry.serveQuery(name, handler, SomeDataClass.class); - assertThat(registry.getHandlers()).anySatisfy(registered -> { + assertThat(registry.getHandlers().get(DEFAULT_DOMAIN)).anySatisfy(registered -> { assertThat(registered).extracting(RegisteredQueryHandler::getPath, RegisteredQueryHandler::getQueryClass) .containsExactly(name, SomeDataClass.class); assertThat(registered).extracting(RegisteredQueryHandler::getHandler) @@ -249,7 +249,7 @@ void serveQueryWithTypeInference() { void serveQueryDelegate() { QueryHandlerDelegate handler = new SomeQueryHandlerDelegate(); registry.serveQuery(name, handler, SomeDataClass.class); - assertThat(registry.getHandlers()).anySatisfy(registered -> { + assertThat(registry.getHandlers().get(DEFAULT_DOMAIN)).anySatisfy(registered -> { assertThat(registered).extracting(RegisteredQueryHandler::getPath, RegisteredQueryHandler::getQueryClass) .containsExactly(name, SomeDataClass.class); }).hasSize(1); @@ -258,7 +258,7 @@ void serveQueryDelegate() { @Test void serveQueryDelegateWithLambda() { registry.serveQuery(name, (from, message) -> Mono.empty(), SomeDataClass.class); - assertThat(registry.getHandlers()).anySatisfy(registered -> { + assertThat(registry.getHandlers().get(DEFAULT_DOMAIN)).anySatisfy(registered -> { assertThat(registered).extracting(RegisteredQueryHandler::getPath, RegisteredQueryHandler::getQueryClass) .containsExactly(name, SomeDataClass.class); }).hasSize(1); diff --git a/async/async-commons/src/main/java/org/reactivecommons/async/commons/HandlerResolver.java b/async/async-commons/src/main/java/org/reactivecommons/async/commons/HandlerResolver.java index 562461ac..ece82925 100644 --- a/async/async-commons/src/main/java/org/reactivecommons/async/commons/HandlerResolver.java +++ b/async/async-commons/src/main/java/org/reactivecommons/async/commons/HandlerResolver.java @@ -25,6 +25,18 @@ public class HandlerResolver { private final Matcher matcher = new KeyMatcher(); + public boolean hasNotificationListeners() { + return !eventNotificationListeners.isEmpty(); + } + + public boolean hasCommandHandlers() { + return !commandHandlers.isEmpty(); + } + + public boolean hasQueryHandlers() { + return !queryHandlers.isEmpty(); + } + @SuppressWarnings("unchecked") public RegisteredQueryHandler getQueryHandler(String path) { return (RegisteredQueryHandler) queryHandlers @@ -74,7 +86,8 @@ public void addEventListener(RegisteredEventListener listener) { public void addQueryHandler(RegisteredQueryHandler handler) { if (handler.getPath().contains("*") || handler.getPath().contains("#")) { - throw new RuntimeException("avoid * or # in dynamic handlers, make sure you have no conflicts with cached patterns"); + throw new RuntimeException("avoid * or # in dynamic handlers, make sure you have no conflicts with cached" + + " patterns"); } queryHandlers.put(handler.getPath(), handler); } diff --git a/async/async-commons/src/main/java/org/reactivecommons/async/commons/HandlerResolverBuilder.java b/async/async-commons/src/main/java/org/reactivecommons/async/commons/HandlerResolverBuilder.java deleted file mode 100644 index cceb6625..00000000 --- a/async/async-commons/src/main/java/org/reactivecommons/async/commons/HandlerResolverBuilder.java +++ /dev/null @@ -1,121 +0,0 @@ -package org.reactivecommons.async.commons; - -import lombok.AccessLevel; -import lombok.NoArgsConstructor; -import lombok.extern.java.Log; -import org.reactivecommons.async.api.DefaultCommandHandler; -import org.reactivecommons.async.api.HandlerRegistry; -import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler; -import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; -import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.stream.Stream; - -import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN; - -@Log -@NoArgsConstructor(access = AccessLevel.PRIVATE) -public final class HandlerResolverBuilder { - - public static HandlerResolver buildResolver(String domain, Map registries, - final DefaultCommandHandler defaultCommandHandler) { - - if (DEFAULT_DOMAIN.equals(domain)) { - final ConcurrentMap> queryHandlers = registries - .values().stream() - .flatMap(r -> r.getHandlers().stream()) - .collect(ConcurrentHashMap::new, (map, handler) - -> map.put(handler.getPath(), handler), ConcurrentHashMap::putAll - ); - - final ConcurrentMap> commandHandlers = registries - .values().stream() - .flatMap(r -> r.getCommandHandlers().stream()) - .collect(ConcurrentHashMap::new, (map, handler) - -> map.put(handler.getPath(), handler), ConcurrentHashMap::putAll - ); - - final ConcurrentMap> eventNotificationListener = registries - .values() - .stream() - .flatMap(r -> r.getEventNotificationListener().stream()) - .collect(ConcurrentHashMap::new, (map, handler) - -> map.put(handler.getPath(), handler), ConcurrentHashMap::putAll - ); - - final ConcurrentMap> eventsToBind = getEventsToBind(domain, - registries); - - final ConcurrentMap> eventHandlers = - getEventHandlersWithDynamics(domain, registries); - - return new HandlerResolver(queryHandlers, eventHandlers, eventsToBind, eventNotificationListener, - commandHandlers) { - @Override - @SuppressWarnings("unchecked") - public RegisteredCommandHandler getCommandHandler(String path) { - final RegisteredCommandHandler handler = super.getCommandHandler(path); - return handler != null ? handler : new RegisteredCommandHandler<>("", defaultCommandHandler, - Object.class); - } - }; - } - - - final ConcurrentMap> eventsToBind = getEventsToBind(domain, registries); - final ConcurrentMap> eventHandlers = - getEventHandlersWithDynamics(domain, registries); - - return new HandlerResolver(new ConcurrentHashMap<>(), eventHandlers, eventsToBind, new ConcurrentHashMap<>(), - new ConcurrentHashMap<>()) { - @Override - @SuppressWarnings("unchecked") - public RegisteredCommandHandler getCommandHandler(String path) { - final RegisteredCommandHandler handler = super.getCommandHandler(path); - return handler != null ? handler : new RegisteredCommandHandler<>("", defaultCommandHandler, - Object.class); - } - }; - } - - private static ConcurrentMap> getEventHandlersWithDynamics( - String domain, Map registries) { - // event handlers and dynamic handlers - return registries - .values().stream() - .flatMap(r -> { - if (r.getDomainEventListeners().containsKey(domain)) { - return Stream.concat(r.getDomainEventListeners().get(domain).stream(), getDynamics(domain, r)); - } - return Stream.empty(); - }) - .collect(ConcurrentHashMap::new, (map, handler) - -> map.put(handler.getPath(), handler), ConcurrentHashMap::putAll - ); - } - - private static Stream> getDynamics(String domain, HandlerRegistry r) { - if (DEFAULT_DOMAIN.equals(domain)) { - return r.getDynamicEventHandlers().stream(); - } - return Stream.of(); - } - - private static ConcurrentMap> getEventsToBind( - String domain, Map registries) { - return registries - .values().stream() - .flatMap(r -> { - if (r.getDomainEventListeners().containsKey(domain)) { - return r.getDomainEventListeners().get(domain).stream(); - } - return Stream.empty(); - }) - .collect(ConcurrentHashMap::new, (map, handler) - -> map.put(handler.getPath(), handler), ConcurrentHashMap::putAll - ); - } -} diff --git a/async/async-commons/src/main/java/org/reactivecommons/async/commons/utils/resolver/HandlerResolverBuilder.java b/async/async-commons/src/main/java/org/reactivecommons/async/commons/utils/resolver/HandlerResolverBuilder.java new file mode 100644 index 00000000..5054362d --- /dev/null +++ b/async/async-commons/src/main/java/org/reactivecommons/async/commons/utils/resolver/HandlerResolverBuilder.java @@ -0,0 +1,106 @@ +package org.reactivecommons.async.commons.utils.resolver; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.java.Log; +import org.reactivecommons.async.api.DefaultCommandHandler; +import org.reactivecommons.async.api.HandlerRegistry; +import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler; +import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; +import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler; +import org.reactivecommons.async.commons.HandlerResolver; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Stream; + +@Log +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class HandlerResolverBuilder { + + public static HandlerResolver buildResolver(String domain, Map registries, + final DefaultCommandHandler defaultCommandHandler) { + final ConcurrentMap> queryHandlers = registries + .values() + .stream() + .flatMap(r -> r.getHandlers() + .getOrDefault(domain, List.of()) + .stream()) + .collect(ConcurrentHashMap::new, (map, handler) + -> map.put(handler.getPath(), handler), ConcurrentHashMap::putAll + ); + + final ConcurrentMap> commandHandlers = registries + .values() + .stream() + .flatMap(r -> r.getCommandHandlers() + .getOrDefault(domain, List.of()) + .stream()) + .collect(ConcurrentHashMap::new, (map, handler) + -> map.put(handler.getPath(), handler), ConcurrentHashMap::putAll + ); + + final ConcurrentMap> eventNotificationListener = registries + .values() + .stream() + .flatMap(r -> r.getEventNotificationListener() + .getOrDefault(domain, List.of()) + .stream()) + .collect(ConcurrentHashMap::new, (map, handler) + -> map.put(handler.getPath(), handler), ConcurrentHashMap::putAll + ); + + final ConcurrentMap> eventsToBind = getEventsToBind(domain, + registries); + + final ConcurrentMap> eventHandlers = + getEventHandlersWithDynamics(domain, registries); + + return new HandlerResolver(queryHandlers, eventHandlers, eventsToBind, eventNotificationListener, + commandHandlers) { + @Override + @SuppressWarnings("unchecked") + public RegisteredCommandHandler getCommandHandler(String path) { + final RegisteredCommandHandler handler = super.getCommandHandler(path); + return handler != null ? + handler : new RegisteredCommandHandler<>("", defaultCommandHandler, Object.class); + } + }; + } + + private static ConcurrentMap> getEventHandlersWithDynamics( + String domain, Map registries) { + // event handlers and dynamic handlers + return registries + .values() + .stream() + .flatMap(r -> Stream.concat(r.getDomainEventListeners() + .getOrDefault(domain, List.of()) + .stream(), + getDynamics(domain, r))) + .collect(ConcurrentHashMap::new, (map, handler) + -> map.put(handler.getPath(), handler), ConcurrentHashMap::putAll + ); + } + + private static Stream> getDynamics(String domain, HandlerRegistry r) { + return r.getDynamicEventHandlers().getOrDefault(domain, List.of()).stream(); + } + + private static ConcurrentMap> getEventsToBind( + String domain, Map registries) { + return registries + .values().stream() + .flatMap(r -> { + if (r.getDomainEventListeners().containsKey(domain)) { + return r.getDomainEventListeners().get(domain).stream(); + } + return Stream.empty(); + }) + .collect(ConcurrentHashMap::new, (map, handler) + -> map.put(handler.getPath(), handler), ConcurrentHashMap::putAll + ); + } +} diff --git a/async/async-commons/src/main/java/org/reactivecommons/async/commons/utils/resolver/HandlerResolverUtil.java b/async/async-commons/src/main/java/org/reactivecommons/async/commons/utils/resolver/HandlerResolverUtil.java deleted file mode 100644 index 39a4e876..00000000 --- a/async/async-commons/src/main/java/org/reactivecommons/async/commons/utils/resolver/HandlerResolverUtil.java +++ /dev/null @@ -1,61 +0,0 @@ -package org.reactivecommons.async.commons.utils.resolver; - -import lombok.AccessLevel; -import lombok.AllArgsConstructor; -import org.reactivecommons.async.api.HandlerRegistry; -import org.reactivecommons.async.api.handlers.CommandHandler; -import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler; -import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; -import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler; -import org.reactivecommons.async.commons.HandlerResolver; - -import java.util.Collection; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.stream.Stream; - -import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN; - -@AllArgsConstructor(access = AccessLevel.PRIVATE) -public final class HandlerResolverUtil { - - public static HandlerResolver fromHandlerRegistries(Collection registries, - CommandHandler defaultHandler) { - final ConcurrentMap> queryHandlers = registries.stream() - .flatMap(r -> r.getHandlers().stream()) - .collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), - ConcurrentHashMap::putAll); - - final ConcurrentMap> eventsToBind = registries.stream() - .flatMap(r -> r.getDomainEventListeners().get(DEFAULT_DOMAIN).stream()) - .collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), - ConcurrentHashMap::putAll); - - // event handlers and dynamic handlers - final ConcurrentMap> eventHandlers = registries.stream() - .flatMap(r -> Stream.concat(r.getDomainEventListeners().get(DEFAULT_DOMAIN).stream(), - r.getDynamicEventHandlers().stream())) - .collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), - ConcurrentHashMap::putAll); - - final ConcurrentMap> commandHandlers = registries.stream() - .flatMap(r -> r.getCommandHandlers().stream()) - .collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), - ConcurrentHashMap::putAll); - - final ConcurrentMap> eventNotificationListener = registries.stream() - .flatMap(r -> r.getEventNotificationListener().stream()) - .collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), - ConcurrentHashMap::putAll); - - return new HandlerResolver(queryHandlers, eventHandlers, eventsToBind, eventNotificationListener, - commandHandlers) { - @Override - @SuppressWarnings("unchecked") - public RegisteredCommandHandler getCommandHandler(String path) { - final RegisteredCommandHandler handler = super.getCommandHandler(path); - return handler != null ? handler : new RegisteredCommandHandler<>("", defaultHandler, Object.class); - } - }; - } -} diff --git a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListenerPerfTest.java b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListenerPerfTest.java index 0b12f4fc..3b63f13b 100644 --- a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListenerPerfTest.java +++ b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListenerPerfTest.java @@ -44,6 +44,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN; import static reactor.core.publisher.Flux.range; @ExtendWith(MockitoExtension.class) @@ -277,6 +278,7 @@ private HandlerResolver createHandlerResolver(final HandlerRegistry initialRegis ) .block(); final ConcurrentMap> commandHandlers = registry.getCommandHandlers() + .get(DEFAULT_DOMAIN) .stream() .collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), ConcurrentHashMap::putAll diff --git a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ListenerReporterTestSuperClass.java b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ListenerReporterTestSuperClass.java index 2e101c38..da384558 100644 --- a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ListenerReporterTestSuperClass.java +++ b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ListenerReporterTestSuperClass.java @@ -148,7 +148,7 @@ protected Flux createSource(Function rout private HandlerResolver createHandlerResolver(final HandlerRegistry registry) { Stream> listenerStream = Stream.concat( - registry.getDynamicEventHandlers().stream(), + registry.getDynamicEventHandlers().get(DEFAULT_DOMAIN).stream(), registry.getDomainEventListeners().get(DEFAULT_DOMAIN).stream()); if (registry.getDomainEventListeners().containsKey("domain")) { listenerStream = Stream.concat( @@ -160,10 +160,13 @@ private HandlerResolver createHandlerResolver(final HandlerRegistry registry) { final Map> eventsToBind = registry.getDomainEventListeners() .get(DEFAULT_DOMAIN).stream().collect(toMap(RegisteredEventListener::getPath, identity())); final Map> notificationHandlers = registry.getEventNotificationListener() + .get(DEFAULT_DOMAIN) .stream().collect(toMap(RegisteredEventListener::getPath, identity())); - final Map> queryHandlers = registry.getHandlers().stream() + final Map> queryHandlers = registry.getHandlers().get(DEFAULT_DOMAIN) + .stream() .collect(toMap(RegisteredQueryHandler::getPath, identity())); final Map> commandHandlers = registry.getCommandHandlers() + .get(DEFAULT_DOMAIN) .stream().collect(toMap(RegisteredCommandHandler::getPath, identity())); return new HandlerResolver( new ConcurrentHashMap<>(queryHandlers), diff --git a/samples/async/eda-async-sender-client-domain-a/src/main/java/sample/SampleRestController.java b/samples/async/eda-async-sender-client-domain-a/src/main/java/sample/SampleRestController.java index e42ff4eb..1e26740f 100644 --- a/samples/async/eda-async-sender-client-domain-a/src/main/java/sample/SampleRestController.java +++ b/samples/async/eda-async-sender-client-domain-a/src/main/java/sample/SampleRestController.java @@ -37,16 +37,16 @@ public class SampleRestController { // Query @GetMapping(path = "/api/teams", produces = APPLICATION_JSON_VALUE) - public Mono getTeams() { - CloudEvent query = CloudEventBuilder.v1() + public Mono getTeams() { + RemovedMemberEvent eventData = RemovedMemberEvent.builder().teamName("team").username("member").build(); + CloudEvent event = CloudEventBuilder.v1() .withId(UUID.randomUUID().toString()) .withSource(URI.create("https://reactive-commons.org/foos")) - .withType(Constants.GET_TEAMS) + .withType(Constants.MEMBER_REMOVED_EXTERNAL_DOMAIN) .withTime(OffsetDateTime.now()) - .withData("application/json", CloudEventBuilderExt.asBytes("")) + .withData("application/json", CloudEventBuilderExt.asBytes(eventData)) .build(); - return directAsyncGateway.requestReply(query, target, CloudEvent.class, externalDomain) - .map(event -> CloudEventBuilderExt.fromCloudEventData(event, Teams.class)); + return Mono.from(domainEventBus.emit(event)); } // Query diff --git a/samples/async/eda-async-sender-client-domain-a/src/main/resources/application.yaml b/samples/async/eda-async-sender-client-domain-a/src/main/resources/application.yaml index 878800fa..5b0a5e0a 100644 --- a/samples/async/eda-async-sender-client-domain-a/src/main/resources/application.yaml +++ b/samples/async/eda-async-sender-client-domain-a/src/main/resources/application.yaml @@ -17,7 +17,7 @@ app: async: app: # domain-a connectionProperties: - virtualHost: domain-a + virtualHost: / teams: connectionProperties: virtualHost: / diff --git a/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/config/ReactiveCommonsListenersConfig.java b/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/config/ReactiveCommonsListenersConfig.java index ba08c712..0f7a7a11 100644 --- a/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/config/ReactiveCommonsListenersConfig.java +++ b/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/config/ReactiveCommonsListenersConfig.java @@ -6,7 +6,7 @@ import org.reactivecommons.async.api.DefaultQueryHandler; import org.reactivecommons.async.api.HandlerRegistry; import org.reactivecommons.async.commons.HandlerResolver; -import org.reactivecommons.async.commons.HandlerResolverBuilder; +import org.reactivecommons.async.commons.utils.resolver.HandlerResolverBuilder; import org.reactivecommons.async.starter.props.GenericAsyncPropsDomain; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.ApplicationContext; diff --git a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProvider.java b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProvider.java index 0ec7bc89..a5c626fe 100644 --- a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProvider.java +++ b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProvider.java @@ -84,7 +84,7 @@ public void listenDomainEvents(HandlerResolver resolver) { @Override public void listenNotificationEvents(HandlerResolver resolver) { - if (!resolver.getNotificationListeners().isEmpty()) { + if (resolver.hasNotificationListeners()) { final ApplicationNotificationListener listener = new ApplicationNotificationListener( receiver, props.getBrokerConfigProps().getDomainEventsExchangeName(), @@ -100,44 +100,48 @@ public void listenNotificationEvents(HandlerResolver resolver) { @Override public void listenCommands(HandlerResolver resolver) { - ApplicationCommandListener commandListener = new ApplicationCommandListener( - receiver, - props.getBrokerConfigProps().getCommandsQueue(), - resolver, - props.getDirect().getExchange(), - converter, - props.getWithDLQRetry(), - props.getCreateTopology(), - props.getDelayedCommands(), - props.getMaxRetries(), - props.getRetryDelay(), - props.getDirect().getMaxLengthBytes(), - discardNotifier, - errorReporter); + if (resolver.hasCommandHandlers()) { + ApplicationCommandListener commandListener = new ApplicationCommandListener( + receiver, + props.getBrokerConfigProps().getCommandsQueue(), + resolver, + props.getDirect().getExchange(), + converter, + props.getWithDLQRetry(), + props.getCreateTopology(), + props.getDelayedCommands(), + props.getMaxRetries(), + props.getRetryDelay(), + props.getDirect().getMaxLengthBytes(), + discardNotifier, + errorReporter); - commandListener.startListener(); + commandListener.startListener(); + } } @Override public void listenQueries(HandlerResolver resolver) { - final ApplicationQueryListener listener = new ApplicationQueryListener( - receiver, - props.getBrokerConfigProps().getQueriesQueue(), - resolver, - sender, - props.getBrokerConfigProps().getDirectMessagesExchangeName(), - converter, - props.getBrokerConfigProps().getGlobalReplyExchangeName(), - props.getWithDLQRetry(), - props.getCreateTopology(), - props.getMaxRetries(), - props.getRetryDelay(), - props.getGlobal().getMaxLengthBytes(), - props.getDirect().isDiscardTimeoutQueries(), - discardNotifier, - errorReporter); + if (resolver.hasQueryHandlers()) { + final ApplicationQueryListener listener = new ApplicationQueryListener( + receiver, + props.getBrokerConfigProps().getQueriesQueue(), + resolver, + sender, + props.getBrokerConfigProps().getDirectMessagesExchangeName(), + converter, + props.getBrokerConfigProps().getGlobalReplyExchangeName(), + props.getWithDLQRetry(), + props.getCreateTopology(), + props.getMaxRetries(), + props.getRetryDelay(), + props.getGlobal().getMaxLengthBytes(), + props.getDirect().isDiscardTimeoutQueries(), + discardNotifier, + errorReporter); - listener.startListener(); + listener.startListener(); + } } @Override diff --git a/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderTest.java b/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderTest.java index 3d12dc97..eecccfd0 100644 --- a/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderTest.java +++ b/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderTest.java @@ -139,6 +139,7 @@ void shouldListenDomainEvents() { @Test @SuppressWarnings({"rawtypes", "unchecked"}) void shouldListenNotificationEvents() { + when(handlerResolver.hasNotificationListeners()).thenReturn(true); when(listener.getTopologyCreator()).thenReturn(creator); when(creator.declare(any(ExchangeSpecification.class))) .thenReturn(Mono.just(mock(AMQP.Exchange.DeclareOk.class))); @@ -148,7 +149,6 @@ void shouldListenNotificationEvents() { when(listener.getMaxConcurrency()).thenReturn(1); when(receiver.consumeManualAck(any(String.class), any())).thenReturn(Flux.never()); List mockedListeners = spy(List.of()); - when(mockedListeners.isEmpty()).thenReturn(false); when(handlerResolver.getNotificationListeners()).thenReturn(mockedListeners); // Act brokerProvider.listenNotificationEvents(handlerResolver); @@ -158,6 +158,7 @@ void shouldListenNotificationEvents() { @Test void shouldListenCommands() { + when(handlerResolver.hasCommandHandlers()).thenReturn(true); when(listener.getTopologyCreator()).thenReturn(creator); when(creator.declare(any(ExchangeSpecification.class))) .thenReturn(Mono.just(mock(AMQP.Exchange.DeclareOk.class))); @@ -176,6 +177,7 @@ void shouldListenCommands() { @Test void shouldListenQueries() { + when(handlerResolver.hasQueryHandlers()).thenReturn(true); when(listener.getTopologyCreator()).thenReturn(creator); when(creator.declare(any(ExchangeSpecification.class))) .thenReturn(Mono.just(mock(AMQP.Exchange.DeclareOk.class))); From df21cf96d307b6cdbef8690cd47513dc7c14602b Mon Sep 17 00:00:00 2001 From: Juan C Galvis <8420868+juancgalvis@users.noreply.github.com> Date: Thu, 10 Apr 2025 15:11:58 -0500 Subject: [PATCH 2/3] feat(open-multi-domain-api): add unit tests --- .../async/api/HandlerRegistryTest.java | 41 ++++++++++++++++++- .../async/rabbit/HandlerResolverTest.java | 20 +++++++++ 2 files changed, 59 insertions(+), 2 deletions(-) diff --git a/async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java b/async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java index 4823c85c..1f25dde2 100644 --- a/async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java +++ b/async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java @@ -12,6 +12,7 @@ import org.reactivecommons.async.api.handlers.DomainEventHandler; import org.reactivecommons.async.api.handlers.QueryHandler; import org.reactivecommons.async.api.handlers.QueryHandlerDelegate; +import org.reactivecommons.async.api.handlers.RawCommandHandler; import org.reactivecommons.async.api.handlers.RawEventHandler; import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler; import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; @@ -25,6 +26,7 @@ class HandlerRegistryTest { private final HandlerRegistry registry = HandlerRegistry.register(); private final String name = "some.event"; + private final String nameRaw = "some.raw.event"; private final String domain = "some-domain"; @@ -60,9 +62,9 @@ void shouldListenDomainCloudEvent() { void shouldListenDomainRawEvent() { SomeRawEventHandler eventHandler = new SomeRawEventHandler(); - registry.listenDomainRawEvent(domain, name, eventHandler); + registry.listenRawEvent(name, eventHandler); - assertThat(registry.getDomainEventListeners().get(domain)) + assertThat(registry.getDomainEventListeners().get(DEFAULT_DOMAIN)) .anySatisfy(registered -> assertThat(registered) .extracting(RegisteredEventListener::getPath, RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler @@ -131,6 +133,20 @@ void shouldRegisterNotificationCloudEventListener() { .anySatisfy(listener -> assertThat(listener.getPath()).isEqualTo(name)); } + @Test + void shouldRegisterNotificationRawEventListener() { + SomeRawEventHandler eventHandler = new SomeRawEventHandler(); + + registry.listenRawEvent(nameRaw, eventHandler); + + assertThat(registry.getEventNotificationListener().get(DEFAULT_DOMAIN)) + .anySatisfy(registered -> assertThat(registered) + .extracting(RegisteredEventListener::getPath, RegisteredEventListener::getInputClass, + RegisteredEventListener::getHandler + ) + .containsExactly(nameRaw, RawMessage.class, eventHandler)).hasSize(1); + } + @Test @SuppressWarnings("unchecked") void listenEvent() { @@ -201,6 +217,20 @@ void handleCloudEventCommand() { .containsExactly(name, CloudEvent.class, cloudCommandHandler)).hasSize(1); } + @Test + void handleRawCommand() { + SomeRawCommandEventHandler eventHandler = new SomeRawCommandEventHandler(); + + registry.handleRawCommand(nameRaw, eventHandler); + + assertThat(registry.getCommandHandlers().get(DEFAULT_DOMAIN)) + .anySatisfy(registered -> assertThat(registered) + .extracting(RegisteredCommandHandler::getPath, RegisteredCommandHandler::getInputClass, + RegisteredCommandHandler::getHandler + ) + .containsExactly(nameRaw, RawMessage.class, eventHandler)).hasSize(1); + } + @Test void shouldServerCloudEventQuery() { SomeCloudEventQueryHandler queryHandler = new SomeCloudEventQueryHandler(); @@ -306,6 +336,13 @@ public Mono handle(CloudEvent message) { } } + private static class SomeRawCommandEventHandler implements RawCommandHandler { + @Override + public Mono handle(RawMessage message) { + return Mono.empty(); + } + } + private static class SomeQueryHandler implements QueryHandler { @Override public Mono handle(SomeDataClass message) { diff --git a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/HandlerResolverTest.java b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/HandlerResolverTest.java index 1f6137ae..f7e0d099 100644 --- a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/HandlerResolverTest.java +++ b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/HandlerResolverTest.java @@ -59,6 +59,17 @@ void shouldMatchForAWildcardEvent() { Assertions.assertThat(eventListener.getPath()).isEqualTo("some.*"); } + @Test + void shouldThrowError() { + // Arrange + RegisteredQueryHandler handler = new RegisteredQueryHandler<>("*", + (from, message) -> Mono.empty(), + String.class); + // Act + // Assert + Assertions.assertThatThrownBy(() -> resolver.addQueryHandler(handler)).isInstanceOf(RuntimeException.class); + } + @Test void shouldMatchForAnExactEvent() { // Act @@ -67,4 +78,13 @@ void shouldMatchForAnExactEvent() { Assertions.assertThat(eventListener.getPath()).isEqualTo("event.name"); } + @Test + void shouldCheckIfHasListenerTypes() { + // Act + // Assert + Assertions.assertThat(resolver.hasCommandHandlers()).isFalse(); + Assertions.assertThat(resolver.hasQueryHandlers()).isFalse(); + Assertions.assertThat(resolver.hasNotificationListeners()).isFalse(); + } + } From d782a11a43f5e4998bb53674d611c4fb62089f98 Mon Sep 17 00:00:00 2001 From: Juan C Galvis <8420868+juancgalvis@users.noreply.github.com> Date: Thu, 10 Apr 2025 15:40:27 -0500 Subject: [PATCH 3/3] feat(open-multi-domain-api): add unit tests --- .../java/org/reactivecommons/async/api/HandlerRegistry.java | 2 +- .../org/reactivecommons/async/api/HandlerRegistryTest.java | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java index cc08ddca..1df771da 100644 --- a/async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java +++ b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java @@ -97,7 +97,7 @@ public HandlerRegistry listenNotificationCloudEvent(String domain, String eventN // notifications: RawMessage public HandlerRegistry listenNotificationRawEvent(String eventName, RawEventHandler handler) { - return listenDomainRawEvent(DEFAULT_DOMAIN, eventName, handler); + return listenNotificationRawEvent(DEFAULT_DOMAIN, eventName, handler); } public HandlerRegistry listenNotificationRawEvent(String domain, String eventName, RawEventHandler handler) { diff --git a/async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java b/async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java index 1f25dde2..cb46b2ef 100644 --- a/async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java +++ b/async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java @@ -27,6 +27,7 @@ class HandlerRegistryTest { private final HandlerRegistry registry = HandlerRegistry.register(); private final String name = "some.event"; private final String nameRaw = "some.raw.event"; + private final String nameRawNotification = "some.raw.notification.event"; private final String domain = "some-domain"; @@ -137,14 +138,14 @@ void shouldRegisterNotificationCloudEventListener() { void shouldRegisterNotificationRawEventListener() { SomeRawEventHandler eventHandler = new SomeRawEventHandler(); - registry.listenRawEvent(nameRaw, eventHandler); + registry.listenNotificationRawEvent(nameRawNotification, eventHandler); assertThat(registry.getEventNotificationListener().get(DEFAULT_DOMAIN)) .anySatisfy(registered -> assertThat(registered) .extracting(RegisteredEventListener::getPath, RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler ) - .containsExactly(nameRaw, RawMessage.class, eventHandler)).hasSize(1); + .containsExactly(nameRawNotification, RawMessage.class, eventHandler)).hasSize(1); } @Test