From 370b0a0519989bd5b259b86cafcedd5a64bdab94 Mon Sep 17 00:00:00 2001 From: Mary Gouseti Date: Wed, 19 Nov 2025 12:53:52 +0200 Subject: [PATCH 1/4] Fix: Downsample returns appropriate error when target index gets deleted unexpectedly. (#138228) (cherry picked from commit 14f5892ad658df85029d2853e5fd033476101d3f) # Conflicts: # x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java --- docs/changelog/138228.yaml | 6 + .../downsample/TransportDownsampleAction.java | 34 +++--- .../TransportDownsampleActionTests.java | 113 ++++++++++++++++-- 3 files changed, 129 insertions(+), 24 deletions(-) create mode 100644 docs/changelog/138228.yaml diff --git a/docs/changelog/138228.yaml b/docs/changelog/138228.yaml new file mode 100644 index 0000000000000..0136025ddaec8 --- /dev/null +++ b/docs/changelog/138228.yaml @@ -0,0 +1,6 @@ +pr: 138228 +summary: "Fix: Downsample returns appropriate error when target index gets deleted\ + \ unexpectedly." +area: Downsampling +type: bug +issues: [] diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java index 4a3b28a412fcd..551b21af80c08 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java @@ -133,20 +133,19 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc /** * This is the cluster state task executor for cluster state update actions. + * Visible for testing */ - private static final SimpleBatchedExecutor STATE_UPDATE_TASK_EXECUTOR = - new SimpleBatchedExecutor<>() { - @Override - public Tuple executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState) - throws Exception { - return Tuple.tuple(task.execute(clusterState), null); - } + static final SimpleBatchedExecutor STATE_UPDATE_TASK_EXECUTOR = new SimpleBatchedExecutor<>() { + @Override + public Tuple executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState) throws Exception { + return Tuple.tuple(task.execute(clusterState), null); + } - @Override - public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) { - task.listener.onResponse(AcknowledgedResponse.TRUE); - } - }; + @Override + public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) { + task.listener.onResponse(AcknowledgedResponse.TRUE); + } + }; @Inject public TransportDownsampleAction( @@ -1114,7 +1113,6 @@ public void onResponse(final AcknowledgedResponse response) { @Override public void onFailure(Exception e) { - recordSuccessMetrics(startTime); // Downsampling has already completed in all shards. listener.onFailure(e); } @@ -1169,6 +1167,11 @@ public ClusterState execute(ClusterState currentState) { logger.debug("Updating downsample index status for [{}]", downsampleIndexName); final ProjectMetadata project = currentState.metadata().getProject(projectId); final IndexMetadata downsampleIndex = project.index(downsampleIndexName); + if (downsampleIndex == null) { + throw new IllegalStateException( + "Failed to update downsample status because [" + downsampleIndexName + "] does not exist" + ); + } if (IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(downsampleIndex.getSettings()) == DownsampleTaskStatus.SUCCESS) { return currentState; } @@ -1190,7 +1193,6 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(Exception e) { - recordSuccessMetrics(startTime); // Downsampling has already completed in all shards. actionListener.onFailure(e); } @@ -1264,8 +1266,8 @@ public void onResponse(final AcknowledgedResponse response) { @Override public void onFailure(Exception e) { - recordSuccessMetrics(startTime); - logger.debug("Downsampling measured successfully", e); + recordFailureMetrics(startTime); + logger.debug("Downsampling failure measured successfully", e); this.actionListener.onFailure(e); } diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java index a968198098b39..163b119e9790f 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.routing.allocation.DataTier; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -56,6 +57,7 @@ import org.elasticsearch.xpack.core.downsample.DownsampleShardIndexerStatus; import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; import org.mockito.Answers; @@ -175,11 +177,13 @@ public void setUp() throws Exception { projectId = randomProjectIdOrDefault(); task = new Task(1, "type", "action", "description", null, null); + // Initialise mocks for thread pool and cluster service var threadContext = new ThreadContext(Settings.EMPTY); when(threadPool.getThreadContext()).thenReturn(threadContext); when(clusterService.localNode()).thenReturn(DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), "node_name")); when(clusterService.getSettings()).thenReturn(Settings.EMPTY); + // Mock refresh & force merge requests Answer mockBroadcastResponse = invocation -> { @SuppressWarnings("unchecked") var listener = (ActionListener) invocation.getArgument(1, ActionListener.class); @@ -190,9 +194,9 @@ public void setUp() throws Exception { doAnswer(mockBroadcastResponse).when(indicesAdminClient).forceMerge(any(), any()); doAnswer(invocation -> { var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class); - updateTask.listener.onResponse(randomBoolean() ? AcknowledgedResponse.TRUE : AcknowledgedResponse.FALSE); + updateTask.listener.onResponse(AcknowledgedResponse.TRUE); return null; - }).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any()); + }).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any()); when(indicesService.createIndexMapperServiceForValidation(any())).thenReturn(mapperService); MappedFieldType timestampFieldMock = mock(MappedFieldType.class); when(timestampFieldMock.meta()).thenReturn(Map.of()); @@ -236,11 +240,6 @@ private void downsample(String mapping) throws IOException { when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata); - doAnswer(invocation -> { - var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class); - updateTask.listener.onResponse(AcknowledgedResponse.TRUE); - return null; - }).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any()); Answer mockPersistentTask = invocation -> { ActionListener> listener = invocation.getArgument(4); PersistentTasksCustomMetadata.PersistentTask task = mock(PersistentTasksCustomMetadata.PersistentTask.class); @@ -260,6 +259,7 @@ private void downsample(String mapping) throws IOException { listener.onResponse(AcknowledgedResponse.TRUE); return null; }).when(indicesAdminClient).updateSettings(any(), any()); + assertSuccessfulUpdateDownsampleStatus(clusterState); PlainActionFuture listener = new PlainActionFuture<>(); action.masterOperation( @@ -298,6 +298,7 @@ public void testDownsamplingForceMergeWithShortCircuitAfterCreation() { .build(); when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata); + assertSuccessfulUpdateDownsampleStatus(clusterState); PlainActionFuture listener = new PlainActionFuture<>(); action.masterOperation( @@ -359,6 +360,7 @@ public void downsampleWithShortCircuitDuringCreation(String mapping) throws IOEx ) .build() ); + assertSuccessfulUpdateDownsampleStatus(clusterService.state()); PlainActionFuture listener = new PlainActionFuture<>(); action.masterOperation( @@ -368,7 +370,7 @@ public void downsampleWithShortCircuitDuringCreation(String mapping) throws IOEx sourceIndex, targetIndex, TimeValue.ONE_HOUR, - new DownsampleConfig(new DateHistogramInterval("5m")) + new DownsampleConfig(new DateHistogramInterval("5m"), randomSamplingMethod()) ), clusterState, listener @@ -377,6 +379,84 @@ public void downsampleWithShortCircuitDuringCreation(String mapping) throws IOEx verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS)); } + public void testDownsamplingWhenTargetIndexGetsDeleted() { + String mapping = switch (randomIntBetween(0, 2)) { + case 0 -> NO_METADATA_MAPPING; + case 1 -> OTHER_METADATA_MAPPING; + default -> FORCE_MERGE_ENABLED_MAPPING; + }; + mockGetMapping(mapping); + mockMergedMapping(mapping); + + var projectMetadata = ProjectMetadata.builder(projectId) + .put(createSourceIndexMetadata(sourceIndex, primaryShards, replicaShards)) + .build(); + + var clusterState = ClusterState.builder(ClusterState.EMPTY_STATE) + .putProjectMetadata(projectMetadata) + .blocks(ClusterBlocks.builder().addIndexBlock(projectId, sourceIndex, IndexMetadata.INDEX_WRITE_BLOCK)) + .build(); + + when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata); + + Answer mockPersistentTask = invocation -> { + ActionListener> listener = invocation.getArgument(4); + PersistentTasksCustomMetadata.PersistentTask task1 = mock(PersistentTasksCustomMetadata.PersistentTask.class); + when(task1.getId()).thenReturn(randomAlphaOfLength(10)); + DownsampleShardPersistentTaskState runningTaskState = new DownsampleShardPersistentTaskState( + DownsampleShardIndexerStatus.COMPLETED, + null + ); + when(task1.getState()).thenReturn(runningTaskState); + listener.onResponse(task1); + return null; + }; + doAnswer(mockPersistentTask).when(persistentTaskService).sendStartRequest(anyString(), anyString(), any(), any(), any()); + doAnswer(mockPersistentTask).when(persistentTaskService).waitForPersistentTaskCondition(any(), anyString(), any(), any(), any()); + doAnswer(invocation -> { + var listener = invocation.getArgument(1, TransportDownsampleAction.UpdateDownsampleIndexSettingsActionListener.class); + listener.onResponse(AcknowledgedResponse.TRUE); + return null; + }).when(indicesAdminClient).updateSettings(any(), any()); + + doAnswer(invocation -> { + var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class); + ClusterStateTaskExecutorUtils.executeHandlingResults( + clusterState, + TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR, + List.of(updateTask), + task1 -> {}, + TransportDownsampleAction.DownsampleClusterStateUpdateTask::onFailure + ); + return null; + }).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any()); + IllegalStateException error = safeAwaitFailure( + IllegalStateException.class, + AcknowledgedResponse.class, + listener -> action.masterOperation( + task, + new DownsampleAction.Request( + ESTestCase.TEST_REQUEST_TIMEOUT, + sourceIndex, + targetIndex, + TimeValue.ONE_HOUR, + new DownsampleConfig(new DateHistogramInterval("5m"), randomSamplingMethod()) + ), + clusterState, + listener + ) + ); + assertThat( + error.getMessage(), + Matchers.startsWith("Failed to update downsample status because [" + targetIndex + "] does not exist") + ); + verify(downsampleMetrics, never()).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS)); + verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.FAILED)); + verify(indicesAdminClient).refresh(any(), any()); + verify(indicesAdminClient, never()).flush(any(), any()); + verify(indicesAdminClient, never()).forceMerge(any(), any()); + } + private void mockGetMapping(String mapping) { doAnswer(invocation -> { @SuppressWarnings("unchecked") @@ -532,4 +612,21 @@ public void testGetSupportedMetrics() { assertThat(supported.defaultMetric(), is("max")); assertThat(supported.supportedMetrics(), is(List.of(metricType.supportedAggs()))); } + + private void assertSuccessfulUpdateDownsampleStatus(ClusterState clusterState) { + var projectMetadata = ProjectMetadata.builder(clusterState.metadata().getProject(projectId)) + .put(createSourceIndexMetadata(targetIndex, primaryShards, replicaShards)) + .build(); + + var updatedClusterState = ClusterState.builder(clusterState).putProjectMetadata(projectMetadata).build(); + doAnswer(invocation -> { + var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class); + ClusterStateTaskExecutorUtils.executeAndAssertSuccessful( + updatedClusterState, + TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR, + List.of(updateTask) + ); + return null; + }).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any()); + } } From 4f8aa059c41ef00b66422896545785f921ab200d Mon Sep 17 00:00:00 2001 From: Mary Gouseti Date: Thu, 20 Nov 2025 08:55:27 +0200 Subject: [PATCH 2/4] Update TransportDownsampleActionTests.java --- .../xpack/downsample/TransportDownsampleActionTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java index 163b119e9790f..f6affcb682a26 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java @@ -370,7 +370,7 @@ public void downsampleWithShortCircuitDuringCreation(String mapping) throws IOEx sourceIndex, targetIndex, TimeValue.ONE_HOUR, - new DownsampleConfig(new DateHistogramInterval("5m"), randomSamplingMethod()) + new DownsampleConfig(new DateHistogramInterval("5m")) ), clusterState, listener From b61fb6b1050b4433a63784d56fefbf4f9d7ec5cc Mon Sep 17 00:00:00 2001 From: Mary Gouseti Date: Thu, 20 Nov 2025 10:53:13 +0200 Subject: [PATCH 3/4] Update TransportDownsampleActionTests.java --- .../xpack/downsample/TransportDownsampleActionTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java index f6affcb682a26..b45ee9a8eeb03 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java @@ -440,7 +440,7 @@ public void testDownsamplingWhenTargetIndexGetsDeleted() { sourceIndex, targetIndex, TimeValue.ONE_HOUR, - new DownsampleConfig(new DateHistogramInterval("5m"), randomSamplingMethod()) + new DownsampleConfig(new DateHistogramInterval("5m")) ), clusterState, listener From 8fc1a7ff39322bee087ef5f9b9f5590a6913e376 Mon Sep 17 00:00:00 2001 From: gmarouli Date: Thu, 20 Nov 2025 12:05:14 +0200 Subject: [PATCH 4/4] Handle exception --- .../xpack/downsample/TransportDownsampleActionTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java index b45ee9a8eeb03..20a6b67c555a3 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java @@ -379,7 +379,7 @@ public void downsampleWithShortCircuitDuringCreation(String mapping) throws IOEx verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS)); } - public void testDownsamplingWhenTargetIndexGetsDeleted() { + public void testDownsamplingWhenTargetIndexGetsDeleted() throws IOException { String mapping = switch (randomIntBetween(0, 2)) { case 0 -> NO_METADATA_MAPPING; case 1 -> OTHER_METADATA_MAPPING;