diff --git a/qa/lucene-index-compatibility/src/javaRestTest/java/org/elasticsearch/lucene/FullClusterRestartSystemIndexCompatibilityIT.java b/qa/lucene-index-compatibility/src/javaRestTest/java/org/elasticsearch/lucene/FullClusterRestartSystemIndexCompatibilityIT.java index 1817e63eea953..9553dcae7b8c5 100644 --- a/qa/lucene-index-compatibility/src/javaRestTest/java/org/elasticsearch/lucene/FullClusterRestartSystemIndexCompatibilityIT.java +++ b/qa/lucene-index-compatibility/src/javaRestTest/java/org/elasticsearch/lucene/FullClusterRestartSystemIndexCompatibilityIT.java @@ -42,7 +42,7 @@ public FullClusterRestartSystemIndexCompatibilityIT(Version version) { /** * 1. creates an index on N-2 and performs async_search on it that is kept in system index * 2. After update to N-1 (latest) perform a system index migration step, also write block the index - * 3. on N, check that async search results are still retrievable and we can write to the system index + * 3. on N, check that N-1 search results are still retrievable and we can write to the system index */ public void testAsyncSearchIndexMigration() throws Exception { final String index = suffix("index"); @@ -112,7 +112,7 @@ public void testAsyncSearchIndexMigration() throws Exception { if (isFullyUpgradedTo(VERSION_CURRENT)) { assertThat(indexVersion(index, true), equalTo(VERSION_MINUS_2)); - assertAsyncSearchHitCount(async_search_ids.get("n-2_id"), numDocs); + // n-2 results should no longer be readable assertAsyncSearchHitCount(async_search_ids.get("n-1_id"), numDocs); // check system index is still writeable diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java index 197e0694cedff..288180d12bc4b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java @@ -576,6 +576,11 @@ public int read() { }); TransportVersion version = TransportVersion.readVersion(new InputStreamStreamInput(encodedIn)); assert version.onOrBefore(TransportVersion.current()) : version + " >= " + TransportVersion.current(); + if (TransportVersion.isCompatible(version) == false) { + throw new IllegalArgumentException( + "Unable to retrieve async search results. Stored results were created with an incompatible version of Elasticsearch." + ); + } final StreamInput input; input = CompressorFactory.COMPRESSOR.threadLocalStreamInput(encodedIn); try (StreamInput in = new NamedWriteableAwareStreamInput(input, registry)) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java index 5e304530b064f..0eb0c91c6999f 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java @@ -7,19 +7,23 @@ package org.elasticsearch.xpack.core.async; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.test.TransportVersionUtils; import org.elasticsearch.transport.AbstractTransportRequest; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.async.AsyncSearchIndexServiceTests.TestAsyncResponse; @@ -37,6 +41,8 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; public class AsyncResultsServiceTests extends ESSingleNodeTestCase { private ClusterService clusterService; @@ -132,7 +138,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId, @Before public void setup() { - clusterService = getInstanceFromNode(ClusterService.class); + clusterService = spy(getInstanceFromNode(ClusterService.class)); TransportService transportService = getInstanceFromNode(TransportService.class); BigArrays bigArrays = getInstanceFromNode(BigArrays.class); taskManager = transportService.getTaskManager(); @@ -326,4 +332,52 @@ public void testRetrieveFromDisk() throws Exception { deleteService.deleteResponse(new DeleteAsyncResultRequest(task.getExecutionId().getEncoded()), deleteListener); assertFutureThrows(deleteListener, ResourceNotFoundException.class); } + + public void testFailWithIncompatibleResults() throws Exception { + // force the search results to be serialized with an incompatible transport version + when(clusterService.state()).thenAnswer(invocation -> { + ClusterState state = (ClusterState) invocation.callRealMethod(); + return ClusterState.builder(state) + .putCompatibilityVersions( + node().getNodeEnvironment().nodeId(), + TransportVersionUtils.getPreviousVersion(TransportVersion.minimumCompatible()), + Map.of() + ) + .build(); + }); + + AsyncResultsService service = createResultsService(true); + TestRequest request = new TestRequest("test request"); + TestTask task = (TestTask) taskManager.register("test", "test", request); + try { + long startTime = System.currentTimeMillis(); + task.setExpirationTime(startTime + TimeValue.timeValueMinutes(1).getMillis()); + + // we need to store initial result + PlainActionFuture futureCreate = new PlainActionFuture<>(); + indexService.createResponse( + task.getExecutionId().getDocId(), + task.getOriginHeaders(), + new TestAsyncResponse(null, task.getExpirationTime()), + futureCreate + ); + futureCreate.actionGet(TimeValue.timeValueSeconds(10)); + + PlainActionFuture futureUpdate = new PlainActionFuture<>(); + indexService.updateResponse( + task.getExecutionId().getDocId(), + emptyMap(), + new TestAsyncResponse("final_response", task.getExpirationTime()), + futureUpdate + ); + futureUpdate.actionGet(TimeValue.timeValueSeconds(10)); + } finally { + taskManager.unregister(task); + } + + PlainActionFuture listener = new PlainActionFuture<>(); + // not waiting for completion, so should return immediately with timeout + service.retrieveResult(new GetAsyncResultRequest(task.getExecutionId().getEncoded()), listener); + assertFutureThrows(listener, IllegalArgumentException.class, RestStatus.BAD_REQUEST); + } }