Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions ckanext/datapusher_plus/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,39 @@
AUTO_UNZIP_ONE_FILE = tk.asbool(
tk.config.get("ckanext.datapusher_plus.auto_unzip_one_file", True)
)

ENABLE_VECTOR_STORE = tk.asbool(
tk.config.get("ckanext.datapusher_plus.enable_vector_store", True)
)

# Pinecone configuration
PINECONE_API_KEY = tk.config.get(
"ckanext.datapusher_plus.pinecone_api_key"
)
VECTOR_STORE_INDEX_NAME = tk.config.get(
"ckanext.datapusher_plus.vector_store_index_name", "datapusher-resources"
)
VECTOR_STORE_NAMESPACE = tk.config.get(
"ckanext.datapusher_plus.vector_store_namespace", "default"
)

# OpenRouter API Key
OPENROUTER_API_KEY = tk.config.get(
Copy link

Copilot AI Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A default OpenRouter API key is hard-coded in source. This poses a security risk; consider loading it exclusively from a secure environment variable.

Copilot uses AI. Check for mistakes.
"ckanext.datapusher_plus.openrouter_api_key", "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"
)
# OpenRouter Model
OPENROUTER_MODEL = tk.config.get(
"ckanext.datapusher_plus.openrouter_model", "google/gemini-2.0-flash-001"
)

# Text chunking settings for vector store
VECTOR_CHUNK_SIZE = tk.asint(
tk.config.get("ckanext.datapusher_plus.vector_chunk_size", "1000")
)
CHUNK_OVERLAP = tk.asint(
tk.config.get("ckanext.datapusher_plus.chunk_overlap", "400")
)
# Temporal coverage detection
DETECT_TEMPORAL_COVERAGE = tk.asbool(
tk.config.get("ckanext.datapusher_plus.detect_temporal_coverage", True)
)
20 changes: 19 additions & 1 deletion ckanext/datapusher_plus/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,4 +590,22 @@ def is_preformulated_field(field):
Check if a field is preformulated (has formula attribute)
This helper returns True only if the field has a 'formula' key with a non-empty value
"""
return bool(field.get('formula', False))
return bool(field.get('formula', False))


def get_resource_embedding_status(resource_id):
"""Check if a resource has been embedded in the vector store"""
try:
from ckanext.datapusher_plus.vector_store import DataPusherVectorStore
vector_store = DataPusherVectorStore()
if vector_store.enabled:
# Check if resource exists in vector store
results = vector_store.collection.query(
query_embeddings=[[0.0] * 384], # Dummy embedding
n_results=1,
where={"resource_id": resource_id}
)
return bool(results and results.get('ids') and results['ids'][0])
except Exception:
pass
return False
91 changes: 90 additions & 1 deletion ckanext/datapusher_plus/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@
from ckanext.datapusher_plus.qsv_utils import QSVCommand
from ckanext.datapusher_plus.pii_screening import screen_for_pii

try:
from ckanext.datapusher_plus.vector_store import DataPusherVectorStore
VECTOR_STORE_AVAILABLE = True
except ImportError:
VECTOR_STORE_AVAILABLE = False

if locale.getdefaultlocale()[0]:
lang, encoding = locale.getdefaultlocale()
locale.setlocale(locale.LC_ALL, locale=(lang, encoding))
Expand Down Expand Up @@ -1584,4 +1590,87 @@ def _push_to_datastore(
  Resource metadata updates: {metadata_elapsed:,.2f}
TOTAL ELAPSED TIME: {total_elapsed:,.2f}
"""
logger.info(end_msg)

# ============================================================
# VECTOR STORE EMBEDDING
# ============================================================

if conf.ENABLE_VECTOR_STORE and VECTOR_STORE_AVAILABLE:
vector_start = time.perf_counter()
logger.info("STARTING VECTOR STORE EMBEDDING...")

try:
# Initialize vector store
vector_store = DataPusherVectorStore()

if vector_store.enabled:
# Prepare temporal information if available
temporal_info = None
if dataset_stats.get('IS_SORTED') and datetimecols_list:
# Extract temporal coverage from the data
temporal_years = set()

# If we have year columns, extract years
for col in datetimecols_list:
if 'year' in col.lower():
# Get year range from min/max
col_idx = headers.index(col)
if col_idx < len(headers_min) and col_idx < len(headers_max):
try:
min_year = int(float(headers_min[col_idx]))
max_year = int(float(headers_max[col_idx]))
temporal_years.update(range(min_year, max_year + 1))
except (ValueError, TypeError):
pass

# Also check for date columns
if not temporal_years and datetimecols_list:
# Sample approach - get min/max dates
for idx, col in enumerate(headers):
if col in datetimecols_list:
try:
# Parse dates from min/max values
min_date = headers_min[idx]
max_date = headers_max[idx]
if min_date and max_date:
# Extract years from dates
min_year = parsedate(str(min_date)).year
max_year = parsedate(str(max_date)).year
temporal_years.update(range(min_year, max_year + 1))
except Exception as e:
logger.warning(f"Could not parse dates from column {col}: {e}")

if temporal_years:
temporal_info = {
'min': min(temporal_years),
'max': max(temporal_years),
'all_years': sorted(list(temporal_years)),
'year_count': len(temporal_years),
'temporal_columns': {col: True for col in datetimecols_list}
}
logger.info(f"Detected temporal coverage: {temporal_info['min']} to {temporal_info['max']}")

# Embed the resource
success = vector_store.embed_resource(
resource_id=resource_id,
resource_metadata=resource,
dataset_metadata=package,
stats_data=resource_fields_stats,
freq_data=resource_fields_freqs,
temporal_info=temporal_info,
logger=logger
)

if success:
logger.info("Vector store embedding completed successfully")
else:
logger.warning("Vector store embedding failed")

except Exception as e:
logger.error(f"Error during vector store embedding: {e}")
# Don't fail the job if embedding fails

vector_elapsed = time.perf_counter() - vector_start
logger.info(f"VECTOR STORE EMBEDDING completed in {vector_elapsed:,.2f} seconds")

logger.info(end_msg)
2 changes: 1 addition & 1 deletion ckanext/datapusher_plus/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,4 @@ def resource_save_redirect(
'{}_resource.new'.format(package_type), id=package_name,
)
# edit dataset page after resource
return h.url_for(u'{}.edit'.format(package_type), id=package_name)
return h.url_for(u'{}.edit'.format(package_type), id=package_name)
Loading