From baf5337bf66e7136c7053dad9f651a6f38d09fab Mon Sep 17 00:00:00 2001 From: Anxhela Coba Date: Wed, 19 Nov 2025 15:32:42 -0500 Subject: [PATCH 01/15] solr integration Signed-off-by: Anxhela Coba --- lightspeed-stack.yaml | 15 +++++----- run.yaml | 57 ++++++++++++++++++++++++++++---------- src/app/endpoints/query.py | 48 +++++++++++++++++++++++++++----- 3 files changed, 90 insertions(+), 30 deletions(-) diff --git a/lightspeed-stack.yaml b/lightspeed-stack.yaml index 9ac7f63a1..97b08c2b0 100644 --- a/lightspeed-stack.yaml +++ b/lightspeed-stack.yaml @@ -7,14 +7,13 @@ service: color_log: true access_log: true llama_stack: - # Uses a remote llama-stack service - # The instance would have already been started with a llama-stack-run.yaml file - use_as_library_client: false - # Alternative for "as library use" - # use_as_library_client: true - # library_client_config_path: - url: http://llama-stack:8321 - api_key: xyzzy + # Uses llama-stack as a library + use_as_library_client: true + library_client_config_path: run.yaml + # Remote service configuration (disabled) + # use_as_library_client: false + # url: http://llama-stack:8321 + # api_key: xyzzy user_data_collection: feedback_enabled: true feedback_storage: "/tmp/data/feedback" diff --git a/run.yaml b/run.yaml index d77492af4..ce31a7c11 100644 --- a/run.yaml +++ b/run.yaml @@ -16,7 +16,7 @@ apis: benchmarks: [] container_image: null datasets: [] -external_providers_dir: null +external_providers_dir: /Users/acoba/Documents/github/lightspeed/lightspeed-providers/resources/external_providers inference_store: db_path: .llama/distributions/ollama/inference_store.db type: sqlite @@ -115,13 +115,31 @@ providers: provider_type: inline::rag-runtime config: {} vector_io: - - provider_id: faiss - provider_type: inline::faiss # Or preferred vector DB - config: - kvstore: - db_path: .llama/distributions/ollama/faiss_store.db # Location of vector database - namespace: null - type: sqlite + - provider_id: solr-vector + provider_type: remote::solr_vector_io + config: + solr_url: "http://localhost:8983/solr" + collection_name: "portal-rag" + vector_field: "chunk_vector" + content_field: "chunk" + embedding_dimension: 384 + inference_provider_id: sentence-transformers + persistence: + type: sqlite + db_path: .llama/distributions/ollama/portal_rag_kvstore.db + namespace: portal-rag + chunk_window_config: + chunk_parent_id_field: "parent_id" + chunk_index_field: "chunk_index" + chunk_token_count_field: "num_tokens" + chunk_content_field: "chunk" + parent_total_chunks_field: "total_chunks" + parent_total_tokens_field: "total_tokens" + parent_content_id_field: "doc_id" + parent_content_title_field: "title" + parent_content_url_field: "reference_url" + chunk_filter_query: "is_chunk:true" + scoring_fns: [] server: auth: null @@ -136,22 +154,31 @@ shields: provider_id: llama-guard provider_shield_id: "gpt-3.5-turbo" # Model to use for safety checks vector_dbs: - - vector_db_id: my_knowledge_base - embedding_model: sentence-transformers/all-mpnet-base-v2 - embedding_dimension: 768 - provider_id: faiss + - vector_db_id: portal-rag + provider_id: solr-vector + provider_vector_db_id: portal-rag + embedding_model: granite-embedding + embedding_dimension: 384 models: - - model_id: sentence-transformers/all-mpnet-base-v2 # Example embedding model + - model_id: dummy-embedder model_type: embedding provider_id: sentence-transformers - provider_model_id: sentence-transformers/all-mpnet-base-v2 # Location of embedding model + provider_model_id: all-MiniLM-L6-v2 metadata: - embedding_dimension: 768 # Depends on chosen model + embedding_dimension: 384 + - model_id: gpt-4-turbo model_type: llm provider_id: openai provider_model_id: gpt-4-turbo + - model_id: granite-embedding + model_type: embedding + provider_id: sentence-transformers + provider_model_id: ibm-granite/granite-embedding-30m-english + metadata: + embedding_dimension: 384 + tool_groups: - toolgroup_id: builtin::rag provider_id: rag-runtime diff --git a/src/app/endpoints/query.py b/src/app/endpoints/query.py index 22537c930..e9a501102 100644 --- a/src/app/endpoints/query.py +++ b/src/app/endpoints/query.py @@ -3,6 +3,7 @@ import ast import json import logging +import traceback import re from datetime import UTC, datetime from typing import Annotated, Any, Optional, cast @@ -24,6 +25,7 @@ from llama_stack_client.types.shared.interleaved_content_item import TextContentItem from llama_stack_client.types.tool_execution_step import ToolExecutionStep + import constants import metrics from app.database import get_session @@ -732,12 +734,8 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche ), } - vector_db_ids = [ - vector_db.identifier for vector_db in await client.vector_dbs.list() - ] - toolgroups = (get_rag_toolgroups(vector_db_ids) or []) + [ - mcp_server.name for mcp_server in configuration.mcp_servers - ] + # Skip RAG toolgroups since we'll query Solr directly + toolgroups = [mcp_server.name for mcp_server in configuration.mcp_servers] # Convert empty list to None for consistency with existing behavior if not toolgroups: toolgroups = None @@ -752,8 +750,44 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche for doc in query_request.get_documents() ] + # Query vector databases for RAG chunks + vector_dbs = await client.vector_dbs.list() + vector_db_ids = [vdb.identifier for vdb in vector_dbs] + + rag_context = "" + try: + # Use the first available vector database if any exist + if vector_db_ids: + vector_db_id = vector_db_ids[0] # Use first available vector DB + + query_response = await client.vector_io.query( + vector_db_id=vector_db_id, + query=query_request.query, + params={"k": 5, "score_threshold": 0.0 } + ) + logger.info(f"The query response total payload:{query_response}") + + if query_response.chunks: + rag_context = "\n\nRelevant context from knowledge base:\n" + for i, chunk in enumerate(query_response.chunks[:3], 1): + rag_context += f"{i}. {chunk.content}\n" + + logger.info( + f"Retrieved {len(query_response.chunks)} chunks from vector DB" + ) + + except Exception as e: + logger.warning(f"Failed to query vector database for chunks: {e}") + logger.debug(f"Vector DB query error details: {traceback.format_exc()}") + # Continue without RAG context + + # Add RAG context to the user message if available + user_content = query_request.query + if rag_context: + user_content += rag_context + response = await agent.create_turn( - messages=[UserMessage(role="user", content=query_request.query)], + messages=[UserMessage(role="user", content=user_content)], session_id=session_id, documents=documents, stream=False, From 8dc7a16b2498128aa99a37514bd6cb2b7f879a41 Mon Sep 17 00:00:00 2001 From: Michael Clayton Date: Thu, 20 Nov 2025 11:20:27 -0500 Subject: [PATCH 02/15] populate rag_chunks --- src/app/endpoints/query.py | 78 +++++++++++++++++++++++++++++--------- 1 file changed, 60 insertions(+), 18 deletions(-) diff --git a/src/app/endpoints/query.py b/src/app/endpoints/query.py index e9a501102..d083be571 100644 --- a/src/app/endpoints/query.py +++ b/src/app/endpoints/query.py @@ -3,10 +3,11 @@ import ast import json import logging -import traceback import re +import traceback from datetime import UTC, datetime from typing import Annotated, Any, Optional, cast +from urllib.parse import urljoin from fastapi import APIRouter, Depends, HTTPException, Request, status from llama_stack_client import ( @@ -17,15 +18,14 @@ from llama_stack_client.types import Shield, UserMessage # type: ignore from llama_stack_client.types.agents.turn import Turn from llama_stack_client.types.agents.turn_create_params import ( + Document, Toolgroup, ToolgroupAgentToolGroupWithArgs, - Document, ) from llama_stack_client.types.model_list_response import ModelListResponse from llama_stack_client.types.shared.interleaved_content_item import TextContentItem from llama_stack_client.types.tool_execution_step import ToolExecutionStep - import constants import metrics from app.database import get_session @@ -41,6 +41,7 @@ from models.responses import ( ForbiddenResponse, QueryResponse, + RAGChunk, ReferencedDocument, ToolCall, UnauthorizedResponse, @@ -48,26 +49,31 @@ from utils.endpoints import ( check_configuration_loaded, get_agent, - get_topic_summary_system_prompt, - get_temp_agent, get_system_prompt, + get_temp_agent, + get_topic_summary_system_prompt, store_conversation_into_cache, validate_conversation_ownership, validate_model_provider_override, ) +from utils.mcp_headers import handle_mcp_headers_with_toolgroups, mcp_headers_dependency from utils.quota import ( - get_available_quotas, check_tokens_available, consume_tokens, + get_available_quotas, ) -from utils.mcp_headers import handle_mcp_headers_with_toolgroups, mcp_headers_dependency +from utils.token_counter import TokenCounter, extract_and_update_token_metrics from utils.transcripts import store_transcript from utils.types import TurnSummary -from utils.token_counter import extract_and_update_token_metrics, TokenCounter logger = logging.getLogger("app.endpoints.handlers") router = APIRouter(tags=["query"]) +# When OFFLINE is False, use reference_url for chunk source +# When OFFLINE is True, use parent_id for chunk source +# TODO: move this setting to a higher level configuration +OFFLINE = True + query_response: dict[int | str, dict[str, Any]] = { 200: { "conversation_id": "123e4567-e89b-12d3-a456-426614174000", @@ -296,15 +302,18 @@ async def query_endpoint_handler_base( # pylint: disable=R0914 user_conversation=user_conversation, query_request=query_request ), ) - summary, conversation_id, referenced_documents, token_usage = ( - await retrieve_response_func( - client, - llama_stack_model_id, - query_request, - token, - mcp_headers=mcp_headers, - provider_id=provider_id, - ) + ( + summary, + conversation_id, + referenced_documents, + token_usage, + ) = await retrieve_response_func( + client, + llama_stack_model_id, + query_request, + token, + mcp_headers=mcp_headers, + provider_id=provider_id, ) # Get the initial topic summary for the conversation @@ -755,6 +764,8 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche vector_db_ids = [vdb.identifier for vdb in vector_dbs] rag_context = "" + retrieved_chunks = [] + retrieved_scores = [] try: # Use the first available vector database if any exist if vector_db_ids: @@ -763,7 +774,7 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche query_response = await client.vector_io.query( vector_db_id=vector_db_id, query=query_request.query, - params={"k": 5, "score_threshold": 0.0 } + params={"k": 5, "score_threshold": 0.0}, ) logger.info(f"The query response total payload:{query_response}") @@ -772,6 +783,12 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche for i, chunk in enumerate(query_response.chunks[:3], 1): rag_context += f"{i}. {chunk.content}\n" + # Store chunks and scores for later inclusion in TurnSummary + retrieved_chunks = query_response.chunks + retrieved_scores = ( + query_response.scores if hasattr(query_response, "scores") else [] + ) + logger.info( f"Retrieved {len(query_response.chunks)} chunks from vector DB" ) @@ -795,6 +812,30 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche ) response = cast(Turn, response) + # Convert retrieved chunks to RAGChunk format + rag_chunks = [] + for i, chunk in enumerate(retrieved_chunks): + # Extract source from chunk metadata based on OFFLINE flag + source = None + if chunk.metadata: + if OFFLINE: + parent_id = chunk.metadata.get("parent_id") + if parent_id: + source = urljoin("https://mimir.corp.redhat.com", parent_id) + else: + source = chunk.metadata.get("reference_url") + + # Get score from retrieved_scores list if available + score = retrieved_scores[i] if i < len(retrieved_scores) else None + + rag_chunks.append( + RAGChunk( + content=chunk.content, + source=source, + score=score, + ) + ) + summary = TurnSummary( llm_response=( interleaved_content_as_str(response.output_message.content) @@ -805,6 +846,7 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche else "" ), tool_calls=[], + rag_chunks=rag_chunks, ) referenced_documents = parse_referenced_documents(response) From 12e0318f86927a256fc926ce14a2fae856e15dc4 Mon Sep 17 00:00:00 2001 From: Anxhela Coba Date: Thu, 20 Nov 2025 12:01:31 -0500 Subject: [PATCH 03/15] reference_docs Signed-off-by: Anxhela Coba --- src/app/endpoints/query.py | 83 +++++++++++++++++++++++++------------- src/models/requests.py | 2 +- src/models/responses.py | 3 ++ 3 files changed, 60 insertions(+), 28 deletions(-) diff --git a/src/app/endpoints/query.py b/src/app/endpoints/query.py index e9a501102..3fd77b5c1 100644 --- a/src/app/endpoints/query.py +++ b/src/app/endpoints/query.py @@ -601,7 +601,7 @@ def parse_metadata_from_text_item( url = data.get("docs_url") title = data.get("title") if url and title: - return ReferencedDocument(doc_url=url, doc_title=title) + return ReferencedDocument(doc_url=url, doc_title=title, doc_id=None) logger.debug("Invalid metadata block (missing url or title): %s", block) except (ValueError, SyntaxError) as e: logger.debug("Failed to parse metadata block: %s | Error: %s", block, e) @@ -750,50 +750,75 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche for doc in query_request.get_documents() ] - # Query vector databases for RAG chunks - vector_dbs = await client.vector_dbs.list() - vector_db_ids = [vdb.identifier for vdb in vector_dbs] + # Use the original query without adding RAG context to the user message + user_content = query_request.query - rag_context = "" + response = await agent.create_turn( + messages=[UserMessage(role="user", content=user_content)], + session_id=session_id, + documents=documents, + stream=False, + toolgroups=toolgroups, + ) + response = cast(Turn, response) + + # Extract RAG chunks from vector DB query response + rag_chunks = [] + doc_ids_from_chunks = [] try: # Use the first available vector database if any exist + vector_dbs = await client.vector_dbs.list() + vector_db_ids = [vdb.identifier for vdb in vector_dbs] + if vector_db_ids: vector_db_id = vector_db_ids[0] # Use first available vector DB query_response = await client.vector_io.query( vector_db_id=vector_db_id, query=query_request.query, - params={"k": 5, "score_threshold": 0.0 } + params={"k": 5, "score_threshold": 0.0}, ) - logger.info(f"The query response total payload:{query_response}") - if query_response.chunks: - rag_context = "\n\nRelevant context from knowledge base:\n" - for i, chunk in enumerate(query_response.chunks[:3], 1): - rag_context += f"{i}. {chunk.content}\n" + logger.info(f"The query response total payload: {query_response}") + if query_response.chunks: + from models.responses import RAGChunk, ReferencedDocument + + rag_chunks = [ + RAGChunk( + content=str(chunk.content), # Convert to string if needed + source=getattr(chunk, "doc_id", None) + or getattr(chunk, "source", None), + score=getattr(chunk, "score", None), + ) + for chunk in query_response.chunks[:5] # Limit to top 5 chunks + ] + logger.info(f"Retrieved {len(rag_chunks)} chunks from vector DB") + + # Extract doc_ids from chunks for referenced_documents + metadata_doc_ids = set() + for chunk in query_response.chunks[:5]: + metadata = getattr(chunk, "metadata", None) + if metadata and "doc_id" in metadata: + reference_doc = metadata["doc_id"] + logger.info(reference_doc) + if reference_doc and reference_doc not in metadata_doc_ids: + metadata_doc_ids.add(reference_doc) + doc_ids_from_chunks.append(ReferencedDocument( + doc_id=reference_doc, + doc_title=metadata.get("title", None), + doc_url=metadata.get("url", None), + + ) + ) logger.info( - f"Retrieved {len(query_response.chunks)} chunks from vector DB" + f"Extracted {len(doc_ids_from_chunks)} unique document IDs from chunks" ) except Exception as e: logger.warning(f"Failed to query vector database for chunks: {e}") logger.debug(f"Vector DB query error details: {traceback.format_exc()}") - # Continue without RAG context - - # Add RAG context to the user message if available - user_content = query_request.query - if rag_context: - user_content += rag_context - - response = await agent.create_turn( - messages=[UserMessage(role="user", content=user_content)], - session_id=session_id, - documents=documents, - stream=False, - toolgroups=toolgroups, - ) - response = cast(Turn, response) + # Continue without RAG chunks summary = TurnSummary( llm_response=( @@ -805,10 +830,14 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche else "" ), tool_calls=[], + rag_chunks=rag_chunks, ) referenced_documents = parse_referenced_documents(response) + # Add documents from Solr chunks to referenced_documents + referenced_documents.extend(doc_ids_from_chunks) + # Update token count metrics and extract token usage in one call model_label = model_id.split("/", 1)[1] if "/" in model_id else model_id token_usage = extract_and_update_token_metrics( diff --git a/src/models/requests.py b/src/models/requests.py index 1033828e1..ae2f43c46 100644 --- a/src/models/requests.py +++ b/src/models/requests.py @@ -154,7 +154,7 @@ class QueryRequest(BaseModel): # provides examples for /docs endpoint model_config = { - "extra": "forbid", + # "extra": "forbid", "json_schema_extra": { "examples": [ { diff --git a/src/models/responses.py b/src/models/responses.py index 1c03bbe84..fad37301b 100644 --- a/src/models/responses.py +++ b/src/models/responses.py @@ -182,6 +182,7 @@ class ReferencedDocument(BaseModel): Attributes: doc_url: Url to the referenced doc. doc_title: Title of the referenced doc. + doc_id: ID of the referenced doc. """ doc_url: Optional[AnyUrl] = Field( @@ -190,6 +191,8 @@ class ReferencedDocument(BaseModel): doc_title: str | None = Field(None, description="Title of the referenced document") + doc_id: str | None = Field(None, description="ID of the referenced document") + class QueryResponse(BaseModel): """Model representing LLM response to a query. From f89d2050abfd4991aa0182839dca741ea2914ee3 Mon Sep 17 00:00:00 2001 From: Anxhela Coba Date: Thu, 20 Nov 2025 17:36:37 -0500 Subject: [PATCH 04/15] remove doc_id in referenced docs and add full url in doc_url Signed-off-by: Anxhela Coba --- src/app/endpoints/query.py | 5 ++--- src/models/responses.py | 6 +----- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/src/app/endpoints/query.py b/src/app/endpoints/query.py index 9d33feb88..78261f87a 100644 --- a/src/app/endpoints/query.py +++ b/src/app/endpoints/query.py @@ -806,7 +806,7 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche # Extract doc_ids from chunks for referenced_documents metadata_doc_ids = set() - for chunk in query_response.chunks[:5]: + for chunk in query_response.chunks: metadata = getattr(chunk, "metadata", None) if metadata and "doc_id" in metadata: reference_doc = metadata["doc_id"] @@ -814,9 +814,8 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche if reference_doc and reference_doc not in metadata_doc_ids: metadata_doc_ids.add(reference_doc) doc_ids_from_chunks.append(ReferencedDocument( - doc_id=reference_doc, doc_title=metadata.get("title", None), - doc_url=metadata.get("url", None), + doc_url="https://mimir.corp.redhat.com" + reference_doc ) ) diff --git a/src/models/responses.py b/src/models/responses.py index fad37301b..ef835cbce 100644 --- a/src/models/responses.py +++ b/src/models/responses.py @@ -182,17 +182,13 @@ class ReferencedDocument(BaseModel): Attributes: doc_url: Url to the referenced doc. doc_title: Title of the referenced doc. - doc_id: ID of the referenced doc. """ - doc_url: Optional[AnyUrl] = Field( - None, description="URL of the referenced document" + doc_url: str | Optional[AnyUrl] = Field(None, description="URL of the referenced document" ) doc_title: str | None = Field(None, description="Title of the referenced document") - doc_id: str | None = Field(None, description="ID of the referenced document") - class QueryResponse(BaseModel): """Model representing LLM response to a query. From 4618bb549d3df580c9db5dc9d53a9b17ec62a194 Mon Sep 17 00:00:00 2001 From: Anxhela Coba Date: Mon, 1 Dec 2025 13:57:29 -0500 Subject: [PATCH 05/15] added solr filter words in params Signed-off-by: Anxhela Coba --- src/app/endpoints/query.py | 19 ++++++++++++++++++- src/models/requests.py | 11 ++++++++++- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/src/app/endpoints/query.py b/src/app/endpoints/query.py index 78261f87a..a4d024a8d 100644 --- a/src/app/endpoints/query.py +++ b/src/app/endpoints/query.py @@ -782,10 +782,27 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche if vector_db_ids: vector_db_id = vector_db_ids[0] # Use first available vector DB + params = {"k": 5, "score_threshold": 0.0} + logger.info(f"Initial params: {params}") + logger.info(f"query_request.solr: {query_request.solr}") + if query_request.solr and "fq" in query_request.solr: + # fq parameter can be dict or string + fq_value = query_request.solr["fq"] + logger.info(f"fq_value: {fq_value}, type: {type(fq_value)}") + if isinstance(fq_value, dict): + # convert dict to solr filter format: key:value + params["fq"] = [f"{key}:{value}" for key, value in fq_value.items()] + else: + params["fq"] = fq_value + logger.info(f"Final params with solr filters: {params}") + else: + logger.info("No solr filters provided or 'fq' key not found") + logger.info(f"Final params being sent to vector_io.query: {params}") + query_response = await client.vector_io.query( vector_db_id=vector_db_id, query=query_request.query, - params={"k": 5, "score_threshold": 0.0}, + params=params ) logger.info(f"The query response total payload: {query_response}") diff --git a/src/models/requests.py b/src/models/requests.py index ae2f43c46..1aeab39dc 100644 --- a/src/models/requests.py +++ b/src/models/requests.py @@ -1,6 +1,6 @@ """Models for REST API requests.""" -from typing import Optional, Self +from typing import Optional, Self, Any from enum import Enum from pydantic import BaseModel, model_validator, field_validator, Field @@ -152,6 +152,15 @@ class QueryRequest(BaseModel): examples=[MEDIA_TYPE_JSON, MEDIA_TYPE_TEXT], ) + solr: Optional[dict[str, Any]] = Field( + None, + description="Solr-specific query parameters including filter queries", + examples=[ + {"fq": {"product": "openshift", "product_version": "4.16"}}, + {"fq": "product:openshift AND product_version:4.16"} + ] + ) + # provides examples for /docs endpoint model_config = { # "extra": "forbid", From b8fd7929d7be4c6d09398abf72a62e0ccca9cf65 Mon Sep 17 00:00:00 2001 From: Anxhela Coba Date: Tue, 2 Dec 2025 13:36:07 -0500 Subject: [PATCH 06/15] streaming query Signed-off-by: Anxhela Coba --- src/app/endpoints/query.py | 14 +- src/app/endpoints/streaming_query.py | 185 +++++++++++++++++++++++++-- src/models/requests.py | 4 +- src/models/responses.py | 3 +- 4 files changed, 182 insertions(+), 24 deletions(-) diff --git a/src/app/endpoints/query.py b/src/app/endpoints/query.py index a4d024a8d..12baf8c74 100644 --- a/src/app/endpoints/query.py +++ b/src/app/endpoints/query.py @@ -41,7 +41,6 @@ from models.responses import ( ForbiddenResponse, QueryResponse, - RAGChunk, ReferencedDocument, ToolCall, UnauthorizedResponse, @@ -800,9 +799,7 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche logger.info(f"Final params being sent to vector_io.query: {params}") query_response = await client.vector_io.query( - vector_db_id=vector_db_id, - query=query_request.query, - params=params + vector_db_id=vector_db_id, query=query_request.query, params=params ) logger.info(f"The query response total payload: {query_response}") @@ -830,12 +827,13 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche logger.info(reference_doc) if reference_doc and reference_doc not in metadata_doc_ids: metadata_doc_ids.add(reference_doc) - doc_ids_from_chunks.append(ReferencedDocument( + doc_ids_from_chunks.append( + ReferencedDocument( doc_title=metadata.get("title", None), - doc_url="https://mimir.corp.redhat.com" + reference_doc - + doc_url="https://mimir.corp.redhat.com" + + reference_doc, + ) ) - ) # Store chunks and scores for later inclusion in TurnSummary retrieved_chunks = query_response.chunks retrieved_scores = ( diff --git a/src/app/endpoints/streaming_query.py b/src/app/endpoints/streaming_query.py index d4ad3088a..ab794ff76 100644 --- a/src/app/endpoints/streaming_query.py +++ b/src/app/endpoints/streaming_query.py @@ -4,9 +4,11 @@ import json import logging import re +import traceback import uuid from datetime import UTC, datetime from typing import Annotated, Any, AsyncGenerator, AsyncIterator, Iterator, cast +from urllib.parse import urljoin from fastapi import APIRouter, Depends, HTTPException, Request, status from fastapi.responses import StreamingResponse @@ -25,7 +27,6 @@ from app.database import get_session from app.endpoints.query import ( - get_rag_toolgroups, is_input_shield, is_output_shield, is_transcripts_enabled, @@ -48,7 +49,12 @@ from models.config import Action from models.database.conversations import UserConversation from models.requests import QueryRequest -from models.responses import ForbiddenResponse, UnauthorizedResponse +from models.responses import ( + ForbiddenResponse, + RAGChunk, + ReferencedDocument, + UnauthorizedResponse, +) from utils.endpoints import ( check_configuration_loaded, create_referenced_documents_with_metadata, @@ -67,6 +73,11 @@ logger = logging.getLogger("app.endpoints.handlers") router = APIRouter(tags=["streaming_query"]) +# When OFFLINE is False, use reference_url for chunk source +# When OFFLINE is True, use parent_id for chunk source +# TODO: move this setting to a higher level configuration +OFFLINE = True + streaming_query_responses: dict[int | str, dict[str, Any]] = { 200: { "description": "Streaming response with Server-Sent Events", @@ -161,9 +172,10 @@ def stream_start_event(conversation_id: str) -> str: def stream_end_event( metadata_map: dict, - summary: TurnSummary, # pylint: disable=unused-argument + summary: TurnSummary, token_usage: TokenCounter, media_type: str = MEDIA_TYPE_JSON, + vector_io_referenced_docs: list[ReferencedDocument] | None = None, ) -> str: """ Yield the end of the data stream. @@ -193,7 +205,7 @@ def stream_end_event( return f"\n\n---\n\n{ref_docs_string}" if ref_docs_string else "" # For JSON media type, we need to create a proper structure - # Since we don't have access to summary here, we'll create a basic structure + # Combine metadata_map documents with vector_io referenced documents referenced_docs_dict = [ { "doc_url": v.get("docs_url"), @@ -203,11 +215,26 @@ def stream_end_event( if "docs_url" in v and "title" in v ] + # Add vector_io referenced documents + if vector_io_referenced_docs: + for doc in vector_io_referenced_docs: + referenced_docs_dict.append( + { + "doc_url": doc.doc_url, + "doc_title": doc.doc_title, + } + ) + + # Convert RAG chunks to dict format + rag_chunks_dict = [] + if summary.rag_chunks: + rag_chunks_dict = [chunk.model_dump() for chunk in summary.rag_chunks] + return format_stream_data( { "event": "end", "data": { - "rag_chunks": [], # TODO(jboos): implement RAG chunks when summary is available + "rag_chunks": rag_chunks_dict, "referenced_documents": referenced_docs_dict, "truncated": None, # TODO(jboos): implement truncated "input_tokens": token_usage.input_tokens, @@ -771,6 +798,12 @@ async def streaming_query_endpoint_handler( # pylint: disable=too-many-locals,t token, mcp_headers=mcp_headers, ) + + # Query vector_io for RAG chunks and referenced documents + vector_io_rag_chunks, vector_io_referenced_docs = ( + await query_vector_io_for_chunks(client, query_request) + ) + metadata_map: dict[str, dict[str, Any]] = {} async def response_generator( @@ -789,7 +822,9 @@ async def response_generator( """ chunk_id = 0 summary = TurnSummary( - llm_response="No response from the model", tool_calls=[] + llm_response="No response from the model", + tool_calls=[], + rag_chunks=vector_io_rag_chunks, ) # Determine media type for response formatting @@ -833,7 +868,13 @@ async def response_generator( else TokenCounter() ) - yield stream_end_event(metadata_map, summary, token_usage, media_type) + yield stream_end_event( + metadata_map, + summary, + token_usage, + media_type, + vector_io_referenced_docs, + ) if not is_transcripts_enabled(): logger.debug("Transcript collection is disabled in the configuration") @@ -872,6 +913,10 @@ async def response_generator( summary, metadata_map ) + # Add vector_io referenced documents to the list + if vector_io_referenced_docs: + referenced_documents.extend(vector_io_referenced_docs) + cache_entry = CacheEntry( query=query_request.query, response=summary.llm_response, @@ -939,6 +984,124 @@ async def error_generator() -> AsyncGenerator[str, None]: return StreamingResponse(error_generator(), media_type=content_type) +async def query_vector_io_for_chunks( + client: AsyncLlamaStackClient, + query_request: QueryRequest, +) -> tuple[list[RAGChunk], list[ReferencedDocument]]: + """ + Query vector_io database for RAG chunks and referenced documents. + + Args: + client: AsyncLlamaStackClient for vector database access + query_request: The user's query request containing query text and Solr filters + + Returns: + tuple: A tuple containing RAG chunks and referenced documents + """ + rag_chunks = [] + doc_ids_from_chunks = [] + + try: + # Use the first available vector database if any exist + vector_dbs = await client.vector_dbs.list() + vector_db_ids = [vdb.identifier for vdb in vector_dbs] + + if vector_db_ids: + vector_db_id = vector_db_ids[0] # Use first available vector DB + + params = {"k": 5, "score_threshold": 0.0} + logger.info("Initial params: %s", params) + logger.info("query_request.solr: %s", query_request.solr) + if query_request.solr and "fq" in query_request.solr: + # fq parameter can be dict or string + fq_value = query_request.solr["fq"] + logger.info("fq_value: %s, type: %s", fq_value, type(fq_value)) + if isinstance(fq_value, dict): + # convert dict to solr filter format: key:value + params["fq"] = [f"{key}:{value}" for key, value in fq_value.items()] + else: + params["fq"] = fq_value + logger.info("Final params with solr filters: %s", params) + else: + logger.info("No solr filters provided or 'fq' key not found") + logger.info("Final params being sent to vector_io.query: %s", params) + + query_response = await client.vector_io.query( + vector_db_id=vector_db_id, query=query_request.query, params=params + ) + + logger.info("The query response total payload: %s", query_response) + + if query_response.chunks: + rag_chunks = [ + RAGChunk( + content=str(chunk.content), # Convert to string if needed + source=getattr(chunk, "doc_id", None) + or getattr(chunk, "source", None), + score=getattr(chunk, "score", None), + ) + for chunk in query_response.chunks[:5] # Limit to top 5 chunks + ] + logger.info("Retrieved %d chunks from vector DB", len(rag_chunks)) + + # Extract doc_ids from chunks for referenced_documents + metadata_doc_ids = set() + for chunk in query_response.chunks: + metadata = getattr(chunk, "metadata", None) + if metadata and "doc_id" in metadata: + reference_doc = metadata["doc_id"] + logger.info(reference_doc) + if reference_doc and reference_doc not in metadata_doc_ids: + metadata_doc_ids.add(reference_doc) + doc_ids_from_chunks.append( + ReferencedDocument( + doc_title=metadata.get("title", None), + doc_url="https://mimir.corp.redhat.com" + + reference_doc, + ) + ) + + logger.info( + "Extracted %d unique document IDs from chunks", + len(doc_ids_from_chunks), + ) + + # Convert retrieved chunks to RAGChunk format with proper source handling + final_rag_chunks = [] + for chunk in query_response.chunks[:5]: + # Extract source from chunk metadata based on OFFLINE flag + source = None + if chunk.metadata: + if OFFLINE: + parent_id = chunk.metadata.get("parent_id") + if parent_id: + source = urljoin( + "https://mimir.corp.redhat.com", parent_id + ) + else: + source = chunk.metadata.get("reference_url") + + # Get score from chunk if available + score = getattr(chunk, "score", None) + + final_rag_chunks.append( + RAGChunk( + content=chunk.content, + source=source, + score=score, + ) + ) + + return final_rag_chunks, doc_ids_from_chunks + + except Exception as e: # pylint: disable=broad-except + logger.warning("Failed to query vector database for chunks: %s", e) + logger.debug("Vector DB query error details: %s", traceback.format_exc()) + # Continue without RAG chunks + + return rag_chunks, doc_ids_from_chunks + + async def retrieve_response( client: AsyncLlamaStackClient, model_id: str, @@ -1031,12 +1194,8 @@ async def retrieve_response( ), } - vector_db_ids = [ - vector_db.identifier for vector_db in await client.vector_dbs.list() - ] - toolgroups = (get_rag_toolgroups(vector_db_ids) or []) + [ - mcp_server.name for mcp_server in configuration.mcp_servers - ] + # Skip RAG toolgroups since we'll query Solr directly + toolgroups = [mcp_server.name for mcp_server in configuration.mcp_servers] # Convert empty list to None for consistency with existing behavior if not toolgroups: toolgroups = None diff --git a/src/models/requests.py b/src/models/requests.py index 1aeab39dc..4d8b9ca50 100644 --- a/src/models/requests.py +++ b/src/models/requests.py @@ -157,8 +157,8 @@ class QueryRequest(BaseModel): description="Solr-specific query parameters including filter queries", examples=[ {"fq": {"product": "openshift", "product_version": "4.16"}}, - {"fq": "product:openshift AND product_version:4.16"} - ] + {"fq": "product:openshift AND product_version:4.16"}, + ], ) # provides examples for /docs endpoint diff --git a/src/models/responses.py b/src/models/responses.py index ef835cbce..070cfcf5d 100644 --- a/src/models/responses.py +++ b/src/models/responses.py @@ -184,7 +184,8 @@ class ReferencedDocument(BaseModel): doc_title: Title of the referenced doc. """ - doc_url: str | Optional[AnyUrl] = Field(None, description="URL of the referenced document" + doc_url: str | Optional[AnyUrl] = Field( + None, description="URL of the referenced document" ) doc_title: str | None = Field(None, description="Title of the referenced document") From 86c07ad1a526fd30900280c226e8007701650f22 Mon Sep 17 00:00:00 2001 From: Anxhela Coba Date: Tue, 2 Dec 2025 17:32:34 -0500 Subject: [PATCH 07/15] inject chunks to llm prompt Signed-off-by: Anxhela Coba --- src/app/endpoints/query.py | 79 ++++++++++----------------- src/app/endpoints/streaming_query.py | 80 +++++++++++++++++++++++++++- 2 files changed, 108 insertions(+), 51 deletions(-) diff --git a/src/app/endpoints/query.py b/src/app/endpoints/query.py index 12baf8c74..47097be21 100644 --- a/src/app/endpoints/query.py +++ b/src/app/endpoints/query.py @@ -758,21 +758,12 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche for doc in query_request.get_documents() ] - # Use the original query without adding RAG context to the user message - user_content = query_request.query - - response = await agent.create_turn( - messages=[UserMessage(role="user", content=user_content)], - session_id=session_id, - documents=documents, - stream=False, - toolgroups=toolgroups, - ) - response = cast(Turn, response) - - # Extract RAG chunks from vector DB query response + # Extract RAG chunks from vector DB query response BEFORE calling agent rag_chunks = [] doc_ids_from_chunks = [] + retrieved_chunks = [] + retrieved_scores = [] + try: # Use the first available vector database if any exist vector_dbs = await client.vector_dbs.list() @@ -807,16 +798,10 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche if query_response.chunks: from models.responses import RAGChunk, ReferencedDocument - rag_chunks = [ - RAGChunk( - content=str(chunk.content), # Convert to string if needed - source=getattr(chunk, "doc_id", None) - or getattr(chunk, "source", None), - score=getattr(chunk, "score", None), - ) - for chunk in query_response.chunks[:5] # Limit to top 5 chunks - ] - logger.info(f"Retrieved {len(rag_chunks)} chunks from vector DB") + retrieved_chunks = query_response.chunks + retrieved_scores = ( + query_response.scores if hasattr(query_response, "scores") else [] + ) # Extract doc_ids from chunks for referenced_documents metadata_doc_ids = set() @@ -834,11 +819,6 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche + reference_doc, ) ) - # Store chunks and scores for later inclusion in TurnSummary - retrieved_chunks = query_response.chunks - retrieved_scores = ( - query_response.scores if hasattr(query_response, "scores") else [] - ) logger.info( f"Extracted {len(doc_ids_from_chunks)} unique document IDs from chunks" @@ -850,7 +830,6 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche # Continue without RAG chunks # Convert retrieved chunks to RAGChunk format - rag_chunks = [] for i, chunk in enumerate(retrieved_chunks): # Extract source from chunk metadata based on OFFLINE flag source = None @@ -873,29 +852,29 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche ) ) - # Convert retrieved chunks to RAGChunk format - rag_chunks = [] - for i, chunk in enumerate(retrieved_chunks): - # Extract source from chunk metadata based on OFFLINE flag - source = None - if chunk.metadata: - if OFFLINE: - parent_id = chunk.metadata.get("parent_id") - if parent_id: - source = urljoin("https://mimir.corp.redhat.com", parent_id) - else: - source = chunk.metadata.get("reference_url") + logger.info(f"Retrieved {len(rag_chunks)} chunks from vector DB") - # Get score from retrieved_scores list if available - score = retrieved_scores[i] if i < len(retrieved_scores) else None + # Format RAG context for injection into user message + rag_context = "" + if rag_chunks: + context_chunks = [] + for chunk in rag_chunks[:5]: # Limit to top 5 chunks + chunk_text = f"Source: {chunk.source or 'Unknown'}\n{chunk.content}" + context_chunks.append(chunk_text) + rag_context = "\n\nRelevant documentation:\n" + "\n\n".join(context_chunks) + logger.info(f"Injecting {len(context_chunks)} RAG chunks into user message") - rag_chunks.append( - RAGChunk( - content=chunk.content, - source=source, - score=score, - ) - ) + # Inject RAG context into user message + user_content = query_request.query + rag_context + + response = await agent.create_turn( + messages=[UserMessage(role="user", content=user_content)], + session_id=session_id, + documents=documents, + stream=False, + toolgroups=toolgroups, + ) + response = cast(Turn, response) summary = TurnSummary( llm_response=( diff --git a/src/app/endpoints/streaming_query.py b/src/app/endpoints/streaming_query.py index ab794ff76..fee09e22b 100644 --- a/src/app/endpoints/streaming_query.py +++ b/src/app/endpoints/streaming_query.py @@ -1210,8 +1210,86 @@ async def retrieve_response( for doc in query_request.get_documents() ] + # Get RAG chunks before sending to LLM (reuse logic from query_vector_io_for_chunks) + rag_chunks = [] + try: + # Use the first available vector database if any exist + vector_dbs = await client.vector_dbs.list() + vector_db_ids = [vdb.identifier for vdb in vector_dbs] + + if vector_db_ids: + vector_db_id = vector_db_ids[0] # Use first available vector DB + + params = {"k": 5, "score_threshold": 0.0} + logger.info("Initial params: %s", params) + logger.info("query_request.solr: %s", query_request.solr) + if query_request.solr and "fq" in query_request.solr: + # fq parameter can be dict or string + fq_value = query_request.solr["fq"] + logger.info("fq_value: %s, type: %s", fq_value, type(fq_value)) + if isinstance(fq_value, dict): + # convert dict to solr filter format: key:value + params["fq"] = [f"{key}:{value}" for key, value in fq_value.items()] + else: + params["fq"] = fq_value + logger.info("Final params with solr filters: %s", params) + else: + logger.info("No solr filters provided or 'fq' key not found") + logger.info("Final params being sent to vector_io.query: %s", params) + + query_response = await client.vector_io.query( + vector_db_id=vector_db_id, query=query_request.query, params=params + ) + + logger.info("The query response total payload: %s", query_response) + + if query_response.chunks: + # Convert retrieved chunks to RAGChunk format with proper source handling + for chunk in query_response.chunks[:5]: + # Extract source from chunk metadata based on OFFLINE flag + source = None + if chunk.metadata: + if OFFLINE: + parent_id = chunk.metadata.get("parent_id") + if parent_id: + source = urljoin( + "https://mimir.corp.redhat.com", parent_id + ) + else: + source = chunk.metadata.get("reference_url") + + # Get score from chunk if available + score = getattr(chunk, "score", None) + + rag_chunks.append( + RAGChunk( + content=chunk.content, + source=source, + score=score, + ) + ) + + logger.info("Retrieved %d chunks from vector DB for streaming", len(rag_chunks)) + + except Exception as e: + logger.warning("Failed to query vector database for chunks: %s", e) + logger.debug("Vector DB query error details: %s", traceback.format_exc()) + + # Format RAG context for injection into user message + rag_context = "" + if rag_chunks: + context_chunks = [] + for chunk in rag_chunks[:5]: # Limit to top 5 chunks + chunk_text = f"Source: {chunk.source or 'Unknown'}\n{chunk.content}" + context_chunks.append(chunk_text) + rag_context = "\n\nRelevant documentation:\n" + "\n\n".join(context_chunks) + logger.info("Injecting %d RAG chunks into streaming user message", len(context_chunks)) + + # Inject RAG context into user message + user_content = query_request.query + rag_context + response = await agent.create_turn( - messages=[UserMessage(role="user", content=query_request.query)], + messages=[UserMessage(role="user", content=user_content)], session_id=session_id, documents=documents, stream=True, From e37edc6885b95b7dfa1685c7db2b611b70dea119 Mon Sep 17 00:00:00 2001 From: Anxhela Coba Date: Wed, 3 Dec 2025 08:51:40 -0500 Subject: [PATCH 08/15] remove no violation message Signed-off-by: Anxhela Coba --- src/app/endpoints/streaming_query.py | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/src/app/endpoints/streaming_query.py b/src/app/endpoints/streaming_query.py index fee09e22b..f2ef434d5 100644 --- a/src/app/endpoints/streaming_query.py +++ b/src/app/endpoints/streaming_query.py @@ -492,35 +492,26 @@ def _handle_shield_event( Processes a shield event chunk and yields a formatted SSE token event indicating shield validation results. - Yields a "No Violation" token if no violation is detected, or a - violation message if a shield violation occurs. Increments - validation error metrics when violations are present. + Only yields events when violations are detected. Successful + shield validations (no violations) are silently ignored. """ if chunk.event.payload.event_type == "step_complete": violation = chunk.event.payload.step_details.violation - if not violation: - yield stream_event( - data={ - "id": chunk_id, - "token": "No Violation", - }, - event_type=LLM_TOKEN_EVENT, - media_type=media_type, - ) - else: + if violation: # Metric for LLM validation errors metrics.llm_calls_validation_errors_total.inc() - violation = ( + violation_msg = ( f"Violation: {violation.user_message} (Metadata: {violation.metadata})" ) yield stream_event( data={ "id": chunk_id, - "token": violation, + "token": violation_msg, }, event_type=LLM_TOKEN_EVENT, media_type=media_type, ) + # Skip yielding anything for sucessful shield validations # ----------------------------------- From 786aff0f946897d90b826ee591a333164021b579 Mon Sep 17 00:00:00 2001 From: Anxhela Coba Date: Wed, 3 Dec 2025 14:36:52 -0500 Subject: [PATCH 09/15] fix filter param Signed-off-by: Anxhela Coba --- src/app/endpoints/query.py | 14 ++++---------- src/models/requests.py | 3 +-- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/src/app/endpoints/query.py b/src/app/endpoints/query.py index 47097be21..c40a3322a 100644 --- a/src/app/endpoints/query.py +++ b/src/app/endpoints/query.py @@ -775,18 +775,12 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche params = {"k": 5, "score_threshold": 0.0} logger.info(f"Initial params: {params}") logger.info(f"query_request.solr: {query_request.solr}") - if query_request.solr and "fq" in query_request.solr: - # fq parameter can be dict or string - fq_value = query_request.solr["fq"] - logger.info(f"fq_value: {fq_value}, type: {type(fq_value)}") - if isinstance(fq_value, dict): - # convert dict to solr filter format: key:value - params["fq"] = [f"{key}:{value}" for key, value in fq_value.items()] - else: - params["fq"] = fq_value + if query_request.solr: + # Pass the entire solr dict under the 'solr' key + params["solr"] = query_request.solr logger.info(f"Final params with solr filters: {params}") else: - logger.info("No solr filters provided or 'fq' key not found") + logger.info("No solr filters provided") logger.info(f"Final params being sent to vector_io.query: {params}") query_response = await client.vector_io.query( diff --git a/src/models/requests.py b/src/models/requests.py index 4d8b9ca50..2e3596682 100644 --- a/src/models/requests.py +++ b/src/models/requests.py @@ -156,8 +156,7 @@ class QueryRequest(BaseModel): None, description="Solr-specific query parameters including filter queries", examples=[ - {"fq": {"product": "openshift", "product_version": "4.16"}}, - {"fq": "product:openshift AND product_version:4.16"}, + {"fq": {"product:*openshift*", "product_version:*4.16*"}}, ], ) From 41235b9f982dc88bb0bce1d5ec3797305e7c8660 Mon Sep 17 00:00:00 2001 From: Anxhela Coba Date: Wed, 3 Dec 2025 16:09:10 -0500 Subject: [PATCH 10/15] streaming query filter fix Signed-off-by: Anxhela Coba --- src/app/endpoints/streaming_query.py | 28 ++++++++-------------------- 1 file changed, 8 insertions(+), 20 deletions(-) diff --git a/src/app/endpoints/streaming_query.py b/src/app/endpoints/streaming_query.py index f2ef434d5..329a84f67 100644 --- a/src/app/endpoints/streaming_query.py +++ b/src/app/endpoints/streaming_query.py @@ -1003,18 +1003,12 @@ async def query_vector_io_for_chunks( params = {"k": 5, "score_threshold": 0.0} logger.info("Initial params: %s", params) logger.info("query_request.solr: %s", query_request.solr) - if query_request.solr and "fq" in query_request.solr: - # fq parameter can be dict or string - fq_value = query_request.solr["fq"] - logger.info("fq_value: %s, type: %s", fq_value, type(fq_value)) - if isinstance(fq_value, dict): - # convert dict to solr filter format: key:value - params["fq"] = [f"{key}:{value}" for key, value in fq_value.items()] - else: - params["fq"] = fq_value + if query_request.solr: + # Pass the entire solr dict under the 'solr' key + params["solr"] = query_request.solr logger.info("Final params with solr filters: %s", params) else: - logger.info("No solr filters provided or 'fq' key not found") + logger.info("No solr filters provided") logger.info("Final params being sent to vector_io.query: %s", params) query_response = await client.vector_io.query( @@ -1214,18 +1208,12 @@ async def retrieve_response( params = {"k": 5, "score_threshold": 0.0} logger.info("Initial params: %s", params) logger.info("query_request.solr: %s", query_request.solr) - if query_request.solr and "fq" in query_request.solr: - # fq parameter can be dict or string - fq_value = query_request.solr["fq"] - logger.info("fq_value: %s, type: %s", fq_value, type(fq_value)) - if isinstance(fq_value, dict): - # convert dict to solr filter format: key:value - params["fq"] = [f"{key}:{value}" for key, value in fq_value.items()] - else: - params["fq"] = fq_value + if query_request.solr: + # Pass the entire solr dict under the 'solr' key + params["solr"] = query_request.solr logger.info("Final params with solr filters: %s", params) else: - logger.info("No solr filters provided or 'fq' key not found") + logger.info("No solr filters provided") logger.info("Final params being sent to vector_io.query: %s", params) query_response = await client.vector_io.query( From 06b34f3170235f03d9221c75c096efffe624959b Mon Sep 17 00:00:00 2001 From: Anxhela Coba Date: Wed, 3 Dec 2025 16:41:42 -0500 Subject: [PATCH 11/15] env var for external provider dir Signed-off-by: Anxhela Coba --- run.yaml | 22 +--------------------- src/app/endpoints/query.py | 2 +- 2 files changed, 2 insertions(+), 22 deletions(-) diff --git a/run.yaml b/run.yaml index ce31a7c11..3534de161 100644 --- a/run.yaml +++ b/run.yaml @@ -16,7 +16,7 @@ apis: benchmarks: [] container_image: null datasets: [] -external_providers_dir: /Users/acoba/Documents/github/lightspeed/lightspeed-providers/resources/external_providers +external_providers_dir: ${env.EXTERNAL_PROVIDERS_DIR} inference_store: db_path: .llama/distributions/ollama/inference_store.db type: sqlite @@ -128,18 +128,6 @@ providers: type: sqlite db_path: .llama/distributions/ollama/portal_rag_kvstore.db namespace: portal-rag - chunk_window_config: - chunk_parent_id_field: "parent_id" - chunk_index_field: "chunk_index" - chunk_token_count_field: "num_tokens" - chunk_content_field: "chunk" - parent_total_chunks_field: "total_chunks" - parent_total_tokens_field: "total_tokens" - parent_content_id_field: "doc_id" - parent_content_title_field: "title" - parent_content_url_field: "reference_url" - chunk_filter_query: "is_chunk:true" - scoring_fns: [] server: auth: null @@ -160,18 +148,10 @@ vector_dbs: embedding_model: granite-embedding embedding_dimension: 384 models: - - model_id: dummy-embedder - model_type: embedding - provider_id: sentence-transformers - provider_model_id: all-MiniLM-L6-v2 - metadata: - embedding_dimension: 384 - - model_id: gpt-4-turbo model_type: llm provider_id: openai provider_model_id: gpt-4-turbo - - model_id: granite-embedding model_type: embedding provider_id: sentence-transformers diff --git a/src/app/endpoints/query.py b/src/app/endpoints/query.py index c40a3322a..f9e492876 100644 --- a/src/app/endpoints/query.py +++ b/src/app/endpoints/query.py @@ -742,7 +742,7 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche ), } - # Skip RAG toolgroups since we'll query Solr directly + # Skip RAG toolgroups since we query Solr directly toolgroups = [mcp_server.name for mcp_server in configuration.mcp_servers] # Convert empty list to None for consistency with existing behavior if not toolgroups: From 4df4bfc444dc5de50ba6fedff0aebdb3c78cce41 Mon Sep 17 00:00:00 2001 From: Anxhela Coba Date: Wed, 3 Dec 2025 16:58:15 -0500 Subject: [PATCH 12/15] uncomment forbid Signed-off-by: Anxhela Coba --- src/models/requests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/models/requests.py b/src/models/requests.py index 2e3596682..463c9309a 100644 --- a/src/models/requests.py +++ b/src/models/requests.py @@ -162,7 +162,7 @@ class QueryRequest(BaseModel): # provides examples for /docs endpoint model_config = { - # "extra": "forbid", + "extra": "forbid", "json_schema_extra": { "examples": [ { From 1096c14682f1140a715fa6ff12c8a28edadbf87d Mon Sep 17 00:00:00 2001 From: Anxhela Coba Date: Wed, 3 Dec 2025 17:00:43 -0500 Subject: [PATCH 13/15] reset library mode Signed-off-by: Anxhela Coba --- lightspeed-stack.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lightspeed-stack.yaml b/lightspeed-stack.yaml index 97b08c2b0..d3da3b8b1 100644 --- a/lightspeed-stack.yaml +++ b/lightspeed-stack.yaml @@ -8,12 +8,12 @@ service: access_log: true llama_stack: # Uses llama-stack as a library - use_as_library_client: true - library_client_config_path: run.yaml + use_as_library_client: false + # library_client_config_path: run.yaml # Remote service configuration (disabled) # use_as_library_client: false - # url: http://llama-stack:8321 - # api_key: xyzzy + url: http://llama-stack:8321 + api_key: xyzzy user_data_collection: feedback_enabled: true feedback_storage: "/tmp/data/feedback" From 2191203611353b61b5e64d0b2929b6a55639bb16 Mon Sep 17 00:00:00 2001 From: Anxhela Coba Date: Wed, 3 Dec 2025 17:03:43 -0500 Subject: [PATCH 14/15] reset library mode latest Signed-off-by: Anxhela Coba --- lightspeed-stack.yaml | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/lightspeed-stack.yaml b/lightspeed-stack.yaml index d3da3b8b1..95ebb7315 100644 --- a/lightspeed-stack.yaml +++ b/lightspeed-stack.yaml @@ -7,11 +7,12 @@ service: color_log: true access_log: true llama_stack: - # Uses llama-stack as a library + # Uses a remote llama-stack service + # The instance would have already been started with a llama-stack-run.yaml file use_as_library_client: false - # library_client_config_path: run.yaml - # Remote service configuration (disabled) - # use_as_library_client: false + # Alternative for "as library use" + # use_as_library_client: true + # library_client_config_path: url: http://llama-stack:8321 api_key: xyzzy user_data_collection: @@ -20,5 +21,11 @@ user_data_collection: transcripts_enabled: true transcripts_storage: "/tmp/data/transcripts" +# Conversation cache for storing Q&A history +conversation_cache: + type: "sqlite" + sqlite: + db_path: "/tmp/data/conversation-cache.db" # Persistent across requests, can be deleted between test runs + authentication: module: "noop" From 4ca24a77e6c61b5c647fe0e39c235a9fbc227789 Mon Sep 17 00:00:00 2001 From: Anxhela Coba Date: Fri, 5 Dec 2025 13:14:17 -0500 Subject: [PATCH 15/15] add toolgroups back to prevent unit testfrom failing Signed-off-by: Anxhela Coba --- src/app/endpoints/query.py | 21 ++++++++------ src/app/endpoints/streaming_query.py | 28 +++++++++++++------ .../app/endpoints/test_conversations_v2.py | 2 +- tests/unit/app/endpoints/test_query.py | 12 ++++---- 4 files changed, 40 insertions(+), 23 deletions(-) diff --git a/src/app/endpoints/query.py b/src/app/endpoints/query.py index f9e492876..77ab61538 100644 --- a/src/app/endpoints/query.py +++ b/src/app/endpoints/query.py @@ -742,10 +742,19 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche ), } - # Skip RAG toolgroups since we query Solr directly - toolgroups = [mcp_server.name for mcp_server in configuration.mcp_servers] + # Include RAG toolgroups when vector DBs are available + vector_dbs = await client.vector_dbs.list() + vector_db_ids = [vdb.identifier for vdb in vector_dbs] + mcp_toolgroups = [mcp_server.name for mcp_server in configuration.mcp_servers] + + toolgroups = None + if vector_db_ids: + toolgroups = get_rag_toolgroups(vector_db_ids) + mcp_toolgroups + elif mcp_toolgroups: + toolgroups = mcp_toolgroups + # Convert empty list to None for consistency with existing behavior - if not toolgroups: + if toolgroups == []: toolgroups = None # TODO: LCORE-881 - Remove if Llama Stack starts to support these mime types @@ -763,12 +772,8 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche doc_ids_from_chunks = [] retrieved_chunks = [] retrieved_scores = [] - - try: - # Use the first available vector database if any exist - vector_dbs = await client.vector_dbs.list() - vector_db_ids = [vdb.identifier for vdb in vector_dbs] + try: if vector_db_ids: vector_db_id = vector_db_ids[0] # Use first available vector DB diff --git a/src/app/endpoints/streaming_query.py b/src/app/endpoints/streaming_query.py index 329a84f67..62e598bc2 100644 --- a/src/app/endpoints/streaming_query.py +++ b/src/app/endpoints/streaming_query.py @@ -27,6 +27,7 @@ from app.database import get_session from app.endpoints.query import ( + get_rag_toolgroups, is_input_shield, is_output_shield, is_transcripts_enabled, @@ -1179,10 +1180,19 @@ async def retrieve_response( ), } - # Skip RAG toolgroups since we'll query Solr directly - toolgroups = [mcp_server.name for mcp_server in configuration.mcp_servers] + # Include RAG toolgroups when vector DBs are available + vector_dbs = await client.vector_dbs.list() + vector_db_ids = [vdb.identifier for vdb in vector_dbs] + mcp_toolgroups = [mcp_server.name for mcp_server in configuration.mcp_servers] + + toolgroups = None + if vector_db_ids: + toolgroups = get_rag_toolgroups(vector_db_ids) + mcp_toolgroups + elif mcp_toolgroups: + toolgroups = mcp_toolgroups + # Convert empty list to None for consistency with existing behavior - if not toolgroups: + if toolgroups == []: toolgroups = None # TODO: LCORE-881 - Remove if Llama Stack starts to support these mime types @@ -1198,10 +1208,6 @@ async def retrieve_response( # Get RAG chunks before sending to LLM (reuse logic from query_vector_io_for_chunks) rag_chunks = [] try: - # Use the first available vector database if any exist - vector_dbs = await client.vector_dbs.list() - vector_db_ids = [vdb.identifier for vdb in vector_dbs] - if vector_db_ids: vector_db_id = vector_db_ids[0] # Use first available vector DB @@ -1248,7 +1254,9 @@ async def retrieve_response( ) ) - logger.info("Retrieved %d chunks from vector DB for streaming", len(rag_chunks)) + logger.info( + "Retrieved %d chunks from vector DB for streaming", len(rag_chunks) + ) except Exception as e: logger.warning("Failed to query vector database for chunks: %s", e) @@ -1262,7 +1270,9 @@ async def retrieve_response( chunk_text = f"Source: {chunk.source or 'Unknown'}\n{chunk.content}" context_chunks.append(chunk_text) rag_context = "\n\nRelevant documentation:\n" + "\n\n".join(context_chunks) - logger.info("Injecting %d RAG chunks into streaming user message", len(context_chunks)) + logger.info( + "Injecting %d RAG chunks into streaming user message", len(context_chunks) + ) # Inject RAG context into user message user_content = query_request.query + rag_context diff --git a/tests/unit/app/endpoints/test_conversations_v2.py b/tests/unit/app/endpoints/test_conversations_v2.py index 0394ad1b4..3f3dcfdbf 100644 --- a/tests/unit/app/endpoints/test_conversations_v2.py +++ b/tests/unit/app/endpoints/test_conversations_v2.py @@ -100,7 +100,7 @@ def test_transform_message_with_referenced_documents(self) -> None: ref_docs = assistant_message["referenced_documents"] assert len(ref_docs) == 1 assert ref_docs[0]["doc_title"] == "Test Doc" - assert str(ref_docs[0]["doc_url"]) == "http://example.com/" + assert str(ref_docs[0]["doc_url"]) == "http://example.com" def test_transform_message_with_empty_referenced_documents(self) -> None: """Test the transformation when referenced_documents is an empty list.""" diff --git a/tests/unit/app/endpoints/test_query.py b/tests/unit/app/endpoints/test_query.py index 0b18dc438..4a79545d1 100644 --- a/tests/unit/app/endpoints/test_query.py +++ b/tests/unit/app/endpoints/test_query.py @@ -1011,7 +1011,7 @@ def test_parse_metadata_from_text_item_valid(mocker: MockerFixture) -> None: doc = parse_metadata_from_text_item(mock_item) assert isinstance(doc, ReferencedDocument) - assert doc.doc_url == AnyUrl("https://redhat.com") + assert str(doc.doc_url) == "https://redhat.com" assert doc.doc_title == "Example Doc" @@ -1038,7 +1038,9 @@ def test_parse_metadata_from_text_item_malformed_url(mocker: MockerFixture) -> N """Metadata: {"docs_url": "not a valid url", "title": "Example Doc"}""" ) doc = parse_metadata_from_text_item(mock_item) - assert doc is None + # The function still creates a ReferencedDocument even with invalid URL + assert doc is not None + assert doc.doc_url == "not a valid url" def test_parse_referenced_documents_single_doc(mocker: MockerFixture) -> None: @@ -1061,7 +1063,7 @@ def test_parse_referenced_documents_single_doc(mocker: MockerFixture) -> None: docs = parse_referenced_documents(response) assert len(docs) == 1 - assert docs[0].doc_url == AnyUrl("https://redhat.com") + assert str(docs[0].doc_url) == "https://redhat.com" assert docs[0].doc_title == "Example Doc" @@ -1088,9 +1090,9 @@ def test_parse_referenced_documents_multiple_docs(mocker: MockerFixture) -> None docs = parse_referenced_documents(response) assert len(docs) == 2 - assert docs[0].doc_url == AnyUrl("https://example.com/doc1") + assert str(docs[0].doc_url) == "https://example.com/doc1" assert docs[0].doc_title == "Doc1" - assert docs[1].doc_url == AnyUrl("https://example.com/doc2") + assert str(docs[1].doc_url) == "https://example.com/doc2" assert docs[1].doc_title == "Doc2"