File tree Expand file tree Collapse file tree 3 files changed +38
-1
lines changed
main/java/org/apache/flink/runtime/scheduler/adaptive
test/java/org/apache/flink/runtime/scheduler/adaptive Expand file tree Collapse file tree 3 files changed +38
-1
lines changed Original file line number Diff line number Diff line change @@ -114,7 +114,8 @@ void onGloballyTerminalState(JobStatus globallyTerminalState) {
114114 }
115115
116116 private void goToSubsequentState () {
117- if (availableParallelismNotChanged (restartWithParallelism )) {
117+ if (availableParallelismNotChanged (restartWithParallelism )
118+ || context .hasDesiredResources ()) {
118119 context .goToCreatingExecutionGraph (getExecutionGraph ());
119120 } else {
120121 context .goToWaitingForResources (getExecutionGraph ());
@@ -163,6 +164,13 @@ interface Context
163164 * slots.
164165 */
165166 Optional <VertexParallelism > getAvailableVertexParallelism ();
167+
168+ /**
169+ * Checks whether we have the desired resources.
170+ *
171+ * @return {@code true} if we have enough resources; otherwise {@code false}
172+ */
173+ boolean hasDesiredResources ();
166174 }
167175
168176 static class Factory implements StateFactory <Restarting > {
Original file line number Diff line number Diff line change @@ -51,6 +51,8 @@ class MockRestartingContext extends MockStateWithExecutionGraphContext
5151
5252 @ Nullable private VertexParallelism availableVertexParallelism ;
5353
54+ private boolean hasDesiredResources = false ;
55+
5456 public void setExpectCancelling (Consumer <ExecutingTest .CancellingArguments > asserter ) {
5557 cancellingStateValidator .expectInput (asserter );
5658 }
@@ -68,6 +70,15 @@ public void setAvailableVertexParallelism(
6870 this .availableVertexParallelism = availableVertexParallelism ;
6971 }
7072
73+ public void setHasDesiredResources (boolean hasDesiredResources ) {
74+ this .hasDesiredResources = hasDesiredResources ;
75+ }
76+
77+ @ Override
78+ public boolean hasDesiredResources () {
79+ return hasDesiredResources ;
80+ }
81+
7182 @ Override
7283 public void goToCanceling (
7384 ExecutionGraph executionGraph ,
Original file line number Diff line number Diff line change @@ -92,6 +92,24 @@ public void testTransitionToSubsequentStateWhenResourceChanged() throws Exceptio
9292 }
9393 }
9494
95+ @ Test
96+ public void testTransitionToSubsequentStateWhenResourceChangedButHasDesiredResources ()
97+ throws Exception {
98+ try (MockRestartingContext ctx = new MockRestartingContext ()) {
99+ JobVertexID jobVertexId = new JobVertexID ();
100+ VertexParallelism availableParallelism =
101+ new VertexParallelism (singletonMap (jobVertexId , 1 ));
102+ VertexParallelism requiredParallelismForForcedRestart =
103+ new VertexParallelism (singletonMap (jobVertexId , 2 ));
104+
105+ ctx .setAvailableVertexParallelism (availableParallelism );
106+ ctx .setHasDesiredResources (true );
107+ Restarting restarting = createRestartingState (ctx , requiredParallelismForForcedRestart );
108+ ctx .setExpectCreatingExecutionGraph ();
109+ restarting .onGloballyTerminalState (JobStatus .CANCELED );
110+ }
111+ }
112+
95113 @ Test
96114 void testCancel () throws Exception {
97115 try (MockRestartingContext ctx = new MockRestartingContext ()) {
You can’t perform that action at this time.
0 commit comments