diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskDispatcher.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskDispatcher.java index 65e9db9..ce83b3b 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskDispatcher.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskDispatcher.java @@ -95,6 +95,7 @@ public boolean isRunning() { public void run() { Thread.currentThread().setName("TaskDispatcher-" + workerQueueName); running = true; + Task retryTask = null; try { while (running) { state = State.WAIT_FOR_WORKER; @@ -103,10 +104,10 @@ public void run() { state = State.WAIT_FOR_TASK; LOG.debug("Wait for task {}", workerQueueName); - final Task task = getNextTask(); + final Task task = retryTask == null ? getNextTask() : retryTask; LOG.debug("Send task to worker {}, ({})", workerQueueName, task.getId()); state = State.DISPATCH_TASK; - dispatch(task); + retryTask = dispatch(task); } } catch (final RuntimeException e) { LOG.error("TaskDispatcher crashed with RuntimeException: {}", getState(), e); @@ -125,13 +126,13 @@ private Task getNextTask() throws InterruptedException { return task; } - private void dispatch(final Task task) { + private Task dispatch(final Task task) { try { workerPool.sendTaskToWorker(task); } catch (final NoFreeWorkersException e) { LOG.info("[NoFreeWorkersException] Workers for queue {} decreased while waiting for task. Rescheduling task.", e.getWorkerQueueName()); LOG.trace("NoFreeWorkersException thrown", e); - scheduler.addTask(task); + return task; } catch (final TaskAlreadySentException e) { LOG.error("Duplicate task detected for worker queue: {}, from task queue: {}", e.getWorkerQueueName(), e.getTaskQueueName(), e); taskAbortedOnDuplicateMessageId(task); @@ -139,6 +140,7 @@ private void dispatch(final Task task) { LOG.error("Sending task to worker failed", e); taskDeliveryFailed(task); } + return null; } /** diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java index a667e12..a2ab392 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java @@ -61,7 +61,7 @@ public PriorityTaskQueue put(final String queueName, final PriorityTaskQueue que public void decrementOnWorker(final TaskRecord taskRecord) { final String trKey = key(taskRecord); - if (tasksOnWorkersPerQueue.get(trKey).decrementAndGet() == 0) { + if (Optional.ofNullable(tasksOnWorkersPerQueue.get(trKey)).map(v -> v.decrementAndGet() == 0).orElse(false)) { tasksOnWorkersPerQueue.remove(trKey); } }