Skip to content

Commit eafab52

Browse files
committed
[FLINK-38703][runtime] Update slot manager metrics in thread-safety manner
1 parent 9773cc8 commit eafab52

File tree

3 files changed

+135
-5
lines changed

3 files changed

+135
-5
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import java.util.HashSet;
5757
import java.util.List;
5858
import java.util.Map;
59+
import java.util.Objects;
5960
import java.util.Optional;
6061
import java.util.Set;
6162
import java.util.StringJoiner;
@@ -114,6 +115,8 @@ public class FineGrainedSlotManager implements SlotManager {
114115

115116
@Nullable private ScheduledFuture<?> clusterReconciliationCheck;
116117

118+
@Nullable private ScheduledFuture<?> metricsUpdateFuture;
119+
117120
@Nullable private CompletableFuture<Void> requirementsCheckFuture;
118121

119122
@Nullable private CompletableFuture<Void> declareNeededResourceFuture;
@@ -124,6 +127,11 @@ public class FineGrainedSlotManager implements SlotManager {
124127
/** True iff the component has been started. */
125128
private boolean started;
126129

130+
/** Metrics. */
131+
private long lastNumberFreeSlots;
132+
133+
private long lastNumberRegisteredSlots;
134+
127135
public FineGrainedSlotManager(
128136
ScheduledExecutor scheduledExecutor,
129137
SlotManagerConfiguration slotManagerConfiguration,
@@ -159,6 +167,7 @@ public FineGrainedSlotManager(
159167
mainThreadExecutor = null;
160168
clusterReconciliationCheck = null;
161169
requirementsCheckFuture = null;
170+
metricsUpdateFuture = null;
162171

163172
started = false;
164173
}
@@ -227,10 +236,23 @@ public void start(
227236
}
228237

229238
private void registerSlotManagerMetrics() {
230-
slotManagerMetricGroup.gauge(
231-
MetricNames.TASK_SLOTS_AVAILABLE, () -> (long) getNumberFreeSlots());
232-
slotManagerMetricGroup.gauge(
233-
MetricNames.TASK_SLOTS_TOTAL, () -> (long) getNumberRegisteredSlots());
239+
// Because taskManagerTracker is not thread-safe, metrics must be updated periodically on
240+
// the main thread to prevent concurrent modification issues.
241+
metricsUpdateFuture =
242+
scheduledExecutor.scheduleAtFixedRate(
243+
this::updateMetrics, 0L, 1000, TimeUnit.MILLISECONDS);
244+
245+
slotManagerMetricGroup.gauge(MetricNames.TASK_SLOTS_AVAILABLE, () -> lastNumberFreeSlots);
246+
slotManagerMetricGroup.gauge(MetricNames.TASK_SLOTS_TOTAL, () -> lastNumberRegisteredSlots);
247+
}
248+
249+
private void updateMetrics() {
250+
Objects.requireNonNull(mainThreadExecutor)
251+
.execute(
252+
() -> {
253+
lastNumberFreeSlots = getNumberFreeSlots();
254+
lastNumberRegisteredSlots = getNumberRegisteredSlots();
255+
});
234256
}
235257

236258
/** Suspends the component. This clears the internal state of the slot manager. */
@@ -250,6 +272,12 @@ public void suspend() {
250272
clusterReconciliationCheck = null;
251273
}
252274

275+
// stop the metrics updates
276+
if (metricsUpdateFuture != null) {
277+
metricsUpdateFuture.cancel(false);
278+
metricsUpdateFuture = null;
279+
}
280+
253281
slotStatusSyncer.close();
254282
taskManagerTracker.clear();
255283
resourceTracker.clear();

flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020
import org.apache.flink.api.common.JobID;
2121
import org.apache.flink.api.java.tuple.Tuple2;
2222
import org.apache.flink.api.java.tuple.Tuple6;
23+
import org.apache.flink.metrics.Gauge;
2324
import org.apache.flink.runtime.clusterframework.types.AllocationID;
2425
import org.apache.flink.runtime.clusterframework.types.ResourceID;
2526
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
2627
import org.apache.flink.runtime.clusterframework.types.SlotID;
2728
import org.apache.flink.runtime.instance.InstanceID;
2829
import org.apache.flink.runtime.messages.Acknowledge;
30+
import org.apache.flink.runtime.metrics.MetricNames;
2931
import org.apache.flink.runtime.metrics.MetricRegistry;
3032
import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
3133
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
@@ -38,6 +40,7 @@
3840
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
3941
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
4042
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
43+
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
4144
import org.apache.flink.util.function.ThrowingConsumer;
4245

4346
import org.junit.jupiter.api.Test;
@@ -50,6 +53,7 @@
5053
import java.util.Optional;
5154
import java.util.concurrent.CompletableFuture;
5255
import java.util.concurrent.atomic.AtomicInteger;
56+
import java.util.concurrent.atomic.AtomicReference;
5357
import java.util.function.Consumer;
5458

5559
import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
@@ -1058,4 +1062,98 @@ void testClearResourceRequirementsWithPendingTaskManager() throws Exception {
10581062
}
10591063
};
10601064
}
1065+
1066+
@Test
1067+
void testMetricsUpdate() throws Exception {
1068+
final AtomicReference<Gauge<Long>> slotsAvailableGauge = new AtomicReference<>();
1069+
final AtomicReference<Gauge<Long>> slotsTotalGauge = new AtomicReference<>();
1070+
1071+
final MetricRegistry metricRegistry =
1072+
TestingMetricRegistry.builder()
1073+
.setRegisterConsumer(
1074+
(metric, name, group) -> {
1075+
if (name.equals(MetricNames.TASK_SLOTS_AVAILABLE)) {
1076+
slotsAvailableGauge.set((Gauge<Long>) metric);
1077+
} else if (name.equals(MetricNames.TASK_SLOTS_TOTAL)) {
1078+
slotsTotalGauge.set((Gauge<Long>) metric);
1079+
}
1080+
})
1081+
.build();
1082+
1083+
final Context context = new Context();
1084+
context.setSlotManagerMetricGroup(
1085+
SlotManagerMetricGroup.create(metricRegistry, "localhost"));
1086+
final ManuallyTriggeredScheduledExecutor scheduledExecutor =
1087+
new ManuallyTriggeredScheduledExecutor();
1088+
context.setScheduledExecutor(scheduledExecutor);
1089+
final TaskExecutorConnection executorConnection1 = createTaskExecutorConnection();
1090+
final TaskExecutorConnection executorConnection2 = createTaskExecutorConnection();
1091+
1092+
context.runTest(
1093+
() -> {
1094+
assertThat(slotsAvailableGauge.get().getValue()).isEqualTo(0);
1095+
assertThat(slotsTotalGauge.get().getValue()).isEqualTo(0);
1096+
1097+
final CompletableFuture<SlotManager.RegistrationResult>
1098+
registerTaskManagerFuture1 = new CompletableFuture<>();
1099+
context.runInMainThreadAndWait(
1100+
() ->
1101+
registerTaskManagerFuture1.complete(
1102+
context.getSlotManager()
1103+
.registerTaskManager(
1104+
executorConnection1,
1105+
new SlotReport(),
1106+
DEFAULT_TOTAL_RESOURCE_PROFILE,
1107+
DEFAULT_SLOT_RESOURCE_PROFILE)));
1108+
assertThat(assertFutureCompleteAndReturn(registerTaskManagerFuture1))
1109+
.isEqualTo(SlotManager.RegistrationResult.SUCCESS);
1110+
1111+
final CompletableFuture<SlotManager.RegistrationResult>
1112+
registerTaskManagerFuture2 = new CompletableFuture<>();
1113+
context.runInMainThreadAndWait(
1114+
() ->
1115+
registerTaskManagerFuture2.complete(
1116+
context.getSlotManager()
1117+
.registerTaskManager(
1118+
executorConnection2,
1119+
new SlotReport(
1120+
createAllocatedSlotStatus(
1121+
new JobID(),
1122+
new AllocationID(),
1123+
DEFAULT_SLOT_RESOURCE_PROFILE)),
1124+
DEFAULT_TOTAL_RESOURCE_PROFILE,
1125+
DEFAULT_SLOT_RESOURCE_PROFILE)));
1126+
assertThat(assertFutureCompleteAndReturn(registerTaskManagerFuture2))
1127+
.isEqualTo(SlotManager.RegistrationResult.SUCCESS);
1128+
1129+
// triggers the metric update task on the main thread
1130+
scheduledExecutor.triggerPeriodicScheduledTasks();
1131+
context.runInMainThreadAndWait(() -> {});
1132+
1133+
assertThat(slotsTotalGauge.get().getValue())
1134+
.isEqualTo(2 * DEFAULT_NUM_SLOTS_PER_WORKER);
1135+
assertThat(slotsAvailableGauge.get().getValue())
1136+
.isEqualTo(2 * DEFAULT_NUM_SLOTS_PER_WORKER - 1);
1137+
1138+
final CompletableFuture<Boolean> unRegisterTaskManagerFuture =
1139+
new CompletableFuture<>();
1140+
context.runInMainThreadAndWait(
1141+
() ->
1142+
unRegisterTaskManagerFuture.complete(
1143+
context.getSlotManager()
1144+
.unregisterTaskManager(
1145+
executorConnection2.getInstanceID(),
1146+
TEST_EXCEPTION)));
1147+
assertThat(assertFutureCompleteAndReturn(unRegisterTaskManagerFuture)).isTrue();
1148+
1149+
// triggers the metric update task on the main thread
1150+
scheduledExecutor.triggerPeriodicScheduledTasks();
1151+
context.runInMainThreadAndWait(() -> {});
1152+
1153+
assertThat(slotsTotalGauge.get().getValue())
1154+
.isEqualTo(DEFAULT_NUM_SLOTS_PER_WORKER);
1155+
assertThat(slotsAvailableGauge.get().getValue())
1156+
.isEqualTo(DEFAULT_NUM_SLOTS_PER_WORKER);
1157+
});
1158+
}
10611159
}

flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ protected class Context {
152152
private SlotManagerMetricGroup slotManagerMetricGroup =
153153
UnregisteredMetricGroups.createUnregisteredSlotManagerMetricGroup();
154154
private BlockedTaskManagerChecker blockedTaskManagerChecker = resourceID -> false;
155-
private final ScheduledExecutor scheduledExecutor =
155+
private ScheduledExecutor scheduledExecutor =
156156
new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor());
157157
private final Executor mainThreadExecutor = EXECUTOR_RESOURCE.getExecutor();
158158
private FineGrainedSlotManager slotManager;
@@ -193,6 +193,10 @@ public void setBlockedTaskManagerChecker(
193193
this.blockedTaskManagerChecker = blockedTaskManagerChecker;
194194
}
195195

196+
public void setScheduledExecutor(ScheduledExecutor scheduledExecutor) {
197+
this.scheduledExecutor = scheduledExecutor;
198+
}
199+
196200
void runInMainThread(Runnable runnable) {
197201
mainThreadExecutor.execute(runnable);
198202
}

0 commit comments

Comments
 (0)