Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ protected void prepareBeforeSend(final Builder builder) throws IOException {
*/
protected boolean ensureChannel(final AtomicReference<Channel> channelReference) throws IOException {
synchronized (this) {
final Channel channel = channelReference.get();
if (running && (channel == null || !channel.isOpen())) {
final Channel chn = channelReference.get();
if (running && (chn == null || !chn.isOpen())) {
channelReference.set(getConnection().createChannel());
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package nl.aerius.taskmanager.client;

import java.util.Date;
import java.util.Map;
import java.util.Optional;

Expand Down Expand Up @@ -53,7 +52,7 @@ public TaskMetrics(final Map<String, Object> properties) {
* Creates a new metrics
*/
public TaskMetrics() {
this.startTime = new Date().getTime();
this.startTime = System.currentTimeMillis();
this.queueName = "";
}

Expand All @@ -63,7 +62,7 @@ public static long longValue(final Map<String, Object> messageMetaData, final St

public static String stringValue(final Map<String, Object> messageMetaData, final String key) {
return Optional.ofNullable(messageMetaData.get(key))
.filter(t -> t instanceof LongString)
.filter(LongString.class::isInstance)
.map(t -> new String(((LongString) t).getBytes()))
.orElse("");
}
Expand All @@ -73,7 +72,7 @@ public long duration() {
}

public TaskMetrics determineDuration() {
this.duration = startTime > 0 ? new Date().getTime() - startTime : 0;
this.duration = startTime > 0 ? System.currentTimeMillis() - startTime : 0;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
*/
package nl.aerius.taskmanager.client.configuration;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

/**
* Configuration object for different (queue) properties.
*/
public class ConnectionConfiguration {
public final class ConnectionConfiguration {

/**
* RabbitMQ default port.
Expand Down Expand Up @@ -203,26 +207,8 @@ public boolean equals(final Object obj) {

@Override
public int hashCode() {
int h$ = 1;
h$ *= 1000003;
h$ ^= brokerHost.hashCode();
h$ *= 1000003;
h$ ^= brokerPort;
h$ *= 1000003;
h$ ^= brokerUsername.hashCode();
h$ *= 1000003;
h$ ^= brokerPassword.hashCode();
h$ *= 1000003;
h$ ^= brokerVirtualHost.hashCode();
h$ *= 1000003;
h$ ^= brokerManagementPort;
h$ *= 1000003;
h$ ^= brokerManagementRefreshRate;
h$ *= 1000003;
h$ ^= brokerRetryWaitTime;
h$ *= 1000003;
h$ ^= brokerMaxInboundMessageBodySize;
return h$;
return Objects.hash(brokerHost.hashCode(), brokerPort, brokerUsername.hashCode(), brokerPassword.hashCode(), brokerVirtualHost.hashCode(),
brokerManagementPort, brokerManagementRefreshRate, brokerRetryWaitTime, brokerMaxInboundMessageBodySize);
}

public static final class Builder {
Expand Down Expand Up @@ -255,9 +241,6 @@ private static void checkBlank(final String name, final String value) {
}

public ConnectionConfiguration.Builder brokerHost(final String brokerHost) {
if (brokerHost == null) {
throw new IllegalArgumentException("brokerHost null");
}
this.brokerHost = brokerHost;
return this;
}
Expand All @@ -268,25 +251,16 @@ public ConnectionConfiguration.Builder brokerPort(final int brokerPort) {
}

public ConnectionConfiguration.Builder brokerUsername(final String brokerUsername) {
if (brokerUsername == null) {
throw new IllegalArgumentException("brokerUsername null");
}
this.brokerUsername = brokerUsername;
return this;
}

public ConnectionConfiguration.Builder brokerPassword(final String brokerPassword) {
if (brokerPassword == null) {
throw new IllegalArgumentException("brokerPassword null");
}
this.brokerPassword = brokerPassword;
return this;
}

public ConnectionConfiguration.Builder brokerVirtualHost(final String brokerVirtualHost) {
if (brokerVirtualHost == null) {
throw new IllegalArgumentException("brokerVirtualHost null");
}
this.brokerVirtualHost = brokerVirtualHost;
return this;
}
Expand All @@ -312,36 +286,19 @@ public Builder brokerMaxInboundMessageBodySize(final int brokerMaxInboundMessage
}

ConnectionConfiguration autoBuild() {
String missing = "";
if (this.brokerHost == null) {
missing += " brokerHost";
}
if (this.brokerPort == null) {
missing += " brokerPort";
}
if (this.brokerUsername == null) {
missing += " brokerUsername";
}
if (this.brokerPassword == null) {
missing += " brokerPassword";
}
if (this.brokerVirtualHost == null) {
missing += " brokerVirtualHost";
}
if (this.brokerManagementPort == null) {
missing += " brokerManagementPort";
}
if (this.brokerManagementRefreshRate == null) {
missing += " brokerManagementRefreshRate";
}
if (this.brokerRetryWaitTime == null) {
missing += " brokerRetryWaitTime";
}
if (this.brokerMaxInboundMessageBodySize == null) {
missing += " brokerMaxInboundMessageBodySize";
}
if (!missing.isEmpty()) {
throw new IllegalStateException("Missing required properties:" + missing);
final List<String> missings = new ArrayList<>();

addMissing(missings, this.brokerHost, "brokerHost");
addMissing(missings, this.brokerPort, "brokerPort");
addMissing(missings, this.brokerUsername, "brokerUsername");
addMissing(missings, this.brokerPassword, "brokerPassword");
addMissing(missings, this.brokerVirtualHost, "brokerVirtualHost");
addMissing(missings, this.brokerManagementPort, "brokerManagementPort");
addMissing(missings, this.brokerManagementRefreshRate, "brokerManagementRefreshRate");
addMissing(missings, this.brokerRetryWaitTime, "brokerRetryWaitTime");
addMissing(missings, this.brokerMaxInboundMessageBodySize, "brokerMaxInboundMessageBodySize");
if (!missings.isEmpty()) {
throw new IllegalArgumentException("Missing required properties: " + String.join(", ", missings));
}
return new ConnectionConfiguration(
this.brokerHost,
Expand All @@ -354,5 +311,11 @@ ConnectionConfiguration autoBuild() {
this.brokerRetryWaitTime,
this.brokerMaxInboundMessageBodySize);
}

private final void addMissing(final List<String> missings, final Object value, final String text) {
if (value == null) {
missings.add(text);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,37 +139,7 @@ private void stopAndStartConsumer() throws IOException {
channel.exchangeDeclare(AERIUS_EVENT_EXCHANGE, EXCHANGE_TYPE);
queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, AERIUS_EVENT_EXCHANGE, "");
consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(final String consumerTag, final Envelope envelope, final AMQP.BasicProperties properties, final byte[] body)
throws IOException {
RabbitMQWorkerMonitor.this.handleDelivery(properties);
}

@Override
public void handleShutdownSignal(final String consumerTag, final ShutdownSignalException sig) {
if (sig.isInitiatedByApplication()) {
isShutdown = true;
LOG.info("Worker event monitor {} was shut down by the application.", consumerTag);
} else {
LOG.debug("Worker event monitor {} was shut down.", consumerTag);
// restart
try {
try {
channel.abort();
} catch (final IOException e) {
// Eat error when closing channel.
}
if (!tryStartingConsuming.get()) {
start();
LOG.info("Restarted worker event monitor {}", consumerTag);
}
} catch (final IOException e) {
LOG.debug("Worker event monitor restart failed", e);
}
}
}
};
consumer = new MonitorConsumer(channel);
channel.basicConsume(queueName, true, consumer);
}
}
Expand Down Expand Up @@ -228,4 +198,44 @@ public void shutdown() {
LOG.trace("Worker event monitor shutdown failed", e);
}
}

private class MonitorConsumer extends DefaultConsumer {
public MonitorConsumer(final Channel channel) {
super(channel);
}

@Override
public void handleDelivery(final String consumerTag, final Envelope envelope, final AMQP.BasicProperties properties, final byte[] body)
throws IOException {
RabbitMQWorkerMonitor.this.handleDelivery(properties);
}

@Override
public void handleShutdownSignal(final String consumerTag, final ShutdownSignalException sig) {
if (sig.isInitiatedByApplication()) {
isShutdown = true;
LOG.info("Worker event monitor {} was shut down by the application.", consumerTag);
} else {
LOG.debug("Worker event monitor {} was shut down.", consumerTag);
// restart
try {
abort();
if (!tryStartingConsuming.get()) {
start();
LOG.info("Restarted worker event monitor {}", consumerTag);
}
} catch (final IOException e) {
LOG.debug("Worker event monitor restart failed", e);
}
}
}

private void abort() {
try {
channel.abort();
} catch (final IOException e) {
// Eat error when closing channel.
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,74 +46,56 @@ static void afterClass() {

@Test
void testTaskManagerClientWithoutBrokerHost() {
assertThrows(IllegalArgumentException.class, () -> {
final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder();
builder.brokerHost(null);
new BrokerConnectionFactory(executor, builder.build());
});
final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder().brokerHost(null);

assertThrows(IllegalArgumentException.class, () -> builder.build(), "Expected IllegalArgumentException when no broker host is set.");
}

@Test
void testTaskManagerClientWithEmptyBrokerHost() {
assertThrows(IllegalArgumentException.class, () -> {
final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder();
builder.brokerHost("");
new BrokerConnectionFactory(executor, builder.build());
});
final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder().brokerHost("");

assertThrows(IllegalArgumentException.class, () -> builder.build(), "Expected IllegalArgumentException when broker host is empty.");
}

@Test
void testTaskManagerClientWithoutBrokerUsername() {
assertThrows(IllegalArgumentException.class, () -> {
final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder();
builder.brokerUsername(null);
new BrokerConnectionFactory(executor, builder.build());
});
final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder().brokerUsername(null);

assertThrows(IllegalArgumentException.class, () -> builder.build(), "Expected IllegalArgumentException when no broker username is set.");
}

@Test
void testTaskManagerClientWithEmptyBrokerUsername() {
assertThrows(IllegalArgumentException.class, () -> {
final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder();
builder.brokerUsername("");
new BrokerConnectionFactory(executor, builder.build());
});
final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder().brokerUsername("");

assertThrows(IllegalArgumentException.class, () -> builder.build(), "Expected IllegalArgumentException when broker username is empty.");
}

@Test
void testTaskManagerClientWithoutBrokerPassword() {
assertThrows(IllegalArgumentException.class, () -> {
final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder();
builder.brokerPassword(null);
new BrokerConnectionFactory(executor, builder.build());
});
final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder().brokerPassword(null);

assertThrows(IllegalArgumentException.class, () -> builder.build(), "Expected IllegalArgumentException when no broker password is set.");
}

@Test
void testTaskManagerClientWithEmptyBrokerPassword() {
assertThrows(IllegalArgumentException.class, () -> {
final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder();
builder.brokerPassword("");
new BrokerConnectionFactory(executor, builder.build());
});
final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder().brokerPassword("");
assertThrows(IllegalArgumentException.class, () -> builder.build(), "Expected IllegalArgumentException when broker password is empty.");
}

@Test
void testTaskManagerClientWithoutBrokerVirtualHost() {
assertThrows(IllegalArgumentException.class, () -> {
final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder();
builder.brokerVirtualHost(null);
new BrokerConnectionFactory(executor, builder.build());
});
final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder().brokerVirtualHost(null);
assertThrows(IllegalArgumentException.class, () -> builder.build(), "Expected IllegalArgumentException when no broker virtual host is set.");
}

@Test
void testTaskManagerClientWithEmptyBrokerVirtualHost() {
assertThrows(IllegalArgumentException.class, () -> {
final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder();
builder.brokerVirtualHost("");
new BrokerConnectionFactory(executor, builder.build());
});
final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder().brokerVirtualHost("");

assertThrows(IllegalArgumentException.class, () -> builder.build(), "Expected IllegalArgumentException when broker virtual host is empty.");
}

private ConnectionConfiguration.Builder getFullConnectionConfigurationBuilder() {
Expand Down
Loading