53
53
public class TaskPersistenceServiceImpl implements TaskPersistenceService {
54
54
private static final Logger logger =
55
55
LoggerFactory .getLogger (MethodHandles .lookup ().lookupClass ());
56
- private final ObjectMapper objectMapper = new ObjectMapper ();
56
+ private static final Duration STALE_THRESHOLD = Duration .ofMinutes (15 );
57
+ private static final Duration OUTDATED_THRESHOLD = Duration .ofHours (24 );
58
+ private static final Duration RESTART_FLAG_THRESHOLD = Duration .ofMinutes (5 );
59
+ private final ObjectMapper objectMapper ;
57
60
private final ActorSystem <?> actorSystem ;
58
61
private final ActorRef <TaskServiceActor .Command > rootActorRef ;
59
62
private final Supplier <StreamTaskService > streamTaskServiceSupplier ;
@@ -73,9 +76,9 @@ public TaskPersistenceServiceImpl(
73
76
this .streamTaskServiceSupplier = streamTaskServiceSupplier ;
74
77
this .taskStatusDao = taskStatusDao ;
75
78
this .entityHolder = entityHolder ;
76
-
77
- objectMapper .setVisibility (PropertyAccessor .FIELD , JsonAutoDetect .Visibility .ANY );
78
- objectMapper .configure (SerializationFeature .FAIL_ON_EMPTY_BEANS , false );
79
+ this . objectMapper = new ObjectMapper ()
80
+ .setVisibility (PropertyAccessor .FIELD , JsonAutoDetect .Visibility .ANY )
81
+ .configure (SerializationFeature .FAIL_ON_EMPTY_BEANS , false );
79
82
80
83
this .schedulingIsEnabled = enableScheduling ;
81
84
if (enableScheduling ) {
@@ -91,70 +94,75 @@ public TaskPersistenceServiceImpl(
91
94
92
95
@ Override
93
96
public void restartInProgressTasks () {
94
- Set <String > flowIdsForResume = new HashSet <>();
95
- Set <TaskStatus > subTasks = new HashSet <>();
97
+ Set <String > resumedFlows = new HashSet <>();
96
98
List <TaskStatus > all = taskStatusDao .findAll ();
97
- all .forEach (
98
- task -> {
99
- try {
100
- boolean taskActorDoesNotExists = taskActorDoesNotExists (task .getTaskId ());
101
- boolean taskWasNotUpdatedForOneMinute = taskWasNotUpdatedForOneMinute (task .getTaskId ());
102
- if (taskActorDoesNotExists && taskWasNotUpdatedForOneMinute ) {
103
- if (StreamTaskStatus .Status .IN_PROGRESS .equals (task .getStatus ())) {
104
- if (Objects .isNull (task .getFlowId ())) {
105
- resume (flowIdsForResume , task );
106
- } else {
107
- subTasks .add (task );
108
- }
109
- } else {
110
- deleteStatusTask (task .getTaskId ());
111
- }
112
- }
113
- } catch (StreamTaskAlreadyRestartedException e ) {
114
- logger .debug (
115
- "The task {} was most likely restarted by another application instance" ,
116
- task .getTaskId (),
117
- e );
118
- } catch (Exception e ) {
119
- logger .error ("Cannot restart the task {}" , task .getTaskId (), e );
120
- }
99
+
100
+ for (TaskStatus task : all ) {
101
+ try {
102
+ if (!isStaleAndUnassigned (task )) {
121
103
checkOutdatedTask (task );
122
- });
123
- subTasks .forEach (
124
- task -> {
125
- if (!flowIdsForResume .contains (task .getFlowId ())) {
126
- deleteStatusTask (task .getTaskId ());
104
+ continue ;
105
+ }
106
+
107
+ if (StreamTaskStatus .Status .IN_PROGRESS == task .getStatus ()) {
108
+ String flowId = task .getFlowId ();
109
+ if (flowId == null ) {
110
+ resume (resumedFlows , task );
111
+ } else {
112
+ resumedFlows .add (flowId );
127
113
}
128
- });
114
+ } else {
115
+ handleFinishedOrFailed (task );
116
+ }
117
+ } catch (StreamTaskAlreadyRestartedException e ) {
118
+ logger .debug ("Task {} was likely restarted elsewhere" , task .getTaskId (), e );
119
+ } catch (Exception e ) {
120
+ logger .error ("Cannot restart task {}" , task .getTaskId (), e );
121
+ }
122
+ checkOutdatedTask (task );
123
+ }
124
+
125
+ all .stream ()
126
+ .filter (t -> t .getFlowId () != null && !resumedFlows .contains (t .getFlowId ()))
127
+ .forEach (this ::handleFinishedOrFailed );
129
128
}
130
129
131
- private void checkOutdatedTask (TaskStatus taskStatus ) {
132
- if (taskStatus .getStatus ().equals (StreamTaskStatus .Status .IN_PROGRESS )
133
- || taskStatus .getStatus ().equals (StreamTaskStatus .Status .INITIATED )) {
134
- if ((Instant .now ().getEpochSecond ()
135
- - taskStatus .getCreatedDate ().toInstant ().getEpochSecond ())
136
- > 24 * 60 * 60 ) {
137
- taskStatus .setStatus (StreamTaskStatus .Status .COMPLETED_WITH_ERROR );
138
- taskStatusDao .save (taskStatus );
139
- return ;
140
- }
141
- if ((Instant .now ().getEpochSecond ()
142
- - taskStatus .getUpdatedDate ().toInstant ().getEpochSecond ())
143
- > 300
144
- && taskStatus .getRestartFlag () > 0 ) {
145
- taskStatus .setRestartFlag (0 );
146
- taskStatusDao .save (taskStatus );
147
- }
130
+ private boolean isStaleAndUnassigned (TaskStatus taskStatus ) {
131
+ Instant now = Instant .now ();
132
+ return taskActorDoesNotExists (taskStatus .getTaskId ())
133
+ && Duration .between (taskStatus .getUpdatedDate ().toInstant (), now ).compareTo (STALE_THRESHOLD ) > 0 ;
134
+ }
135
+
136
+ private void handleFinishedOrFailed (TaskStatus taskStatus ) {
137
+ String flowId = taskStatus .getFlowId ();
138
+ if (flowId == null || taskActorDoesNotExists (UUID .fromString (flowId ))) {
139
+ deleteStatusTask (taskStatus .getTaskId ());
140
+ } else {
141
+ logger .debug ("Skipping delete of child {} because parent {} still has an actor" ,
142
+ taskStatus .getTaskId (), flowId );
148
143
}
149
144
}
150
145
151
- private boolean taskWasNotUpdatedForOneMinute ( UUID taskId ) {
152
- var taskStatus = taskStatusDao . findById ( taskId );
146
+ private void checkOutdatedTask ( TaskStatus taskStatus ) {
147
+ Instant now = Instant . now ( );
153
148
154
- return taskStatus .isPresent ()
155
- && (Instant .now ().getEpochSecond ()
156
- - taskStatus .get ().getUpdatedDate ().toInstant ().getEpochSecond ()
157
- > 60 );
149
+ boolean inProgOrInit = StreamTaskStatus .Status .IN_PROGRESS == taskStatus .getStatus ()
150
+ || StreamTaskStatus .Status .INITIATED == taskStatus .getStatus ();
151
+ if (!inProgOrInit ) return ;
152
+
153
+ Instant created = taskStatus .getCreatedDate ().toInstant ();
154
+ if (Duration .between (created , now ).compareTo (OUTDATED_THRESHOLD ) > 0 ) {
155
+ taskStatus .setStatus (StreamTaskStatus .Status .COMPLETED_WITH_ERROR );
156
+ taskStatusDao .save (taskStatus );
157
+ return ;
158
+ }
159
+
160
+ Instant updated = taskStatus .getUpdatedDate ().toInstant ();
161
+ if (Duration .between (updated , now ).compareTo (RESTART_FLAG_THRESHOLD ) > 0
162
+ && taskStatus .getRestartFlag () > 0 ) {
163
+ taskStatus .setRestartFlag (0 );
164
+ taskStatusDao .save (taskStatus );
165
+ }
158
166
}
159
167
160
168
private void resume (Set <String > flowIdsForResume , TaskStatus task ) {
0 commit comments