From 0a318586ef856ee0c109320cf5a8768ad692c5ee Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Wed, 5 Nov 2025 17:04:33 -0500 Subject: [PATCH 1/6] Cherry-pick first pass --- docs/changelog/136889.yaml | 6 + muted-tests.yml | 4 - .../SearchQueryThenFetchAsyncAction.java | 178 ++++++++++++++++-- .../elasticsearch/search/SearchService.java | 2 +- .../search/query/QuerySearchResult.java | 1 - ...sponse_might_include_reduction_failure.csv | 1 + 6 files changed, 170 insertions(+), 22 deletions(-) create mode 100644 docs/changelog/136889.yaml create mode 100644 server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv diff --git a/docs/changelog/136889.yaml b/docs/changelog/136889.yaml new file mode 100644 index 0000000000000..8888eeb316b6c --- /dev/null +++ b/docs/changelog/136889.yaml @@ -0,0 +1,6 @@ +pr: 136889 +summary: Remove early phase failure in batched +area: Search +type: bug +issues: + - 134151 diff --git a/muted-tests.yml b/muted-tests.yml index 44116115625eb..bd7603b3d9fcd 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -361,9 +361,6 @@ tests: - class: org.elasticsearch.repositories.blobstore.testkit.analyze.GCSRepositoryAnalysisRestIT method: testRepositoryAnalysis issue: https://github.com/elastic/elasticsearch/issues/125668 - - class: org.elasticsearch.search.basic.SearchWithRandomDisconnectsIT - method: testSearchWithRandomDisconnects - issue: https://github.com/elastic/elasticsearch/issues/122707 - class: org.elasticsearch.docker.test.DockerYmlTestSuiteIT method: test {p0=/11_nodes/Test cat nodes output} issue: https://github.com/elastic/elasticsearch/issues/125906 @@ -406,4 +403,3 @@ tests: - class: org.elasticsearch.xpack.esql.plugin.DataNodeRequestSenderIT method: testSearchWhileRelocating issue: https://github.com/elastic/elasticsearch/issues/128500 - diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 0437db6b9eb7c..2b97a32d114a3 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.core.SimpleRefCounted; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.node.Node; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchShardTarget; @@ -74,12 +75,16 @@ import java.util.function.BiFunction; import static org.elasticsearch.action.search.SearchPhaseController.getTopDocsSize; +import static org.elasticsearch.action.search.SearchPhaseController.merge; public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { private static final Logger logger = LogManager.getLogger(SearchQueryThenFetchAsyncAction.class); private static final TransportVersion BATCHED_QUERY_PHASE_VERSION = TransportVersion.fromName("batched_query_phase_version"); + private static final TransportVersion BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE = TransportVersion.fromName( + "batched_response_might_include_reduction_failure" + ); private final SearchProgressListener progressListener; @@ -218,18 +223,27 @@ public static final class NodeQueryResponse extends TransportResponse { private final RefCounted refCounted = LeakTracker.wrap(new SimpleRefCounted()); private final Object[] results; + private final Exception reductionFailure; private final SearchPhaseController.TopDocsStats topDocsStats; private final QueryPhaseResultConsumer.MergeResult mergeResult; - NodeQueryResponse(StreamInput in) throws IOException { + public NodeQueryResponse(StreamInput in) throws IOException { this.results = in.readArray(i -> i.readBoolean() ? new QuerySearchResult(i) : i.readException(), Object[]::new); - this.mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in); - this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in); + if (in.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE) && in.readBoolean()) { + this.reductionFailure = in.readException(); + this.mergeResult = null; + this.topDocsStats = null; + } else { + this.reductionFailure = null; + this.mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in); + this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in); + } } NodeQueryResponse( - QueryPhaseResultConsumer.MergeResult mergeResult, Object[] results, + Exception reductionFailure, + QueryPhaseResultConsumer.MergeResult mergeResult, SearchPhaseController.TopDocsStats topDocsStats ) { this.results = results; @@ -238,6 +252,7 @@ public static final class NodeQueryResponse extends TransportResponse { r.incRef(); } } + this.reductionFailure = reductionFailure; this.mergeResult = mergeResult; this.topDocsStats = topDocsStats; assert Arrays.stream(results).noneMatch(Objects::isNull) : Arrays.toString(results); @@ -248,6 +263,10 @@ public Object[] getResults() { return results; } + Exception getReductionFailure() { + return reductionFailure; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeArray((o, v) -> { @@ -260,8 +279,19 @@ public void writeTo(StreamOutput out) throws IOException { ((QuerySearchResult) v).writeTo(o); } }, results); - mergeResult.writeTo(out); - topDocsStats.writeTo(out); + if (out.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE)) { + boolean hasReductionFailure = reductionFailure != null; + out.writeBoolean(hasReductionFailure); + if (hasReductionFailure) { + out.writeException(reductionFailure); + } else { + mergeResult.writeTo(out); + topDocsStats.writeTo(out); + } + } else { + mergeResult.writeTo(out); + topDocsStats.writeTo(out); + } } @Override @@ -495,7 +525,12 @@ public Executor executor() { @Override public void handleResponse(NodeQueryResponse response) { if (results instanceof QueryPhaseResultConsumer queryPhaseResultConsumer) { - queryPhaseResultConsumer.addBatchedPartialResult(response.topDocsStats, response.mergeResult); + Exception reductionFailure = response.getReductionFailure(); + if (reductionFailure != null) { + queryPhaseResultConsumer.failure.compareAndSet(null, reductionFailure); + } else { + queryPhaseResultConsumer.addBatchedPartialResult(response.topDocsStats, response.mergeResult); + } } for (int i = 0; i < response.results.length; i++) { var s = request.shards.get(i); @@ -515,6 +550,21 @@ public void handleResponse(NodeQueryResponse response) { @Override public void handleException(TransportException e) { + if (connection.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE) == false) { + bwcHandleException(e); + return; + } + Exception cause = (Exception) ExceptionsHelper.unwrapCause(e); + logger.debug("handling node search exception coming from [" + nodeId + "]", cause); + onNodeQueryFailure(e, request, routing); + } + + /** + * This code is strictly for _snapshot_ backwards compatibility. The feature flag + * {@link SearchService#BATCHED_QUERY_PHASE_FEATURE_FLAG} was not turned on when the transport version + * {@link SearchQueryThenFetchAsyncAction#BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE} was introduced. + */ + private void bwcHandleException(TransportException e) { Exception cause = (Exception) ExceptionsHelper.unwrapCause(e); logger.debug("handling node search exception coming from [" + nodeId + "]", cause); if (e instanceof SendRequestTransportException || cause instanceof TaskCancelledException) { @@ -717,8 +767,8 @@ private void setFailure(QueryPerNodeState state, int dataNodeLocalIdx, Exception @Override public void onFailure(Exception e) { - // TODO: count down fully and just respond with an exception if partial results aren't allowed as an - // optimization + // Note: this shard won't be retried until it returns to the coordinating node where the shard iterator lives + // TODO: consider alternatives that don't wait for the entire batch to complete before retrying the shard setFailure(state, dataNodeLocalIdx, e); doneFuture.onResponse(null); } @@ -786,11 +836,93 @@ void onShardDone() { if (countDown.countDown() == false) { return; } + if (channel.getVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE) == false) { + bwcRespond(); + return; + } + var channelListener = new ChannelActionListener<>(channel); + NodeQueryResponse nodeQueryResponse; + try (queryPhaseResultConsumer) { + Exception reductionFailure = queryPhaseResultConsumer.failure.get(); + if (reductionFailure == null) { + nodeQueryResponse = getSuccessfulResponse(); + } else { + nodeQueryResponse = getReductionFailureResponse(reductionFailure); + } + } catch (IOException e) { + releaseAllResultsContexts(); + channelListener.onFailure(e); + return; + } + ActionListener.respondAndRelease(channelListener, nodeQueryResponse)); + } + + private NodeQueryResponse getSuccessfulResponse() throws IOException { + final QueryPhaseResultConsumer.MergeResult mergeResult; + try { + mergeResult = Objects.requireNonNullElse( + queryPhaseResultConsumer.consumePartialMergeResultDataNode(), + EMPTY_PARTIAL_MERGE_RESULT + ); + } catch (Exception e) { + return getReductionFailureResponse(e); + } + // translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments, + // also collect the set of indices that may be part of a subsequent fetch operation here so that we can release all other + // indices without a roundtrip to the coordinating node + final BitSet relevantShardIndices = new BitSet(searchRequest.shards.size()); + if (mergeResult.reducedTopDocs() != null) { + for (ScoreDoc scoreDoc : mergeResult.reducedTopDocs().scoreDocs) { + final int localIndex = scoreDoc.shardIndex; + scoreDoc.shardIndex = searchRequest.shards.get(localIndex).shardIndex; + relevantShardIndices.set(localIndex); + } + } + final Object[] results = new Object[queryPhaseResultConsumer.getNumShards()]; + for (int i = 0; i < results.length; i++) { + var result = queryPhaseResultConsumer.results.get(i); + if (result == null) { + results[i] = failures.get(i); + } else { + // free context id and remove it from the result right away in case we don't need it anymore + maybeFreeContext(result, relevantShardIndices, namedWriteableRegistry); + results[i] = result; + } + assert results[i] != null; + } + return new NodeQueryResponse(results, null, mergeResult, queryPhaseResultConsumer.topDocsStats); + } + + private NodeQueryResponse getReductionFailureResponse(Exception reductionFailure) throws IOException { + try { + final Object[] results = new Object[queryPhaseResultConsumer.getNumShards()]; + for (int i = 0; i < results.length; i++) { + var result = queryPhaseResultConsumer.results.get(i); + if (result == null) { + results[i] = failures.get(i); + } else { + results[i] = result; + } + assert results[i] != null; + } + return new NodeQueryResponse(results, reductionFailure, null, null); + } finally { + releaseAllResultsContexts(); + } + } + + /** + * This code is strictly for _snapshot_ backwards compatibility. The feature flag + * {@link SearchService#BATCHED_QUERY_PHASE_FEATURE_FLAG} was not turned on when the transport version + * {@link SearchQueryThenFetchAsyncAction#BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE} was introduced. + */ + void bwcRespond() { var channelListener = new ChannelActionListener<>(channel); try (queryPhaseResultConsumer) { var failure = queryPhaseResultConsumer.failure.get(); if (failure != null) { - handleMergeFailure(failure, channelListener, namedWriteableRegistry); + releaseAllResultsContexts(); + channelListener.onFailure(failure); return; } final QueryPhaseResultConsumer.MergeResult mergeResult; @@ -800,7 +932,8 @@ void onShardDone() { EMPTY_PARTIAL_MERGE_RESULT ); } catch (Exception e) { - handleMergeFailure(e, channelListener, namedWriteableRegistry); + releaseAllResultsContexts(); + channelListener.onFailure(e); return; } // translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments, @@ -839,16 +972,30 @@ && isPartOfPIT(searchRequest.searchRequest, q.getContextId(), namedWriteableRegi ActionListener.respondAndRelease( channelListener, - new NodeQueryResponse(mergeResult, results, queryPhaseResultConsumer.topDocsStats) + new NodeQueryResponse(results, null, mergeResult, queryPhaseResultConsumer.topDocsStats) ); } } - private void handleMergeFailure( - Exception e, - ChannelActionListener channelListener, + private void maybeFreeContext( + SearchPhaseResult result, + BitSet relevantShardIndices, NamedWriteableRegistry namedWriteableRegistry ) { + if (result instanceof QuerySearchResult q + && q.getContextId() != null + && relevantShardIndices.get(q.getShardIndex()) == false + && q.hasSuggestHits() == false + && q.getRankShardResult() == null + && searchRequest.searchRequest.scroll() == null + && isPartOfPIT(searchRequest.searchRequest, q.getContextId(), namedWriteableRegistry) == false) { + if (dependencies.searchService.freeReaderContext(q.getContextId())) { + q.clearContextId(); + } + } + } + + private void releaseAllResultsContexts() { queryPhaseResultConsumer.getSuccessfulResults() .forEach( searchPhaseResult -> releaseLocalContext( @@ -858,7 +1005,6 @@ private void handleMergeFailure( namedWriteableRegistry ) ); - channelListener.onFailure(e); } void consumeResult(QuerySearchResult queryResult) { diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index f69ba14b2d685..5247e3bff2491 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -286,7 +286,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv Property.NodeScope ); - private static final boolean BATCHED_QUERY_PHASE_FEATURE_FLAG = new FeatureFlag("batched_query_phase").isEnabled(); + public static final boolean BATCHED_QUERY_PHASE_FEATURE_FLAG = new FeatureFlag("batched_query_phase").isEnabled(); public static final int DEFAULT_SIZE = 10; public static final int DEFAULT_FROM = 0; diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 698006f324dc1..e820ad9030f88 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -491,7 +491,6 @@ public void writeToNoId(StreamOutput out) throws IOException { out.writeBoolean(true); writeTopDocs(out, topDocsAndMaxScore); } else { - assert isPartiallyReduced(); out.writeBoolean(false); } } else { diff --git a/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv new file mode 100644 index 0000000000000..eef83daf2840e --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv @@ -0,0 +1 @@ +9213000,9185007,9112012 From 8c4488435f60d353360a4bbebe7693dec0fed775 Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Thu, 13 Nov 2025 13:33:28 -0500 Subject: [PATCH 2/6] Fixes --- .../action/search/SearchQueryThenFetchAsyncAction.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 2b97a32d114a3..74c21b31683ef 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -34,7 +34,6 @@ import org.elasticsearch.core.SimpleRefCounted; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.node.Node; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchShardTarget; @@ -75,7 +74,6 @@ import java.util.function.BiFunction; import static org.elasticsearch.action.search.SearchPhaseController.getTopDocsSize; -import static org.elasticsearch.action.search.SearchPhaseController.merge; public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { @@ -767,8 +765,8 @@ private void setFailure(QueryPerNodeState state, int dataNodeLocalIdx, Exception @Override public void onFailure(Exception e) { - // Note: this shard won't be retried until it returns to the coordinating node where the shard iterator lives - // TODO: consider alternatives that don't wait for the entire batch to complete before retrying the shard + // TODO: count down fully and just respond with an exception if partial results aren't allowed as an + // optimization setFailure(state, dataNodeLocalIdx, e); doneFuture.onResponse(null); } From 899d78c515c44100294635a5c995affc2efc9cf9 Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Mon, 17 Nov 2025 16:00:52 -0500 Subject: [PATCH 3/6] Add transport version --- .../batched_response_might_include_reduction_failure.csv | 2 +- server/src/main/resources/transport/upper_bounds/8.19.csv | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv index eef83daf2840e..1e5fcced5a988 100644 --- a/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv +++ b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv @@ -1 +1 @@ -9213000,9185007,9112012 +9213000,9185007,9112012,8841074 diff --git a/server/src/main/resources/transport/upper_bounds/8.19.csv b/server/src/main/resources/transport/upper_bounds/8.19.csv index a2bfbb0094989..febec42efcb5b 100644 --- a/server/src/main/resources/transport/upper_bounds/8.19.csv +++ b/server/src/main/resources/transport/upper_bounds/8.19.csv @@ -1 +1 @@ -initial_8.19.8,8841073 +batched_response_might_include_reduction_failure,8841074 From b3d14e11d0cd6ccebc0ca93f6228c18dd970b19b Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Mon, 17 Nov 2025 22:39:02 -0500 Subject: [PATCH 4/6] Fix typo --- .../action/search/SearchQueryThenFetchAsyncAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 74c21b31683ef..9ee35275266fc 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -852,7 +852,7 @@ void onShardDone() { channelListener.onFailure(e); return; } - ActionListener.respondAndRelease(channelListener, nodeQueryResponse)); + ActionListener.respondAndRelease(channelListener, nodeQueryResponse); } private NodeQueryResponse getSuccessfulResponse() throws IOException { From d6edbed469ec54571a0bd7564685bdaf51472e16 Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Mon, 17 Nov 2025 22:47:52 -0500 Subject: [PATCH 5/6] Make ctor package private --- .../action/search/SearchQueryThenFetchAsyncAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 9ee35275266fc..59839150585a5 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -225,7 +225,7 @@ public static final class NodeQueryResponse extends TransportResponse { private final SearchPhaseController.TopDocsStats topDocsStats; private final QueryPhaseResultConsumer.MergeResult mergeResult; - public NodeQueryResponse(StreamInput in) throws IOException { + NodeQueryResponse(StreamInput in) throws IOException { this.results = in.readArray(i -> i.readBoolean() ? new QuerySearchResult(i) : i.readException(), Object[]::new); if (in.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE) && in.readBoolean()) { this.reductionFailure = in.readException(); From e788695ade3f512b6961a0e528b69132c7b231bc Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Mon, 17 Nov 2025 22:49:17 -0500 Subject: [PATCH 6/6] s --- muted-tests.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index 93513da005015..2ff0bac53fc11 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -406,5 +406,3 @@ tests: - class: org.elasticsearch.xpack.inference.InferenceRestIT method: test {p0=inference/70_text_similarity_rank_retriever/Text similarity reranker with min_score zero includes all docs} issue: https://github.com/elastic/elasticsearch/issues/137732 - ->>>>>>> 8.19