Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public boolean isRunning() {
public void run() {
Thread.currentThread().setName("TaskDispatcher-" + workerQueueName);
running = true;
Task retryTask = null;
try {
while (running) {
state = State.WAIT_FOR_WORKER;
Expand All @@ -103,10 +104,10 @@ public void run() {

state = State.WAIT_FOR_TASK;
LOG.debug("Wait for task {}", workerQueueName);
final Task task = getNextTask();
final Task task = retryTask == null ? getNextTask() : retryTask;
LOG.debug("Send task to worker {}, ({})", workerQueueName, task.getId());
state = State.DISPATCH_TASK;
dispatch(task);
retryTask = dispatch(task);
}
} catch (final RuntimeException e) {
LOG.error("TaskDispatcher crashed with RuntimeException: {}", getState(), e);
Expand All @@ -125,20 +126,21 @@ private Task getNextTask() throws InterruptedException {
return task;
}

private void dispatch(final Task task) {
private Task dispatch(final Task task) {
try {
workerPool.sendTaskToWorker(task);
} catch (final NoFreeWorkersException e) {
LOG.info("[NoFreeWorkersException] Workers for queue {} decreased while waiting for task. Rescheduling task.", e.getWorkerQueueName());
LOG.trace("NoFreeWorkersException thrown", e);
scheduler.addTask(task);
return task;
} catch (final TaskAlreadySentException e) {
LOG.error("Duplicate task detected for worker queue: {}, from task queue: {}", e.getWorkerQueueName(), e.getTaskQueueName(), e);
taskAbortedOnDuplicateMessageId(task);
} catch (final IOException | ShutdownSignalException e) {
LOG.error("Sending task to worker failed", e);
taskDeliveryFailed(task);
}
return null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public PriorityTaskQueue put(final String queueName, final PriorityTaskQueue que
public void decrementOnWorker(final TaskRecord taskRecord) {
final String trKey = key(taskRecord);

if (tasksOnWorkersPerQueue.get(trKey).decrementAndGet() == 0) {
if (Optional.ofNullable(tasksOnWorkersPerQueue.get(trKey)).map(v -> v.decrementAndGet() == 0).orElse(false)) {
tasksOnWorkersPerQueue.remove(trKey);
}
}
Expand Down