Skip to content

Commit 0464372

Browse files
Optimize query storage by storing source field as a string in local index
Signed-off-by: Emily Guo <35637792+LilyCaroline17@users.noreply.github.com>
1 parent a8779a7 commit 0464372

21 files changed

+1150
-54
lines changed

src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ public List<Setting<?>> getSettings() {
154154
QueryInsightsSettings.TOP_N_EXPORTER_TYPE,
155155
QueryInsightsSettings.TOP_N_EXPORTER_TEMPLATE_PRIORITY,
156156
QueryInsightsSettings.TOP_N_QUERIES_EXCLUDED_INDICES,
157+
QueryInsightsSettings.MAX_SOURCE_LENGTH,
157158
QueryCategorizationSettings.SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY
158159
);
159160
}

src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java

Lines changed: 58 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
import java.time.ZonedDateTime;
1919
import java.time.format.DateTimeFormatter;
2020
import java.util.Collections;
21+
import java.util.HashMap;
2122
import java.util.List;
23+
import java.util.Map;
2224
import java.util.Objects;
2325
import java.util.concurrent.CompletableFuture;
2426
import org.apache.logging.log4j.LogManager;
@@ -47,6 +49,7 @@
4749
import org.opensearch.plugin.insights.core.metrics.OperationalMetric;
4850
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
4951
import org.opensearch.plugin.insights.core.utils.IndexDiscoveryHelper;
52+
import org.opensearch.plugin.insights.rules.model.Attribute;
5053
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
5154
import org.opensearch.transport.client.Client;
5255

@@ -217,10 +220,26 @@ public void onFailure(Exception e) {
217220
void bulk(final String indexName, final List<SearchQueryRecord> records) throws IOException {
218221
final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setTimeout(TimeValue.timeValueMinutes(1));
219222
for (SearchQueryRecord record : records) {
220-
bulkRequestBuilder.add(
221-
new IndexRequest(indexName).id(record.getId())
222-
.source(record.toXContentForExport(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
223-
);
223+
Object sourceValue = record.getAttributes().get(Attribute.SOURCE);
224+
if (sourceValue != null && !(sourceValue instanceof String)) {
225+
Map<Attribute, Object> modifiedAttrs = new HashMap<>(record.getAttributes());
226+
modifiedAttrs.put(Attribute.SOURCE, sourceValue.toString());
227+
SearchQueryRecord recordForExport = new SearchQueryRecord(
228+
record.getTimestamp(),
229+
record.getMeasurements(),
230+
modifiedAttrs,
231+
record.getId()
232+
);
233+
bulkRequestBuilder.add(
234+
new IndexRequest(indexName).id(record.getId())
235+
.source(recordForExport.toXContentForExport(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
236+
);
237+
} else {
238+
bulkRequestBuilder.add(
239+
new IndexRequest(indexName).id(record.getId())
240+
.source(record.toXContentForExport(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
241+
);
242+
}
224243
}
225244
bulkRequestBuilder.execute(new ActionListener<BulkResponse>() {
226245
@Override
@@ -328,8 +347,8 @@ String readIndexMappings() throws IOException {
328347
}
329348

330349
/**
331-
* Ensures that the template exists. This method first checks if the template exists and
332-
* only creates it if it doesn't.
350+
* Ensures that the template exists and is up-to-date. This method checks if the template exists,
351+
* compares its mapping with the current mapping, and updates it if necessary.
333352
*
334353
* @return CompletableFuture that completes when the template check/creation is done
335354
*/
@@ -341,16 +360,39 @@ CompletableFuture<Boolean> ensureTemplateExists() {
341360
client.execute(GetComposableIndexTemplateAction.INSTANCE, getRequest, new ActionListener<>() {
342361
@Override
343362
public void onResponse(GetComposableIndexTemplateAction.Response response) {
344-
// If the template exists and priority has not been changed, we don't need to create/update it
345-
if (response.indexTemplates().containsKey(TEMPLATE_NAME)
346-
&& response.indexTemplates().get(TEMPLATE_NAME).priority() == templatePriority) {
347-
logger.debug("Template [{}] already exists, skipping creation", TEMPLATE_NAME);
348-
future.complete(true);
349-
return;
350-
}
363+
try {
364+
String currentMapping = readIndexMappings();
365+
366+
if (response.indexTemplates().containsKey(TEMPLATE_NAME)) {
367+
ComposableIndexTemplate existingTemplate = response.indexTemplates().get(TEMPLATE_NAME);
368+
369+
// Check if template needs updating
370+
boolean needsUpdate = existingTemplate.priority() != templatePriority;
371+
372+
// For now, always update template when mapping content changes
373+
// More sophisticated mapping comparison can be added later if needed
374+
if (!needsUpdate && existingTemplate.template() != null && existingTemplate.template().mappings() != null) {
375+
String existingMappingString = existingTemplate.template().mappings().toString();
376+
if (!Objects.equals(existingMappingString, currentMapping)) {
377+
needsUpdate = true;
378+
}
379+
}
380+
381+
if (!needsUpdate) {
382+
logger.debug("Template [{}] is up-to-date, skipping update", TEMPLATE_NAME);
383+
future.complete(true);
384+
return;
385+
}
386+
387+
logger.info("Template [{}] needs updating", TEMPLATE_NAME);
388+
}
351389

352-
// Template doesn't exist, create it
353-
createTemplate(future);
390+
// Template doesn't exist or needs updating
391+
createTemplate(future);
392+
} catch (IOException e) {
393+
logger.error("Failed to read current mapping for template comparison", e);
394+
createTemplate(future);
395+
}
354396
}
355397

356398
@Override
@@ -365,7 +407,7 @@ public void onFailure(Exception e) {
365407
}
366408

367409
/**
368-
* Helper method to create the template
410+
* Helper method to create or update the template
369411
*
370412
* @param future The CompletableFuture to complete when done
371413
*/

src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import static org.opensearch.plugin.insights.rules.model.SearchQueryRecord.DEFAULT_TOP_N_QUERY_MAP;
1212
import static org.opensearch.plugin.insights.settings.QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING;
13+
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MAX_SOURCE_LENGTH;
1314
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_QUERIES_EXCLUDED_INDICES;
1415
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME;
1516
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_TYPE;
@@ -67,6 +68,7 @@ public final class QueryInsightsListener extends SearchRequestOperationsListener
6768
private boolean groupingFieldTypeEnabled;
6869
private final QueryShapeGenerator queryShapeGenerator;
6970
private Set<Pattern> excludedIndicesPattern;
71+
private int maxSourceLength;
7072

7173
/**
7274
* Constructor for QueryInsightsListener
@@ -163,6 +165,10 @@ public QueryInsightsListener(
163165
clusterService.getClusterSettings()
164166
.addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, this::setSearchQueryMetricsEnabled);
165167
setSearchQueryMetricsEnabled(clusterService.getClusterSettings().get(SEARCH_QUERY_METRICS_ENABLED_SETTING));
168+
169+
// Setting for max source length
170+
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_SOURCE_LENGTH, this::setMaxSourceLength);
171+
setMaxSourceLength(clusterService.getClusterSettings().get(MAX_SOURCE_LENGTH));
166172
}
167173

168174
private void setExcludedIndices(List<String> excludedIndices) {
@@ -202,6 +208,14 @@ public void setGroupingFieldTypeEnabled(Boolean fieldTypeEnabled) {
202208
this.groupingFieldTypeEnabled = fieldTypeEnabled;
203209
}
204210

211+
/**
212+
* Set the maximum source length before truncation
213+
* @param maxSourceLength maximum length for source strings
214+
*/
215+
public void setMaxSourceLength(int maxSourceLength) {
216+
this.maxSourceLength = maxSourceLength;
217+
}
218+
205219
/**
206220
* Update the query insights service state based on the enabled features.
207221
* If any feature is enabled, it starts the service. If no features are enabled, it stops the service.
@@ -313,7 +327,22 @@ private void constructSearchQueryRecord(final SearchPhaseContext context, final
313327

314328
Map<Attribute, Object> attributes = new HashMap<>();
315329
attributes.put(Attribute.SEARCH_TYPE, request.searchType().toString().toLowerCase(Locale.ROOT));
316-
attributes.put(Attribute.SOURCE, request.source());
330+
331+
// Handle source with truncation for very long queries to prevent storage bloat
332+
if (request.source() != null) {
333+
String sourceString = request.source().toString();
334+
if (sourceString.length() > maxSourceLength) {
335+
// Simple truncation - categorization will be skipped via boolean flag
336+
attributes.put(Attribute.SOURCE, sourceString.substring(0, maxSourceLength));
337+
attributes.put(Attribute.SOURCE_TRUNCATED, true);
338+
} else {
339+
attributes.put(Attribute.SOURCE, sourceString);
340+
attributes.put(Attribute.SOURCE_TRUNCATED, false);
341+
}
342+
} else {
343+
attributes.put(Attribute.SOURCE, null);
344+
attributes.put(Attribute.SOURCE_TRUNCATED, false);
345+
}
317346
attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards());
318347
attributes.put(Attribute.INDICES, request.indices());
319348
attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap());

src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ public QueryInsightsService(
197197
this.setExporterDeleteAfterAndDelete(clusterService.getClusterSettings().get(TOP_N_EXPORTER_DELETE_AFTER));
198198
this.setExporterAndReaderType(SinkType.parse(clusterService.getClusterSettings().get(TOP_N_EXPORTER_TYPE)));
199199
this.setTemplatePriority(clusterService.getClusterSettings().get(TOP_N_EXPORTER_TEMPLATE_PRIORITY));
200-
this.searchQueryCategorizer = SearchQueryCategorizer.getInstance(metricsRegistry);
200+
this.searchQueryCategorizer = SearchQueryCategorizer.getInstance(metricsRegistry, namedXContentRegistry);
201201
this.enableSearchQueryMetricsFeature(false);
202202
this.groupingType = DEFAULT_GROUPING_TYPE;
203203
}

src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryCategorizer.java

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,17 @@ public final class SearchQueryCategorizer {
3939

4040
final SearchQueryAggregationCategorizer searchQueryAggregationCategorizer;
4141
private static SearchQueryCategorizer instance;
42+
private static org.opensearch.core.xcontent.NamedXContentRegistry namedXContentRegistry;
4243

4344
/**
4445
* Constructor for SearchQueryCategorizor
4546
* @param metricsRegistry opentelemetry metrics registry
47+
* @param xContentRegistry NamedXContentRegistry for parsing
4648
*/
47-
private SearchQueryCategorizer(MetricsRegistry metricsRegistry) {
49+
private SearchQueryCategorizer(MetricsRegistry metricsRegistry, org.opensearch.core.xcontent.NamedXContentRegistry xContentRegistry) {
4850
searchQueryCounters = new SearchQueryCounters(metricsRegistry);
4951
searchQueryAggregationCategorizer = new SearchQueryAggregationCategorizer(searchQueryCounters);
52+
namedXContentRegistry = xContentRegistry;
5053
}
5154

5255
/**
@@ -55,10 +58,23 @@ private SearchQueryCategorizer(MetricsRegistry metricsRegistry) {
5558
* @return singleton instance
5659
*/
5760
public static SearchQueryCategorizer getInstance(MetricsRegistry metricsRegistry) {
61+
return getInstance(metricsRegistry, org.opensearch.core.xcontent.NamedXContentRegistry.EMPTY);
62+
}
63+
64+
/**
65+
* Get singleton instance of SearchQueryCategorizer
66+
* @param metricsRegistry metric registry
67+
* @param xContentRegistry NamedXContentRegistry for parsing
68+
* @return singleton instance
69+
*/
70+
public static SearchQueryCategorizer getInstance(
71+
MetricsRegistry metricsRegistry,
72+
org.opensearch.core.xcontent.NamedXContentRegistry xContentRegistry
73+
) {
5874
if (instance == null) {
5975
synchronized (SearchQueryCategorizer.class) {
6076
if (instance == null) {
61-
instance = new SearchQueryCategorizer(metricsRegistry);
77+
instance = new SearchQueryCategorizer(metricsRegistry, xContentRegistry);
6278
}
6379
}
6480
}
@@ -81,7 +97,39 @@ public void consumeRecords(List<SearchQueryRecord> records) {
8197
* @param record search query record
8298
*/
8399
public void categorize(SearchQueryRecord record) {
84-
SearchSourceBuilder source = (SearchSourceBuilder) record.getAttributes().get(Attribute.SOURCE);
100+
Object sourceObj = record.getAttributes().get(Attribute.SOURCE);
101+
if (sourceObj == null) {
102+
return;
103+
}
104+
105+
SearchSourceBuilder source;
106+
if (sourceObj instanceof String) {
107+
// Parse string back to SearchSourceBuilder for categorization
108+
String sourceString = (String) sourceObj;
109+
110+
// Check if source was previously truncated - skip parsing if so
111+
Boolean isTruncated = (Boolean) record.getAttributes().get(Attribute.SOURCE_TRUNCATED);
112+
if (Boolean.TRUE.equals(isTruncated)) {
113+
logger.debug("Skipping categorization for truncated source string");
114+
return;
115+
}
116+
117+
try {
118+
source = SearchSourceBuilder.fromXContent(
119+
org.opensearch.common.xcontent.json.JsonXContent.jsonXContent.createParser(
120+
namedXContentRegistry,
121+
org.opensearch.core.xcontent.DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
122+
sourceString
123+
)
124+
);
125+
} catch (Exception e) {
126+
logger.warn("Failed to parse SOURCE string for categorization: {} - Source: {}", e.getMessage(), sourceString);
127+
return;
128+
}
129+
} else {
130+
source = (SearchSourceBuilder) sourceObj;
131+
}
132+
85133
Map<MetricType, Measurement> measurements = record.getMeasurements();
86134

87135
incrementQueryTypeCounters(source.query(), measurements);
@@ -128,6 +176,7 @@ public SearchQueryCounters getSearchQueryCounters() {
128176
public void reset() {
129177
synchronized (SearchQueryCategorizer.class) {
130178
instance = null;
179+
namedXContentRegistry = null;
131180
}
132181
}
133182
}

src/main/java/org/opensearch/plugin/insights/core/utils/SearchResponseParser.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ private static List<SearchQueryRecord> parseSearchHits(
6969
final SearchResponse searchResponse,
7070
final NamedXContentRegistry namedXContentRegistry
7171
) throws Exception {
72+
if (searchResponse == null) {
73+
throw new IllegalArgumentException("SearchResponse cannot be null");
74+
}
75+
7276
List<SearchQueryRecord> records = new ArrayList<>();
7377

7478
for (SearchHit hit : searchResponse.getHits()) {

src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,12 @@ public enum Attribute {
8282
/**
8383
* The cancelled of the search query, often used in live queries.
8484
*/
85-
IS_CANCELLED;
85+
IS_CANCELLED,
86+
87+
/**
88+
* Indicates if the source was truncated due to length limits.
89+
*/
90+
SOURCE_TRUNCATED;
8691

8792
/**
8893
* Read an Attribute from a StreamInput
@@ -138,8 +143,14 @@ public static Object readAttributeValue(StreamInput in, Attribute attribute) thr
138143
if (attribute == Attribute.TASK_RESOURCE_USAGES) {
139144
return in.readList(TaskResourceInfo::readFromStream);
140145
} else if (attribute == Attribute.SOURCE) {
141-
SearchSourceBuilder builder = new SearchSourceBuilder(in);
142-
return builder;
146+
Object value = in.readGenericValue();
147+
if (value instanceof SearchSourceBuilder) {
148+
// Convert old SearchSourceBuilder objects to string for consistency
149+
return ((SearchSourceBuilder) value).toString();
150+
} else {
151+
// Already a string (new format)
152+
return value;
153+
}
143154
} else if (attribute == Attribute.GROUP_BY) {
144155
return GroupingType.valueOf(in.readString().toUpperCase(Locale.ROOT));
145156
} else {

0 commit comments

Comments
 (0)