From b06585c440aa8928683f86c13af71e5b8432f9a4 Mon Sep 17 00:00:00 2001 From: Hilbrand Bouwkamp Date: Thu, 4 Sep 2025 14:01:13 +0200 Subject: [PATCH] Fixes: retry task directly, instead of readd, and guard agains tnull - Instead of adding a task back to the scheduler retry it in the loop. This because some other metrics are tracked on tasks, and these are messed up by readding it (unless we would implement some more logic to do an actual re-add to the scheduler. But in that case just do it in the loop makes more sense). - Added extra null guard for safety. --- .../java/nl/aerius/taskmanager/TaskDispatcher.java | 10 ++++++---- .../scheduler/priorityqueue/PriorityQueueMap.java | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskDispatcher.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskDispatcher.java index 65e9db9..ce83b3b 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskDispatcher.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskDispatcher.java @@ -95,6 +95,7 @@ public boolean isRunning() { public void run() { Thread.currentThread().setName("TaskDispatcher-" + workerQueueName); running = true; + Task retryTask = null; try { while (running) { state = State.WAIT_FOR_WORKER; @@ -103,10 +104,10 @@ public void run() { state = State.WAIT_FOR_TASK; LOG.debug("Wait for task {}", workerQueueName); - final Task task = getNextTask(); + final Task task = retryTask == null ? getNextTask() : retryTask; LOG.debug("Send task to worker {}, ({})", workerQueueName, task.getId()); state = State.DISPATCH_TASK; - dispatch(task); + retryTask = dispatch(task); } } catch (final RuntimeException e) { LOG.error("TaskDispatcher crashed with RuntimeException: {}", getState(), e); @@ -125,13 +126,13 @@ private Task getNextTask() throws InterruptedException { return task; } - private void dispatch(final Task task) { + private Task dispatch(final Task task) { try { workerPool.sendTaskToWorker(task); } catch (final NoFreeWorkersException e) { LOG.info("[NoFreeWorkersException] Workers for queue {} decreased while waiting for task. Rescheduling task.", e.getWorkerQueueName()); LOG.trace("NoFreeWorkersException thrown", e); - scheduler.addTask(task); + return task; } catch (final TaskAlreadySentException e) { LOG.error("Duplicate task detected for worker queue: {}, from task queue: {}", e.getWorkerQueueName(), e.getTaskQueueName(), e); taskAbortedOnDuplicateMessageId(task); @@ -139,6 +140,7 @@ private void dispatch(final Task task) { LOG.error("Sending task to worker failed", e); taskDeliveryFailed(task); } + return null; } /** diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java index a667e12..a2ab392 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java @@ -61,7 +61,7 @@ public PriorityTaskQueue put(final String queueName, final PriorityTaskQueue que public void decrementOnWorker(final TaskRecord taskRecord) { final String trKey = key(taskRecord); - if (tasksOnWorkersPerQueue.get(trKey).decrementAndGet() == 0) { + if (Optional.ofNullable(tasksOnWorkersPerQueue.get(trKey)).map(v -> v.decrementAndGet() == 0).orElse(false)) { tasksOnWorkersPerQueue.remove(trKey); } }