diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b3b3ed2 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +secrets.toml \ No newline at end of file diff --git a/README.md b/README.md index 1fbadb1..deba818 100644 --- a/README.md +++ b/README.md @@ -1,176 +1,332 @@ -## RAG Demo using Couchbase, Streamlit, Haystack, and OpenAI +# RAG Demo using Couchbase, Streamlit, Haystack, and OpenAI -This is a demo app built to chat with your custom PDFs using the vector search capabilities of Couchbase to augment the OpenAI results in a Retrieval-Augmented-Generation (RAG) model. +This is a demo app built to chat with your custom PDFs using **Couchbase Vector Search** to augment the OpenAI results in a Retrieval-Augmented-Generation (RAG) model. -### How does it work? +## Three Implementation Options + +### Option 1: Search Service (FTS) Vector Search (`chat_with_pdf_with_fts.py`) + +Uses **`CouchbaseSearchDocumentStore`** with Full Text Search (FTS) vector indexes, which offers: + +- **Flexible vector search** with FTS capabilities +- **Rich text search** combined with vector similarity +- **Complex filtering** using FTS queries +- **Compatible with Couchbase 7.6+** +- **Ideal for hybrid search** scenarios combining full-text and vector search + +### Option 2: Hyperscale Vector Index (Default - `chat_with_pdf.py`) + +Uses **`CouchbaseQueryDocumentStore`** with Hyperscale (BHIVe) vector index, which offers: + +- **High-performance vector search at massive scale** (billions of documents) +- **Pure vector search** optimized for RAG applications +- **SQL++ queries** for efficient vector retrieval +- **Recommended for Couchbase 8.0+** for pure vector similarity search + +### Option 3: Composite Vector Index (`chat_with_pdf.py`) + +Uses **`CouchbaseQueryDocumentStore`** with Composite vector index, which offers: + +- **Vector search with metadata filtering** +- **Combines vector fields with scalar fields** for pre-filtering +- **SQL++ queries** with efficient filtered vector retrieval +- **Best for filtered vector search** scenarios (e.g., filter by date, category, user_id) +- **Recommended for Couchbase 8.0+** when you need to filter before vector search + +## How does it work? You can upload your PDFs with custom data & ask questions about the data in the chat box. For each question, you will get two answers: - one using RAG (Couchbase logo) -- one using pure LLM - Gemini Pro (🤖). +- one using pure LLM - OpenAI (🤖). + +The RAG pipeline utilizes Haystack, Couchbase Vector Search, and OpenAI models. It fetches relevant parts of the PDF using vector search & adds them as context for the language model. + +## Quick Start + +1. **Clone this repository** + ```bash + git clone + cd haystack-demo + ``` + +2. **Create a Python virtual environment** + ```bash + python -m venv venv + source venv/bin/activate + ``` + +3. **Install dependencies** + ```bash + pip install -r requirements.txt + ``` + +4. **Create a Couchbase bucket** (via Couchbase UI/Capella) with the name "sample_bucket" + +5. **Configure environment variables** (see Setup section below) + +6. **Run the Streamlit app** + ```bash + # For Hyperscale/Composite Vector Index (default) + streamlit run chat_with_pdf.py + + # OR for Search Service/FTS Vector Search + streamlit run chat_with_pdf_with_fts.py + ``` + +7. **Upload a PDF** - everything else is automatic! + +The app automatically creates: +- Scopes and collections +- Vector indexes (after PDF upload for `chat_with_pdf.py`, or on startup for `chat_with_pdf_with_fts.py`) + +## Which Option Should You Choose? +Couchbase Capella supports three types of vector indexes: + +- **Hyperscale Vector Index** (`chat_with_pdf.py`) - Best for RAG/chatbot applications with pure semantic search and billions of documents +- **Composite Vector Index** (`chat_with_pdf.py`) - Best when you need to filter by metadata before vector search +- **Search Vector Index** (`chat_with_pdf_with_fts.py`) - Best for hybrid searches combining keywords, geospatial, and semantic search + +> **For this PDF chat demo, we recommend Hyperscale Vector Index** for optimal performance in RAG applications. + +Learn more about choosing the right vector index in the [official Couchbase vector index documentation](https://docs.couchbase.com/cloud/vector-index/use-vector-indexes.html). + + +## Setup and Installation + +### Install dependencies + +`pip install -r requirements.txt` + +### Set the environment secrets + +Copy the `secrets.example.toml` file in `.streamlit` folder and rename it to `secrets.toml` and replace the placeholders with the actual values for your environment + +**For Hyperscale or Composite Vector Index (`chat_with_pdf.py`):** +``` +DB_CONN_STR = "" +DB_USERNAME = "" +DB_PASSWORD = "" +DB_BUCKET = "" +DB_SCOPE = "" +DB_COLLECTION = "" +OPENAI_API_KEY = "" +``` + +**For Search Service / FTS (`chat_with_pdf_with_fts.py`):** + +Add one additional environment variable to the above configuration: +``` +INDEX_NAME = "" +``` + +### Automatic Resource Setup + +The application automatically handles resource creation in the following order: + +**On Application Startup:** + +1. Creates the scope if it doesn't exist +2. Creates the collection if it doesn't exist + +**After PDF Upload (`chat_with_pdf.py`):** + +3. Automatically creates the Hyperscale/Composite vector index after documents are loaded +4. Falls back to creating the index on first query if needed + +**On Application Startup (`chat_with_pdf_with_fts.py`):** + +3. Attempts to create the FTS index (can be created without documents) + +**What You Need:** +- Your Couchbase **bucket must exist** with the name "sample_bucket" +- All other resources (scope, collection, indexes) are created automatically +- **No manual index creation required** - just upload your PDF and the index will be created + +**Note**: For `chat_with_pdf.py`, the vector index is created automatically **after you upload your first PDF** because Hyperscale/Composite indexes require documents for training. -The RAG pipeline utilizes Haystack, Couchbase Vector Search, and a OpenAI model. It fetches relevant parts of the PDF using vector search and adds them as context for the language model. +### Manual Vector Index Creation (Optional) +**The application now creates indexes automatically!** This section is only needed if: +- You want to pre-create the index before uploading documents +- Automatic creation fails in your environment +- You prefer manual control over index configuration -### Setup and Installation +**For Hyperscale or Composite Vector Index (`chat_with_pdf.py`):** -- #### Install dependencies: +The app automatically creates the vector index after you upload your first PDF. However, you can manually create it if needed. - `pip install -r requirements.txt` +This demo uses Couchbase's Vector Indexes (introduced in version 8.0). Choose between: -- #### Set the environment secrets +- **Hyperscale Vector Index**: Optimized for pure vector search at scale. Perfect for RAG, chatbots, and scenarios needing fast vector similarity search on large datasets. - Copy the `secrets.example.toml` file in `.streamlit` folder and rename it to `secrets.toml` and replace the placeholders with the actual values for your environment +- **Composite Vector Index**: Combines vector fields with scalar fields, allowing you to apply metadata filters before vector search (e.g., date, category, user_id). +Learn more about these vector indexes [here](https://docs.couchbase.com/cloud/vector-index/use-vector-indexes.html). + +**For Search Service / FTS (`chat_with_pdf_with_fts.py`):** + +The app attempts to create the FTS index on startup. If automatic creation fails, you can create it manually. See the FTS index creation section below for detailed instructions. + +### Key Components + +- Streamlit: Provides the web interface +- Haystack: Orchestrates the RAG pipeline +- Couchbase: Serves as the high-performance vector store +- OpenAI: Supplies embeddings and the language model + +## Manual Vector Index Creation (Optional) + +**⚠️ Manual creation is NOT required** - the app creates indexes automatically when you upload a PDF. This section is only for advanced users who want manual control. + +### Hyperscale or Composite Vector Index (for `chat_with_pdf.py`) + +You need to create a Hyperscale or Composite vector index on your collection **after** loading some documents (required for index training). Choose between BHIVe or Composite Index based on your use case. Whichever vector index (Hyperscale or Composite) you choose won't affect the functionality of this demo, though performance differences may occur. + +**Option 1: Hyperscale Vector Index (Recommended)** + +Hyperscale is a dedicated vector index optimized for pure vector search at massive scale. Use this for the best performance in RAG applications. Refer to the [Hyperscale Vector Index Guide](https://docs.couchbase.com/cloud/vector-index/hyperscale-vector-index.html) for detailed instructions. + +Creating a Hyperscale Index using SQL++ (use Couchbase Query Workbench or programmatically): + +```sql +CREATE VECTOR INDEX idx_pdf_hyperscale +ON `bucket_name`.`scope_name`.`collection_name`(embedding VECTOR) +WITH { + "dimension": 1536, + "similarity": "DOT" +}; ``` - DB_CONN_STR = "" - DB_USERNAME = "" - DB_PASSWORD = "" - DB_BUCKET = "" - DB_SCOPE = "" - DB_COLLECTION = "" - INDEX_NAME = "" - OPENAI_API_KEY = "" + +**Option 2: Composite Vector Index** + +Composite indexes combine vector fields with other scalar fields. This is useful when you need to filter documents by metadata before performing vector search. + +Creating a Composite Index using SQL++: + +```sql +CREATE INDEX idx_pdf_composite +ON `bucket_name`.`scope_name`.`collection_name`(embedding VECTOR) +WITH { + "dimension": 1536, + "similarity": "DOT" +}; ``` -- #### Create the Search Index on Full Text Service - - We need to create the Search Index on the Full Text Service in Couchbase. For this demo, you can import the following index using the instructions. - - - [Couchbase Capella](https://docs.couchbase.com/cloud/search/import-search-index.html) - - - Copy the index definition to a new file index.json - - Import the file in Capella using the instructions in the documentation. - - Click on Create Index to create the index. - - - [Couchbase Server](https://docs.couchbase.com/server/current/search/import-search-index.html) - - - Click on Search -> Add Index -> Import - - Copy the following Index definition in the Import screen - - Click on Create Index to create the index. - -- #### Key Components - - - Streamlit: Provides the web interface - - Haystack: Orchestrates the RAG pipeline - - Couchbase: Serves as the vector store - - OpenAI: Supplies the language model - - #### Index Definition - - Here, we are creating the index `pdf_search` on the documents in the `haystack_collection` collection within the `haystack_scope` scope in the bucket `haystack_bucket`. The Vector field is set to `embeddings` with 1536 dimensions and the text field set to `text`. We are also indexing and storing all the fields under `metadata` in the document as a dynamic mapping to account for varying document structures. The similarity metric is set to `dot_product`. If there is a change in these parameters, please adapt the index accordingly. - - ``` - { - "name": "pdf_search", - "type": "fulltext-index", - "sourceType": "gocbcore", - "sourceName": "haystack_bucket", - "planParams": { - "indexPartitions": 1, - "numReplicas": 0 - }, - "params": { - "doc_config": { - "docid_prefix_delim": "", - "docid_regexp": "", - "mode": "scope.collection.type_field", - "type_field": "type" - }, - "mapping": { - "default_analyzer": "standard", - "default_datetime_parser": "dateTimeOptional", - "index_dynamic": true, - "store_dynamic": true, - "default_mapping": { - "dynamic": true, - "enabled": false - }, - "types": { - "haystack_scope.haystack_collection": { - "dynamic": false, - "enabled": true, - "properties": { - "content": { - "enabled": true, - "fields": [ - { - "docvalues": true, - "include_in_all": false, - "include_term_vectors": false, - "index": true, - "name": "content", - "store": true, - "type": "text" - } - ] - }, - "embedding": { - "enabled": true, - "dynamic": false, - "fields": [ - { - "vector_index_optimized_for": "recall", - "docvalues": true, - "dims": 1536, - "include_in_all": false, - "include_term_vectors": false, - "index": true, - "name": "embedding", - "similarity": "dot_product", - "store": true, - "type": "vector" - } - ] - }, - "dataframe": { - "enabled": true, - "fields": [ - { - "docvalues": true, - "include_in_all": false, - "include_term_vectors": false, - "index": true, - "name": "dataframe", - "store": true, - "analyzer": "keyword", - "type": "text" - } - ] - }, - "meta": { - "dynamic": true, - "enabled": true, - "properties": { - "name": { - "enabled": true, - "fields": [ - { - "docvalues": true, - "include_in_all": false, - "include_term_vectors": false, - "index": true, - "name": "name", - "store": true, - "analyzer": "keyword", - "type": "text" - } - ] - } - } - } - } - } - } - } - } - } +**Index Parameters:** +- `dimension`: Must match your embedding model (1536 for OpenAI ada-002/ada-003, 768 for sentence-transformers) +- `similarity`: Must match the similarity metric in `CouchbaseQueryDocumentStore`. Use `DOT` for dot product (recommended for OpenAI embeddings) + +**Important Notes:** +1. **Index Creation Timing**: Hyperscale and Composite vector indexes require training data. Create the index **after** you've loaded the documents into your collection. +2. **Similarity Metric**: The `similarity` parameter in the index **must match** the `similarity` parameter in your `CouchbaseQueryDocumentStore` configuration. +3. **Dimension**: Must match your embedding model's output dimensions. + +**Verifying Your Index:** +After creating the index, verify it exists: + +```sql +SELECT * FROM system:indexes +WHERE name LIKE 'idx_%_vector'; ``` -- #### Run the application +### FTS Vector Index (for `chat_with_pdf_with_fts.py`) + +**Automatic Creation**: The app attempts to create the FTS index automatically on startup using the `INDEX_NAME` from your configuration. + +**Manual Creation** (if automatic creation fails): Create a Full Text Search index with vector capabilities. + +**Creating an FTS Index with Vector Support** + +If automatic creation fails, you can create the index using the Couchbase UI or by importing the provided index definition. + +Using Couchbase Capella: +1. Follow the import instructions [here](https://docs.couchbase.com/cloud/search/import-search-index.html) +2. Use the provided `sampleSearchIndex.json` file in this repository +3. Update the following values in the JSON before importing: + - `sourceName`: Replace `sample_bucket` with your bucket name + - `types`: Replace `scope.coll` with your actual `scope_name.collection_name` +4. Import the file in Capella +5. Click on Create Index + +Using Couchbase Server: +1. Navigate to Search -> Add Index -> Import +2. Use the provided `sampleSearchIndex.json` file in this repository +3. Update the following values in the JSON before importing: + - `sourceName`: Replace `sample_bucket` with your bucket name + - `types`: Replace `scope.coll` with your actual `scope_name.collection_name` +4. Paste the updated JSON in the Import screen +5. Click on Create Index + +**FTS Index Definition** + +The `sampleSearchIndex.json` file contains a pre-configured FTS index with vector capabilities. Key features: +- **Index Name**: `sample-index` (customizable) +- **Vector Field**: `embedding` with 1536 dimensions +- **Similarity**: `dot_product` (optimized for OpenAI embeddings) +- **Text Field**: `content` for document text +- **Metadata**: Dynamic mapping for `meta` fields + +## Run the Application + +**For Hyperscale or Composite Vector Index:** +``` +streamlit run chat_with_pdf.py +``` + +**For Search Service / FTS:** +``` +streamlit run chat_with_pdf_with_fts.py +``` + +## Implementation Details + +### Hyperscale and Composite Vector Index Implementation (`chat_with_pdf.py`) + +This demo uses the following key components: + +1. **CouchbaseQueryDocumentStore**: + - Configured with `QueryVectorSearchType.ANN` for fast approximate nearest neighbor search + - Uses `QueryVectorSearchSimilarity.DOT` for dot product similarity (recommended for OpenAI embeddings) + - Supports both **Hyperscale (BHIVe)** and **Composite** indexes + - Leverages SQL++ for efficient vector retrieval + - Same code works for both index types - just create the appropriate index + +2. **CouchbaseQueryEmbeddingRetriever**: + - Uses SQL++ queries with `APPROX_VECTOR_DISTANCE()` function for ANN search + - Retrieves top-k most similar documents based on embedding similarity + - Optimized for low-latency, high-throughput vector search + +3. **OpenAI Embeddings**: + - `text-embedding-ada-002` model with 1536 dimensions + - Generates embeddings for both documents and queries + +For more details on implementation, refer to the extensive code comments in `chat_with_pdf.py`. + +### Search Service / FTS Implementation (`chat_with_pdf_with_fts.py`) + +This alternative implementation uses: + +1. **CouchbaseSearchDocumentStore**: + - Uses Full Text Search service for vector indexing and retrieval + - Compatible with Couchbase 7.6+ and 8.0+ + - Supports rich text search combined with vector similarity + +2. **CouchbaseSearchEmbeddingRetriever**: + - Leverages FTS vector search capabilities + - Retrieves top-k most similar documents using FTS queries + - Supports complex filtering with FTS query syntax + +3. **OpenAI Embeddings**: + - Same `text-embedding-ada-002` model with 1536 dimensions + - Generates embeddings for both documents and queries + +For more details on FTS implementation, refer to the code comments in `chat_with_pdf_with_fts.py`. - `streamlit run chat_with_pdf.py` +## Additional Resources -For more details on implementation, refer to the code comments in chat_with_pdf.py. \ No newline at end of file +- [Couchbase Vector Index Documentation](https://docs.couchbase.com/cloud/vector-index/vectors-and-indexes-overview.html) +- [Haystack Documentation](https://docs.haystack.deepset.ai/docs/intro) +- [couchbase-haystack GitHub Repository](https://github.com/Couchbase-Ecosystem/couchbase-haystack) \ No newline at end of file diff --git a/chat_with_pdf.py b/chat_with_pdf.py index ee2bf0a..b99da04 100644 --- a/chat_with_pdf.py +++ b/chat_with_pdf.py @@ -1,6 +1,7 @@ import os import tempfile import streamlit as st +from datetime import timedelta from haystack import Pipeline from haystack.components.converters import PyPDFToDocument from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter @@ -9,7 +10,22 @@ from haystack.components.builders import PromptBuilder, AnswerBuilder from haystack.components.writers import DocumentWriter from haystack.utils import Secret -from couchbase_haystack import CouchbaseSearchDocumentStore, CouchbaseSearchEmbeddingRetriever, CouchbasePasswordAuthenticator, CouchbaseClusterOptions +from couchbase.n1ql import QueryScanConsistency +from couchbase.cluster import Cluster +from couchbase.auth import PasswordAuthenticator +from couchbase.options import ClusterOptions +from couchbase.exceptions import QueryIndexAlreadyExistsException, ScopeAlreadyExistsException, CollectionAlreadyExistsException + +# Import CouchbaseQueryDocumentStore for GSI-based vector search with BHIVe support +from couchbase_haystack import ( + CouchbaseQueryDocumentStore, + CouchbaseQueryEmbeddingRetriever, + CouchbasePasswordAuthenticator, + CouchbaseClusterOptions, + QueryVectorSearchType, + QueryVectorSearchSimilarity, + CouchbaseQueryOptions +) def check_environment_variable(variable_name): """Check if environment variable is set""" @@ -17,7 +33,141 @@ def check_environment_variable(variable_name): st.error(f"{variable_name} environment variable is not set. Please add it to the secrets.toml file") st.stop() -def save_to_vector_store(uploaded_file, indexing_pipeline): +def create_scope_if_not_exists(collection_manager, scope_name): + """Create scope if it doesn't exist""" + try: + scopes = collection_manager.get_all_scopes() + scope_names = [scope.name for scope in scopes] + + if scope_name not in scope_names: + collection_manager.create_scope(scope_name) + st.info(f"Scope '{scope_name}' created successfully") + return True + return False + except ScopeAlreadyExistsException: + return False + except Exception as e: + st.warning(f"Could not create scope '{scope_name}': {str(e)}") + return False + +def create_collection_if_not_exists(collection_manager, scope_name, collection_name): + """Create collection if it doesn't exist""" + try: + scopes = collection_manager.get_all_scopes() + + for scope in scopes: + if scope.name == scope_name: + collection_names = [collection.name for collection in scope.collections] + + if collection_name not in collection_names: + collection_manager.create_collection(scope_name=scope_name, collection_name=collection_name) + st.info(f"Collection '{collection_name}' created in scope '{scope_name}'") + return True + return False + + st.error(f"Scope '{scope_name}' does not exist, cannot create collection") + return False + except CollectionAlreadyExistsException: + return False + except Exception as e: + st.warning(f"Could not create collection '{collection_name}': {str(e)}") + return False + +def create_vector_index_if_not_exists(cluster, bucket_name, scope_name, collection_name, similarity="DOT", dimension=1536): + """Create Hyperscale vector index if it doesn't exist""" + index_name = f"idx_{collection_name}_vector" + + try: + # Check if index exists by trying to query system:indexes + check_query = f""" + SELECT COUNT(*) as count FROM system:indexes + WHERE name = '{index_name}' + AND keyspace_id = '{collection_name}' + AND bucket_id = '{bucket_name}' + AND scope_id = '{scope_name}' + """ + + result = cluster.query(check_query) + rows = list(result) + + if rows and rows[0].get('count', 0) > 0: + st.success(f"Vector index '{index_name}' already exists!") + return False # Index already exists + + # Count documents in collection first + count_query = f"SELECT COUNT(*) as doc_count FROM `{bucket_name}`.`{scope_name}`.`{collection_name}`" + count_result = cluster.query(count_query) + count_rows = list(count_result) + doc_count = count_rows[0].get('doc_count', 0) if count_rows else 0 + + if doc_count == 0: + st.error("No documents found in collection. Please upload a PDF first before creating the vector index.") + return False + + # Create the Hyperscale vector index + create_index_query = f""" + CREATE VECTOR INDEX {index_name} + ON `{collection_name}`(embedding VECTOR) + WITH {{ + "dimension": {dimension}, + "similarity": "{similarity}" + }} + """ + + # Set query context to the bucket.scope, then run the create index + cluster.bucket(bucket_name).scope(scope_name).query(create_index_query).execute() + st.success(f"Vector index '{index_name}' created successfully!") + st.info("Note: The vector index may take a few moments to become fully available") + return True + + except QueryIndexAlreadyExistsException: + st.info(f"Vector index '{index_name}' already exists") + return False + except Exception as e: + error_msg = str(e) + + # If the error is about index already existing, that's fine + if "already exists" in error_msg.lower() or "duplicate" in error_msg.lower(): + st.info(f"Vector index '{index_name}' already exists") + return False + # If it's about no documents, warn but continue + elif "no documents" in error_msg.lower() or "training" in error_msg.lower(): + st.warning(f"Vector index requires documents for training. Please upload a PDF first.") + return False + else: + st.error(f"Could not create vector index: {error_msg}") + st.info("You may need to create the vector index manually. See README for instructions.") + return False + +def setup_couchbase_resources(cluster_connection_string, username, password, bucket_name, scope_name, collection_name) -> Cluster: + """Setup Couchbase resources: scope, collection, and optionally vector index""" + try: + # Connect to cluster for management operations + auth = PasswordAuthenticator(username, password) + cluster = Cluster(cluster_connection_string, ClusterOptions(auth)) + cluster.wait_until_ready(timedelta(seconds=10)) + + bucket = cluster.bucket(bucket_name) + collection_manager = bucket.collections() + + # Create scope if needed + scope_created = create_scope_if_not_exists(collection_manager, scope_name) + + # Create collection if needed + collection_created = create_collection_if_not_exists(collection_manager, scope_name, collection_name) + + # If we just created scope or collection, wait a bit for them to be ready + if scope_created or collection_created: + import time + time.sleep(2) + + return cluster + + except Exception as e: + st.error(f"Error during Couchbase setup: {str(e)}") + st.info("Continuing with existing resources...") + +def save_to_vector_store(uploaded_file, indexing_pipeline) -> Cluster: """Process the PDF & store it in Couchbase Vector Store""" if uploaded_file is not None: temp_dir = tempfile.TemporaryDirectory() @@ -28,11 +178,31 @@ def save_to_vector_store(uploaded_file, indexing_pipeline): result = indexing_pipeline.run({"converter": {"sources": [temp_file_path]}}) st.info(f"PDF loaded into vector store: {result['writer']['documents_written']} documents indexed") + + # Create the scope and collection + cluster = setup_couchbase_resources( + cluster_connection_string=os.getenv("DB_CONN_STR"), + username=os.getenv("DB_USERNAME"), + password=os.getenv("DB_PASSWORD"), + bucket_name=os.getenv("DB_BUCKET"), + scope_name=os.getenv("DB_SCOPE"), + collection_name=os.getenv("DB_COLLECTION"), + ) + + # Create the vector index + create_vector_index_if_not_exists( + cluster=cluster, + bucket_name=os.getenv("DB_BUCKET"), + scope_name=os.getenv("DB_SCOPE"), + collection_name=os.getenv("DB_COLLECTION"), + ) + + return cluster @st.cache_resource(show_spinner="Connecting to Vector Store") def get_document_store(): - """Return the Couchbase document store""" - return CouchbaseSearchDocumentStore( + """Return the Couchbase document store using CouchbaseQueryDocumentStore.""" + return CouchbaseQueryDocumentStore( cluster_connection_string=Secret.from_env_var("DB_CONN_STR"), authenticator=CouchbasePasswordAuthenticator( username=Secret.from_env_var("DB_USERNAME"), @@ -42,7 +212,13 @@ def get_document_store(): bucket=os.getenv("DB_BUCKET"), scope=os.getenv("DB_SCOPE"), collection=os.getenv("DB_COLLECTION"), - vector_search_index=os.getenv("INDEX_NAME"), + search_type=QueryVectorSearchType.ANN, + similarity=QueryVectorSearchSimilarity.DOT, + nprobes=10, + query_options=CouchbaseQueryOptions( + timeout=timedelta(seconds=60), + scan_consistency=QueryScanConsistency.NOT_BOUNDED + ) ) @@ -57,10 +233,21 @@ def get_document_store(): ) # Load and check environment variables - env_vars = ["DB_CONN_STR", "DB_USERNAME", "DB_PASSWORD", "DB_BUCKET", "DB_SCOPE", "DB_COLLECTION", "INDEX_NAME", "OPENAI_API_KEY"] + env_vars = ["DB_CONN_STR", "DB_USERNAME", "DB_PASSWORD", "DB_BUCKET", "DB_SCOPE", "DB_COLLECTION", "OPENAI_API_KEY"] for var in env_vars: check_environment_variable(var) + # Setup Couchbase resources (scope and collection only, index created after document upload) + with st.spinner("Setting up Couchbase resources..."): + setup_couchbase_resources( + cluster_connection_string=os.getenv("DB_CONN_STR"), + username=os.getenv("DB_USERNAME"), + password=os.getenv("DB_PASSWORD"), + bucket_name=os.getenv("DB_BUCKET"), + scope_name=os.getenv("DB_SCOPE"), + collection_name=os.getenv("DB_COLLECTION"), + ) + # Initialize document store document_store = get_document_store() @@ -80,7 +267,7 @@ def get_document_store(): # Create RAG pipeline rag_pipeline = Pipeline() rag_pipeline.add_component("query_embedder", OpenAITextEmbedder()) - rag_pipeline.add_component("retriever", CouchbaseSearchEmbeddingRetriever(document_store=document_store)) + rag_pipeline.add_component("retriever", CouchbaseQueryEmbeddingRetriever(document_store=document_store)) rag_pipeline.add_component("prompt_builder", PromptBuilder(template=""" You are a helpful bot. If you cannot answer based on the context provided, respond with a generic answer. Answer the question as truthfully as possible using the context below: {% for doc in documents %} @@ -109,7 +296,7 @@ def get_document_store(): couchbase_logo = "https://emoji.slack-edge.com/T024FJS4M/couchbase/4a361e948b15ed91.png" st.title("Chat with PDF") - st.markdown("Answers with [Couchbase logo](https://emoji.slack-edge.com/T024FJS4M/couchbase/4a361e948b15ed91.png) are generated using *RAG* while 🤖 are generated by pure *LLM (Gemini)*") + st.markdown("Answers with [Couchbase logo](https://emoji.slack-edge.com/T024FJS4M/couchbase/4a361e948b15ed91.png) are generated using *RAG* while 🤖 are generated by pure *LLM (OpenAI)*") with st.sidebar: st.header("Upload your PDF") @@ -123,10 +310,10 @@ def get_document_store(): st.markdown(""" For each question, you will get two answers: * one using RAG ([Couchbase logo](https://emoji.slack-edge.com/T024FJS4M/couchbase/4a361e948b15ed91.png)) - * one using pure LLM - Gemini (🤖). + * one using pure LLM - OpenAI (🤖). """) - st.markdown("For RAG, we are using [Haystack](https://haystack.deepset.ai/), [Couchbase Vector Search](https://couchbase.com/) & [Gemini](https://gemini.google.com/). We fetch parts of the PDF relevant to the question using Vector search & add it as the context to the LLM. The LLM is instructed to answer based on the context from the Vector Store.") + st.markdown("For RAG, we are using [Haystack](https://haystack.deepset.ai/), [Couchbase Vector Search](https://docs.couchbase.com/cloud/vector-index/hyperscale-vector-index.html) & [OpenAI](https://openai.com/). We fetch parts of the PDF relevant to the question using high-performance GSI vector search & add it as the context to the LLM. The LLM is instructed to answer based on the context from the Vector Store.") if "messages" not in st.session_state: st.session_state.messages = [] @@ -140,6 +327,25 @@ def get_document_store(): st.chat_message("user").markdown(question) st.session_state.messages.append({"role": "user", "content": question, "avatar": "👤"}) + # Ensure vector index exists before first query (fallback safety check) + if "index_check_done" not in st.session_state: + with st.spinner("Ensuring vector index is ready..."): + cluster = setup_couchbase_resources( + cluster_connection_string=os.getenv("DB_CONN_STR"), + username=os.getenv("DB_USERNAME"), + password=os.getenv("DB_PASSWORD"), + bucket_name=os.getenv("DB_BUCKET"), + scope_name=os.getenv("DB_SCOPE"), + collection_name=os.getenv("DB_COLLECTION"), + ) + create_vector_index_if_not_exists( + cluster=cluster, + bucket_name=os.getenv("DB_BUCKET"), + scope_name=os.getenv("DB_SCOPE"), + collection_name=os.getenv("DB_COLLECTION"), + ) + st.session_state.index_check_done = True + # RAG response with st.chat_message("assistant", avatar=couchbase_logo): message_placeholder = st.empty() diff --git a/chat_with_pdf_with_search_vector_index.py b/chat_with_pdf_with_search_vector_index.py new file mode 100644 index 0000000..2477e47 --- /dev/null +++ b/chat_with_pdf_with_search_vector_index.py @@ -0,0 +1,317 @@ +import os +import json +import tempfile +import streamlit as st +from datetime import timedelta +from haystack import Pipeline +from haystack.components.converters import PyPDFToDocument +from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter +from haystack.components.embedders import OpenAIDocumentEmbedder, OpenAITextEmbedder +from haystack.components.generators import OpenAIGenerator +from haystack.components.builders import PromptBuilder, AnswerBuilder +from haystack.components.writers import DocumentWriter +from haystack.utils import Secret +from couchbase.cluster import Cluster +from couchbase.auth import PasswordAuthenticator +from couchbase.options import ClusterOptions +from couchbase.exceptions import ScopeAlreadyExistsException, CollectionAlreadyExistsException, QueryIndexAlreadyExistsException +from couchbase.management.search import SearchIndex +from couchbase_haystack import CouchbaseSearchDocumentStore, CouchbaseSearchEmbeddingRetriever, CouchbasePasswordAuthenticator, CouchbaseClusterOptions + +def check_environment_variable(variable_name): + """Check if environment variable is set""" + if variable_name not in os.environ: + st.error(f"{variable_name} environment variable is not set. Please add it to the secrets.toml file") + st.stop() + +def create_scope_if_not_exists(collection_manager, scope_name): + """Create scope if it doesn't exist""" + try: + scopes = collection_manager.get_all_scopes() + scope_names = [scope.name for scope in scopes] + + if scope_name not in scope_names: + collection_manager.create_scope(scope_name) + st.info(f"Scope '{scope_name}' created successfully") + return True + return False + except ScopeAlreadyExistsException: + return False + except Exception as e: + st.warning(f"Could not create scope '{scope_name}': {str(e)}") + return False + +def create_collection_if_not_exists(collection_manager, scope_name, collection_name): + """Create collection if it doesn't exist""" + try: + scopes = collection_manager.get_all_scopes() + + for scope in scopes: + if scope.name == scope_name: + collection_names = [collection.name for collection in scope.collections] + + if collection_name not in collection_names: + collection_manager.create_collection(scope_name=scope_name, collection_name=collection_name) + st.info(f"Collection '{collection_name}' created in scope '{scope_name}'") + return True + return False + + st.error(f"Scope '{scope_name}' does not exist, cannot create collection") + return False + except CollectionAlreadyExistsException: + return False + except Exception as e: + st.warning(f"Could not create collection '{collection_name}': {str(e)}") + return False + +def create_fts_index_if_not_exists(cluster, bucket_name, scope_name, collection_name, index_name): + """Create FTS (Search) index with vector support if it doesn't exist""" + + try: + # Load FTS index definition from JSON file + json_file_path = os.path.join(os.path.dirname(__file__), "sampleSearchIndex.json") + with open(json_file_path, "r") as f: + index_definition = json.load(f) + + # Update the index definition with the provided parameters + index_definition["name"] = index_name + index_definition["sourceName"] = bucket_name + + # Update the type mapping to use the correct scope.collection + types_key = f"{scope_name}.{collection_name}" + # Get the existing type configuration (using the sample key "scope.coll") + sample_type_config = index_definition["params"]["mapping"]["types"].get("scope.coll") + if sample_type_config: + # Replace the sample key with the actual scope.collection key + index_definition["params"]["mapping"]["types"] = {types_key: sample_type_config} + + # Get CLUSTER index manager (for bucket-level indexes) + scope_index_manager = cluster.bucket(bucket_name).scope(scope_name).search_indexes() + + # Check if index already exists + existing_indexes = scope_index_manager.get_all_indexes() + if index_definition["name"] in [index.name for index in existing_indexes]: + st.info(f"FTS index '{index_definition['name']}' already exists") + return False + + st.info(f"Creating FTS index '{index_definition['name']}'...") + + # Create SearchIndex object from JSON definition + search_index = SearchIndex.from_json(index_definition) + + # Upsert the index (create if not exists, update if exists) + scope_index_manager.upsert_index(search_index) + + st.success(f"FTS index '{index_definition['name']}' successfully created") + st.info("Note: The FTS index may take a few moments to build") + return True + + except QueryIndexAlreadyExistsException: + st.info(f"FTS index '{index_definition['name']}' already exists") + return False + except Exception as e: + error_msg = str(e) + if "already exists" in error_msg.lower(): + st.info(f"FTS index '{index_definition['name']}' already exists") + return False + elif "service" in error_msg.lower() and "unavailable" in error_msg.lower(): + st.error("Search service is not available. Please ensure the Search service is enabled in your Couchbase cluster.") + return False + else: + st.warning(f"Could not create FTS index '{index_definition['name']}': {error_msg}") + st.info("You may need to create the FTS index manually. See README for instructions.") + return False + +def setup_couchbase_resources(cluster_connection_string, username, password, bucket_name, scope_name, collection_name, index_name): + """Setup Couchbase resources: scope, collection, and FTS index""" + try: + # Connect to cluster for management operations + auth = PasswordAuthenticator(username, password) + cluster = Cluster(cluster_connection_string, ClusterOptions(auth)) + cluster.wait_until_ready(timedelta(seconds=10)) + + bucket = cluster.bucket(bucket_name) + collection_manager = bucket.collections() + + # Create scope if needed + scope_created = create_scope_if_not_exists(collection_manager, scope_name) + + # Create collection if needed + collection_created = create_collection_if_not_exists(collection_manager, scope_name, collection_name) + + # If we just created scope or collection, wait a bit for them to be ready + if scope_created or collection_created: + import time + time.sleep(2) + + # Try to create FTS index + create_fts_index_if_not_exists(cluster, bucket_name, scope_name, collection_name, index_name) + + except Exception as e: + st.error(f"Error during Couchbase setup: {str(e)}") + st.info("Continuing with existing resources...") + +def save_to_vector_store(uploaded_file, indexing_pipeline): + """Process the PDF & store it in Couchbase Vector Store""" + if uploaded_file is not None: + temp_dir = tempfile.TemporaryDirectory() + temp_file_path = os.path.join(temp_dir.name, uploaded_file.name) + + with open(temp_file_path, "wb") as f: + f.write(uploaded_file.getvalue()) + result = indexing_pipeline.run({"converter": {"sources": [temp_file_path]}}) + + st.info(f"PDF loaded into vector store: {result['writer']['documents_written']} documents indexed") + +@st.cache_resource(show_spinner="Connecting to Vector Store") +def get_document_store(): + """Return the Couchbase document store""" + return CouchbaseSearchDocumentStore( + cluster_connection_string=Secret.from_env_var("DB_CONN_STR"), + authenticator=CouchbasePasswordAuthenticator( + username=Secret.from_env_var("DB_USERNAME"), + password=Secret.from_env_var("DB_PASSWORD") + ), + cluster_options=CouchbaseClusterOptions(profile='wan_development'), + bucket=os.getenv("DB_BUCKET"), + scope=os.getenv("DB_SCOPE"), + collection=os.getenv("DB_COLLECTION"), + vector_search_index=os.getenv("INDEX_NAME"), + ) + + +if __name__ == "__main__": + OPENAI_API_KEY = Secret.from_env_var("OPENAI_API_KEY") + st.set_page_config( + page_title="Chat with your PDF using Haystack, Couchbase & Gemini Pro", + page_icon="🤖", + layout="centered", + initial_sidebar_state="auto", + menu_items=None, + ) + + # Load and check environment variables + env_vars = ["DB_CONN_STR", "DB_USERNAME", "DB_PASSWORD", "DB_BUCKET", "DB_SCOPE", "DB_COLLECTION", "INDEX_NAME", "OPENAI_API_KEY"] + for var in env_vars: + check_environment_variable(var) + + # Setup Couchbase resources (scope, collection, and FTS index) + with st.spinner("Setting up Couchbase resources..."): + setup_couchbase_resources( + cluster_connection_string=os.getenv("DB_CONN_STR"), + username=os.getenv("DB_USERNAME"), + password=os.getenv("DB_PASSWORD"), + bucket_name=os.getenv("DB_BUCKET"), + scope_name=os.getenv("DB_SCOPE"), + collection_name=os.getenv("DB_COLLECTION"), + index_name=os.getenv("INDEX_NAME") + ) + + # Initialize document store + document_store = get_document_store() + + # Create indexing pipeline + indexing_pipeline = Pipeline() + indexing_pipeline.add_component("converter", PyPDFToDocument()) + indexing_pipeline.add_component("cleaner", DocumentCleaner()) + indexing_pipeline.add_component("splitter", DocumentSplitter(split_by="word", split_length=250, split_overlap=50)) + indexing_pipeline.add_component("embedder", OpenAIDocumentEmbedder()) + indexing_pipeline.add_component("writer", DocumentWriter(document_store=document_store)) + + indexing_pipeline.connect("converter.documents", "cleaner.documents") + indexing_pipeline.connect("cleaner.documents", "splitter.documents") + indexing_pipeline.connect("splitter.documents", "embedder.documents") + indexing_pipeline.connect("embedder.documents", "writer.documents") + + # Create RAG pipeline + rag_pipeline = Pipeline() + rag_pipeline.add_component("query_embedder", OpenAITextEmbedder()) + rag_pipeline.add_component("retriever", CouchbaseSearchEmbeddingRetriever(document_store=document_store)) + rag_pipeline.add_component("prompt_builder", PromptBuilder(template=""" + You are a helpful bot. If you cannot answer based on the context provided, respond with a generic answer. Answer the question as truthfully as possible using the context below: + {% for doc in documents %} + {{ doc.content }} + {% endfor %} + + Question: {{question}} + """)) + rag_pipeline.add_component( + "llm", + OpenAIGenerator( + api_key=OPENAI_API_KEY, + model="gpt-4o", + ), + ) + rag_pipeline.add_component("answer_builder", AnswerBuilder()) + + rag_pipeline.connect("query_embedder", "retriever.query_embedding") + rag_pipeline.connect("retriever.documents", "prompt_builder.documents") + rag_pipeline.connect("prompt_builder.prompt", "llm.prompt") + rag_pipeline.connect("llm.replies", "answer_builder.replies") + rag_pipeline.connect("llm.meta", "answer_builder.meta") + rag_pipeline.connect("retriever", "answer_builder.documents") + + # Frontend + couchbase_logo = "https://emoji.slack-edge.com/T024FJS4M/couchbase/4a361e948b15ed91.png" + + st.title("Chat with PDF") + st.markdown("Answers with [Couchbase logo](https://emoji.slack-edge.com/T024FJS4M/couchbase/4a361e948b15ed91.png) are generated using *RAG* while 🤖 are generated by pure *LLM (Gemini)*") + + with st.sidebar: + st.header("Upload your PDF") + with st.form("upload pdf"): + uploaded_file = st.file_uploader("Choose a PDF.", help="The document will be deleted after one hour of inactivity (TTL).", type="pdf") + submitted = st.form_submit_button("Upload") + if submitted: + save_to_vector_store(uploaded_file, indexing_pipeline) + + st.subheader("How does it work?") + st.markdown(""" + For each question, you will get two answers: + * one using RAG ([Couchbase logo](https://emoji.slack-edge.com/T024FJS4M/couchbase/4a361e948b15ed91.png)) + * one using pure LLM - Gemini (🤖). + """) + + st.markdown("For RAG, we are using [Haystack](https://haystack.deepset.ai/), [Couchbase Vector Search](https://couchbase.com/) & [Gemini](https://gemini.google.com/). We fetch parts of the PDF relevant to the question using Vector search & add it as the context to the LLM. The LLM is instructed to answer based on the context from the Vector Store.") + + if "messages" not in st.session_state: + st.session_state.messages = [] + st.session_state.messages.append({"role": "assistant", "content": "Hi, I'm a chatbot who can chat with the PDF. How can I help you?", "avatar": "🤖"}) + + for message in st.session_state.messages: + with st.chat_message(message["role"], avatar=message["avatar"]): + st.markdown(message["content"]) + + if question := st.chat_input("Ask a question based on the PDF"): + st.chat_message("user").markdown(question) + st.session_state.messages.append({"role": "user", "content": question, "avatar": "👤"}) + + # RAG response + with st.chat_message("assistant", avatar=couchbase_logo): + message_placeholder = st.empty() + rag_result = rag_pipeline.run( + { + "query_embedder": {"text": question}, + "retriever": {"top_k": 3}, + "prompt_builder": {"question": question}, + "answer_builder": {"query": question}, + } + ) + rag_response = rag_result["answer_builder"]["answers"][0].data + message_placeholder.markdown(rag_response) + st.session_state.messages.append({"role": "assistant", "content": rag_response, "avatar": couchbase_logo}) + + # Pure LLM response + with st.chat_message("ai", avatar="🤖"): + message_placeholder_pure_llm = st.empty() + pure_llm_result = rag_pipeline.run( + { + "prompt_builder": {"question": question}, + "llm": {}, + "answer_builder": {"query": question}, + "query_embedder": {"text": question} + } + ) + pure_llm_response = pure_llm_result["answer_builder"]["answers"][0].data + message_placeholder_pure_llm.markdown(pure_llm_response) + st.session_state.messages.append({"role": "assistant", "content": pure_llm_response, "avatar": "🤖"}) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 9e0b308..a8501d7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ streamlit==1.42.2 -couchbase-haystack==2.0.0 +couchbase-haystack==2.1.0 pypdf==5.3.0 -nltk>=3.9.1 \ No newline at end of file +nltk>=3.9.1 +requests>=2.31.0 \ No newline at end of file diff --git a/sampleSearchIndex.json b/sampleSearchIndex.json index b0e035d..a4dde8c 100644 --- a/sampleSearchIndex.json +++ b/sampleSearchIndex.json @@ -1,12 +1,6 @@ { - "name": "pdf_search", + "name": "sample-index", "type": "fulltext-index", - "sourceType": "gocbcore", - "sourceName": "haystack_bucket", - "planParams": { - "indexPartitions": 1, - "numReplicas": 0 - }, "params": { "doc_config": { "docid_prefix_delim": "", @@ -17,88 +11,66 @@ "mapping": { "default_analyzer": "standard", "default_datetime_parser": "dateTimeOptional", - "index_dynamic": true, - "store_dynamic": true, + "default_field": "_all", "default_mapping": { "dynamic": true, "enabled": false }, + "default_type": "_default", + "docvalues_dynamic": false, + "index_dynamic": true, + "store_dynamic": false, + "type_field": "_type", "types": { - "haystack_scope.haystack_collection": { - "dynamic": false, + "scope.coll": { + "dynamic": true, "enabled": true, "properties": { - "content": { - "enabled": true, - "fields": [ - { - "docvalues": true, - "include_in_all": false, - "include_term_vectors": false, - "index": true, - "name": "content", - "store": true, - "type": "text" - } - ] - }, "embedding": { "enabled": true, "dynamic": false, "fields": [ { - "vector_index_optimized_for": "recall", - "docvalues": true, "dims": 1536, - "include_in_all": false, - "include_term_vectors": false, "index": true, "name": "embedding", "similarity": "dot_product", - "store": true, - "type": "vector" + "type": "vector", + "vector_index_optimized_for": "recall" } ] }, - "dataframe": { + "meta": { + "dynamic": true, + "enabled": true + }, + "content": { "enabled": true, + "dynamic": false, "fields": [ { - "docvalues": true, - "include_in_all": false, - "include_term_vectors": false, "index": true, - "name": "dataframe", + "name": "text", "store": true, - "analyzer": "keyword", "type": "text" } ] - }, - "meta": { - "dynamic": true, - "enabled": true, - "properties": { - "name": { - "enabled": true, - "fields": [ - { - "docvalues": true, - "include_in_all": false, - "include_term_vectors": false, - "index": true, - "name": "name", - "store": true, - "analyzer": "keyword", - "type": "text" - } - ] - } - } } } } } + }, + "store": { + "indexType": "scorch", + "segmentVersion": 16 } + }, + "sourceType": "gocbcore", + "sourceName": "sample_bucket", + "sourceParams": {}, + "planParams": { + "maxPartitionsPerPIndex": 64, + "indexPartitions": 16, + "numReplicas": 0 } } \ No newline at end of file