Skip to content

Commit f716931

Browse files
authored
Return a proper error when fetching incompatible async search results (#138327)
1 parent 943990e commit f716931

File tree

3 files changed

+62
-3
lines changed

3 files changed

+62
-3
lines changed

qa/lucene-index-compatibility/src/javaRestTest/java/org/elasticsearch/lucene/FullClusterRestartSystemIndexCompatibilityIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public FullClusterRestartSystemIndexCompatibilityIT(Version version) {
4242
/**
4343
* 1. creates an index on N-2 and performs async_search on it that is kept in system index
4444
* 2. After update to N-1 (latest) perform a system index migration step, also write block the index
45-
* 3. on N, check that async search results are still retrievable and we can write to the system index
45+
* 3. on N, check that N-1 search results are still retrievable and we can write to the system index
4646
*/
4747
public void testAsyncSearchIndexMigration() throws Exception {
4848
final String index = suffix("index");
@@ -112,7 +112,7 @@ public void testAsyncSearchIndexMigration() throws Exception {
112112

113113
if (isFullyUpgradedTo(VERSION_CURRENT)) {
114114
assertThat(indexVersion(index, true), equalTo(VERSION_MINUS_2));
115-
assertAsyncSearchHitCount(async_search_ids.get("n-2_id"), numDocs);
115+
// n-2 results should no longer be readable
116116
assertAsyncSearchHitCount(async_search_ids.get("n-1_id"), numDocs);
117117

118118
// check system index is still writeable

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,11 @@ public int read() {
576576
});
577577
TransportVersion version = TransportVersion.readVersion(new InputStreamStreamInput(encodedIn));
578578
assert version.onOrBefore(TransportVersion.current()) : version + " >= " + TransportVersion.current();
579+
if (TransportVersion.isCompatible(version) == false) {
580+
throw new IllegalArgumentException(
581+
"Unable to retrieve async search results. Stored results were created with an incompatible version of Elasticsearch."
582+
);
583+
}
579584
final StreamInput input;
580585
input = CompressorFactory.COMPRESSOR.threadLocalStreamInput(encodedIn);
581586
try (StreamInput in = new NamedWriteableAwareStreamInput(input, registry)) {

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,23 @@
77
package org.elasticsearch.xpack.core.async;
88

99
import org.elasticsearch.ResourceNotFoundException;
10+
import org.elasticsearch.TransportVersion;
1011
import org.elasticsearch.action.ActionListener;
1112
import org.elasticsearch.action.DocWriteResponse;
1213
import org.elasticsearch.action.support.PlainActionFuture;
1314
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1415
import org.elasticsearch.action.update.UpdateResponse;
16+
import org.elasticsearch.cluster.ClusterState;
1517
import org.elasticsearch.cluster.service.ClusterService;
1618
import org.elasticsearch.common.util.BigArrays;
1719
import org.elasticsearch.core.TimeValue;
20+
import org.elasticsearch.rest.RestStatus;
1821
import org.elasticsearch.tasks.CancellableTask;
1922
import org.elasticsearch.tasks.Task;
2023
import org.elasticsearch.tasks.TaskId;
2124
import org.elasticsearch.tasks.TaskManager;
2225
import org.elasticsearch.test.ESSingleNodeTestCase;
26+
import org.elasticsearch.test.TransportVersionUtils;
2327
import org.elasticsearch.transport.AbstractTransportRequest;
2428
import org.elasticsearch.transport.TransportService;
2529
import org.elasticsearch.xpack.core.async.AsyncSearchIndexServiceTests.TestAsyncResponse;
@@ -37,6 +41,8 @@
3741
import static org.hamcrest.Matchers.lessThanOrEqualTo;
3842
import static org.hamcrest.Matchers.notNullValue;
3943
import static org.hamcrest.Matchers.nullValue;
44+
import static org.mockito.Mockito.spy;
45+
import static org.mockito.Mockito.when;
4046

4147
public class AsyncResultsServiceTests extends ESSingleNodeTestCase {
4248
private ClusterService clusterService;
@@ -132,7 +138,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId,
132138

133139
@Before
134140
public void setup() {
135-
clusterService = getInstanceFromNode(ClusterService.class);
141+
clusterService = spy(getInstanceFromNode(ClusterService.class));
136142
TransportService transportService = getInstanceFromNode(TransportService.class);
137143
BigArrays bigArrays = getInstanceFromNode(BigArrays.class);
138144
taskManager = transportService.getTaskManager();
@@ -326,4 +332,52 @@ public void testRetrieveFromDisk() throws Exception {
326332
deleteService.deleteResponse(new DeleteAsyncResultRequest(task.getExecutionId().getEncoded()), deleteListener);
327333
assertFutureThrows(deleteListener, ResourceNotFoundException.class);
328334
}
335+
336+
public void testFailWithIncompatibleResults() throws Exception {
337+
// force the search results to be serialized with an incompatible transport version
338+
when(clusterService.state()).thenAnswer(invocation -> {
339+
ClusterState state = (ClusterState) invocation.callRealMethod();
340+
return ClusterState.builder(state)
341+
.putCompatibilityVersions(
342+
node().getNodeEnvironment().nodeId(),
343+
TransportVersionUtils.getPreviousVersion(TransportVersion.minimumCompatible()),
344+
Map.of()
345+
)
346+
.build();
347+
});
348+
349+
AsyncResultsService<TestTask, TestAsyncResponse> service = createResultsService(true);
350+
TestRequest request = new TestRequest("test request");
351+
TestTask task = (TestTask) taskManager.register("test", "test", request);
352+
try {
353+
long startTime = System.currentTimeMillis();
354+
task.setExpirationTime(startTime + TimeValue.timeValueMinutes(1).getMillis());
355+
356+
// we need to store initial result
357+
PlainActionFuture<DocWriteResponse> futureCreate = new PlainActionFuture<>();
358+
indexService.createResponse(
359+
task.getExecutionId().getDocId(),
360+
task.getOriginHeaders(),
361+
new TestAsyncResponse(null, task.getExpirationTime()),
362+
futureCreate
363+
);
364+
futureCreate.actionGet(TimeValue.timeValueSeconds(10));
365+
366+
PlainActionFuture<UpdateResponse> futureUpdate = new PlainActionFuture<>();
367+
indexService.updateResponse(
368+
task.getExecutionId().getDocId(),
369+
emptyMap(),
370+
new TestAsyncResponse("final_response", task.getExpirationTime()),
371+
futureUpdate
372+
);
373+
futureUpdate.actionGet(TimeValue.timeValueSeconds(10));
374+
} finally {
375+
taskManager.unregister(task);
376+
}
377+
378+
PlainActionFuture<TestAsyncResponse> listener = new PlainActionFuture<>();
379+
// not waiting for completion, so should return immediately with timeout
380+
service.retrieveResult(new GetAsyncResultRequest(task.getExecutionId().getEncoded()), listener);
381+
assertFutureThrows(listener, IllegalArgumentException.class, RestStatus.BAD_REQUEST);
382+
}
329383
}

0 commit comments

Comments
 (0)