From 53d65ae497013f2fc08ed0e8978150c6007d0394 Mon Sep 17 00:00:00 2001 From: Mark Vieira Date: Wed, 19 Nov 2025 14:58:50 -0800 Subject: [PATCH 1/3] Return a proper error when fetching incompatible async search results --- .../core/async/AsyncTaskIndexService.java | 5 ++ .../core/async/AsyncResultsServiceTests.java | 55 ++++++++++++++++++- 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java index 197e0694cedff..e1e341db964f6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java @@ -576,6 +576,11 @@ public int read() { }); TransportVersion version = TransportVersion.readVersion(new InputStreamStreamInput(encodedIn)); assert version.onOrBefore(TransportVersion.current()) : version + " >= " + TransportVersion.current(); + if (TransportVersion.isCompatible(version) == false) { + throw new IllegalStateException( + "Unable to retrieve async search results. Stored results were created with an incompatible version of Elasticsearch." + ); + } final StreamInput input; input = CompressorFactory.COMPRESSOR.threadLocalStreamInput(encodedIn); try (StreamInput in = new NamedWriteableAwareStreamInput(input, registry)) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java index 5e304530b064f..75085fbe2f1a3 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java @@ -7,11 +7,13 @@ package org.elasticsearch.xpack.core.async; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.TimeValue; @@ -20,6 +22,7 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.test.TransportVersionUtils; import org.elasticsearch.transport.AbstractTransportRequest; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.async.AsyncSearchIndexServiceTests.TestAsyncResponse; @@ -37,6 +40,8 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; public class AsyncResultsServiceTests extends ESSingleNodeTestCase { private ClusterService clusterService; @@ -132,7 +137,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId, @Before public void setup() { - clusterService = getInstanceFromNode(ClusterService.class); + clusterService = spy(getInstanceFromNode(ClusterService.class)); TransportService transportService = getInstanceFromNode(TransportService.class); BigArrays bigArrays = getInstanceFromNode(BigArrays.class); taskManager = transportService.getTaskManager(); @@ -326,4 +331,52 @@ public void testRetrieveFromDisk() throws Exception { deleteService.deleteResponse(new DeleteAsyncResultRequest(task.getExecutionId().getEncoded()), deleteListener); assertFutureThrows(deleteListener, ResourceNotFoundException.class); } + + public void testFailWithIncompatibleResults() throws Exception { + // force the search results to be serialized with an incompatible transport version + when(clusterService.state()).thenAnswer(invocation -> { + ClusterState state = (ClusterState) invocation.callRealMethod(); + return ClusterState.builder(state) + .putCompatibilityVersions( + node().getNodeEnvironment().nodeId(), + TransportVersionUtils.getPreviousVersion(TransportVersion.minimumCompatible()), + Map.of() + ) + .build(); + }); + + AsyncResultsService service = createResultsService(true); + TestRequest request = new TestRequest("test request"); + TestTask task = (TestTask) taskManager.register("test", "test", request); + try { + long startTime = System.currentTimeMillis(); + task.setExpirationTime(startTime + TimeValue.timeValueMinutes(1).getMillis()); + + // we need to store initial result + PlainActionFuture futureCreate = new PlainActionFuture<>(); + indexService.createResponse( + task.getExecutionId().getDocId(), + task.getOriginHeaders(), + new TestAsyncResponse(null, task.getExpirationTime()), + futureCreate + ); + futureCreate.actionGet(TimeValue.timeValueSeconds(10)); + + PlainActionFuture futureUpdate = new PlainActionFuture<>(); + indexService.updateResponse( + task.getExecutionId().getDocId(), + emptyMap(), + new TestAsyncResponse("final_response", task.getExpirationTime()), + futureUpdate + ); + futureUpdate.actionGet(TimeValue.timeValueSeconds(10)); + } finally { + taskManager.unregister(task); + } + + PlainActionFuture listener = new PlainActionFuture<>(); + // not waiting for completion, so should return immediately with timeout + service.retrieveResult(new GetAsyncResultRequest(task.getExecutionId().getEncoded()), listener); + assertFutureThrows(listener, IllegalStateException.class); + } } From 04e6eba4f31d7343dffb0f8b10c1f2bb7c4e1242 Mon Sep 17 00:00:00 2001 From: Mark Vieira Date: Wed, 19 Nov 2025 15:37:26 -0800 Subject: [PATCH 2/3] Fix failing test --- .../lucene/FullClusterRestartSystemIndexCompatibilityIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/qa/lucene-index-compatibility/src/javaRestTest/java/org/elasticsearch/lucene/FullClusterRestartSystemIndexCompatibilityIT.java b/qa/lucene-index-compatibility/src/javaRestTest/java/org/elasticsearch/lucene/FullClusterRestartSystemIndexCompatibilityIT.java index 1817e63eea953..9553dcae7b8c5 100644 --- a/qa/lucene-index-compatibility/src/javaRestTest/java/org/elasticsearch/lucene/FullClusterRestartSystemIndexCompatibilityIT.java +++ b/qa/lucene-index-compatibility/src/javaRestTest/java/org/elasticsearch/lucene/FullClusterRestartSystemIndexCompatibilityIT.java @@ -42,7 +42,7 @@ public FullClusterRestartSystemIndexCompatibilityIT(Version version) { /** * 1. creates an index on N-2 and performs async_search on it that is kept in system index * 2. After update to N-1 (latest) perform a system index migration step, also write block the index - * 3. on N, check that async search results are still retrievable and we can write to the system index + * 3. on N, check that N-1 search results are still retrievable and we can write to the system index */ public void testAsyncSearchIndexMigration() throws Exception { final String index = suffix("index"); @@ -112,7 +112,7 @@ public void testAsyncSearchIndexMigration() throws Exception { if (isFullyUpgradedTo(VERSION_CURRENT)) { assertThat(indexVersion(index, true), equalTo(VERSION_MINUS_2)); - assertAsyncSearchHitCount(async_search_ids.get("n-2_id"), numDocs); + // n-2 results should no longer be readable assertAsyncSearchHitCount(async_search_ids.get("n-1_id"), numDocs); // check system index is still writeable From 61394af916a09d074630978707e73ddb746b5c73 Mon Sep 17 00:00:00 2001 From: Mark Vieira Date: Thu, 20 Nov 2025 09:06:35 -0800 Subject: [PATCH 3/3] Return an HTTP 400 error code --- .../elasticsearch/xpack/core/async/AsyncTaskIndexService.java | 2 +- .../xpack/core/async/AsyncResultsServiceTests.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java index e1e341db964f6..288180d12bc4b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java @@ -577,7 +577,7 @@ public int read() { TransportVersion version = TransportVersion.readVersion(new InputStreamStreamInput(encodedIn)); assert version.onOrBefore(TransportVersion.current()) : version + " >= " + TransportVersion.current(); if (TransportVersion.isCompatible(version) == false) { - throw new IllegalStateException( + throw new IllegalArgumentException( "Unable to retrieve async search results. Stored results were created with an incompatible version of Elasticsearch." ); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java index 75085fbe2f1a3..0eb0c91c6999f 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; @@ -377,6 +378,6 @@ public void testFailWithIncompatibleResults() throws Exception { PlainActionFuture listener = new PlainActionFuture<>(); // not waiting for completion, so should return immediately with timeout service.retrieveResult(new GetAsyncResultRequest(task.getExecutionId().getEncoded()), listener); - assertFutureThrows(listener, IllegalStateException.class); + assertFutureThrows(listener, IllegalArgumentException.class, RestStatus.BAD_REQUEST); } }