diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java index 7ce04dbcc3773..93ef12ac88aaf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java @@ -114,7 +114,8 @@ void onGloballyTerminalState(JobStatus globallyTerminalState) { } private void goToSubsequentState() { - if (availableParallelismNotChanged(restartWithParallelism)) { + if (availableParallelismNotChanged(restartWithParallelism) + || context.hasDesiredResources()) { context.goToCreatingExecutionGraph(getExecutionGraph()); } else { context.goToWaitingForResources(getExecutionGraph()); @@ -163,6 +164,13 @@ interface Context * slots. */ Optional getAvailableVertexParallelism(); + + /** + * Checks whether we have the desired resources. + * + * @return {@code true} if we have enough resources; otherwise {@code false} + */ + boolean hasDesiredResources(); } static class Factory implements StateFactory { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockRestartingContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockRestartingContext.java index c4e2023b4c3ef..30cc9433600ee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockRestartingContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockRestartingContext.java @@ -51,6 +51,8 @@ class MockRestartingContext extends MockStateWithExecutionGraphContext @Nullable private VertexParallelism availableVertexParallelism; + private boolean hasDesiredResources = false; + public void setExpectCancelling(Consumer asserter) { cancellingStateValidator.expectInput(asserter); } @@ -68,6 +70,15 @@ public void setAvailableVertexParallelism( this.availableVertexParallelism = availableVertexParallelism; } + public void setHasDesiredResources(boolean hasDesiredResources) { + this.hasDesiredResources = hasDesiredResources; + } + + @Override + public boolean hasDesiredResources() { + return hasDesiredResources; + } + @Override public void goToCanceling( ExecutionGraph executionGraph, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java index b987d43e15451..c8b50df7db2c5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java @@ -30,6 +30,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,8 +77,10 @@ public void testTransitionToSubsequentStateWhenCancellationComplete( } } - @Test - public void testTransitionToSubsequentStateWhenResourceChanged() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTransitionToSubsequentStateWhenResourceChanged(boolean hasDesiredResources) + throws Exception { try (MockRestartingContext ctx = new MockRestartingContext()) { JobVertexID jobVertexId = new JobVertexID(); VertexParallelism availableParallelism = @@ -86,8 +89,13 @@ public void testTransitionToSubsequentStateWhenResourceChanged() throws Exceptio new VertexParallelism(singletonMap(jobVertexId, 2)); ctx.setAvailableVertexParallelism(availableParallelism); + ctx.setHasDesiredResources(hasDesiredResources); Restarting restarting = createRestartingState(ctx, requiredParallelismForForcedRestart); - ctx.setExpectWaitingForResources(); + if (hasDesiredResources) { + ctx.setExpectCreatingExecutionGraph(); + } else { + ctx.setExpectWaitingForResources(); + } restarting.onGloballyTerminalState(JobStatus.CANCELED); } }