Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@
public class TaskPersistenceServiceImpl implements TaskPersistenceService {
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final ObjectMapper objectMapper = new ObjectMapper();
private static final Duration STALE_THRESHOLD = Duration.ofMinutes(15);
private static final Duration OUTDATED_THRESHOLD = Duration.ofHours(24);
private static final Duration RESTART_FLAG_THRESHOLD = Duration.ofMinutes(5);
private final ObjectMapper objectMapper;
private final ActorSystem<?> actorSystem;
private final ActorRef<TaskServiceActor.Command> rootActorRef;
private final Supplier<StreamTaskService> streamTaskServiceSupplier;
Expand All @@ -73,9 +76,9 @@ public TaskPersistenceServiceImpl(
this.streamTaskServiceSupplier = streamTaskServiceSupplier;
this.taskStatusDao = taskStatusDao;
this.entityHolder = entityHolder;

objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
this.objectMapper = new ObjectMapper()
.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);

this.schedulingIsEnabled = enableScheduling;
if (enableScheduling) {
Expand All @@ -91,70 +94,75 @@ public TaskPersistenceServiceImpl(

@Override
public void restartInProgressTasks() {
Set<String> flowIdsForResume = new HashSet<>();
Set<TaskStatus> subTasks = new HashSet<>();
Set<String> resumedFlows = new HashSet<>();
List<TaskStatus> all = taskStatusDao.findAll();
all.forEach(
task -> {
try {
boolean taskActorDoesNotExists = taskActorDoesNotExists(task.getTaskId());
boolean taskWasNotUpdatedForOneMinute = taskWasNotUpdatedForOneMinute(task.getTaskId());
if (taskActorDoesNotExists && taskWasNotUpdatedForOneMinute) {
if (StreamTaskStatus.Status.IN_PROGRESS.equals(task.getStatus())) {
if (Objects.isNull(task.getFlowId())) {
resume(flowIdsForResume, task);
} else {
subTasks.add(task);
}
} else {
deleteStatusTask(task.getTaskId());
}
}
} catch (StreamTaskAlreadyRestartedException e) {
logger.debug(
"The task {} was most likely restarted by another application instance",
task.getTaskId(),
e);
} catch (Exception e) {
logger.error("Cannot restart the task {}", task.getTaskId(), e);
}

for (TaskStatus task : all) {
try {
if (!isStaleAndUnassigned(task)) {
checkOutdatedTask(task);
});
subTasks.forEach(
task -> {
if (!flowIdsForResume.contains(task.getFlowId())) {
deleteStatusTask(task.getTaskId());
continue;
}

if (StreamTaskStatus.Status.IN_PROGRESS == task.getStatus()) {
String flowId = task.getFlowId();
if (flowId == null) {
resume(resumedFlows, task);
} else {
resumedFlows.add(flowId);
}
});
} else {
handleFinishedOrFailed(task);
}
} catch (StreamTaskAlreadyRestartedException e) {
logger.debug("Task {} was likely restarted elsewhere", task.getTaskId(), e);
} catch (Exception e) {
logger.error("Cannot restart task {}", task.getTaskId(), e);
}
checkOutdatedTask(task);
}

all.stream()
.filter(t -> t.getFlowId() != null && !resumedFlows.contains(t.getFlowId()))
.forEach(this::handleFinishedOrFailed);
}

private void checkOutdatedTask(TaskStatus taskStatus) {
if (taskStatus.getStatus().equals(StreamTaskStatus.Status.IN_PROGRESS)
|| taskStatus.getStatus().equals(StreamTaskStatus.Status.INITIATED)) {
if ((Instant.now().getEpochSecond()
- taskStatus.getCreatedDate().toInstant().getEpochSecond())
> 24 * 60 * 60) {
taskStatus.setStatus(StreamTaskStatus.Status.COMPLETED_WITH_ERROR);
taskStatusDao.save(taskStatus);
return;
}
if ((Instant.now().getEpochSecond()
- taskStatus.getUpdatedDate().toInstant().getEpochSecond())
> 300
&& taskStatus.getRestartFlag() > 0) {
taskStatus.setRestartFlag(0);
taskStatusDao.save(taskStatus);
}
private boolean isStaleAndUnassigned(TaskStatus taskStatus) {
Instant now = Instant.now();
return taskActorDoesNotExists(taskStatus.getTaskId())
&& Duration.between(taskStatus.getUpdatedDate().toInstant(), now).compareTo(STALE_THRESHOLD) > 0;
}

private void handleFinishedOrFailed(TaskStatus taskStatus) {
String flowId = taskStatus.getFlowId();
if (flowId == null || taskActorDoesNotExists(UUID.fromString(flowId))) {
deleteStatusTask(taskStatus.getTaskId());
} else {
logger.debug("Skipping delete of child {} because parent {} still has an actor",
taskStatus.getTaskId(), flowId);
}
}

private boolean taskWasNotUpdatedForOneMinute(UUID taskId) {
var taskStatus = taskStatusDao.findById(taskId);
private void checkOutdatedTask(TaskStatus taskStatus) {
Instant now = Instant.now();

return taskStatus.isPresent()
&& (Instant.now().getEpochSecond()
- taskStatus.get().getUpdatedDate().toInstant().getEpochSecond()
> 60);
boolean inProgOrInit = StreamTaskStatus.Status.IN_PROGRESS == taskStatus.getStatus()
|| StreamTaskStatus.Status.INITIATED == taskStatus.getStatus();
if (!inProgOrInit) return;

Instant created = taskStatus.getCreatedDate().toInstant();
if (Duration.between(created, now).compareTo(OUTDATED_THRESHOLD) > 0) {
taskStatus.setStatus(StreamTaskStatus.Status.COMPLETED_WITH_ERROR);
taskStatusDao.save(taskStatus);
return;
}

Instant updated = taskStatus.getUpdatedDate().toInstant();
if (Duration.between(updated, now).compareTo(RESTART_FLAG_THRESHOLD) > 0
&& taskStatus.getRestartFlag() > 0) {
taskStatus.setRestartFlag(0);
taskStatusDao.save(taskStatus);
}
}

private void resume(Set<String> flowIdsForResume, TaskStatus task) {
Expand Down
Loading