|
15 | 15 | import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; |
16 | 16 | import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; |
17 | 17 | import org.elasticsearch.cluster.ClusterChangedEvent; |
| 18 | +import org.elasticsearch.cluster.ClusterState; |
18 | 19 | import org.elasticsearch.cluster.ClusterStateListener; |
19 | 20 | import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata; |
20 | 21 | import org.elasticsearch.cluster.metadata.ReservedStateHandlerMetadata; |
|
39 | 40 | import java.util.concurrent.ExecutionException; |
40 | 41 | import java.util.concurrent.TimeUnit; |
41 | 42 | import java.util.concurrent.atomic.AtomicLong; |
| 43 | +import java.util.function.Function; |
42 | 44 | import java.util.stream.Stream; |
43 | 45 |
|
44 | 46 | import static org.elasticsearch.cluster.metadata.ReservedStateMetadata.EMPTY_VERSION; |
@@ -194,22 +196,29 @@ private Tuple<CountDownLatch, AtomicLong> setupClusterStateListener(String node, |
194 | 196 | ClusterService clusterService = internalCluster().clusterService(node); |
195 | 197 | CountDownLatch savedClusterState = new CountDownLatch(1); |
196 | 198 | AtomicLong metadataVersion = new AtomicLong(-1); |
| 199 | + Function<ClusterState, Boolean> clusterStateProcessor = clusterState -> { |
| 200 | + ReservedStateMetadata reservedState = clusterState.metadata().reservedStateMetadata().get(FileSettingsService.NAMESPACE); |
| 201 | + if (reservedState != null && reservedState.version() == fileSettingsVersion) { |
| 202 | + metadataVersion.set(clusterState.metadata().version()); |
| 203 | + savedClusterState.countDown(); |
| 204 | + logger.info( |
| 205 | + "done waiting for file settings [version: {}, metadata version: {}]", |
| 206 | + clusterState.version(), |
| 207 | + clusterState.metadata().version() |
| 208 | + ); |
| 209 | + return true; |
| 210 | + } |
| 211 | + return false; |
| 212 | + }; |
197 | 213 | clusterService.addListener(new ClusterStateListener() { |
198 | 214 | @Override |
199 | 215 | public void clusterChanged(ClusterChangedEvent event) { |
200 | | - ReservedStateMetadata reservedState = event.state().metadata().reservedStateMetadata().get(FileSettingsService.NAMESPACE); |
201 | | - if (reservedState != null && reservedState.version() == fileSettingsVersion) { |
| 216 | + if (clusterStateProcessor.apply(event.state())) { |
202 | 217 | clusterService.removeListener(this); |
203 | | - metadataVersion.set(event.state().metadata().version()); |
204 | | - savedClusterState.countDown(); |
205 | | - logger.info( |
206 | | - "done waiting for file settings [version: {}, metadata version: {}]", |
207 | | - event.state().version(), |
208 | | - event.state().metadata().version() |
209 | | - ); |
210 | 218 | } |
211 | 219 | } |
212 | 220 | }); |
| 221 | + clusterStateProcessor.apply(clusterService.state()); |
213 | 222 |
|
214 | 223 | return new Tuple<>(savedClusterState, metadataVersion); |
215 | 224 | } |
|
0 commit comments