Skip to content

Commit b0adda4

Browse files
authored
fix(health): Use bloking IO for health indicator connection (#114)
1 parent 54e19b7 commit b0adda4

File tree

2 files changed

+18
-17
lines changed

2 files changed

+18
-17
lines changed

async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/health/DomainRabbitReactiveHealthIndicator.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,35 @@
22

33
import com.rabbitmq.client.Connection;
44
import com.rabbitmq.client.ConnectionFactory;
5-
import lombok.RequiredArgsConstructor;
65
import lombok.SneakyThrows;
76
import lombok.extern.log4j.Log4j2;
8-
import org.reactivecommons.async.rabbit.config.ConnectionFactoryProvider;
97
import org.reactivecommons.async.rabbit.config.ConnectionManager;
108
import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator;
119
import org.springframework.boot.actuate.health.Health;
1210
import reactor.core.publisher.Mono;
1311

1412
import java.net.SocketException;
13+
import java.util.Map;
1514
import java.util.stream.Collectors;
1615

1716
@Log4j2
18-
@RequiredArgsConstructor
1917
public class DomainRabbitReactiveHealthIndicator extends AbstractReactiveHealthIndicator {
2018
private static final String VERSION = "version";
21-
private final ConnectionManager manager;
19+
private final Map<String, ConnectionFactory> domainProviders;
20+
21+
public DomainRabbitReactiveHealthIndicator(ConnectionManager manager) {
22+
this.domainProviders = manager.getProviders().entrySet().stream()
23+
.collect(Collectors.toMap(Map.Entry::getKey, entry -> {
24+
ConnectionFactory connection = entry.getValue().getProvider().getConnectionFactory().clone();
25+
connection.useBlockingIo();
26+
return connection;
27+
}));
28+
}
2229

2330
@Override
2431
protected Mono<Health> doHealthCheck(Health.Builder builder) {
25-
return Mono.zip(manager.getProviders()
26-
.entrySet()
27-
.stream()
28-
.map(entry -> checkSingle(entry.getKey(), entry.getValue().getProvider()))
32+
return Mono.zip(domainProviders.entrySet().stream()
33+
.map(entry -> checkSingle(entry.getKey(), entry.getValue()))
2934
.collect(Collectors.toList()), this::merge);
3035
}
3136

@@ -38,17 +43,11 @@ private Health merge(Object[] results) {
3843
return builder.build();
3944
}
4045

41-
private Mono<Status> checkSingle(String domain, ConnectionFactoryProvider provider) {
42-
return Mono.defer(() -> getVersion(provider))
46+
private Mono<Status> checkSingle(String domain, ConnectionFactory connectionFactory) {
47+
return Mono.defer(() -> Mono.just(getRawVersion(connectionFactory)))
4348
.map(version -> Status.builder().version(version).domain(domain).build());
4449
}
4550

46-
private Mono<String> getVersion(ConnectionFactoryProvider provider) {
47-
return Mono.just(provider)
48-
.map(ConnectionFactoryProvider::getConnectionFactory)
49-
.map(this::getRawVersion);
50-
}
51-
5251
@SneakyThrows
5352
private String getRawVersion(ConnectionFactory factory) {
5453
Connection connection = null;

async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/health/DomainRabbitReactiveHealthIndicatorTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,14 @@ public class DomainRabbitReactiveHealthIndicatorTest {
3737

3838
@BeforeEach
3939
void setup() {
40+
when(provider.getConnectionFactory()).thenReturn(factory);
41+
when(factory.clone()).thenReturn(factory);
42+
4043
ConnectionManager connectionManager = new ConnectionManager();
4144
connectionManager.addDomain(DEFAULT_DOMAIN, null, null, provider);
4245
connectionManager.addDomain("domain2", null, null, provider);
4346
connectionManager.addDomain("domain3", null, null, provider);
4447
indicator = new DomainRabbitReactiveHealthIndicator(connectionManager);
45-
when(provider.getConnectionFactory()).thenReturn(factory);
4648
}
4749

4850
@Test

0 commit comments

Comments
 (0)