Skip to content

chore(next): Create shared starter #124

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,13 @@ jobs:
distribution: temurin
java-version: 17
- name: Execute build test jacocoTestReport and sonar analysis
if: endsWith(github.REF, '/master') == true
if: endsWith(github.REF, '/master') == true || github.event.pull_request.head.repo.fork == false
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
run: ./gradlew build test jacocoTestReport sonar --refresh-dependencies --no-daemon --continue -Denv.ci=true
run: ./gradlew clean build generateMergedReport sonar --refresh-dependencies --no-daemon --continue -Denv.ci=true
- name: Execute build test jacocoTestReport pull request
if: endsWith(github.REF, '/merge') == true
if: github.event.pull_request.head.repo.fork == true
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
run: ./gradlew build test jacocoTestReport --refresh-dependencies --no-daemon --continue -Denv.ci=true
run: ./gradlew clean build generateMergedReport --refresh-dependencies --no-daemon --continue -Denv.ci=true
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package org.reactivecommons.async.commons.config;

import lombok.Getter;

import java.time.Duration;
import java.util.UUID;

@Getter
public class BrokerConfig {
private final String routingKey = UUID.randomUUID().toString().replaceAll("-", "");
private final boolean persistentQueries;
Expand All @@ -24,24 +27,4 @@ public BrokerConfig(boolean persistentQueries, boolean persistentCommands, boole
this.replyTimeout = replyTimeout;
}

public boolean isPersistentQueries() {
return persistentQueries;
}

public boolean isPersistentCommands() {
return persistentCommands;
}

public boolean isPersistentEvents() {
return persistentEvents;
}

public Duration getReplyTimeout() {
return replyTimeout;
}

public String getRoutingKey() {
return routingKey;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package org.reactivecommons.async.kafka;

import io.cloudevents.CloudEvent;
import org.reactivecommons.api.domain.Command;
import org.reactivecommons.async.api.AsyncQuery;
import org.reactivecommons.async.api.DirectAsyncGateway;
import org.reactivecommons.async.api.From;
import reactor.core.publisher.Mono;

public class KafkaDirectAsyncGateway implements DirectAsyncGateway {

public static final String NOT_IMPLEMENTED_YET = "Not implemented yet";

@Override
public <T> Mono<Void> sendCommand(Command<T> command, String targetName) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public <T> Mono<Void> sendCommand(Command<T> command, String targetName, long delayMillis) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public <T> Mono<Void> sendCommand(Command<T> command, String targetName, String domain) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public <T> Mono<Void> sendCommand(Command<T> command, String targetName, long delayMillis, String domain) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public Mono<Void> sendCommand(CloudEvent command, String targetName) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public Mono<Void> sendCommand(CloudEvent command, String targetName, long delayMillis) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public Mono<Void> sendCommand(CloudEvent command, String targetName, String domain) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public Mono<Void> sendCommand(CloudEvent command, String targetName, long delayMillis, String domain) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public <T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class<R> type) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public <T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class<R> type, String domain) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public <R extends CloudEvent> Mono<R> requestReply(CloudEvent query, String targetName, Class<R> type) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public <R extends CloudEvent> Mono<R> requestReply(CloudEvent query, String targetName, Class<R> type, String domain) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public <T> Mono<Void> reply(T response, From from) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,26 @@

@AllArgsConstructor
public class KafkaDomainEventBus implements DomainEventBus {
public static final String NOT_IMPLEMENTED_YET = "Not implemented yet";
private final ReactiveMessageSender sender;

@Override
public <T> Publisher<Void> emit(DomainEvent<T> event) {
return sender.send(event);
}

@Override
public <T> Publisher<Void> emit(String domain, DomainEvent<T> event) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public Publisher<Void> emit(CloudEvent event) {
return sender.send(event);
}

@Override
public Publisher<Void> emit(String domain, CloudEvent event) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package org.reactivecommons.async.rabbit;

import io.cloudevents.CloudEvent;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
import org.reactivecommons.api.domain.DomainEvent;
import org.reactivecommons.api.domain.DomainEventBus;
import org.reactivecommons.async.commons.config.BrokerConfig;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.reactivecommons.api.domain.DomainEvent;
import org.reactivecommons.api.domain.DomainEventBus;

import java.util.Collections;

Expand All @@ -29,7 +29,12 @@ public RabbitDomainEventBus(ReactiveMessageSender sender, String exchange, Broke
@Override
public <T> Mono<Void> emit(DomainEvent<T> event) {
return sender.sendWithConfirm(event, exchange, event.getName(), Collections.emptyMap(), persistentEvents)
.onErrorMap(err -> new RuntimeException("Event send failure: " + event.getName(), err));
.onErrorMap(err -> new RuntimeException("Event send failure: " + event.getName(), err));
}

@Override
public <T> Publisher<Void> emit(String domain, DomainEvent<T> event) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
Expand All @@ -39,4 +44,9 @@ public Publisher<Void> emit(CloudEvent cloudEvent) {
.onErrorMap(err -> new RuntimeException("Event send failure: " + cloudEvent.getType(), err));
}

@Override
public Publisher<Void> emit(String domain, CloudEvent event) {
throw new UnsupportedOperationException("Not implemented yet");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
import org.reactivecommons.async.commons.DiscardNotifier;
import org.reactivecommons.async.commons.EventExecutor;
import org.reactivecommons.async.commons.HandlerResolver;
import org.reactivecommons.async.commons.communications.Message;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.ext.CustomReporter;
import org.reactivecommons.async.commons.HandlerResolver;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -51,9 +51,6 @@ public ApplicationNotificationListener(ReactiveMessageListener receiver,
}

protected Mono<Void> setUpBindings(TopologyCreator creator) {
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(exchange(exchangeName)
.type("topic")
.durable(true));

final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declare(
queue(queueName)
Expand All @@ -65,6 +62,10 @@ protected Mono<Void> setUpBindings(TopologyCreator creator) {
.flatMap(listener -> creator.bind(binding(exchangeName, listener.getPath(), queueName)));

if (createTopology) {
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(exchange(exchangeName)
.type("topic")
.durable(true));

return declareExchange
.then(declareQueue)
.thenMany(bindings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ protected Mono<AcknowledgableDelivery> handle(AcknowledgableDelivery msj, Instan
}

private void onTerminate() {
messageFlux.doOnTerminate(this::onTerminate)
messageFlux
.doOnTerminate(this::onTerminate)
.subscribe(new LoggerSubscriber<>(getClass().getName()));
}

Expand Down
9 changes: 0 additions & 9 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,6 @@ plugins {
id 'co.com.bancolombia.cleanArchitecture' version '3.17.13'
}

sonar {
properties {
property 'sonar.projectKey', 'reactive-commons_reactive-commons-java'
property 'sonar.coverage.exclusions', 'samples/**/*'
property 'sonar.organization', 'reactive-commons'
property 'sonar.host.url', 'https://sonarcloud.io'
}
}

repositories {
mavenCentral()
}
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/reactive-commons/1-getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ spring:

You can also set it in runtime for example from a secret, so you can create the `RabbitProperties` bean like:

```java title="org.reactivecommons.async.rabbit.config.RabbitProperties"
```java title="org.reactivecommons.async.rabbit.standalone.config.RabbitProperties"

@Configuration
public class MyRabbitMQConfig {
Expand Down
8 changes: 7 additions & 1 deletion docs/docs/reactive-commons/9-configuration-properties.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ app:
createTopology: true # if your organization have restrictions with automatic topology creation you can set it to false and create it manually or by your organization process.
delayedCommands: false # Enable to send a delayed command to an external target
prefetchCount: 250 # is the maximum number of in flight messages you can reduce it to process less concurrent messages, this settings acts per instance of your service
useDiscardNotifierPerDomain: false # if true it uses a discard notifier for each domain,when false it uses a single discard notifier for all domains with default 'app' domain
enabled: true # if you want to disable this domain you can set it to false
brokerType: "rabbitmq" # please don't change this value
flux:
maxConcurrency: 250 # max concurrency of listener flow
domain:
Expand Down Expand Up @@ -64,7 +67,7 @@ You can override this settings programmatically through a `AsyncPropsDomainPrope
```java
package sample;

import org.reactivecommons.async.rabbit.config.RabbitProperties;
import org.reactivecommons.async.rabbit.standalone.config.RabbitProperties;
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
import org.reactivecommons.async.rabbit.config.props.AsyncRabbitPropsDomainProperties;
import org.springframework.context.annotation.Bean;
Expand Down Expand Up @@ -133,6 +136,9 @@ reactive:
retryDelay: 1000 # interval for message retries, with and without DLQRetry
checkExistingTopics: true # if you don't want to verify topic existence before send a record you can set it to false
createTopology: true # if your organization have restrictions with automatic topology creation you can set it to false and create it manually or by your organization process.
useDiscardNotifierPerDomain: false # if true it uses a discard notifier for each domain,when false it uses a single discard notifier for all domains with default 'app' domain
enabled: true # if you want to disable this domain you can set it to false
brokerType: "kafka" # please don't change this value
domain:
ignoreThisListener: false # Allows you to disable event listener for this specific domain
connectionProperties: # you can override the connection properties of each domain
Expand Down
Loading
Loading