diff --git a/source/taskmanager-client/src/main/java/nl/aerius/taskmanager/client/TaskManagerClientSender.java b/source/taskmanager-client/src/main/java/nl/aerius/taskmanager/client/TaskManagerClientSender.java index b46ab28..64a523d 100644 --- a/source/taskmanager-client/src/main/java/nl/aerius/taskmanager/client/TaskManagerClientSender.java +++ b/source/taskmanager-client/src/main/java/nl/aerius/taskmanager/client/TaskManagerClientSender.java @@ -151,8 +151,8 @@ protected void prepareBeforeSend(final Builder builder) throws IOException { */ protected boolean ensureChannel(final AtomicReference channelReference) throws IOException { synchronized (this) { - final Channel channel = channelReference.get(); - if (running && (channel == null || !channel.isOpen())) { + final Channel chn = channelReference.get(); + if (running && (chn == null || !chn.isOpen())) { channelReference.set(getConnection().createChannel()); return true; } diff --git a/source/taskmanager-client/src/main/java/nl/aerius/taskmanager/client/TaskMetrics.java b/source/taskmanager-client/src/main/java/nl/aerius/taskmanager/client/TaskMetrics.java index 0ff618c..e327ed2 100644 --- a/source/taskmanager-client/src/main/java/nl/aerius/taskmanager/client/TaskMetrics.java +++ b/source/taskmanager-client/src/main/java/nl/aerius/taskmanager/client/TaskMetrics.java @@ -16,7 +16,6 @@ */ package nl.aerius.taskmanager.client; -import java.util.Date; import java.util.Map; import java.util.Optional; @@ -53,7 +52,7 @@ public TaskMetrics(final Map properties) { * Creates a new metrics */ public TaskMetrics() { - this.startTime = new Date().getTime(); + this.startTime = System.currentTimeMillis(); this.queueName = ""; } @@ -63,7 +62,7 @@ public static long longValue(final Map messageMetaData, final St public static String stringValue(final Map messageMetaData, final String key) { return Optional.ofNullable(messageMetaData.get(key)) - .filter(t -> t instanceof LongString) + .filter(LongString.class::isInstance) .map(t -> new String(((LongString) t).getBytes())) .orElse(""); } @@ -73,7 +72,7 @@ public long duration() { } public TaskMetrics determineDuration() { - this.duration = startTime > 0 ? new Date().getTime() - startTime : 0; + this.duration = startTime > 0 ? System.currentTimeMillis() - startTime : 0; return this; } diff --git a/source/taskmanager-client/src/main/java/nl/aerius/taskmanager/client/configuration/ConnectionConfiguration.java b/source/taskmanager-client/src/main/java/nl/aerius/taskmanager/client/configuration/ConnectionConfiguration.java index 39b8bea..1058233 100644 --- a/source/taskmanager-client/src/main/java/nl/aerius/taskmanager/client/configuration/ConnectionConfiguration.java +++ b/source/taskmanager-client/src/main/java/nl/aerius/taskmanager/client/configuration/ConnectionConfiguration.java @@ -16,10 +16,14 @@ */ package nl.aerius.taskmanager.client.configuration; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + /** * Configuration object for different (queue) properties. */ -public class ConnectionConfiguration { +public final class ConnectionConfiguration { /** * RabbitMQ default port. @@ -203,26 +207,8 @@ public boolean equals(final Object obj) { @Override public int hashCode() { - int h$ = 1; - h$ *= 1000003; - h$ ^= brokerHost.hashCode(); - h$ *= 1000003; - h$ ^= brokerPort; - h$ *= 1000003; - h$ ^= brokerUsername.hashCode(); - h$ *= 1000003; - h$ ^= brokerPassword.hashCode(); - h$ *= 1000003; - h$ ^= brokerVirtualHost.hashCode(); - h$ *= 1000003; - h$ ^= brokerManagementPort; - h$ *= 1000003; - h$ ^= brokerManagementRefreshRate; - h$ *= 1000003; - h$ ^= brokerRetryWaitTime; - h$ *= 1000003; - h$ ^= brokerMaxInboundMessageBodySize; - return h$; + return Objects.hash(brokerHost.hashCode(), brokerPort, brokerUsername.hashCode(), brokerPassword.hashCode(), brokerVirtualHost.hashCode(), + brokerManagementPort, brokerManagementRefreshRate, brokerRetryWaitTime, brokerMaxInboundMessageBodySize); } public static final class Builder { @@ -255,9 +241,6 @@ private static void checkBlank(final String name, final String value) { } public ConnectionConfiguration.Builder brokerHost(final String brokerHost) { - if (brokerHost == null) { - throw new IllegalArgumentException("brokerHost null"); - } this.brokerHost = brokerHost; return this; } @@ -268,25 +251,16 @@ public ConnectionConfiguration.Builder brokerPort(final int brokerPort) { } public ConnectionConfiguration.Builder brokerUsername(final String brokerUsername) { - if (brokerUsername == null) { - throw new IllegalArgumentException("brokerUsername null"); - } this.brokerUsername = brokerUsername; return this; } public ConnectionConfiguration.Builder brokerPassword(final String brokerPassword) { - if (brokerPassword == null) { - throw new IllegalArgumentException("brokerPassword null"); - } this.brokerPassword = brokerPassword; return this; } public ConnectionConfiguration.Builder brokerVirtualHost(final String brokerVirtualHost) { - if (brokerVirtualHost == null) { - throw new IllegalArgumentException("brokerVirtualHost null"); - } this.brokerVirtualHost = brokerVirtualHost; return this; } @@ -312,36 +286,19 @@ public Builder brokerMaxInboundMessageBodySize(final int brokerMaxInboundMessage } ConnectionConfiguration autoBuild() { - String missing = ""; - if (this.brokerHost == null) { - missing += " brokerHost"; - } - if (this.brokerPort == null) { - missing += " brokerPort"; - } - if (this.brokerUsername == null) { - missing += " brokerUsername"; - } - if (this.brokerPassword == null) { - missing += " brokerPassword"; - } - if (this.brokerVirtualHost == null) { - missing += " brokerVirtualHost"; - } - if (this.brokerManagementPort == null) { - missing += " brokerManagementPort"; - } - if (this.brokerManagementRefreshRate == null) { - missing += " brokerManagementRefreshRate"; - } - if (this.brokerRetryWaitTime == null) { - missing += " brokerRetryWaitTime"; - } - if (this.brokerMaxInboundMessageBodySize == null) { - missing += " brokerMaxInboundMessageBodySize"; - } - if (!missing.isEmpty()) { - throw new IllegalStateException("Missing required properties:" + missing); + final List missings = new ArrayList<>(); + + addMissing(missings, this.brokerHost, "brokerHost"); + addMissing(missings, this.brokerPort, "brokerPort"); + addMissing(missings, this.brokerUsername, "brokerUsername"); + addMissing(missings, this.brokerPassword, "brokerPassword"); + addMissing(missings, this.brokerVirtualHost, "brokerVirtualHost"); + addMissing(missings, this.brokerManagementPort, "brokerManagementPort"); + addMissing(missings, this.brokerManagementRefreshRate, "brokerManagementRefreshRate"); + addMissing(missings, this.brokerRetryWaitTime, "brokerRetryWaitTime"); + addMissing(missings, this.brokerMaxInboundMessageBodySize, "brokerMaxInboundMessageBodySize"); + if (!missings.isEmpty()) { + throw new IllegalArgumentException("Missing required properties: " + String.join(", ", missings)); } return new ConnectionConfiguration( this.brokerHost, @@ -354,5 +311,11 @@ ConnectionConfiguration autoBuild() { this.brokerRetryWaitTime, this.brokerMaxInboundMessageBodySize); } + + private final void addMissing(final List missings, final Object value, final String text) { + if (value == null) { + missings.add(text); + } + } } } diff --git a/source/taskmanager-client/src/main/java/nl/aerius/taskmanager/client/mq/RabbitMQWorkerMonitor.java b/source/taskmanager-client/src/main/java/nl/aerius/taskmanager/client/mq/RabbitMQWorkerMonitor.java index 03ee343..928533e 100644 --- a/source/taskmanager-client/src/main/java/nl/aerius/taskmanager/client/mq/RabbitMQWorkerMonitor.java +++ b/source/taskmanager-client/src/main/java/nl/aerius/taskmanager/client/mq/RabbitMQWorkerMonitor.java @@ -139,37 +139,7 @@ private void stopAndStartConsumer() throws IOException { channel.exchangeDeclare(AERIUS_EVENT_EXCHANGE, EXCHANGE_TYPE); queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, AERIUS_EVENT_EXCHANGE, ""); - consumer = new DefaultConsumer(channel) { - @Override - public void handleDelivery(final String consumerTag, final Envelope envelope, final AMQP.BasicProperties properties, final byte[] body) - throws IOException { - RabbitMQWorkerMonitor.this.handleDelivery(properties); - } - - @Override - public void handleShutdownSignal(final String consumerTag, final ShutdownSignalException sig) { - if (sig.isInitiatedByApplication()) { - isShutdown = true; - LOG.info("Worker event monitor {} was shut down by the application.", consumerTag); - } else { - LOG.debug("Worker event monitor {} was shut down.", consumerTag); - // restart - try { - try { - channel.abort(); - } catch (final IOException e) { - // Eat error when closing channel. - } - if (!tryStartingConsuming.get()) { - start(); - LOG.info("Restarted worker event monitor {}", consumerTag); - } - } catch (final IOException e) { - LOG.debug("Worker event monitor restart failed", e); - } - } - } - }; + consumer = new MonitorConsumer(channel); channel.basicConsume(queueName, true, consumer); } } @@ -228,4 +198,44 @@ public void shutdown() { LOG.trace("Worker event monitor shutdown failed", e); } } + + private class MonitorConsumer extends DefaultConsumer { + public MonitorConsumer(final Channel channel) { + super(channel); + } + + @Override + public void handleDelivery(final String consumerTag, final Envelope envelope, final AMQP.BasicProperties properties, final byte[] body) + throws IOException { + RabbitMQWorkerMonitor.this.handleDelivery(properties); + } + + @Override + public void handleShutdownSignal(final String consumerTag, final ShutdownSignalException sig) { + if (sig.isInitiatedByApplication()) { + isShutdown = true; + LOG.info("Worker event monitor {} was shut down by the application.", consumerTag); + } else { + LOG.debug("Worker event monitor {} was shut down.", consumerTag); + // restart + try { + abort(); + if (!tryStartingConsuming.get()) { + start(); + LOG.info("Restarted worker event monitor {}", consumerTag); + } + } catch (final IOException e) { + LOG.debug("Worker event monitor restart failed", e); + } + } + } + + private void abort() { + try { + channel.abort(); + } catch (final IOException e) { + // Eat error when closing channel. + } + } + } } diff --git a/source/taskmanager-client/src/test/java/nl/aerius/taskmanager/client/BrokerConnectionFactoryTest.java b/source/taskmanager-client/src/test/java/nl/aerius/taskmanager/client/BrokerConnectionFactoryTest.java index 48b2f52..904eb05 100644 --- a/source/taskmanager-client/src/test/java/nl/aerius/taskmanager/client/BrokerConnectionFactoryTest.java +++ b/source/taskmanager-client/src/test/java/nl/aerius/taskmanager/client/BrokerConnectionFactoryTest.java @@ -46,74 +46,56 @@ static void afterClass() { @Test void testTaskManagerClientWithoutBrokerHost() { - assertThrows(IllegalArgumentException.class, () -> { - final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder(); - builder.brokerHost(null); - new BrokerConnectionFactory(executor, builder.build()); - }); + final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder().brokerHost(null); + + assertThrows(IllegalArgumentException.class, () -> builder.build(), "Expected IllegalArgumentException when no broker host is set."); } @Test void testTaskManagerClientWithEmptyBrokerHost() { - assertThrows(IllegalArgumentException.class, () -> { - final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder(); - builder.brokerHost(""); - new BrokerConnectionFactory(executor, builder.build()); - }); + final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder().brokerHost(""); + + assertThrows(IllegalArgumentException.class, () -> builder.build(), "Expected IllegalArgumentException when broker host is empty."); } @Test void testTaskManagerClientWithoutBrokerUsername() { - assertThrows(IllegalArgumentException.class, () -> { - final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder(); - builder.brokerUsername(null); - new BrokerConnectionFactory(executor, builder.build()); - }); + final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder().brokerUsername(null); + + assertThrows(IllegalArgumentException.class, () -> builder.build(), "Expected IllegalArgumentException when no broker username is set."); } @Test void testTaskManagerClientWithEmptyBrokerUsername() { - assertThrows(IllegalArgumentException.class, () -> { - final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder(); - builder.brokerUsername(""); - new BrokerConnectionFactory(executor, builder.build()); - }); + final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder().brokerUsername(""); + + assertThrows(IllegalArgumentException.class, () -> builder.build(), "Expected IllegalArgumentException when broker username is empty."); } @Test void testTaskManagerClientWithoutBrokerPassword() { - assertThrows(IllegalArgumentException.class, () -> { - final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder(); - builder.brokerPassword(null); - new BrokerConnectionFactory(executor, builder.build()); - }); + final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder().brokerPassword(null); + + assertThrows(IllegalArgumentException.class, () -> builder.build(), "Expected IllegalArgumentException when no broker password is set."); } @Test void testTaskManagerClientWithEmptyBrokerPassword() { - assertThrows(IllegalArgumentException.class, () -> { - final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder(); - builder.brokerPassword(""); - new BrokerConnectionFactory(executor, builder.build()); - }); + final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder().brokerPassword(""); + assertThrows(IllegalArgumentException.class, () -> builder.build(), "Expected IllegalArgumentException when broker password is empty."); } @Test void testTaskManagerClientWithoutBrokerVirtualHost() { - assertThrows(IllegalArgumentException.class, () -> { - final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder(); - builder.brokerVirtualHost(null); - new BrokerConnectionFactory(executor, builder.build()); - }); + final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder().brokerVirtualHost(null); + assertThrows(IllegalArgumentException.class, () -> builder.build(), "Expected IllegalArgumentException when no broker virtual host is set."); } @Test void testTaskManagerClientWithEmptyBrokerVirtualHost() { - assertThrows(IllegalArgumentException.class, () -> { - final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder(); - builder.brokerVirtualHost(""); - new BrokerConnectionFactory(executor, builder.build()); - }); + final ConnectionConfiguration.Builder builder = getFullConnectionConfigurationBuilder().brokerVirtualHost(""); + + assertThrows(IllegalArgumentException.class, () -> builder.build(), "Expected IllegalArgumentException when broker virtual host is empty."); } private ConnectionConfiguration.Builder getFullConnectionConfigurationBuilder() { diff --git a/source/taskmanager-client/src/test/java/nl/aerius/taskmanager/client/TaskManagerClientSenderTest.java b/source/taskmanager-client/src/test/java/nl/aerius/taskmanager/client/TaskManagerClientSenderTest.java index 15590c4..4fecfb8 100644 --- a/source/taskmanager-client/src/test/java/nl/aerius/taskmanager/client/TaskManagerClientSenderTest.java +++ b/source/taskmanager-client/src/test/java/nl/aerius/taskmanager/client/TaskManagerClientSenderTest.java @@ -30,7 +30,6 @@ import java.io.NotSerializableException; import java.io.Serializable; import java.util.UUID; -import java.util.concurrent.TimeoutException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -59,7 +58,7 @@ class TaskManagerClientSenderTest { private Connection connection; @BeforeEach - void setUp() throws IOException, TimeoutException { + void setUp() { connection = MockConnectionUtil.mockConnection(); lenient().doReturn(connection).when(factory).getConnection(); lenient().doReturn(true).when(factory).isOpen(); @@ -72,7 +71,7 @@ void tearDown() { } @Test - void testSendTask() throws IOException, InterruptedException, ClassNotFoundException { + void testSendTask() throws IOException, ClassNotFoundException { final ArgumentCaptor propertiesCaptor = ArgumentCaptor.forClass(BasicProperties.class); final ArgumentCaptor dataCaptor = ArgumentCaptor.forClass(byte[].class); final Serializable input = new MockTaskInput(1); @@ -111,8 +110,9 @@ void testSendTasksWithNullId() throws IOException { } @Test - void testTaskManagerClientWithoutConnectionConfiguration() throws IOException { - assertThrows(IllegalArgumentException.class, () -> taskManagerClient = new TaskManagerClientSender(null)); + void testTaskManagerClientWithoutConnectionConfiguration() { + assertThrows(IllegalArgumentException.class, () -> taskManagerClient = new TaskManagerClientSender(null), + "Expected IllegalArgumentException when configuration is null."); } @Test @@ -131,7 +131,7 @@ void testSendUnserializableTask() { private static final long serialVersionUID = 7681080846084936169L; }; taskManagerClient.sendTask(input, NORMAL_TASK_ID, NORMAL_TASK_ID, TASK_QUEUE_NAME); - }); + }, "Expected NotSerializableException for unserialisable object."); } /** @@ -152,7 +152,7 @@ void testSendTaskAfterExit() { assertThrows(IllegalStateException.class, () -> { taskManagerClient.close(); testSendTask(); - }); + }, "Expected IllegalStateException for sending task after sender was shutdown."); } /** @@ -161,7 +161,8 @@ void testSendTaskAfterExit() { @Test void testSendTaskToNullQueue() { assertThrows(IllegalArgumentException.class, - () -> taskManagerClient.sendTask(new MockTaskInput(1), NORMAL_TASK_ID, NORMAL_TASK_ID, null)); + () -> taskManagerClient.sendTask(new MockTaskInput(1), NORMAL_TASK_ID, NORMAL_TASK_ID, null), + "Expected IllegalArgumentException when the queue passed is null."); } /** @@ -169,7 +170,8 @@ void testSendTaskToNullQueue() { */ @Test void testSendNullObjectAsTask() { - assertThrows(IllegalArgumentException.class, () -> taskManagerClient.sendTask(null, NORMAL_TASK_ID, NORMAL_TASK_ID, TASK_QUEUE_NAME)); + assertThrows(IllegalArgumentException.class, () -> taskManagerClient.sendTask(null, NORMAL_TASK_ID, NORMAL_TASK_ID, TASK_QUEUE_NAME), + "Expected IllegalArgumentException when a null object is given to be send."); } static record MockTaskInput(int aNumber) implements Serializable { diff --git a/source/taskmanager-client/src/test/java/nl/aerius/taskmanager/client/TaskManagerClientSenderWithCallbackTest.java b/source/taskmanager-client/src/test/java/nl/aerius/taskmanager/client/TaskManagerClientSenderWithCallbackTest.java index 162e44a..97b9e9e 100644 --- a/source/taskmanager-client/src/test/java/nl/aerius/taskmanager/client/TaskManagerClientSenderWithCallbackTest.java +++ b/source/taskmanager-client/src/test/java/nl/aerius/taskmanager/client/TaskManagerClientSenderWithCallbackTest.java @@ -27,7 +27,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeoutException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -51,7 +50,7 @@ class TaskManagerClientSenderWithCallbackTest { private @Mock MockTaskResultHandler mockTaskResultHandler; @BeforeEach - void setUp() throws IOException, TimeoutException { + void setUp() throws IOException { final Connection connection = mock(Connection.class); doAnswer(a -> { final Channel channel = mock(Channel.class); diff --git a/source/taskmanager-client/src/test/java/nl/aerius/taskmanager/client/WorkerResultSenderTest.java b/source/taskmanager-client/src/test/java/nl/aerius/taskmanager/client/WorkerResultSenderTest.java index 20ab28e..b8c8be1 100644 --- a/source/taskmanager-client/src/test/java/nl/aerius/taskmanager/client/WorkerResultSenderTest.java +++ b/source/taskmanager-client/src/test/java/nl/aerius/taskmanager/client/WorkerResultSenderTest.java @@ -107,7 +107,6 @@ private Map assertProperties() { final BasicProperties replyProperties = propertiesCaptor.getValue(); assertEquals("1", replyProperties.getCorrelationId(), "Expected same correlationid"); assertEquals("2", replyProperties.getMessageId(), "Expected same messageid"); - final Map replyHeaders = replyProperties.getHeaders(); - return replyHeaders; + return replyProperties.getHeaders(); } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/Main.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/Main.java index e745aa1..2efe69c 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/Main.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/Main.java @@ -54,12 +54,11 @@ private Main() { * When this main method is used, the task manager will be started. * * @param args no arguments needed, but if supplied, they should fit the description given by using -help. - * @throws IOException When an error occurred reading a file during configuration. - * @throws ParseException When command line option parsing failed */ - public static void main(final String[] args) throws IOException, ParseException { - final CmdOptions cmdOptions = new CmdOptions(args); - if (cmdOptions.printIfInfoOption()) { + public static void main(final String[] args) { + final CmdOptions cmdOptions = cmdOptions(args); + + if (cmdOptions == null || cmdOptions.printIfInfoOption()) { return; } LOG.info("--------------------------------TASKMANAGER STARTED------------------------------------"); @@ -74,6 +73,17 @@ public void run() { try (final ExecutorService executorService = Executors.newCachedThreadPool(); final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(THREAD_POOL_SIZE)) { startupFromConfiguration(executorService, scheduledExecutorService, cmdOptions.getConfigFile()); + } catch (final IOException e) { + LOG.error("IOException during startup.", e); + } + } + + private static CmdOptions cmdOptions(final String[] args) { + try { + return new CmdOptions(args); + } catch (final ParseException e) { + LOG.error("Command line options could not be parsed.", e); + return null; } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskDispatcher.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskDispatcher.java index e54ae14..65e9db9 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskDispatcher.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskDispatcher.java @@ -157,7 +157,7 @@ private void taskDeliveryFailed(final Task task) { } } - private void taskAbortedOnDuplicateMessageId(final Task task) { + private static void taskAbortedOnDuplicateMessageId(final Task task) { try { task.getTaskConsumer().messageDeliveryAborted(task.getMessage(), new RuntimeException("Duplicate messageId found for task" + task.getMessage().getMessageId())); diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java index c977852..739d19d 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java @@ -194,8 +194,8 @@ private void updateNumberOfWorkers(final int numberOfWorkers) { if (deltaWorkers > 0) { freeWorkers.release(deltaWorkers); LOG.info("# Workers of {} increased to {}(+{})", workerQueueName, totalConfiguredWorkers, deltaWorkers); - } else if ((deltaWorkers < 0) && (freeWorkers.availablePermits() > 0)) { - freeWorkers.tryAcquire(Math.min(freeWorkers.availablePermits(), -deltaWorkers)); + } else if ((deltaWorkers < 0) && (freeWorkers.availablePermits() > 0) + && freeWorkers.tryAcquire(Math.min(freeWorkers.availablePermits(), -deltaWorkers))) { LOG.info("# Workers of {} decreased to {}({})", workerQueueName, totalConfiguredWorkers, deltaWorkers); } if (previousTotalConfiguredWorkers != totalConfiguredWorkers) { diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java index 60fb1e5..cb2325a 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java @@ -56,6 +56,8 @@ public class PerformanceMetricsReporter implements WorkerFinishedHandler { private static final String DISPATCH = "Avg dispatch wait time "; private static final String WORK = "Avg work duration "; + private static final int UPDATE_TIME_SECONDS = 60; + private final DoubleGauge dispatchedWorkerCountGauge; private final DoubleGauge dispatchedWorkerWaitGauge; private final DoubleGauge dispatchedQueueCountGauge; @@ -66,8 +68,6 @@ public class PerformanceMetricsReporter implements WorkerFinishedHandler { private final DoubleGauge workQueueCountGauge; private final DoubleGauge workQueueDurationGauge; - private static final int UPDATE_TIME_SECONDS = 60; - private final Map dispatchedQueueMetrics = new HashMap<>(); private final DurationMetric dispatchedWorkerMetrics; private final Map workQueueMetrics = new HashMap<>(); @@ -157,14 +157,14 @@ private synchronized void update() { } } - private void metrics(final String prefixText, final Map metrics, final DoubleGauge gauge, + private static void metrics(final String prefixText, final Map metrics, final DoubleGauge gauge, final DoubleGauge waitGauge) { for (final Entry entry : metrics.entrySet()) { metrics(prefixText, gauge, waitGauge, entry.getKey(), entry.getValue()); } } - private void metrics(final String prefixText, final DoubleGauge gauge, final DoubleGauge waitGauge, final String name, + private static void metrics(final String prefixText, final DoubleGauge gauge, final DoubleGauge waitGauge, final String name, final DurationMetric metrics) { final DurationMetricValue metric = metrics.process(); final int count = metric.count(); diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitor.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitor.java index 629e809..2611a03 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitor.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitor.java @@ -131,6 +131,7 @@ private static int getJsonIntPrimitive(final JsonNode jsonObject, final String k } protected JsonNode getJsonResultFromApi(final String apiPath) throws URISyntaxException, IOException { + // Not using new URI constructor because it will escape the %2f in apiPath to %252f. final URI uri = new URL("http", configuration.getBrokerHost(), configuration.getBrokerManagementPort(), apiPath).toURI(); try (final CloseableHttpResponse response = httpClient.execute(targetHost, new HttpGet(uri), context)) { diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducer.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducer.java index b090ce1..33a21ce 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducer.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducer.java @@ -188,7 +188,7 @@ public void handleDelivery(final String consumerTag, final Envelope envelope, fi return true; } - private void handleWorkFinished(final WorkerFinishedHandler handler, final BasicProperties properties) { + private static void handleWorkFinished(final WorkerFinishedHandler handler, final BasicProperties properties) { try { handler.onWorkerFinished(properties.getMessageId(), properties.getHeaders()); } catch (final RuntimeException e) { diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java index aa19d7b..520317a 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java @@ -65,6 +65,8 @@ public class RabbitMQWorkerSizeProvider implements WorkerSizeProviderProxy { */ private final long refreshDelayBeforeUpdateSeconds; + private final Object sync = new Object(); + private final Map> lastRuns = new HashMap<>(); private final Map observers = new HashMap<>(); private final Map monitors = new HashMap<>(); @@ -153,7 +155,7 @@ public void triggerWorkerQueueState(final String queueName) { // If a new update is received before the schedule has run it will cancel the current schedule and reschedule. // This is mainly for when multiple events are triggered to not trigger a call for every event, // and also to manage the events trigger in combination with the scheduled process. - synchronized (queueName) { + synchronized (sync) { Optional.ofNullable(lastRuns.get(queueName)).ifPresent(f -> f.cancel(false)); final Runnable updateTask = () -> updateWorkerQueueState(queueName); @@ -162,8 +164,8 @@ public void triggerWorkerQueueState(final String queueName) { } private void updateWorkerQueueState(final String queueName) { - synchronized (queueName) { - monitors.get(queueName).updateWorkerQueueState(queueName, observers.get(queueName)); + synchronized (sync) { + Optional.ofNullable(monitors.get(queueName)).ifPresent(m -> m.updateWorkerQueueState(queueName, observers.get(queueName))); lastRuns.remove(queueName); } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/SchedulerFileConfigurationHandler.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/SchedulerFileConfigurationHandler.java index 27f12d5..650f702 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/SchedulerFileConfigurationHandler.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/SchedulerFileConfigurationHandler.java @@ -24,8 +24,8 @@ /** * Interface to read and write specific scheduler configuration files. - * @param - * @param + * @param queue configuration + * @param scheduler configuration */ public interface SchedulerFileConfigurationHandler> { diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java index e7a130e..79ea691 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java @@ -34,6 +34,8 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import nl.aerius.taskmanager.TaskDispatcher.State; import nl.aerius.taskmanager.domain.QueueConfig; @@ -45,6 +47,8 @@ */ class TaskDispatcherTest { + private static final Logger LOG = LoggerFactory.getLogger(TaskDispatcherTest.class); + private static final String WORKER_QUEUE_NAME_TEST = "TEST"; private static ExecutorService executor; @@ -162,6 +166,7 @@ private Future forwardTaskAsync(final Task task, final Future previous) { } dispatcher.forwardTask(task); } catch (InterruptedException | ExecutionException e) { + LOG.error("Exception in dispatcher forwardTask", e); } }); } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java index 03d921a..3cdf948 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java @@ -58,7 +58,6 @@ class WorkerPoolTest { private @Mock WorkerUpdateHandler workerUpdateHandler; private int numberOfWorkers; - @BeforeEach void setUp() throws IOException { numberOfWorkers = 0; @@ -93,7 +92,8 @@ void testWorkerPoolSizing() throws IOException { @Test void testNoFreeWorkers() { - assertThrows(NoFreeWorkersException.class, () -> workerPool.sendTaskToWorker(createTask())); + assertThrows(NoFreeWorkersException.class, () -> workerPool.sendTaskToWorker(createTask()), + "Expected NoFreeWorkersException when trying to send a task while there are no free workers."); } @Test @@ -107,7 +107,8 @@ void testWorkerPoolScaleDown() throws IOException { workerPool.sendTaskToWorker(task3); assertEquals(5, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after 2 workers running"); workerPool.onNumberOfWorkersUpdate(1, 0); - assertEquals(3, workerPool.getWorkerSize(), "Workpool size should match number of running tasks, since new total is lower than currently running"); + assertEquals(3, workerPool.getWorkerSize(), + "Workpool size should match number of running tasks, since new total is lower than currently running"); assertEquals(1, workerPool.getReportedWorkerSize(), "Check if current workerPool size is same after decreasing # workers"); workerPool.releaseWorker(task1.getId()); assertEquals(2, workerPool.getWorkerSize(), "Check if workerPool size is lower, but not yet same as total because still process running"); @@ -133,13 +134,13 @@ void testReleaseTaskTwice() throws IOException { @Disabled("Exception is not thrown anymore, so test ignored for now") @Test - void testSendSameTaskTwice() { - assertThrows(TaskAlreadySentException.class, () -> { - workerPool.onNumberOfWorkersUpdate(3, 0); - final Task task1 = createTask(); - workerPool.sendTaskToWorker(task1); - workerPool.sendTaskToWorker(task1); - }); + void testSendSameTaskTwice() throws IOException { + workerPool.onNumberOfWorkersUpdate(3, 0); + final Task task1 = createTask(); + workerPool.sendTaskToWorker(task1); + + assertThrows(TaskAlreadySentException.class, () -> workerPool.sendTaskToWorker(task1), + "Expected TaskAlreadySentException when a message is send a second time."); } @Test diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java index 0d9fde2..d32b18e 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java @@ -28,7 +28,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; @@ -96,7 +95,7 @@ void testOnWorkDispatched() { verify(mockedGauges.get("aer.taskmanager.dispatched.queue")).set(eq(2.0), any()); verify(mockedGauges.get("aer.taskmanager.dispatched.queue.wait")).set(durationCaptor.capture(), any()); durationCaptor.getAllValues() - .forEach(v -> assertTrue(v > 99.0, "Duration should report at least 100.0 as it is the offset of the start time, but was " + v)); + .forEach(v -> assertTrue(v > 99.0, "Duration should report at least 100.0 as it is the offset of the start time, but was " + v)); } @Test @@ -110,7 +109,7 @@ void testOnWorkerFinished() { verify(mockedGauges.get("aer.taskmanager.work.queue")).set(eq(2.0), any()); verify(mockedGauges.get("aer.taskmanager.work.queue.duration")).set(durationCaptor.capture(), any()); durationCaptor.getAllValues() - .forEach(v -> assertTrue(v > 99.0, "Duration should report at least 100.0 as it is the offset of the start time, but was " + v)); + .forEach(v -> assertTrue(v > 99.0, "Duration should report at least 100.0 as it is the offset of the start time, but was " + v)); } @Test @@ -121,11 +120,11 @@ void testWorkLoad() throws InterruptedException { methodCaptor.getValue().run(); Thread.sleep(10); // Add a bit of delay to get some time frame between these 2 run calls. methodCaptor.getValue().run(); - verify(mockedGauges.get("aer.taskmanager.work.load") , times(2)).set(durationCaptor.capture(), any()); - assertEquals(50.0, durationCaptor.getAllValues().get(1)); + verify(mockedGauges.get("aer.taskmanager.work.load"), times(2)).set(durationCaptor.capture(), any()); + assertEquals(50.0, durationCaptor.getAllValues().get(1), "Expected workload of 50%"); } private Map createMap(final String queueName, final long duration) { - return new TaskMetrics().duration(duration).queueName(queueName).start(new Date().getTime() - 100).build(); + return new TaskMetrics().duration(duration).queueName(queueName).start(System.currentTimeMillis() - 100).build(); } } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandlerTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandlerTest.java index 6188dca..0ba27a1 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandlerTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandlerTest.java @@ -62,7 +62,7 @@ class RabbitMQMessageHandlerTest extends AbstractRabbitMQTest { @Timeout(value = 10, unit = TimeUnit.SECONDS) void testMessageReceivedHandler() throws IOException, InterruptedException { final byte[] receivedBody = "4321".getBytes(); - final TaskMessageHandler tmh = adapterFactory.createTaskMessageHandler(new QueueConfig(taskQueueName, false, false, null)); + final TaskMessageHandler tmh = adapterFactory.createTaskMessageHandler(new QueueConfig(taskQueueName, false, false, null)); final Semaphore lock = new Semaphore(0); final DataDock data = new DataDock(); tmh.start(); @@ -108,7 +108,7 @@ void testReStart() throws IOException, InterruptedException { final AtomicInteger shutdownCallsCounter = new AtomicInteger(); final MessageReceivedHandler mockMessageReceivedHandler = mock(MessageReceivedHandler.class); - final TaskMessageHandler tmh = adapterFactory.createTaskMessageHandler(new QueueConfig(taskQueueName, false, false, null)); + final TaskMessageHandler tmh = adapterFactory.createTaskMessageHandler(new QueueConfig(taskQueueName, false, false, null)); ((RabbitMQMessageHandler) tmh).setRetryTimeMilliseconds(1L); doAnswer(invoke -> null).when(mockChannel).addShutdownListener(shutdownListenerCaptor.capture()); @@ -144,9 +144,9 @@ void testReStart() throws IOException, InterruptedException { // Wait till TaskMessageHandler has called basicConsume in the consumer. verifyTryStartConsumingLock.acquire(); // Release the consumer start lock, it should throw an IOException and not call the shutdown handler. - triggerRestartConsumer(tryStartConsumingLock, verifyTryStartConsumingLock, mockMessageReceivedHandler, 0); + triggerRestartConsumer(tryStartConsumingLock, verifyTryStartConsumingLock, mockMessageReceivedHandler); // Release the second time, it should not throw an IOException this time, but just finish start without issue. - triggerRestartConsumer(tryStartConsumingLock, verifyTryStartConsumingLock, mockMessageReceivedHandler, 0); + triggerRestartConsumer(tryStartConsumingLock, verifyTryStartConsumingLock, mockMessageReceivedHandler); // Release the second start call. It should just finished normally. tryStartConsumingLock.release(); // Wait for thread to finish. @@ -162,8 +162,7 @@ void testReStart() throws IOException, InterruptedException { } private static void triggerRestartConsumer(final Semaphore tryStartConsumingLock, final Semaphore verifyTryStartConsumingLock, - final MessageReceivedHandler mockMessageReceivedHandler, final int expectedNumberMessageReceivedHandlerShutdownCalled) - throws InterruptedException { + final MessageReceivedHandler mockMessageReceivedHandler) throws InterruptedException { // Let the consumer basicConsume continue. tryStartConsumingLock.release(); // Consumer should have restarted. diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProviderTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProviderTest.java index 21bfd74..b1e9bc0 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProviderTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProviderTest.java @@ -36,7 +36,7 @@ /** * Test class for {@link RabbitMQWorkerSizeProvider} */ -public class RabbitMQWorkerSizeProviderTest extends AbstractRabbitMQTest { +class RabbitMQWorkerSizeProviderTest extends AbstractRabbitMQTest { private static final String TEST_QUEUE = "test"; @@ -52,7 +52,7 @@ void setUp() throws Exception { @Test @Timeout(value = 10, unit = TimeUnit.SECONDS) - void testTriggerWorkerQueueState() throws IOException, InterruptedException { + void testTriggerWorkerQueueState() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); final RabbitMQQueueMonitor mockMonitor = mock(RabbitMQQueueMonitor.class); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerTest.java index ea8c25b..a89ffea 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerTest.java @@ -38,6 +38,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import nl.aerius.taskmanager.MockAdaptorFactory; import nl.aerius.taskmanager.MockTask; @@ -55,6 +57,8 @@ */ class PriorityTaskSchedulerTest { + private static final Logger LOG = LoggerFactory.getLogger(PriorityTaskSchedulerTest.class); + private static final String QUEUE1 = "queue1"; private static final String QUEUE2 = "queue2"; private static final String QUEUE3 = "queue3"; @@ -84,10 +88,10 @@ void setUp() throws IOException { configuration.getQueues().add(tc3); scheduler = (PriorityTaskScheduler) factory.createScheduler(new QueueConfig(QUEUE1, false, true, null)); configuration.getQueues().forEach(scheduler::updateQueue); - task1 = createTask(taskConsumer1, "1", QUEUE1); - task2a = createTask(taskConsumer2, "2a", QUEUE2); - task2b = createTask(taskConsumer2, "2b", QUEUE2); - task3 = createTask(taskConsumer3, "3", QUEUE3); + task1 = createTask(taskConsumer1, "1"); + task2a = createTask(taskConsumer2, "2a"); + task2b = createTask(taskConsumer2, "2b"); + task3 = createTask(taskConsumer3, "3"); } @Test @@ -152,10 +156,10 @@ private boolean compareSame(final Task taskA, final Task taskB, final int return @Timeout(7000) void testGetTaskWith1WorkerAvailable() throws InterruptedException, ExecutionException { scheduler.onWorkerPoolSizeChange(1); - final Task task1 = createTask(taskConsumer1, "1", QUEUE1); //add task with priority 0. - scheduler.addTask(task1); + final Task task = createTask(taskConsumer1, "1"); //add task with priority 0. + scheduler.addTask(task); final AtomicInteger chkCounter = new AtomicInteger(); - final Future receivedTask = waitForTask(task1, chkCounter); + final Future receivedTask = waitForTask(task, chkCounter); await().atMost(1, TimeUnit.SECONDS).until(receivedTask::isDone); assertNotNull(receivedTask.get(), "Received task"); assertEquals(1, chkCounter.intValue(), "Counter should be 1 when only one slot available"); @@ -170,9 +174,9 @@ void testGetTaskWith1WorkerAvailable() throws InterruptedException, ExecutionExc @Timeout(value = 7, unit = TimeUnit.SECONDS) void testGetTask() throws InterruptedException, ExecutionException { scheduler.onWorkerPoolSizeChange(2); - final Task task1a = createTask(taskConsumer1, "1a", QUEUE1); + final Task task1a = createTask(taskConsumer1, "1a"); scheduler.addTask(task1a); - final Task task1b = createTask(taskConsumer1, "1b", QUEUE1); + final Task task1b = createTask(taskConsumer1, "1b"); scheduler.addTask(task1b); scheduler.addTask(task2a); assertSame(task2a, scheduler.getNextTask(), "Should get task2a back."); @@ -206,7 +210,7 @@ void testGetTaskBigPool() throws InterruptedException, ExecutionException { final List tasks = new ArrayList<>(); final List sendTasks = new ArrayList<>(); for (int i = 0; i < 10; i++) { - final Task task = createTask(taskConsumer2, "1", QUEUE2); + final Task task = createTask(taskConsumer2, "1"); scheduler.addTask(task); tasks.add(task); } @@ -216,7 +220,7 @@ void testGetTaskBigPool() throws InterruptedException, ExecutionException { } scheduler.addTask(task1); assertSame(task1, scheduler.getNextTask(), "Should still get task 1"); - final Task task1b = createTask(taskConsumer1, "1b", QUEUE1); + final Task task1b = createTask(taskConsumer1, "1b"); scheduler.addTask(task1b); assertSame(task1b, scheduler.getNextTask(), "Should still get task 1b"); final AtomicInteger chkCounter = new AtomicInteger(); @@ -273,14 +277,15 @@ public Task call() throws Exception { Task result = null; try { result = scheduler.getNextTask(); - if (task == null) { - assertNotNull(result, "Should get any task back"); - } else { - assertSame(task, result, "Should get task back."); - } - chkCounter.incrementAndGet(); } catch (final InterruptedException e) { + LOG.error("InterruptedException when waiting for next task", e); + } + if (task == null) { + assertNotNull(result, "Should get any task back"); + } else { + assertSame(task, result, "Should get task back."); } + chkCounter.incrementAndGet(); return result; } }); @@ -298,7 +303,7 @@ public void messageDelivered(final Message message) { }; } - private Task createTask(final TaskConsumer tc, final String messageId, final String queue) { + private static Task createTask(final TaskConsumer tc, final String messageId) { return new MockTask(tc, messageId); } }