Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/138228.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 138228
summary: "Fix: Downsample returns appropriate error when target index gets deleted\
\ unexpectedly."
area: Downsampling
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,19 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc

/**
* This is the cluster state task executor for cluster state update actions.
* Visible for testing
*/
private static final SimpleBatchedExecutor<DownsampleClusterStateUpdateTask, Void> STATE_UPDATE_TASK_EXECUTOR =
new SimpleBatchedExecutor<>() {
@Override
public Tuple<ClusterState, Void> executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState)
throws Exception {
return Tuple.tuple(task.execute(clusterState), null);
}
static final SimpleBatchedExecutor<DownsampleClusterStateUpdateTask, Void> STATE_UPDATE_TASK_EXECUTOR = new SimpleBatchedExecutor<>() {
@Override
public Tuple<ClusterState, Void> executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState) throws Exception {
return Tuple.tuple(task.execute(clusterState), null);
}

@Override
public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) {
task.listener.onResponse(AcknowledgedResponse.TRUE);
}
};
@Override
public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) {
task.listener.onResponse(AcknowledgedResponse.TRUE);
}
};

@Inject
public TransportDownsampleAction(
Expand Down Expand Up @@ -1114,7 +1113,6 @@ public void onResponse(final AcknowledgedResponse response) {

@Override
public void onFailure(Exception e) {
recordSuccessMetrics(startTime); // Downsampling has already completed in all shards.
listener.onFailure(e);
}

Expand Down Expand Up @@ -1169,6 +1167,11 @@ public ClusterState execute(ClusterState currentState) {
logger.debug("Updating downsample index status for [{}]", downsampleIndexName);
final ProjectMetadata project = currentState.metadata().getProject(projectId);
final IndexMetadata downsampleIndex = project.index(downsampleIndexName);
if (downsampleIndex == null) {
throw new IllegalStateException(
"Failed to update downsample status because [" + downsampleIndexName + "] does not exist"
);
}
if (IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(downsampleIndex.getSettings()) == DownsampleTaskStatus.SUCCESS) {
return currentState;
}
Expand All @@ -1190,7 +1193,6 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void onFailure(Exception e) {
recordSuccessMetrics(startTime); // Downsampling has already completed in all shards.
actionListener.onFailure(e);
}

Expand Down Expand Up @@ -1264,8 +1266,8 @@ public void onResponse(final AcknowledgedResponse response) {

@Override
public void onFailure(Exception e) {
recordSuccessMetrics(startTime);
logger.debug("Downsampling measured successfully", e);
recordFailureMetrics(startTime);
logger.debug("Downsampling failure measured successfully", e);
this.actionListener.onFailure(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.allocation.DataTier;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.IndexScopedSettings;
Expand Down Expand Up @@ -56,6 +57,7 @@
import org.elasticsearch.xpack.core.downsample.DownsampleShardIndexerStatus;
import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.mockito.Answers;
Expand Down Expand Up @@ -175,11 +177,13 @@ public void setUp() throws Exception {
projectId = randomProjectIdOrDefault();
task = new Task(1, "type", "action", "description", null, null);

// Initialise mocks for thread pool and cluster service
var threadContext = new ThreadContext(Settings.EMPTY);
when(threadPool.getThreadContext()).thenReturn(threadContext);
when(clusterService.localNode()).thenReturn(DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), "node_name"));
when(clusterService.getSettings()).thenReturn(Settings.EMPTY);

// Mock refresh & force merge requests
Answer<Void> mockBroadcastResponse = invocation -> {
@SuppressWarnings("unchecked")
var listener = (ActionListener<BroadcastResponse>) invocation.getArgument(1, ActionListener.class);
Expand All @@ -190,9 +194,9 @@ public void setUp() throws Exception {
doAnswer(mockBroadcastResponse).when(indicesAdminClient).forceMerge(any(), any());
doAnswer(invocation -> {
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
updateTask.listener.onResponse(randomBoolean() ? AcknowledgedResponse.TRUE : AcknowledgedResponse.FALSE);
updateTask.listener.onResponse(AcknowledgedResponse.TRUE);
return null;
}).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any());
}).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any());
when(indicesService.createIndexMapperServiceForValidation(any())).thenReturn(mapperService);
MappedFieldType timestampFieldMock = mock(MappedFieldType.class);
when(timestampFieldMock.meta()).thenReturn(Map.of());
Expand Down Expand Up @@ -236,11 +240,6 @@ private void downsample(String mapping) throws IOException {

when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata);

doAnswer(invocation -> {
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
updateTask.listener.onResponse(AcknowledgedResponse.TRUE);
return null;
}).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any());
Answer<Void> mockPersistentTask = invocation -> {
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = invocation.getArgument(4);
PersistentTasksCustomMetadata.PersistentTask<?> task = mock(PersistentTasksCustomMetadata.PersistentTask.class);
Expand All @@ -260,6 +259,7 @@ private void downsample(String mapping) throws IOException {
listener.onResponse(AcknowledgedResponse.TRUE);
return null;
}).when(indicesAdminClient).updateSettings(any(), any());
assertSuccessfulUpdateDownsampleStatus(clusterState);

PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
action.masterOperation(
Expand Down Expand Up @@ -298,6 +298,7 @@ public void testDownsamplingForceMergeWithShortCircuitAfterCreation() {
.build();

when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata);
assertSuccessfulUpdateDownsampleStatus(clusterState);

PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
action.masterOperation(
Expand Down Expand Up @@ -359,6 +360,7 @@ public void downsampleWithShortCircuitDuringCreation(String mapping) throws IOEx
)
.build()
);
assertSuccessfulUpdateDownsampleStatus(clusterService.state());

PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
action.masterOperation(
Expand All @@ -377,6 +379,84 @@ public void downsampleWithShortCircuitDuringCreation(String mapping) throws IOEx
verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS));
}

public void testDownsamplingWhenTargetIndexGetsDeleted() throws IOException {
String mapping = switch (randomIntBetween(0, 2)) {
case 0 -> NO_METADATA_MAPPING;
case 1 -> OTHER_METADATA_MAPPING;
default -> FORCE_MERGE_ENABLED_MAPPING;
};
mockGetMapping(mapping);
mockMergedMapping(mapping);

var projectMetadata = ProjectMetadata.builder(projectId)
.put(createSourceIndexMetadata(sourceIndex, primaryShards, replicaShards))
.build();

var clusterState = ClusterState.builder(ClusterState.EMPTY_STATE)
.putProjectMetadata(projectMetadata)
.blocks(ClusterBlocks.builder().addIndexBlock(projectId, sourceIndex, IndexMetadata.INDEX_WRITE_BLOCK))
.build();

when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata);

Answer<Void> mockPersistentTask = invocation -> {
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = invocation.getArgument(4);
PersistentTasksCustomMetadata.PersistentTask<?> task1 = mock(PersistentTasksCustomMetadata.PersistentTask.class);
when(task1.getId()).thenReturn(randomAlphaOfLength(10));
DownsampleShardPersistentTaskState runningTaskState = new DownsampleShardPersistentTaskState(
DownsampleShardIndexerStatus.COMPLETED,
null
);
when(task1.getState()).thenReturn(runningTaskState);
listener.onResponse(task1);
return null;
};
doAnswer(mockPersistentTask).when(persistentTaskService).sendStartRequest(anyString(), anyString(), any(), any(), any());
doAnswer(mockPersistentTask).when(persistentTaskService).waitForPersistentTaskCondition(any(), anyString(), any(), any(), any());
doAnswer(invocation -> {
var listener = invocation.getArgument(1, TransportDownsampleAction.UpdateDownsampleIndexSettingsActionListener.class);
listener.onResponse(AcknowledgedResponse.TRUE);
return null;
}).when(indicesAdminClient).updateSettings(any(), any());

doAnswer(invocation -> {
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
ClusterStateTaskExecutorUtils.executeHandlingResults(
clusterState,
TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR,
List.of(updateTask),
task1 -> {},
TransportDownsampleAction.DownsampleClusterStateUpdateTask::onFailure
);
return null;
}).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any());
IllegalStateException error = safeAwaitFailure(
IllegalStateException.class,
AcknowledgedResponse.class,
listener -> action.masterOperation(
task,
new DownsampleAction.Request(
ESTestCase.TEST_REQUEST_TIMEOUT,
sourceIndex,
targetIndex,
TimeValue.ONE_HOUR,
new DownsampleConfig(new DateHistogramInterval("5m"))
),
clusterState,
listener
)
);
assertThat(
error.getMessage(),
Matchers.startsWith("Failed to update downsample status because [" + targetIndex + "] does not exist")
);
verify(downsampleMetrics, never()).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS));
verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.FAILED));
verify(indicesAdminClient).refresh(any(), any());
verify(indicesAdminClient, never()).flush(any(), any());
verify(indicesAdminClient, never()).forceMerge(any(), any());
}

private void mockGetMapping(String mapping) {
doAnswer(invocation -> {
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -532,4 +612,21 @@ public void testGetSupportedMetrics() {
assertThat(supported.defaultMetric(), is("max"));
assertThat(supported.supportedMetrics(), is(List.of(metricType.supportedAggs())));
}

private void assertSuccessfulUpdateDownsampleStatus(ClusterState clusterState) {
var projectMetadata = ProjectMetadata.builder(clusterState.metadata().getProject(projectId))
.put(createSourceIndexMetadata(targetIndex, primaryShards, replicaShards))
.build();

var updatedClusterState = ClusterState.builder(clusterState).putProjectMetadata(projectMetadata).build();
doAnswer(invocation -> {
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(
updatedClusterState,
TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR,
List.of(updateTask)
);
return null;
}).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any());
}
}