diff --git a/cqp-core/src/main/java/com/quantori/cqp/core/task/service/TaskPersistenceServiceImpl.java b/cqp-core/src/main/java/com/quantori/cqp/core/task/service/TaskPersistenceServiceImpl.java index 8bd9397..5f8eaad 100644 --- a/cqp-core/src/main/java/com/quantori/cqp/core/task/service/TaskPersistenceServiceImpl.java +++ b/cqp-core/src/main/java/com/quantori/cqp/core/task/service/TaskPersistenceServiceImpl.java @@ -53,7 +53,10 @@ public class TaskPersistenceServiceImpl implements TaskPersistenceService { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private final ObjectMapper objectMapper = new ObjectMapper(); + private static final Duration STALE_THRESHOLD = Duration.ofMinutes(15); + private static final Duration OUTDATED_THRESHOLD = Duration.ofHours(24); + private static final Duration RESTART_FLAG_THRESHOLD = Duration.ofMinutes(5); + private final ObjectMapper objectMapper; private final ActorSystem actorSystem; private final ActorRef rootActorRef; private final Supplier streamTaskServiceSupplier; @@ -73,9 +76,9 @@ public TaskPersistenceServiceImpl( this.streamTaskServiceSupplier = streamTaskServiceSupplier; this.taskStatusDao = taskStatusDao; this.entityHolder = entityHolder; - - objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); - objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); + this.objectMapper = new ObjectMapper() + .setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY) + .configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); this.schedulingIsEnabled = enableScheduling; if (enableScheduling) { @@ -91,70 +94,75 @@ public TaskPersistenceServiceImpl( @Override public void restartInProgressTasks() { - Set flowIdsForResume = new HashSet<>(); - Set subTasks = new HashSet<>(); + Set resumedFlows = new HashSet<>(); List all = taskStatusDao.findAll(); - all.forEach( - task -> { - try { - boolean taskActorDoesNotExists = taskActorDoesNotExists(task.getTaskId()); - boolean taskWasNotUpdatedForOneMinute = taskWasNotUpdatedForOneMinute(task.getTaskId()); - if (taskActorDoesNotExists && taskWasNotUpdatedForOneMinute) { - if (StreamTaskStatus.Status.IN_PROGRESS.equals(task.getStatus())) { - if (Objects.isNull(task.getFlowId())) { - resume(flowIdsForResume, task); - } else { - subTasks.add(task); - } - } else { - deleteStatusTask(task.getTaskId()); - } - } - } catch (StreamTaskAlreadyRestartedException e) { - logger.debug( - "The task {} was most likely restarted by another application instance", - task.getTaskId(), - e); - } catch (Exception e) { - logger.error("Cannot restart the task {}", task.getTaskId(), e); - } + + for (TaskStatus task : all) { + try { + if (!isStaleAndUnassigned(task)) { checkOutdatedTask(task); - }); - subTasks.forEach( - task -> { - if (!flowIdsForResume.contains(task.getFlowId())) { - deleteStatusTask(task.getTaskId()); + continue; + } + + if (StreamTaskStatus.Status.IN_PROGRESS == task.getStatus()) { + String flowId = task.getFlowId(); + if (flowId == null) { + resume(resumedFlows, task); + } else { + resumedFlows.add(flowId); } - }); + } else { + handleFinishedOrFailed(task); + } + } catch (StreamTaskAlreadyRestartedException e) { + logger.debug("Task {} was likely restarted elsewhere", task.getTaskId(), e); + } catch (Exception e) { + logger.error("Cannot restart task {}", task.getTaskId(), e); + } + checkOutdatedTask(task); + } + + all.stream() + .filter(t -> t.getFlowId() != null && !resumedFlows.contains(t.getFlowId())) + .forEach(this::handleFinishedOrFailed); } - private void checkOutdatedTask(TaskStatus taskStatus) { - if (taskStatus.getStatus().equals(StreamTaskStatus.Status.IN_PROGRESS) - || taskStatus.getStatus().equals(StreamTaskStatus.Status.INITIATED)) { - if ((Instant.now().getEpochSecond() - - taskStatus.getCreatedDate().toInstant().getEpochSecond()) - > 24 * 60 * 60) { - taskStatus.setStatus(StreamTaskStatus.Status.COMPLETED_WITH_ERROR); - taskStatusDao.save(taskStatus); - return; - } - if ((Instant.now().getEpochSecond() - - taskStatus.getUpdatedDate().toInstant().getEpochSecond()) - > 300 - && taskStatus.getRestartFlag() > 0) { - taskStatus.setRestartFlag(0); - taskStatusDao.save(taskStatus); - } + private boolean isStaleAndUnassigned(TaskStatus taskStatus) { + Instant now = Instant.now(); + return taskActorDoesNotExists(taskStatus.getTaskId()) + && Duration.between(taskStatus.getUpdatedDate().toInstant(), now).compareTo(STALE_THRESHOLD) > 0; + } + + private void handleFinishedOrFailed(TaskStatus taskStatus) { + String flowId = taskStatus.getFlowId(); + if (flowId == null || taskActorDoesNotExists(UUID.fromString(flowId))) { + deleteStatusTask(taskStatus.getTaskId()); + } else { + logger.debug("Skipping delete of child {} because parent {} still has an actor", + taskStatus.getTaskId(), flowId); } } - private boolean taskWasNotUpdatedForOneMinute(UUID taskId) { - var taskStatus = taskStatusDao.findById(taskId); + private void checkOutdatedTask(TaskStatus taskStatus) { + Instant now = Instant.now(); - return taskStatus.isPresent() - && (Instant.now().getEpochSecond() - - taskStatus.get().getUpdatedDate().toInstant().getEpochSecond() - > 60); + boolean inProgOrInit = StreamTaskStatus.Status.IN_PROGRESS == taskStatus.getStatus() + || StreamTaskStatus.Status.INITIATED == taskStatus.getStatus(); + if (!inProgOrInit) return; + + Instant created = taskStatus.getCreatedDate().toInstant(); + if (Duration.between(created, now).compareTo(OUTDATED_THRESHOLD) > 0) { + taskStatus.setStatus(StreamTaskStatus.Status.COMPLETED_WITH_ERROR); + taskStatusDao.save(taskStatus); + return; + } + + Instant updated = taskStatus.getUpdatedDate().toInstant(); + if (Duration.between(updated, now).compareTo(RESTART_FLAG_THRESHOLD) > 0 + && taskStatus.getRestartFlag() > 0) { + taskStatus.setRestartFlag(0); + taskStatusDao.save(taskStatus); + } } private void resume(Set flowIdsForResume, TaskStatus task) {