Skip to content

Commit e3370c3

Browse files
committed
Fix: Downsample returns appropriate error when target index gets deleted unexpectedly. (#138228)
(cherry picked from commit 14f5892) # Conflicts: # x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java
1 parent a69c448 commit e3370c3

File tree

3 files changed

+127
-23
lines changed

3 files changed

+127
-23
lines changed

docs/changelog/138228.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 138228
2+
summary: "Fix: Downsample returns appropriate error when target index gets deleted\
3+
\ unexpectedly."
4+
area: Downsampling
5+
type: bug
6+
issues: []

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -133,20 +133,19 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
133133

134134
/**
135135
* This is the cluster state task executor for cluster state update actions.
136+
* Visible for testing
136137
*/
137-
private static final SimpleBatchedExecutor<DownsampleClusterStateUpdateTask, Void> STATE_UPDATE_TASK_EXECUTOR =
138-
new SimpleBatchedExecutor<>() {
139-
@Override
140-
public Tuple<ClusterState, Void> executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState)
141-
throws Exception {
142-
return Tuple.tuple(task.execute(clusterState), null);
143-
}
138+
static final SimpleBatchedExecutor<DownsampleClusterStateUpdateTask, Void> STATE_UPDATE_TASK_EXECUTOR = new SimpleBatchedExecutor<>() {
139+
@Override
140+
public Tuple<ClusterState, Void> executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState) throws Exception {
141+
return Tuple.tuple(task.execute(clusterState), null);
142+
}
144143

145-
@Override
146-
public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) {
147-
task.listener.onResponse(AcknowledgedResponse.TRUE);
148-
}
149-
};
144+
@Override
145+
public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) {
146+
task.listener.onResponse(AcknowledgedResponse.TRUE);
147+
}
148+
};
150149

151150
@Inject
152151
public TransportDownsampleAction(
@@ -1114,7 +1113,6 @@ public void onResponse(final AcknowledgedResponse response) {
11141113

11151114
@Override
11161115
public void onFailure(Exception e) {
1117-
recordSuccessMetrics(startTime); // Downsampling has already completed in all shards.
11181116
listener.onFailure(e);
11191117
}
11201118

@@ -1169,6 +1167,11 @@ public ClusterState execute(ClusterState currentState) {
11691167
logger.debug("Updating downsample index status for [{}]", downsampleIndexName);
11701168
final ProjectMetadata project = currentState.metadata().getProject(projectId);
11711169
final IndexMetadata downsampleIndex = project.index(downsampleIndexName);
1170+
if (downsampleIndex == null) {
1171+
throw new IllegalStateException(
1172+
"Failed to update downsample status because [" + downsampleIndexName + "] does not exist"
1173+
);
1174+
}
11721175
if (IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(downsampleIndex.getSettings()) == DownsampleTaskStatus.SUCCESS) {
11731176
return currentState;
11741177
}
@@ -1190,7 +1193,6 @@ public ClusterState execute(ClusterState currentState) {
11901193

11911194
@Override
11921195
public void onFailure(Exception e) {
1193-
recordSuccessMetrics(startTime); // Downsampling has already completed in all shards.
11941196
actionListener.onFailure(e);
11951197
}
11961198

@@ -1264,8 +1266,8 @@ public void onResponse(final AcknowledgedResponse response) {
12641266

12651267
@Override
12661268
public void onFailure(Exception e) {
1267-
recordSuccessMetrics(startTime);
1268-
logger.debug("Downsampling measured successfully", e);
1269+
recordFailureMetrics(startTime);
1270+
logger.debug("Downsampling failure measured successfully", e);
12691271
this.actionListener.onFailure(e);
12701272
}
12711273

x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java

Lines changed: 103 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.cluster.project.ProjectResolver;
3030
import org.elasticsearch.cluster.routing.allocation.DataTier;
3131
import org.elasticsearch.cluster.service.ClusterService;
32+
import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils;
3233
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
3334
import org.elasticsearch.common.compress.CompressedXContent;
3435
import org.elasticsearch.common.settings.IndexScopedSettings;
@@ -56,6 +57,7 @@
5657
import org.elasticsearch.xpack.core.downsample.DownsampleShardIndexerStatus;
5758
import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState;
5859
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
60+
import org.hamcrest.Matchers;
5961
import org.junit.After;
6062
import org.junit.Before;
6163
import org.mockito.Answers;
@@ -190,9 +192,11 @@ public void setUp() throws Exception {
190192
doAnswer(mockBroadcastResponse).when(indicesAdminClient).forceMerge(any(), any());
191193
doAnswer(invocation -> {
192194
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
193-
updateTask.listener.onResponse(randomBoolean() ? AcknowledgedResponse.TRUE : AcknowledgedResponse.FALSE);
195+
updateTask.listener.onResponse(AcknowledgedResponse.TRUE);
194196
return null;
195-
}).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any());
197+
}).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any());
198+
199+
// Mocks for mapping retrieval & merging
196200
when(indicesService.createIndexMapperServiceForValidation(any())).thenReturn(mapperService);
197201
MappedFieldType timestampFieldMock = mock(MappedFieldType.class);
198202
when(timestampFieldMock.meta()).thenReturn(Map.of());
@@ -236,11 +240,6 @@ private void downsample(String mapping) throws IOException {
236240

237241
when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata);
238242

239-
doAnswer(invocation -> {
240-
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
241-
updateTask.listener.onResponse(AcknowledgedResponse.TRUE);
242-
return null;
243-
}).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any());
244243
Answer<Void> mockPersistentTask = invocation -> {
245244
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = invocation.getArgument(4);
246245
PersistentTasksCustomMetadata.PersistentTask<?> task = mock(PersistentTasksCustomMetadata.PersistentTask.class);
@@ -260,6 +259,7 @@ private void downsample(String mapping) throws IOException {
260259
listener.onResponse(AcknowledgedResponse.TRUE);
261260
return null;
262261
}).when(indicesAdminClient).updateSettings(any(), any());
262+
assertSuccessfulUpdateDownsampleStatus(clusterState);
263263

264264
PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
265265
action.masterOperation(
@@ -298,6 +298,7 @@ public void testDownsamplingForceMergeWithShortCircuitAfterCreation() {
298298
.build();
299299

300300
when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata);
301+
assertSuccessfulUpdateDownsampleStatus(clusterState);
301302

302303
PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
303304
action.masterOperation(
@@ -359,6 +360,7 @@ public void downsampleWithShortCircuitDuringCreation(String mapping) throws IOEx
359360
)
360361
.build()
361362
);
363+
assertSuccessfulUpdateDownsampleStatus(clusterService.state());
362364

363365
PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
364366
action.masterOperation(
@@ -377,6 +379,83 @@ public void downsampleWithShortCircuitDuringCreation(String mapping) throws IOEx
377379
verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS));
378380
}
379381

382+
public void testDownsamplingWhenTargetIndexGetsDeleted() throws IOException {
383+
String mapping = switch (randomIntBetween(0, 2)) {
384+
case 0 -> NO_METADATA_MAPPING;
385+
case 1 -> OTHER_METADATA_MAPPING;
386+
default -> FORCE_MERGE_ENABLED_MAPPING;
387+
};
388+
mockGetMapping(mapping);
389+
mockMergedMapping(mapping);
390+
391+
var projectMetadata = ProjectMetadata.builder(projectId)
392+
.put(createSourceIndexMetadata(sourceIndex, primaryShards, replicaShards))
393+
.build();
394+
395+
var clusterState = ClusterState.builder(ClusterState.EMPTY_STATE)
396+
.putProjectMetadata(projectMetadata)
397+
.blocks(ClusterBlocks.builder().addIndexBlock(projectId, sourceIndex, IndexMetadata.INDEX_WRITE_BLOCK))
398+
.build();
399+
400+
when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata);
401+
402+
Answer<Void> mockPersistentTask = invocation -> {
403+
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = invocation.getArgument(4);
404+
PersistentTasksCustomMetadata.PersistentTask<?> task1 = mock(PersistentTasksCustomMetadata.PersistentTask.class);
405+
when(task1.getId()).thenReturn(randomAlphaOfLength(10));
406+
DownsampleShardPersistentTaskState runningTaskState = new DownsampleShardPersistentTaskState(
407+
DownsampleShardIndexerStatus.COMPLETED,
408+
null
409+
);
410+
when(task1.getState()).thenReturn(runningTaskState);
411+
listener.onResponse(task1);
412+
return null;
413+
};
414+
doAnswer(mockPersistentTask).when(persistentTaskService).sendStartRequest(anyString(), anyString(), any(), any(), any());
415+
doAnswer(mockPersistentTask).when(persistentTaskService).waitForPersistentTaskCondition(any(), anyString(), any(), any(), any());
416+
doAnswer(invocation -> {
417+
var listener = invocation.getArgument(1, TransportDownsampleAction.UpdateDownsampleIndexSettingsActionListener.class);
418+
listener.onResponse(AcknowledgedResponse.TRUE);
419+
return null;
420+
}).when(indicesAdminClient).updateSettings(any(), any());
421+
422+
doAnswer(invocation -> {
423+
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
424+
ClusterStateTaskExecutorUtils.executeHandlingResults(
425+
clusterState,
426+
TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR,
427+
List.of(updateTask),
428+
task1 -> {},
429+
TransportDownsampleAction.DownsampleClusterStateUpdateTask::onFailure
430+
);
431+
return null;
432+
}).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any());
433+
IllegalStateException error = safeAwaitFailure(
434+
IllegalStateException.class,
435+
AcknowledgedResponse.class,
436+
listener -> action.masterOperation(
437+
task,
438+
new DownsampleAction.Request(
439+
ESTestCase.TEST_REQUEST_TIMEOUT,
440+
sourceIndex,
441+
targetIndex,
442+
TimeValue.ONE_HOUR,
443+
new DownsampleConfig(new DateHistogramInterval("5m"))
444+
),
445+
clusterState,
446+
listener
447+
)
448+
);
449+
assertThat(
450+
error.getMessage(),
451+
Matchers.startsWith("Failed to update downsample status because [" + targetIndex + "] does not exist")
452+
);
453+
verify(downsampleMetrics, never()).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS));
454+
verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.FAILED));
455+
verify(indicesAdminClient).refresh(any(), any());
456+
verify(indicesAdminClient, never()).forceMerge(any(), any());
457+
}
458+
380459
private void mockGetMapping(String mapping) {
381460
doAnswer(invocation -> {
382461
@SuppressWarnings("unchecked")
@@ -532,4 +611,21 @@ public void testGetSupportedMetrics() {
532611
assertThat(supported.defaultMetric(), is("max"));
533612
assertThat(supported.supportedMetrics(), is(List.of(metricType.supportedAggs())));
534613
}
614+
615+
private void assertSuccessfulUpdateDownsampleStatus(ClusterState clusterState) {
616+
var projectMetadata = ProjectMetadata.builder(clusterState.metadata().getProject(projectId))
617+
.put(createSourceIndexMetadata(targetIndex, primaryShards, replicaShards))
618+
.build();
619+
620+
var updatedClusterState = ClusterState.builder(clusterState).putProjectMetadata(projectMetadata).build();
621+
doAnswer(invocation -> {
622+
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
623+
ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(
624+
updatedClusterState,
625+
TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR,
626+
List.of(updateTask)
627+
);
628+
return null;
629+
}).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any());
630+
}
535631
}

0 commit comments

Comments
 (0)