From d0349abea18328d777b88a5002f80457a2f4a66f Mon Sep 17 00:00:00 2001 From: Minhajuddin Mohammed Date: Tue, 15 Jul 2025 14:03:57 -0400 Subject: [PATCH 1/4] added vector storage changes --- ckanext/datapusher_plus/config.py | 35 ++ ckanext/datapusher_plus/helpers.py | 20 +- ckanext/datapusher_plus/jobs.py | 88 ++++ ckanext/datapusher_plus/vector_store.py | 591 ++++++++++++++++++++++++ 4 files changed, 733 insertions(+), 1 deletion(-) create mode 100644 ckanext/datapusher_plus/vector_store.py diff --git a/ckanext/datapusher_plus/config.py b/ckanext/datapusher_plus/config.py index 5e5a546..2d2167d 100644 --- a/ckanext/datapusher_plus/config.py +++ b/ckanext/datapusher_plus/config.py @@ -171,3 +171,38 @@ AUTO_UNZIP_ONE_FILE = tk.asbool( tk.config.get("ckanext.datapusher_plus.auto_unzip_one_file", True) ) + +ENABLE_VECTOR_STORE = tk.asbool( + tk.config.get("ckanext.datapusher_plus.enable_vector_store", False) +) +VECTOR_STORE_PATH = tk.config.get( + "ckanext.datapusher_plus.vector_store_path", "/var/lib/ckan/vector_store" +) +VECTOR_STORE_ENABLED = tk.asbool( + tk.config.get("ckanext.datapusher_plus.vector_store_enabled", False) +) +VECTOR_STORE_COLLECTION = tk.config.get( + "ckanext.datapusher_plus.vector_store_collection", "default" +) +EMBEDDING_MODEL = tk.config.get( + "ckanext.datapusher_plus.embedding_model", "dunzhang/stella_en_400M_v5" +) +EMBEDDING_DEVICE = tk.config.get( + "ckanext.datapusher_plus.embedding_device", "cpu" +) +# OpenRouter API Key +OPENROUTER_API_KEY = tk.config.get( + "ckanext.datapusher_plus.openrouter_api_key", "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" +) +# OpenRouter Model +OPENROUTER_MODEL = tk.config.get( + "ckanext.datapusher_plus.openrouter_model", "google/gemini-2.0-flash-001" +) +# Text chunking overlap +CHUNK_OVERLAP = tk.asint( + tk.config.get("ckanext.datapusher_plus.chunk_overlap", "400") +) +# Temporal coverage detection +DETECT_TEMPORAL_COVERAGE = tk.asbool( + tk.config.get("ckanext.datapusher_plus.detect_temporal_coverage", True) +) diff --git a/ckanext/datapusher_plus/helpers.py b/ckanext/datapusher_plus/helpers.py index 62b1b05..226d226 100644 --- a/ckanext/datapusher_plus/helpers.py +++ b/ckanext/datapusher_plus/helpers.py @@ -590,4 +590,22 @@ def is_preformulated_field(field): Check if a field is preformulated (has formula attribute) This helper returns True only if the field has a 'formula' key with a non-empty value """ - return bool(field.get('formula', False)) \ No newline at end of file + return bool(field.get('formula', False)) + + +def get_resource_embedding_status(resource_id): + """Check if a resource has been embedded in the vector store""" + try: + from ckanext.datapusher_plus.vector_store import DataPusherVectorStore + vector_store = DataPusherVectorStore() + if vector_store.enabled: + # Check if resource exists in vector store + results = vector_store.collection.query( + query_embeddings=[[0.0] * 384], # Dummy embedding + n_results=1, + where={"resource_id": resource_id} + ) + return bool(results and results.get('ids') and results['ids'][0]) + except Exception: + pass + return False \ No newline at end of file diff --git a/ckanext/datapusher_plus/jobs.py b/ckanext/datapusher_plus/jobs.py index 1f22db4..3f2a6f6 100644 --- a/ckanext/datapusher_plus/jobs.py +++ b/ckanext/datapusher_plus/jobs.py @@ -39,6 +39,12 @@ from ckanext.datapusher_plus.qsv_utils import QSVCommand from ckanext.datapusher_plus.pii_screening import screen_for_pii +try: + from ckanext.datapusher_plus.vector_store import DataPusherVectorStore + VECTOR_STORE_AVAILABLE = True +except ImportError: + VECTOR_STORE_AVAILABLE = False + if locale.getdefaultlocale()[0]: lang, encoding = locale.getdefaultlocale() locale.setlocale(locale.LC_ALL, locale=(lang, encoding)) @@ -1585,3 +1591,85 @@ def _push_to_datastore( TOTAL ELAPSED TIME: {total_elapsed:,.2f} """ logger.info(end_msg) + + + # ============================================================ + # VECTOR STORE EMBEDDING + # ============================================================ + if conf.ENABLE_VECTOR_STORE and VECTOR_STORE_AVAILABLE: + vector_start = time.perf_counter() + logger.info("STARTING VECTOR STORE EMBEDDING...") + + try: + # Initialize vector store + vector_store = DataPusherVectorStore() + + if vector_store.enabled: + # Prepare temporal information if available + temporal_info = None + if dataset_stats.get('IS_SORTED') and datetimecols_list: + # Extract temporal coverage from the data + temporal_years = set() + + # If we have year columns, extract years + for col in datetimecols_list: + if 'year' in col.lower(): + # Get year range from min/max + col_idx = headers.index(col) + if col_idx < len(headers_min) and col_idx < len(headers_max): + try: + min_year = int(float(headers_min[col_idx])) + max_year = int(float(headers_max[col_idx])) + temporal_years.update(range(min_year, max_year + 1)) + except (ValueError, TypeError): + pass + + # Also check for date columns + if not temporal_years and datetimecols_list: + # Sample approach - get min/max dates + for idx, col in enumerate(headers): + if col in datetimecols_list: + try: + # Parse dates from min/max values + min_date = headers_min[idx] + max_date = headers_max[idx] + if min_date and max_date: + # Extract years from dates + min_year = parsedate(str(min_date)).year + max_year = parsedate(str(max_date)).year + temporal_years.update(range(min_year, max_year + 1)) + except Exception as e: + logger.warning(f"Could not parse dates from column {col}: {e}") + + if temporal_years: + temporal_info = { + 'min': min(temporal_years), + 'max': max(temporal_years), + 'all_years': sorted(list(temporal_years)), + 'year_count': len(temporal_years), + 'temporal_columns': {col: True for col in datetimecols_list} + } + logger.info(f"Detected temporal coverage: {temporal_info['min']} to {temporal_info['max']}") + + # Embed the resource + success = vector_store.embed_resource( + resource_id=resource_id, + resource_metadata=resource, + dataset_metadata=package, + stats_data=resource_fields_stats, + freq_data=resource_fields_freqs, + temporal_info=temporal_info, + logger=logger + ) + + if success: + logger.info("Vector store embedding completed successfully") + else: + logger.warning("Vector store embedding failed") + + except Exception as e: + logger.error(f"Error during vector store embedding: {e}") + # Don't fail the job if embedding fails + + vector_elapsed = time.perf_counter() - vector_start + logger.info(f"VECTOR STORE EMBEDDING completed in {vector_elapsed:,.2f} seconds") diff --git a/ckanext/datapusher_plus/vector_store.py b/ckanext/datapusher_plus/vector_store.py new file mode 100644 index 0000000..ebdac1f --- /dev/null +++ b/ckanext/datapusher_plus/vector_store.py @@ -0,0 +1,591 @@ +# -*- coding: utf-8 -*- +""" +Vector Store integration for DataPusher Plus +Embeds resource data and metadata during the upload process +Using local embeddings and OpenRouter for LLM +""" + +import os +import logging +import json +import requests +from typing import Dict, Any, Optional, List, Union +from datetime import datetime +import chromadb +from sentence_transformers import SentenceTransformer +import numpy as np +import ckanext.datapusher_plus.config as conf + +log = logging.getLogger(__name__) + + +class DataPusherVectorStore: + """Vector store for embedding resources during datapusher upload""" + + def __init__(self): + self.enabled = conf.ENABLE_VECTOR_STORE + if not self.enabled: + log.info("Vector store embedding is disabled") + return + + self.persist_directory = conf.VECTOR_STORE_PATH + self.collection_name = conf.VECTOR_STORE_COLLECTION + + # Initialize components + self._initialize_components() + + def _initialize_components(self): + """Initialize vector store components""" + try: + # Ensure directory exists + os.makedirs(self.persist_directory, exist_ok=True) + + # Initialize ChromaDB + settings = chromadb.config.Settings( + persist_directory=self.persist_directory, + anonymized_telemetry=False, + allow_reset=True + ) + + self.client = chromadb.PersistentClient( + path=self.persist_directory, + settings=settings + ) + + # Get or create collection + try: + self.collection = self.client.get_collection(name=self.collection_name) + log.info(f"Retrieved collection '{self.collection_name}'") + except: + self.collection = self.client.create_collection( + name=self.collection_name, + metadata={"hnsw:space": "cosine"} + ) + log.info(f"Created collection '{self.collection_name}'") + + # Initialize local embedding model + device = conf.EMBEDDING_DEVICE # 'cuda', 'cpu', or 'mps' + self.embedding_model = SentenceTransformer( + conf.EMBEDDING_MODEL, + trust_remote_code=True, + device=device, + config_kwargs={ + "use_memory_efficient_attention": False, + "unpad_inputs": False + } + ) + log.info(f"Loaded embedding model {conf.EMBEDDING_MODEL} on {device}") + + # Initialize OpenRouter configuration + self.openrouter_api_key = conf.OPENROUTER_API_KEY + self.openrouter_model = conf.OPENROUTER_MODEL + self.openrouter_base_url = "https://openrouter.ai/api/v1" + + # Text splitter settings + self.chunk_size = conf.CHUNK_SIZE + self.chunk_overlap = conf.CHUNK_OVERLAP + + log.info("Vector store components initialized successfully") + + except Exception as e: + log.error(f"Failed to initialize vector store: {e}") + self.enabled = False + + def _split_text(self, text: str, chunk_size: int = None, chunk_overlap: int = None) -> List[str]: + """Simple text splitter""" + if chunk_size is None: + chunk_size = self.chunk_size + if chunk_overlap is None: + chunk_overlap = self.chunk_overlap + + if len(text) <= chunk_size: + return [text] + + chunks = [] + start = 0 + + while start < len(text): + end = start + chunk_size + + # Try to find a good break point + if end < len(text): + # Look for paragraph break + break_point = text.rfind('\n\n', start, end) + if break_point == -1: + # Look for sentence break + break_point = text.rfind('. ', start, end) + if break_point == -1: + # Look for any newline + break_point = text.rfind('\n', start, end) + if break_point > start: + end = break_point + 1 + + chunks.append(text[start:end].strip()) + start = end - chunk_overlap + + return chunks + + def _call_openrouter(self, prompt: str, system_prompt: str = None, + temperature: float = 0.1, max_tokens: int = 500) -> str: + """Call OpenRouter API for text generation""" + headers = { + "Authorization": f"Bearer {self.openrouter_api_key}", + "Content-Type": "application/json", + "HTTP-Referer": "https://github.com/dathere/datapusher-plus", + "X-Title": "DataPusher Plus Vector Store" + } + + messages = [] + if system_prompt: + messages.append({"role": "system", "content": system_prompt}) + messages.append({"role": "user", "content": prompt}) + + data = { + "model": self.openrouter_model, + "messages": messages, + "temperature": temperature, + "max_tokens": max_tokens + } + + try: + response = requests.post( + f"{self.openrouter_base_url}/chat/completions", + headers=headers, + json=data, + timeout=30 + ) + + if response.status_code == 200: + result = response.json() + return result["choices"][0]["message"]["content"].strip() + else: + log.error(f"OpenRouter API error {response.status_code}: {response.text}") + return "" + + except Exception as e: + log.error(f"Error calling OpenRouter: {e}") + return "" + + def embed_resource(self, + resource_id: str, + resource_metadata: Dict[str, Any], + dataset_metadata: Dict[str, Any], + stats_data: Dict[str, Any], + freq_data: Dict[str, Any], + temporal_info: Optional[Dict[str, Any]] = None, + logger=None) -> bool: + """Embed a resource after successful datastore upload""" + + if not self.enabled: + return False + + if logger is None: + logger = log + + try: + logger.info(f"Starting vector embedding for resource {resource_id}") + + # Create resource profile from available data + profile = self._create_resource_profile( + resource_id, resource_metadata, dataset_metadata, + stats_data, freq_data, temporal_info + ) + + # Generate AI description using OpenRouter + ai_description = self._generate_ai_description(profile) + profile['ai_description'] = ai_description + + # Create document content + doc_content = self._create_document_content(profile) + + # Create metadata for ChromaDB + metadata = self._create_metadata(profile) + + # Split document if too long + chunks = self._split_text(doc_content) + + # Generate embeddings using local model + embeddings = [] + for chunk in chunks: + embedding = self.embedding_model.encode(chunk, convert_to_numpy=True) + embeddings.append(embedding.tolist()) + + # Remove existing entries for this resource + try: + self.collection.delete(where={"resource_id": resource_id}) + except Exception as e: + logger.warning(f"Could not delete existing entries: {e}") + + # Add new entries + ids = [f"{resource_id}_{i}" for i in range(len(chunks))] + metadatas = [metadata.copy() for _ in chunks] + + # Add chunk info to metadata + for i, meta in enumerate(metadatas): + meta['chunk_index'] = i + meta['total_chunks'] = len(chunks) + + self.collection.add( + embeddings=embeddings, + documents=chunks, + metadatas=metadatas, + ids=ids + ) + + logger.info(f"Successfully embedded {len(chunks)} chunks for resource {resource_id}") + return True + + except Exception as e: + logger.error(f"Error embedding resource {resource_id}: {e}") + return False + + def search_resources(self, query: str, n_results: int = 10, + filters: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: + """Search for resources using the local embedding model""" + try: + # Generate query embedding using the query prompt + query_embedding = self.embedding_model.encode( + query, + prompt_name="s2p_query" if hasattr(self.embedding_model, 'prompts') else None, + convert_to_numpy=True + ) + + # Build where clause + where_clause = filters if filters else None + + # Search + search_kwargs = { + "query_embeddings": [query_embedding.tolist()], + "n_results": n_results, + } + + if where_clause: + search_kwargs["where"] = where_clause + + results = self.collection.query(**search_kwargs) + + # Format results + formatted_results = [] + if results and 'documents' in results and results['documents']: + documents = results['documents'][0] + metadatas = results['metadatas'][0] if results.get('metadatas') else [] + distances = results['distances'][0] if results.get('distances') else [] + ids = results['ids'][0] if results.get('ids') else [] + + for i, doc in enumerate(documents): + formatted_results.append({ + 'id': ids[i] if i < len(ids) else None, + 'content': doc, + 'metadata': metadatas[i] if i < len(metadatas) else {}, + 'distance': distances[i] if i < len(distances) else 1.0, + 'score': 1.0 - (distances[i] if i < len(distances) else 1.0) + }) + + return formatted_results + + except Exception as e: + log.error(f"Error searching resources: {e}") + return [] + + def search_resources_by_year(self, query: str, year_filter: tuple, + n_results: int = 20) -> List[Dict[str, Any]]: + """Search for resources containing specific years""" + try: + # Build where clause for temporal filtering + where_clause = { + "$and": [ + {"has_temporal": True}, + {"temporal_min": {"$lte": year_filter[1]}}, + {"temporal_max": {"$gte": year_filter[0]}} + ] + } + + # Search with year filter + return self.search_resources( + query=f"{query} {year_filter[0]} {year_filter[1]} year temporal data", + n_results=n_results, + filters=where_clause + ) + + except Exception as e: + log.error(f"Error in year-based search: {e}") + return [] + + def _create_resource_profile(self, + resource_id: str, + resource_metadata: Dict[str, Any], + dataset_metadata: Dict[str, Any], + stats_data: Dict[str, Any], + freq_data: Dict[str, Any], + temporal_info: Optional[Dict[str, Any]]) -> Dict[str, Any]: + """Create resource profile from datapusher analysis""" + + profile = { + 'resource_id': resource_id, + 'resource_name': resource_metadata.get('name', resource_id), + 'format': resource_metadata.get('format', 'unknown'), + 'dataset_id': dataset_metadata.get('id'), + 'dataset_title': dataset_metadata.get('title'), + 'dataset_tags': [tag['name'] if isinstance(tag, dict) else str(tag) + for tag in dataset_metadata.get('tags', [])], + 'dataset_notes': dataset_metadata.get('notes', ''), + 'resource_description': resource_metadata.get('description', ''), + 'temporal_coverage': temporal_info, + 'stats_summary': stats_data, + 'frequency_summary': self._summarize_frequencies(freq_data), + 'profiling_timestamp': datetime.now().isoformat() + } + + # Extract column information from stats + if stats_data: + profile['columns_info'] = self._extract_column_info(stats_data) + profile['data_patterns'] = self._detect_patterns_from_stats(stats_data) + + return profile + + def _extract_column_info(self, stats_data: Dict[str, Any]) -> Dict[str, Any]: + """Extract column information from qsv stats""" + columns_info = {} + + for field_name, field_stats in stats_data.items(): + if isinstance(field_stats, dict) and field_stats.get('stats'): + stats = field_stats['stats'] + + col_info = { + 'dtype': stats.get('type', 'String'), + 'non_null_count': int(stats.get('count', 0)), + 'null_count': int(stats.get('nullcount', 0)), + 'unique_count': int(stats.get('cardinality', 0)), + 'is_temporal': stats.get('type') == 'DateTime', + 'is_numeric': stats.get('type') in ['Integer', 'Float'], + } + + # Add numeric stats if available + if col_info['is_numeric']: + col_info['numeric_stats'] = { + 'min': float(stats.get('min', 0)), + 'max': float(stats.get('max', 0)), + 'mean': float(stats.get('mean', 0)) if stats.get('mean') else None, + } + + # Check if geographic + col_info['is_geographic'] = self._is_geographic_column(field_name) + + columns_info[field_name] = col_info + + return columns_info + + def _is_geographic_column(self, col_name: str) -> bool: + """Check if column name suggests geographic data""" + col_name_lower = str(col_name).lower() + geo_keywords = ['county', 'state', 'city', 'country', 'region', 'area', + 'district', 'location', 'address', 'zip', 'postal', 'fips', + 'place', 'metro', 'msa', 'municipality', 'province', 'territory'] + + return any(keyword in col_name_lower for keyword in geo_keywords) + + def _detect_patterns_from_stats(self, stats_data: Dict[str, Any]) -> Dict[str, Any]: + """Detect data patterns from stats""" + patterns = { + 'has_temporal': False, + 'has_geographic': False, + 'has_financial': False, + 'data_categories': [] + } + + financial_keywords = ['income', 'revenue', 'cost', 'price', 'amount', 'salary', + 'wage', 'payment', 'balance', 'budget', 'earning', 'median', + 'expenditure', 'expense', 'profit', 'tax', 'gdp'] + + for field_name, field_info in stats_data.items(): + if isinstance(field_info, dict): + field_lower = field_name.lower() + + # Check temporal + if field_info.get('stats', {}).get('type') == 'DateTime': + patterns['has_temporal'] = True + + # Check geographic + if self._is_geographic_column(field_name): + patterns['has_geographic'] = True + + # Check financial + if any(keyword in field_lower for keyword in financial_keywords): + patterns['has_financial'] = True + if 'financial' not in patterns['data_categories']: + patterns['data_categories'].append('financial') + + # Check unemployment + if 'unemploy' in field_lower: + if 'unemployment' not in patterns['data_categories']: + patterns['data_categories'].append('unemployment') + + return patterns + + def _summarize_frequencies(self, freq_data: Dict[str, List[Dict]]) -> Dict[str, Any]: + """Summarize frequency data for embedding""" + summary = {} + + for field_name, frequencies in freq_data.items(): + if frequencies and len(frequencies) > 0: + # Get top 5 values + top_values = [f"{item['value']} ({item['count']})" + for item in frequencies[:5]] + summary[field_name] = { + 'top_values': top_values, + 'unique_count': len(frequencies) + } + + return summary + + def _generate_ai_description(self, profile: Dict[str, Any]) -> str: + """Generate AI description using OpenRouter""" + try: + # Build context for LLM + context = f""" +Resource: {profile['resource_name']} +Format: {profile['format']} +Dataset: {profile['dataset_title']} +Tags: {', '.join(profile['dataset_tags'])} + +Dataset Description: {profile['dataset_notes'][:500] if profile['dataset_notes'] else 'Not provided'} +Resource Description: {profile['resource_description'] or 'Not provided'} +""" + + # Add temporal coverage + if profile.get('temporal_coverage'): + temp = profile['temporal_coverage'] + context += f"\n\nTemporal Coverage: {temp.get('min')} to {temp.get('max')}" + if temp.get('year_count'): + context += f" ({temp['year_count']} years)" + + # Add column information + if profile.get('columns_info'): + context += "\n\nKey Columns:" + for col_name, col_info in list(profile['columns_info'].items())[:10]: + context += f"\n- {col_name}: {col_info['dtype']}" + if col_info.get('is_geographic'): + context += " (geographic)" + elif col_info.get('numeric_stats'): + stats = col_info['numeric_stats'] + context += f" (range: {stats['min']:.2f} - {stats['max']:.2f})" + + # Add data patterns + if profile.get('data_patterns', {}).get('data_categories'): + context += f"\n\nData Categories: {', '.join(profile['data_patterns']['data_categories'])}" + + prompt = f"""Based on the following resource information, generate a comprehensive description that will help users understand exactly what data this resource contains and how it can be used. + +{context} + +Generate a clear, informative description (2-3 sentences) that: +1. Clearly states what specific data this resource contains +2. Mentions the temporal coverage if available +3. Highlights key variables or metrics +4. Suggests potential use cases + +Be specific and factual.""" + + description = self._call_openrouter(prompt) + + if description: + return description + else: + # Fallback description + fallback = f"Resource containing {profile['resource_name']} data" + if profile.get('temporal_coverage'): + temp = profile['temporal_coverage'] + fallback += f" from {temp.get('min')} to {temp.get('max')}" + return fallback + + except Exception as e: + log.error(f"Error generating AI description: {e}") + # Fallback description + fallback = f"Resource containing {profile['resource_name']} data" + if profile.get('temporal_coverage'): + temp = profile['temporal_coverage'] + fallback += f" from {temp.get('min')} to {temp.get('max')}" + return fallback + + def _create_document_content(self, profile: Dict[str, Any]) -> str: + """Create searchable document content""" + doc = f"""Resource: {profile['resource_name']} +Resource ID: {profile['resource_id']} +Dataset: {profile['dataset_title']} +Format: {profile['format']} + +{profile['ai_description']} +""" + + # Add temporal coverage + if profile.get('temporal_coverage'): + temp = profile['temporal_coverage'] + doc += f"\n\n**TEMPORAL COVERAGE: {temp.get('min')} to {temp.get('max')}**" + if temp.get('year_count'): + doc += f"\nTotal Years: {temp['year_count']}" + + # Add column information + if profile.get('columns_info'): + doc += "\n\nColumns:" + for col_name, col_info in list(profile['columns_info'].items())[:15]: + doc += f"\n- {col_name}: {col_info['dtype']}" + if col_info.get('unique_count'): + doc += f" ({col_info['unique_count']} unique values)" + + # Add frequency summaries for key columns + if profile.get('frequency_summary'): + doc += "\n\nTop Values in Key Columns:" + for field, freq_info in list(profile['frequency_summary'].items())[:5]: + if freq_info.get('top_values'): + doc += f"\n- {field}: {', '.join(freq_info['top_values'][:3])}" + + # Add tags + if profile.get('dataset_tags'): + doc += f"\n\nTags: {', '.join(profile['dataset_tags'])}" + + return doc + + def _create_metadata(self, profile: Dict[str, Any]) -> Dict[str, Any]: + """Create metadata for ChromaDB""" + metadata = { + 'resource_id': profile['resource_id'], + 'resource_name': profile.get('resource_name', ''), + 'dataset_id': profile.get('dataset_id', ''), + 'dataset_title': profile.get('dataset_title', ''), + 'format': profile.get('format', 'unknown'), + 'has_temporal': bool(profile.get('temporal_coverage')), + 'indexed_at': datetime.now().isoformat() + } + + # Add temporal metadata + if profile.get('temporal_coverage'): + temp = profile['temporal_coverage'] + metadata['temporal_min'] = int(temp.get('min', 0)) + metadata['temporal_max'] = int(temp.get('max', 0)) + metadata['year_count'] = int(temp.get('year_count', 0)) + else: + metadata['temporal_min'] = 0 + metadata['temporal_max'] = 0 + metadata['year_count'] = 0 + + # Add pattern metadata + if profile.get('data_patterns'): + patterns = profile['data_patterns'] + metadata['has_geographic'] = patterns.get('has_geographic', False) + metadata['has_financial'] = patterns.get('has_financial', False) + metadata['data_categories'] = ','.join(patterns.get('data_categories', [])) + + return metadata + + def delete_resource_embeddings(self, resource_id: str) -> bool: + """Delete embeddings for a resource""" + if not self.enabled: + return False + + try: + self.collection.delete(where={"resource_id": resource_id}) + log.info(f"Deleted embeddings for resource {resource_id}") + return True + except Exception as e: + log.error(f"Error deleting embeddings for resource {resource_id}: {e}") + return False \ No newline at end of file From 416b84af2c11666897ea09fc40e7dfe27bf838e1 Mon Sep 17 00:00:00 2001 From: Minhajuddin Mohammed Date: Tue, 15 Jul 2025 14:04:53 -0400 Subject: [PATCH 2/4] added requiements --- requirements.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/requirements.txt b/requirements.txt index 276baa0..e1a8f93 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,5 @@ fiona==1.10.1 pandas==2.2.3 shapely==2.1.0 pyproj>=3.7.1 +sentence-transformers==4.1.0 +chromadb==1.0.12 \ No newline at end of file From 62962b5c6de1d509288926543b42e3714b4e0ef3 Mon Sep 17 00:00:00 2001 From: Minhajuddin Mohammed Date: Thu, 24 Jul 2025 03:19:32 -0400 Subject: [PATCH 3/4] changed chroma db to pincone db --- ckanext/datapusher_plus/config.py | 29 +++-- ckanext/datapusher_plus/vector_store.py | 164 +++++++++--------------- requirements.txt | 3 +- 3 files changed, 76 insertions(+), 120 deletions(-) diff --git a/ckanext/datapusher_plus/config.py b/ckanext/datapusher_plus/config.py index 2d2167d..2aee99c 100644 --- a/ckanext/datapusher_plus/config.py +++ b/ckanext/datapusher_plus/config.py @@ -175,21 +175,18 @@ ENABLE_VECTOR_STORE = tk.asbool( tk.config.get("ckanext.datapusher_plus.enable_vector_store", False) ) -VECTOR_STORE_PATH = tk.config.get( - "ckanext.datapusher_plus.vector_store_path", "/var/lib/ckan/vector_store" + +# Pinecone configuration +PINECONE_API_KEY = tk.config.get( + "ckanext.datapusher_plus.pinecone_api_key" ) -VECTOR_STORE_ENABLED = tk.asbool( - tk.config.get("ckanext.datapusher_plus.vector_store_enabled", False) +VECTOR_STORE_INDEX_NAME = tk.config.get( + "ckanext.datapusher_plus.vector_store_index_name", "datapusher-resources" ) -VECTOR_STORE_COLLECTION = tk.config.get( - "ckanext.datapusher_plus.vector_store_collection", "default" -) -EMBEDDING_MODEL = tk.config.get( - "ckanext.datapusher_plus.embedding_model", "dunzhang/stella_en_400M_v5" +VECTOR_STORE_NAMESPACE = tk.config.get( + "ckanext.datapusher_plus.vector_store_namespace", "default" ) -EMBEDDING_DEVICE = tk.config.get( - "ckanext.datapusher_plus.embedding_device", "cpu" -) + # OpenRouter API Key OPENROUTER_API_KEY = tk.config.get( "ckanext.datapusher_plus.openrouter_api_key", "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" @@ -197,8 +194,12 @@ # OpenRouter Model OPENROUTER_MODEL = tk.config.get( "ckanext.datapusher_plus.openrouter_model", "google/gemini-2.0-flash-001" -) -# Text chunking overlap +) + +# Text chunking settings +CHUNK_SIZE = tk.asint( + tk.config.get("ckanext.datapusher_plus.chunk_size", "1000") +) CHUNK_OVERLAP = tk.asint( tk.config.get("ckanext.datapusher_plus.chunk_overlap", "400") ) diff --git a/ckanext/datapusher_plus/vector_store.py b/ckanext/datapusher_plus/vector_store.py index ebdac1f..4eb97f9 100644 --- a/ckanext/datapusher_plus/vector_store.py +++ b/ckanext/datapusher_plus/vector_store.py @@ -2,7 +2,7 @@ """ Vector Store integration for DataPusher Plus Embeds resource data and metadata during the upload process -Using local embeddings and OpenRouter for LLM +Using Pinecone vector database and OpenRouter for LLM """ import os @@ -11,9 +11,7 @@ import requests from typing import Dict, Any, Optional, List, Union from datetime import datetime -import chromadb -from sentence_transformers import SentenceTransformer -import numpy as np +from pinecone import Pinecone import ckanext.datapusher_plus.config as conf log = logging.getLogger(__name__) @@ -28,8 +26,8 @@ def __init__(self): log.info("Vector store embedding is disabled") return - self.persist_directory = conf.VECTOR_STORE_PATH - self.collection_name = conf.VECTOR_STORE_COLLECTION + self.index_name = conf.VECTOR_STORE_INDEX_NAME + self.namespace = conf.VECTOR_STORE_NAMESPACE # Initialize components self._initialize_components() @@ -37,44 +35,11 @@ def __init__(self): def _initialize_components(self): """Initialize vector store components""" try: - # Ensure directory exists - os.makedirs(self.persist_directory, exist_ok=True) - - # Initialize ChromaDB - settings = chromadb.config.Settings( - persist_directory=self.persist_directory, - anonymized_telemetry=False, - allow_reset=True - ) - - self.client = chromadb.PersistentClient( - path=self.persist_directory, - settings=settings - ) + # Initialize Pinecone + self.pc = Pinecone(api_key=conf.PINECONE_API_KEY) + self.index = self.pc.Index(self.index_name) - # Get or create collection - try: - self.collection = self.client.get_collection(name=self.collection_name) - log.info(f"Retrieved collection '{self.collection_name}'") - except: - self.collection = self.client.create_collection( - name=self.collection_name, - metadata={"hnsw:space": "cosine"} - ) - log.info(f"Created collection '{self.collection_name}'") - - # Initialize local embedding model - device = conf.EMBEDDING_DEVICE # 'cuda', 'cpu', or 'mps' - self.embedding_model = SentenceTransformer( - conf.EMBEDDING_MODEL, - trust_remote_code=True, - device=device, - config_kwargs={ - "use_memory_efficient_attention": False, - "unpad_inputs": False - } - ) - log.info(f"Loaded embedding model {conf.EMBEDDING_MODEL} on {device}") + log.info(f"Connected to Pinecone index '{self.index_name}'") # Initialize OpenRouter configuration self.openrouter_api_key = conf.OPENROUTER_API_KEY @@ -198,38 +163,37 @@ def embed_resource(self, # Create document content doc_content = self._create_document_content(profile) - # Create metadata for ChromaDB + # Create metadata for Pinecone metadata = self._create_metadata(profile) # Split document if too long chunks = self._split_text(doc_content) - # Generate embeddings using local model - embeddings = [] - for chunk in chunks: - embedding = self.embedding_model.encode(chunk, convert_to_numpy=True) - embeddings.append(embedding.tolist()) - # Remove existing entries for this resource try: - self.collection.delete(where={"resource_id": resource_id}) + # Delete existing vectors for this resource + existing_ids = [f"{resource_id}_{i}" for i in range(100)] # Assume max 100 chunks + self.index.delete(ids=existing_ids, namespace=self.namespace) except Exception as e: logger.warning(f"Could not delete existing entries: {e}") - # Add new entries - ids = [f"{resource_id}_{i}" for i in range(len(chunks))] - metadatas = [metadata.copy() for _ in chunks] - - # Add chunk info to metadata - for i, meta in enumerate(metadatas): - meta['chunk_index'] = i - meta['total_chunks'] = len(chunks) - - self.collection.add( - embeddings=embeddings, - documents=chunks, - metadatas=metadatas, - ids=ids + # Prepare data for Pinecone upsert + records = [] + for i, chunk in enumerate(chunks): + chunk_metadata = metadata.copy() + chunk_metadata['chunk_index'] = i + chunk_metadata['total_chunks'] = len(chunks) + + records.append({ + "id": f"{resource_id}_{i}", + "text": chunk, + "metadata": chunk_metadata + }) + + # Upsert to Pinecone (embeddings are generated automatically) + self.index.upsert_records( + namespace=self.namespace, + records=records ) logger.info(f"Successfully embedded {len(chunks)} chunks for resource {resource_id}") @@ -241,44 +205,34 @@ def embed_resource(self, def search_resources(self, query: str, n_results: int = 10, filters: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: - """Search for resources using the local embedding model""" + """Search for resources using Pinecone hosted embeddings""" try: - # Generate query embedding using the query prompt - query_embedding = self.embedding_model.encode( - query, - prompt_name="s2p_query" if hasattr(self.embedding_model, 'prompts') else None, - convert_to_numpy=True - ) - - # Build where clause - where_clause = filters if filters else None - - # Search - search_kwargs = { - "query_embeddings": [query_embedding.tolist()], - "n_results": n_results, + # Build search query for Pinecone + search_query = { + "inputs": {"text": query}, + "top_k": n_results } - if where_clause: - search_kwargs["where"] = where_clause + # Add metadata filters if provided + if filters: + search_query["filter"] = filters - results = self.collection.query(**search_kwargs) + # Search using Pinecone + results = self.index.search( + namespace=self.namespace, + query=search_query + ) # Format results formatted_results = [] - if results and 'documents' in results and results['documents']: - documents = results['documents'][0] - metadatas = results['metadatas'][0] if results.get('metadatas') else [] - distances = results['distances'][0] if results.get('distances') else [] - ids = results['ids'][0] if results.get('ids') else [] - - for i, doc in enumerate(documents): + if results and 'matches' in results: + for match in results['matches']: formatted_results.append({ - 'id': ids[i] if i < len(ids) else None, - 'content': doc, - 'metadata': metadatas[i] if i < len(metadatas) else {}, - 'distance': distances[i] if i < len(distances) else 1.0, - 'score': 1.0 - (distances[i] if i < len(distances) else 1.0) + 'id': match.get('id'), + 'content': match.get('text', ''), + 'metadata': match.get('metadata', {}), + 'score': match.get('score', 0.0), + 'distance': 1.0 - match.get('score', 0.0) }) return formatted_results @@ -291,20 +245,18 @@ def search_resources_by_year(self, query: str, year_filter: tuple, n_results: int = 20) -> List[Dict[str, Any]]: """Search for resources containing specific years""" try: - # Build where clause for temporal filtering - where_clause = { - "$and": [ - {"has_temporal": True}, - {"temporal_min": {"$lte": year_filter[1]}}, - {"temporal_max": {"$gte": year_filter[0]}} - ] + # Build filter for temporal filtering using Pinecone metadata filters + filters = { + "has_temporal": True, + "temporal_min": {"$lte": year_filter[1]}, + "temporal_max": {"$gte": year_filter[0]} } # Search with year filter return self.search_resources( query=f"{query} {year_filter[0]} {year_filter[1]} year temporal data", n_results=n_results, - filters=where_clause + filters=filters ) except Exception as e: @@ -583,7 +535,11 @@ def delete_resource_embeddings(self, resource_id: str) -> bool: return False try: - self.collection.delete(where={"resource_id": resource_id}) + # Delete vectors by resource_id filter + self.index.delete( + filter={"resource_id": resource_id}, + namespace=self.namespace + ) log.info(f"Deleted embeddings for resource {resource_id}") return True except Exception as e: diff --git a/requirements.txt b/requirements.txt index e1a8f93..58aadcc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,5 +5,4 @@ fiona==1.10.1 pandas==2.2.3 shapely==2.1.0 pyproj>=3.7.1 -sentence-transformers==4.1.0 -chromadb==1.0.12 \ No newline at end of file +pinecone \ No newline at end of file From ded32d4bbee63b7754a5f87e3104445ba3d852a9 Mon Sep 17 00:00:00 2001 From: Minhajuddin Mohammed Date: Mon, 28 Jul 2025 06:06:01 -0400 Subject: [PATCH 4/4] bug fixes --- ckanext/datapusher_plus/config.py | 8 +- ckanext/datapusher_plus/jobs.py | 5 +- ckanext/datapusher_plus/plugin.py | 2 +- ckanext/datapusher_plus/vector_store.py | 269 ++++++++++++++---------- 4 files changed, 162 insertions(+), 122 deletions(-) diff --git a/ckanext/datapusher_plus/config.py b/ckanext/datapusher_plus/config.py index 2aee99c..abdd58d 100644 --- a/ckanext/datapusher_plus/config.py +++ b/ckanext/datapusher_plus/config.py @@ -173,7 +173,7 @@ ) ENABLE_VECTOR_STORE = tk.asbool( - tk.config.get("ckanext.datapusher_plus.enable_vector_store", False) + tk.config.get("ckanext.datapusher_plus.enable_vector_store", True) ) # Pinecone configuration @@ -196,9 +196,9 @@ "ckanext.datapusher_plus.openrouter_model", "google/gemini-2.0-flash-001" ) -# Text chunking settings -CHUNK_SIZE = tk.asint( - tk.config.get("ckanext.datapusher_plus.chunk_size", "1000") +# Text chunking settings for vector store +VECTOR_CHUNK_SIZE = tk.asint( + tk.config.get("ckanext.datapusher_plus.vector_chunk_size", "1000") ) CHUNK_OVERLAP = tk.asint( tk.config.get("ckanext.datapusher_plus.chunk_overlap", "400") diff --git a/ckanext/datapusher_plus/jobs.py b/ckanext/datapusher_plus/jobs.py index 3f2a6f6..3ea711f 100644 --- a/ckanext/datapusher_plus/jobs.py +++ b/ckanext/datapusher_plus/jobs.py @@ -1590,12 +1590,11 @@ def _push_to_datastore(   Resource metadata updates: {metadata_elapsed:,.2f} TOTAL ELAPSED TIME: {total_elapsed:,.2f} """ - logger.info(end_msg) - # ============================================================ # VECTOR STORE EMBEDDING # ============================================================ + if conf.ENABLE_VECTOR_STORE and VECTOR_STORE_AVAILABLE: vector_start = time.perf_counter() logger.info("STARTING VECTOR STORE EMBEDDING...") @@ -1673,3 +1672,5 @@ def _push_to_datastore( vector_elapsed = time.perf_counter() - vector_start logger.info(f"VECTOR STORE EMBEDDING completed in {vector_elapsed:,.2f} seconds") + + logger.info(end_msg) diff --git a/ckanext/datapusher_plus/plugin.py b/ckanext/datapusher_plus/plugin.py index ac3a8ea..3c3d093 100644 --- a/ckanext/datapusher_plus/plugin.py +++ b/ckanext/datapusher_plus/plugin.py @@ -215,4 +215,4 @@ def resource_save_redirect( '{}_resource.new'.format(package_type), id=package_name, ) # edit dataset page after resource - return h.url_for(u'{}.edit'.format(package_type), id=package_name) + return h.url_for(u'{}.edit'.format(package_type), id=package_name) \ No newline at end of file diff --git a/ckanext/datapusher_plus/vector_store.py b/ckanext/datapusher_plus/vector_store.py index 4eb97f9..6dd439a 100644 --- a/ckanext/datapusher_plus/vector_store.py +++ b/ckanext/datapusher_plus/vector_store.py @@ -47,29 +47,49 @@ def _initialize_components(self): self.openrouter_base_url = "https://openrouter.ai/api/v1" # Text splitter settings - self.chunk_size = conf.CHUNK_SIZE + self.chunk_size = conf.VECTOR_CHUNK_SIZE self.chunk_overlap = conf.CHUNK_OVERLAP - log.info("Vector store components initialized successfully") + log.info("Vector store components initialized successfully") except Exception as e: log.error(f"Failed to initialize vector store: {e}") self.enabled = False def _split_text(self, text: str, chunk_size: int = None, chunk_overlap: int = None) -> List[str]: - """Simple text splitter""" + """Simple text splitter with safeguards against infinite loops""" if chunk_size is None: chunk_size = self.chunk_size if chunk_overlap is None: chunk_overlap = self.chunk_overlap + + log.debug(f"Text splitting - Input length: {len(text)}, chunk_size: {chunk_size}, chunk_overlap: {chunk_overlap}") + + # Validate inputs to prevent infinite loops + if chunk_size <= 0: + chunk_size = 1000 # fallback + if chunk_overlap < 0: + chunk_overlap = 0 + if chunk_overlap >= chunk_size: + chunk_overlap = chunk_size // 2 # ensure overlap is less than chunk size + log.debug(f"Adjusted chunk_overlap to {chunk_overlap} to prevent infinite loop") if len(text) <= chunk_size: + log.debug("Text fits in single chunk") return [text] chunks = [] start = 0 + max_iterations = len(text) // max(1, chunk_size - chunk_overlap) + 10 # safety limit + iteration_count = 0 + + log.debug(f"Starting text splitting with max_iterations: {max_iterations}") - while start < len(text): + while start < len(text) and iteration_count < max_iterations: + iteration_count += 1 + if iteration_count % 100 == 0: # Log every 100 iterations + log.debug(f"Text splitting iteration {iteration_count}, start position: {start}") + end = start + chunk_size # Try to find a good break point @@ -85,10 +105,22 @@ def _split_text(self, text: str, chunk_size: int = None, chunk_overlap: int = No if break_point > start: end = break_point + 1 - chunks.append(text[start:end].strip()) - start = end - chunk_overlap + chunk = text[start:end].strip() + if chunk: # only add non-empty chunks + chunks.append(chunk) + + # Ensure we always advance by at least 1 character to prevent infinite loops + next_start = end - chunk_overlap + if next_start <= start: + next_start = start + max(1, chunk_size - chunk_overlap) + start = next_start - return chunks + # Safety check - if we hit max iterations, log a warning + if iteration_count >= max_iterations: + log.warning(f"Text splitting hit maximum iterations ({max_iterations}). Text length: {len(text)}") + + log.debug(f"Text splitting completed in {iteration_count} iterations, created {len(chunks)} chunks") + return chunks if chunks else [text] # ensure we always return at least the original text def _call_openrouter(self, prompt: str, system_prompt: str = None, temperature: float = 0.1, max_tokens: int = 500) -> str: @@ -113,20 +145,28 @@ def _call_openrouter(self, prompt: str, system_prompt: str = None, } try: + log.debug("Calling OpenRouter API...") response = requests.post( f"{self.openrouter_base_url}/chat/completions", headers=headers, json=data, - timeout=30 + timeout=60 # Increased timeout to 60 seconds ) if response.status_code == 200: result = response.json() + log.debug("OpenRouter API call successful") return result["choices"][0]["message"]["content"].strip() else: log.error(f"OpenRouter API error {response.status_code}: {response.text}") return "" + except requests.exceptions.Timeout: + log.error("OpenRouter API timeout after 60 seconds") + return "" + except requests.exceptions.RequestException as e: + log.error(f"OpenRouter API request error: {e}") + return "" except Exception as e: log.error(f"Error calling OpenRouter: {e}") return "" @@ -151,86 +191,135 @@ def embed_resource(self, logger.info(f"Starting vector embedding for resource {resource_id}") # Create resource profile from available data + logger.debug("Creating resource profile...") profile = self._create_resource_profile( resource_id, resource_metadata, dataset_metadata, stats_data, freq_data, temporal_info ) + logger.debug("Resource profile created successfully") # Generate AI description using OpenRouter - ai_description = self._generate_ai_description(profile) - profile['ai_description'] = ai_description + logger.debug("Generating AI description...") + try: + ai_description = self._generate_ai_description(profile) + profile['ai_description'] = ai_description + logger.debug("AI description generated successfully") + except Exception as e: + logger.warning(f"Failed to generate AI description: {e}") + profile['ai_description'] = f"Resource containing {profile['resource_name']} data" # Create document content + logger.debug("Creating document content...") doc_content = self._create_document_content(profile) + logger.debug(f"Document content created, length: {len(doc_content)} characters") # Create metadata for Pinecone + logger.debug("Creating metadata...") metadata = self._create_metadata(profile) + logger.debug(f"Metadata created with {len(metadata)} fields") # Split document if too long - chunks = self._split_text(doc_content) + logger.debug("Splitting text into chunks...") + try: + chunks = self._split_text(doc_content) + logger.info(f"Split document into {len(chunks)} chunks") + except Exception as e: + logger.error(f"Text splitting failed: {e}") + # Fallback to simple chunking + logger.debug("Using fallback chunking strategy") + chunk_size = getattr(self, 'chunk_size', 1000) + chunks = [doc_content[i:i+chunk_size] for i in range(0, len(doc_content), chunk_size)] + logger.info(f"Fallback chunking created {len(chunks)} chunks") # Remove existing entries for this resource + logger.debug("Checking for existing entries to delete...") try: - # Delete existing vectors for this resource - existing_ids = [f"{resource_id}_{i}" for i in range(100)] # Assume max 100 chunks - self.index.delete(ids=existing_ids, namespace=self.namespace) + # Delete existing vectors for this resource by filter + self.index.delete( + filter={"resource_id": resource_id}, + namespace=self.namespace + ) + logger.debug("Existing entries deleted successfully") except Exception as e: - logger.warning(f"Could not delete existing entries: {e}") + # Ignore namespace not found errors on first upload + if "Namespace not found" not in str(e): + logger.warning(f"Could not delete existing entries: {e}") + else: + logger.debug("Namespace not found - this is expected on first upload") # Prepare data for Pinecone upsert + logger.debug("Preparing records for Pinecone upsert...") records = [] for i, chunk in enumerate(chunks): chunk_metadata = metadata.copy() - chunk_metadata['chunk_index'] = i - chunk_metadata['total_chunks'] = len(chunks) + chunk_metadata['chunk_index'] = int(i) + chunk_metadata['total_chunks'] = int(len(chunks)) - records.append({ - "id": f"{resource_id}_{i}", - "text": chunk, - "metadata": chunk_metadata - }) + # Create record for integrated embedding + record = { + "_id": f"{resource_id}_{i}", + "text": chunk # This field name must match your index's field_map + } + # Add metadata as separate fields (Pinecone will automatically store them as metadata) + record.update(chunk_metadata) + records.append(record) + + logger.debug(f"Prepared {len(records)} records for upsert") # Upsert to Pinecone (embeddings are generated automatically) - self.index.upsert_records( - namespace=self.namespace, - records=records - ) + logger.debug("Starting Pinecone upsert...") + try: + self.index.upsert_records( + self.namespace, # namespace first + records # then records list + ) + logger.debug("Pinecone upsert completed successfully") + except Exception as e: + logger.error(f"Pinecone upsert failed: {e}") + raise logger.info(f"Successfully embedded {len(chunks)} chunks for resource {resource_id}") return True except Exception as e: logger.error(f"Error embedding resource {resource_id}: {e}") + logger.error(f"Exception type: {type(e).__name__}") + import traceback + logger.error(f"Traceback: {traceback.format_exc()}") return False def search_resources(self, query: str, n_results: int = 10, filters: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: """Search for resources using Pinecone hosted embeddings""" try: - # Build search query for Pinecone - search_query = { - "inputs": {"text": query}, - "top_k": n_results + # Build search parameters for Pinecone integrated embedding + search_params = { + "namespace": self.namespace, + "query": { + "inputs": {"text": query}, + "top_k": n_results + } } # Add metadata filters if provided if filters: - search_query["filter"] = filters + search_params["query"]["filter"] = filters # Search using Pinecone - results = self.index.search( - namespace=self.namespace, - query=search_query - ) + results = self.index.search(**search_params) # Format results formatted_results = [] if results and 'matches' in results: for match in results['matches']: + # For integrated embedding, the original text is in the 'text' field + content = match.get('text', '') + metadata = match.get('metadata', {}) + formatted_results.append({ 'id': match.get('id'), - 'content': match.get('text', ''), - 'metadata': match.get('metadata', {}), + 'content': content, + 'metadata': metadata, 'score': match.get('score', 0.0), 'distance': 1.0 - match.get('score', 0.0) }) @@ -241,28 +330,6 @@ def search_resources(self, query: str, n_results: int = 10, log.error(f"Error searching resources: {e}") return [] - def search_resources_by_year(self, query: str, year_filter: tuple, - n_results: int = 20) -> List[Dict[str, Any]]: - """Search for resources containing specific years""" - try: - # Build filter for temporal filtering using Pinecone metadata filters - filters = { - "has_temporal": True, - "temporal_min": {"$lte": year_filter[1]}, - "temporal_max": {"$gte": year_filter[0]} - } - - # Search with year filter - return self.search_resources( - query=f"{query} {year_filter[0]} {year_filter[1]} year temporal data", - n_results=n_results, - filters=filters - ) - - except Exception as e: - log.error(f"Error in year-based search: {e}") - return [] - def _create_resource_profile(self, resource_id: str, resource_metadata: Dict[str, Any], @@ -282,7 +349,6 @@ def _create_resource_profile(self, for tag in dataset_metadata.get('tags', [])], 'dataset_notes': dataset_metadata.get('notes', ''), 'resource_description': resource_metadata.get('description', ''), - 'temporal_coverage': temporal_info, 'stats_summary': stats_data, 'frequency_summary': self._summarize_frequencies(freq_data), 'profiling_timestamp': datetime.now().isoformat() @@ -308,7 +374,6 @@ def _extract_column_info(self, stats_data: Dict[str, Any]) -> Dict[str, Any]: 'non_null_count': int(stats.get('count', 0)), 'null_count': int(stats.get('nullcount', 0)), 'unique_count': int(stats.get('cardinality', 0)), - 'is_temporal': stats.get('type') == 'DateTime', 'is_numeric': stats.get('type') in ['Integer', 'Float'], } @@ -339,7 +404,6 @@ def _is_geographic_column(self, col_name: str) -> bool: def _detect_patterns_from_stats(self, stats_data: Dict[str, Any]) -> Dict[str, Any]: """Detect data patterns from stats""" patterns = { - 'has_temporal': False, 'has_geographic': False, 'has_financial': False, 'data_categories': [] @@ -353,10 +417,6 @@ def _detect_patterns_from_stats(self, stats_data: Dict[str, Any]) -> Dict[str, A if isinstance(field_info, dict): field_lower = field_name.lower() - # Check temporal - if field_info.get('stats', {}).get('type') == 'DateTime': - patterns['has_temporal'] = True - # Check geographic if self._is_geographic_column(field_name): patterns['has_geographic'] = True @@ -404,13 +464,6 @@ def _generate_ai_description(self, profile: Dict[str, Any]) -> str: Resource Description: {profile['resource_description'] or 'Not provided'} """ - # Add temporal coverage - if profile.get('temporal_coverage'): - temp = profile['temporal_coverage'] - context += f"\n\nTemporal Coverage: {temp.get('min')} to {temp.get('max')}" - if temp.get('year_count'): - context += f" ({temp['year_count']} years)" - # Add column information if profile.get('columns_info'): context += "\n\nKey Columns:" @@ -426,38 +479,37 @@ def _generate_ai_description(self, profile: Dict[str, Any]) -> str: if profile.get('data_patterns', {}).get('data_categories'): context += f"\n\nData Categories: {', '.join(profile['data_patterns']['data_categories'])}" + # Limit context length to prevent API issues + if len(context) > 3000: + context = context[:3000] + "..." + log.debug("Truncated context due to length") + prompt = f"""Based on the following resource information, generate a comprehensive description that will help users understand exactly what data this resource contains and how it can be used. {context} Generate a clear, informative description (2-3 sentences) that: 1. Clearly states what specific data this resource contains -2. Mentions the temporal coverage if available -3. Highlights key variables or metrics -4. Suggests potential use cases +2. Highlights key variables or metrics +3. Suggests potential use cases Be specific and factual.""" + log.debug("Calling OpenRouter for AI description...") description = self._call_openrouter(prompt) if description: + log.debug("AI description generated successfully") return description else: # Fallback description - fallback = f"Resource containing {profile['resource_name']} data" - if profile.get('temporal_coverage'): - temp = profile['temporal_coverage'] - fallback += f" from {temp.get('min')} to {temp.get('max')}" - return fallback + log.debug("OpenRouter returned empty response, using fallback") + return f"Resource containing {profile['resource_name']} data" except Exception as e: log.error(f"Error generating AI description: {e}") # Fallback description - fallback = f"Resource containing {profile['resource_name']} data" - if profile.get('temporal_coverage'): - temp = profile['temporal_coverage'] - fallback += f" from {temp.get('min')} to {temp.get('max')}" - return fallback + return f"Resource containing {profile['resource_name']} data" def _create_document_content(self, profile: Dict[str, Any]) -> str: """Create searchable document content""" @@ -469,13 +521,6 @@ def _create_document_content(self, profile: Dict[str, Any]) -> str: {profile['ai_description']} """ - # Add temporal coverage - if profile.get('temporal_coverage'): - temp = profile['temporal_coverage'] - doc += f"\n\n**TEMPORAL COVERAGE: {temp.get('min')} to {temp.get('max')}**" - if temp.get('year_count'): - doc += f"\nTotal Years: {temp['year_count']}" - # Add column information if profile.get('columns_info'): doc += "\n\nColumns:" @@ -498,34 +543,28 @@ def _create_document_content(self, profile: Dict[str, Any]) -> str: return doc def _create_metadata(self, profile: Dict[str, Any]) -> Dict[str, Any]: - """Create metadata for ChromaDB""" + """Create metadata for Pinecone (only simple types allowed)""" metadata = { - 'resource_id': profile['resource_id'], - 'resource_name': profile.get('resource_name', ''), - 'dataset_id': profile.get('dataset_id', ''), - 'dataset_title': profile.get('dataset_title', ''), - 'format': profile.get('format', 'unknown'), - 'has_temporal': bool(profile.get('temporal_coverage')), + 'resource_id': str(profile['resource_id']), + 'resource_name': str(profile.get('resource_name', '')), + 'dataset_id': str(profile.get('dataset_id', '')), + 'dataset_title': str(profile.get('dataset_title', '')), + 'format': str(profile.get('format', 'unknown')), 'indexed_at': datetime.now().isoformat() } - # Add temporal metadata - if profile.get('temporal_coverage'): - temp = profile['temporal_coverage'] - metadata['temporal_min'] = int(temp.get('min', 0)) - metadata['temporal_max'] = int(temp.get('max', 0)) - metadata['year_count'] = int(temp.get('year_count', 0)) - else: - metadata['temporal_min'] = 0 - metadata['temporal_max'] = 0 - metadata['year_count'] = 0 - - # Add pattern metadata + # Add pattern metadata (as simple types) if profile.get('data_patterns'): patterns = profile['data_patterns'] - metadata['has_geographic'] = patterns.get('has_geographic', False) - metadata['has_financial'] = patterns.get('has_financial', False) - metadata['data_categories'] = ','.join(patterns.get('data_categories', [])) + metadata['has_geographic'] = bool(patterns.get('has_geographic', False)) + metadata['has_financial'] = bool(patterns.get('has_financial', False)) + # Convert data categories to a single string + categories = patterns.get('data_categories', []) + metadata['data_categories'] = ','.join(categories) if categories else '' + else: + metadata['has_geographic'] = False + metadata['has_financial'] = False + metadata['data_categories'] = '' return metadata