Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public List<Setting<?>> getSettings() {
QueryInsightsSettings.TOP_N_EXPORTER_TYPE,
QueryInsightsSettings.TOP_N_EXPORTER_TEMPLATE_PRIORITY,
QueryInsightsSettings.TOP_N_QUERIES_EXCLUDED_INDICES,
QueryInsightsSettings.MAX_SOURCE_LENGTH,
QueryCategorizationSettings.SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -47,6 +49,7 @@
import org.opensearch.plugin.insights.core.metrics.OperationalMetric;
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
import org.opensearch.plugin.insights.core.utils.IndexDiscoveryHelper;
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.transport.client.Client;

Expand Down Expand Up @@ -217,10 +220,26 @@ public void onFailure(Exception e) {
void bulk(final String indexName, final List<SearchQueryRecord> records) throws IOException {
final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setTimeout(TimeValue.timeValueMinutes(1));
for (SearchQueryRecord record : records) {
bulkRequestBuilder.add(
new IndexRequest(indexName).id(record.getId())
.source(record.toXContentForExport(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
Object sourceValue = record.getAttributes().get(Attribute.SOURCE);
if (sourceValue != null && !(sourceValue instanceof String)) {
Map<Attribute, Object> modifiedAttrs = new HashMap<>(record.getAttributes());
modifiedAttrs.put(Attribute.SOURCE, sourceValue.toString());
SearchQueryRecord recordForExport = new SearchQueryRecord(
record.getTimestamp(),
record.getMeasurements(),
modifiedAttrs,
record.getId()
);
bulkRequestBuilder.add(
new IndexRequest(indexName).id(record.getId())
.source(recordForExport.toXContentForExport(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
} else {
bulkRequestBuilder.add(
new IndexRequest(indexName).id(record.getId())
.source(record.toXContentForExport(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
}
}
bulkRequestBuilder.execute(new ActionListener<BulkResponse>() {
@Override
Expand Down Expand Up @@ -328,8 +347,8 @@ String readIndexMappings() throws IOException {
}

/**
* Ensures that the template exists. This method first checks if the template exists and
* only creates it if it doesn't.
* Ensures that the template exists and is up-to-date. This method checks if the template exists,
* compares its mapping with the current mapping, and updates it if necessary.
*
* @return CompletableFuture that completes when the template check/creation is done
*/
Expand All @@ -341,16 +360,39 @@ CompletableFuture<Boolean> ensureTemplateExists() {
client.execute(GetComposableIndexTemplateAction.INSTANCE, getRequest, new ActionListener<>() {
@Override
public void onResponse(GetComposableIndexTemplateAction.Response response) {
// If the template exists and priority has not been changed, we don't need to create/update it
if (response.indexTemplates().containsKey(TEMPLATE_NAME)
&& response.indexTemplates().get(TEMPLATE_NAME).priority() == templatePriority) {
logger.debug("Template [{}] already exists, skipping creation", TEMPLATE_NAME);
future.complete(true);
return;
}
try {
String currentMapping = readIndexMappings();

if (response.indexTemplates().containsKey(TEMPLATE_NAME)) {
ComposableIndexTemplate existingTemplate = response.indexTemplates().get(TEMPLATE_NAME);

// Check if template needs updating
boolean needsUpdate = existingTemplate.priority() != templatePriority;

// For now, always update template when mapping content changes
// More sophisticated mapping comparison can be added later if needed
if (!needsUpdate && existingTemplate.template() != null && existingTemplate.template().mappings() != null) {
String existingMappingString = existingTemplate.template().mappings().toString();
if (!Objects.equals(existingMappingString, currentMapping)) {
needsUpdate = true;
}
}

if (!needsUpdate) {
logger.debug("Template [{}] is up-to-date, skipping update", TEMPLATE_NAME);
future.complete(true);
return;
}

logger.info("Template [{}] needs updating", TEMPLATE_NAME);
}

// Template doesn't exist, create it
createTemplate(future);
// Template doesn't exist or needs updating
createTemplate(future);
} catch (IOException e) {
logger.error("Failed to read current mapping for template comparison", e);
createTemplate(future);
}
}

@Override
Expand All @@ -365,7 +407,7 @@ public void onFailure(Exception e) {
}

/**
* Helper method to create the template
* Helper method to create or update the template
*
* @param future The CompletableFuture to complete when done
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import static org.opensearch.plugin.insights.rules.model.SearchQueryRecord.DEFAULT_TOP_N_QUERY_MAP;
import static org.opensearch.plugin.insights.settings.QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MAX_SOURCE_LENGTH;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_QUERIES_EXCLUDED_INDICES;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_TYPE;
Expand Down Expand Up @@ -67,6 +68,7 @@ public final class QueryInsightsListener extends SearchRequestOperationsListener
private boolean groupingFieldTypeEnabled;
private final QueryShapeGenerator queryShapeGenerator;
private Set<Pattern> excludedIndicesPattern;
private int maxSourceLength;

/**
* Constructor for QueryInsightsListener
Expand Down Expand Up @@ -163,6 +165,10 @@ public QueryInsightsListener(
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, this::setSearchQueryMetricsEnabled);
setSearchQueryMetricsEnabled(clusterService.getClusterSettings().get(SEARCH_QUERY_METRICS_ENABLED_SETTING));

// Setting for max source length
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_SOURCE_LENGTH, this::setMaxSourceLength);
setMaxSourceLength(clusterService.getClusterSettings().get(MAX_SOURCE_LENGTH));
}

private void setExcludedIndices(List<String> excludedIndices) {
Expand Down Expand Up @@ -202,6 +208,14 @@ public void setGroupingFieldTypeEnabled(Boolean fieldTypeEnabled) {
this.groupingFieldTypeEnabled = fieldTypeEnabled;
}

/**
* Set the maximum source length before truncation
* @param maxSourceLength maximum length for source strings
*/
public void setMaxSourceLength(int maxSourceLength) {
this.maxSourceLength = maxSourceLength;
}

/**
* Update the query insights service state based on the enabled features.
* If any feature is enabled, it starts the service. If no features are enabled, it stops the service.
Expand Down Expand Up @@ -313,7 +327,22 @@ private void constructSearchQueryRecord(final SearchPhaseContext context, final

Map<Attribute, Object> attributes = new HashMap<>();
attributes.put(Attribute.SEARCH_TYPE, request.searchType().toString().toLowerCase(Locale.ROOT));
attributes.put(Attribute.SOURCE, request.source());

// Handle source with truncation for very long queries to prevent storage bloat
if (request.source() != null) {
String sourceString = request.source().toString();
if (sourceString.length() > maxSourceLength) {
// Simple truncation - categorization will be skipped via boolean flag
attributes.put(Attribute.SOURCE, sourceString.substring(0, maxSourceLength));
attributes.put(Attribute.SOURCE_TRUNCATED, true);
} else {
attributes.put(Attribute.SOURCE, sourceString);
attributes.put(Attribute.SOURCE_TRUNCATED, false);
}
} else {
attributes.put(Attribute.SOURCE, null);
attributes.put(Attribute.SOURCE_TRUNCATED, false);
}
attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards());
attributes.put(Attribute.INDICES, request.indices());
attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public QueryInsightsService(
this.setExporterDeleteAfterAndDelete(clusterService.getClusterSettings().get(TOP_N_EXPORTER_DELETE_AFTER));
this.setExporterAndReaderType(SinkType.parse(clusterService.getClusterSettings().get(TOP_N_EXPORTER_TYPE)));
this.setTemplatePriority(clusterService.getClusterSettings().get(TOP_N_EXPORTER_TEMPLATE_PRIORITY));
this.searchQueryCategorizer = SearchQueryCategorizer.getInstance(metricsRegistry);
this.searchQueryCategorizer = SearchQueryCategorizer.getInstance(metricsRegistry, namedXContentRegistry);
this.enableSearchQueryMetricsFeature(false);
this.groupingType = DEFAULT_GROUPING_TYPE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,17 @@ public final class SearchQueryCategorizer {

final SearchQueryAggregationCategorizer searchQueryAggregationCategorizer;
private static SearchQueryCategorizer instance;
private static org.opensearch.core.xcontent.NamedXContentRegistry namedXContentRegistry;

/**
* Constructor for SearchQueryCategorizor
* @param metricsRegistry opentelemetry metrics registry
* @param xContentRegistry NamedXContentRegistry for parsing
*/
private SearchQueryCategorizer(MetricsRegistry metricsRegistry) {
private SearchQueryCategorizer(MetricsRegistry metricsRegistry, org.opensearch.core.xcontent.NamedXContentRegistry xContentRegistry) {
searchQueryCounters = new SearchQueryCounters(metricsRegistry);
searchQueryAggregationCategorizer = new SearchQueryAggregationCategorizer(searchQueryCounters);
namedXContentRegistry = xContentRegistry;
}

/**
Expand All @@ -55,10 +58,23 @@ private SearchQueryCategorizer(MetricsRegistry metricsRegistry) {
* @return singleton instance
*/
public static SearchQueryCategorizer getInstance(MetricsRegistry metricsRegistry) {
return getInstance(metricsRegistry, org.opensearch.core.xcontent.NamedXContentRegistry.EMPTY);
}

/**
* Get singleton instance of SearchQueryCategorizer
* @param metricsRegistry metric registry
* @param xContentRegistry NamedXContentRegistry for parsing
* @return singleton instance
*/
public static SearchQueryCategorizer getInstance(
MetricsRegistry metricsRegistry,
org.opensearch.core.xcontent.NamedXContentRegistry xContentRegistry
) {
if (instance == null) {
synchronized (SearchQueryCategorizer.class) {
if (instance == null) {
instance = new SearchQueryCategorizer(metricsRegistry);
instance = new SearchQueryCategorizer(metricsRegistry, xContentRegistry);
}
}
}
Expand All @@ -81,7 +97,39 @@ public void consumeRecords(List<SearchQueryRecord> records) {
* @param record search query record
*/
public void categorize(SearchQueryRecord record) {
SearchSourceBuilder source = (SearchSourceBuilder) record.getAttributes().get(Attribute.SOURCE);
Object sourceObj = record.getAttributes().get(Attribute.SOURCE);
if (sourceObj == null) {
return;
}

SearchSourceBuilder source;
if (sourceObj instanceof String) {
// Parse string back to SearchSourceBuilder for categorization
String sourceString = (String) sourceObj;

// Check if source was previously truncated - skip parsing if so
Boolean isTruncated = (Boolean) record.getAttributes().get(Attribute.SOURCE_TRUNCATED);
if (Boolean.TRUE.equals(isTruncated)) {
logger.debug("Skipping categorization for truncated source string");
return;
}

try {
source = SearchSourceBuilder.fromXContent(
org.opensearch.common.xcontent.json.JsonXContent.jsonXContent.createParser(
namedXContentRegistry,
org.opensearch.core.xcontent.DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
sourceString
)
);
} catch (Exception e) {
logger.warn("Failed to parse SOURCE string for categorization: {} - Source: {}", e.getMessage(), sourceString);
return;
}
} else {
source = (SearchSourceBuilder) sourceObj;
}

Map<MetricType, Measurement> measurements = record.getMeasurements();

incrementQueryTypeCounters(source.query(), measurements);
Expand Down Expand Up @@ -128,6 +176,7 @@ public SearchQueryCounters getSearchQueryCounters() {
public void reset() {
synchronized (SearchQueryCategorizer.class) {
instance = null;
namedXContentRegistry = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ private static List<SearchQueryRecord> parseSearchHits(
final SearchResponse searchResponse,
final NamedXContentRegistry namedXContentRegistry
) throws Exception {
if (searchResponse == null) {
throw new IllegalArgumentException("SearchResponse cannot be null");
}

List<SearchQueryRecord> records = new ArrayList<>();

for (SearchHit hit : searchResponse.getHits()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@ public enum Attribute {
/**
* The cancelled of the search query, often used in live queries.
*/
IS_CANCELLED;
IS_CANCELLED,

/**
* Indicates if the source was truncated due to length limits.
*/
SOURCE_TRUNCATED;

/**
* Read an Attribute from a StreamInput
Expand Down Expand Up @@ -138,8 +143,14 @@ public static Object readAttributeValue(StreamInput in, Attribute attribute) thr
if (attribute == Attribute.TASK_RESOURCE_USAGES) {
return in.readList(TaskResourceInfo::readFromStream);
} else if (attribute == Attribute.SOURCE) {
SearchSourceBuilder builder = new SearchSourceBuilder(in);
return builder;
Object value = in.readGenericValue();
if (value instanceof SearchSourceBuilder) {
// Convert old SearchSourceBuilder objects to string for consistency
return ((SearchSourceBuilder) value).toString();
} else {
// Already a string (new format)
return value;
}
} else if (attribute == Attribute.GROUP_BY) {
return GroupingType.valueOf(in.readString().toUpperCase(Locale.ROOT));
} else {
Expand Down
Loading
Loading