From c6eebd7abfe767d04ff2f7795003f8c3e26407d1 Mon Sep 17 00:00:00 2001 From: Minhajuddin Mohammed Date: Tue, 14 Oct 2025 04:01:32 -0400 Subject: [PATCH 1/6] Added dpp_spatial_extent in the package metadata --- ckanext/datapusher_plus/config.py | 5 + ckanext/datapusher_plus/jinja2_helpers.py | 129 ++++++++++++++++++++++ ckanext/datapusher_plus/jobs.py | 57 ++++++++++ 3 files changed, 191 insertions(+) diff --git a/ckanext/datapusher_plus/config.py b/ckanext/datapusher_plus/config.py index 5e5a546..e4b3a4f 100644 --- a/ckanext/datapusher_plus/config.py +++ b/ckanext/datapusher_plus/config.py @@ -146,6 +146,11 @@ "ckanext.datapusher_plus.SPATIAL_SIMPLIFICATION_RELATIVE_TOLERANCE", "0.1" ) +# CSV spatial extent detection settings +AUTO_CSV_SPATIAL_EXTENT = tk.asbool( + tk.config.get("ckanext.datapusher_plus.auto_csv_spatial_extent", True) +) + # Latitude and longitude column names # multiple fields can be specified, separated by commas # matching columns will be from left to right and the jinja2 diff --git a/ckanext/datapusher_plus/jinja2_helpers.py b/ckanext/datapusher_plus/jinja2_helpers.py index e46b46e..d77f1d7 100644 --- a/ckanext/datapusher_plus/jinja2_helpers.py +++ b/ckanext/datapusher_plus/jinja2_helpers.py @@ -887,3 +887,132 @@ def get_column_stats(context, column_name, stat_name=None): return field_stats.get(stat_name, 0) else: return field_stats + + +@jinja2_global +@pass_context +def get_dataset_spatial_extent(context: dict) -> dict: + """Get the overall spatial extent for the dataset (aggregated from all spatial resources). + + Args: + context: The context of the template + (automatically passed by Jinja2 using @pass_context decorator) + + Returns: + dict: Spatial extent data with bounding box coordinates, or empty dict if none found + + Example: + {{ get_dataset_spatial_extent() }} + Returns: { + "type": "BoundingBox", + "coordinates": [[min_lon, min_lat], [max_lon, max_lat]], + "source": "aggregated_from_resources", + "resource_count": 2 + } + """ + package = context.get("package", {}) + dpp_suggestions = package.get("dpp_suggestions", {}) + return dpp_suggestions.get("dpp_spatial_extent", {}) + + +@jinja2_global +@pass_context +def get_spatial_resources(context: dict) -> dict: + """Get all resources in the dataset that have spatial data. + + Args: + context: The context of the template + (automatically passed by Jinja2 using @pass_context decorator) + + Returns: + dict: Dictionary mapping resource IDs to their spatial extent data + + Example: + {{ get_spatial_resources() }} + Returns: { + "resource-id-1": { + "type": "BoundingBox", + "coordinates": [[lon_min, lat_min], [lon_max, lat_max]], + "lat_field": "latitude", + "lon_field": "longitude", + "resource_name": "Data File 1" + }, + "resource-id-2": { ... } + } + """ + package = context.get("package", {}) + dpp_suggestions = package.get("dpp_suggestions", {}) + return dpp_suggestions.get("spatial_resources", {}) + + +@jinja2_global +@pass_context +def has_spatial_data(context: dict, resource_id: str = None) -> bool: + """Check if the dataset or a specific resource has spatial data. + + Args: + context: The context of the template + (automatically passed by Jinja2 using @pass_context decorator) + resource_id: Optional resource ID to check. If None, checks if dataset has any spatial data. + + Returns: + bool: True if spatial data exists, False otherwise + + Examples: + Check if dataset has any spatial data: + {{ has_spatial_data() }} + + Check if specific resource has spatial data: + {{ has_spatial_data("resource-id-123") }} + """ + package = context.get("package", {}) + dpp_suggestions = package.get("dpp_suggestions", {}) + spatial_resources = dpp_suggestions.get("spatial_resources", {}) + + if resource_id: + return resource_id in spatial_resources + else: + return len(spatial_resources) > 0 + + +@jinja2_global +@pass_context +def get_spatial_bounds_wkt(context: dict, resource_id: str = None) -> str: + """Get spatial bounds as WKT (Well-Known Text) polygon format. + + Args: + context: The context of the template + (automatically passed by Jinja2 using @pass_context decorator) + resource_id: Optional resource ID. If None, returns dataset-level bounds. + + Returns: + str: WKT polygon representation of the spatial bounds + + Examples: + Dataset bounds: + {{ get_spatial_bounds_wkt() }} + + Resource bounds: + {{ get_spatial_bounds_wkt("resource-id-123") }} + + Returns: "POLYGON((lon_min lat_min, lon_min lat_max, lon_max lat_max, lon_max lat_min, lon_min lat_min))" + """ + if resource_id: + spatial_resources = get_spatial_resources(context) + extent_data = spatial_resources.get(resource_id, {}) + else: + extent_data = get_dataset_spatial_extent(context) + + if not extent_data or "coordinates" not in extent_data: + return "" + + coords = extent_data["coordinates"] + if len(coords) != 2: + return "" + + lon_min, lat_min = coords[0] + lon_max, lat_max = coords[1] + + # Create WKT polygon (counterclockwise order) + wkt = f"POLYGON(({lon_min} {lat_min}, {lon_min} {lat_max}, {lon_max} {lat_max}, {lon_max} {lat_min}, {lon_min} {lat_min}))" + return wkt diff --git a/ckanext/datapusher_plus/jobs.py b/ckanext/datapusher_plus/jobs.py index a3e6373..7f947d6 100644 --- a/ckanext/datapusher_plus/jobs.py +++ b/ckanext/datapusher_plus/jobs.py @@ -1281,6 +1281,63 @@ def _push_to_datastore( logger, ) + # ============================================================ + # SPATIAL EXTENT DETECTION FOR CSV WITH LAT/LONG COLUMNS + # ============================================================ + # Check if this is a CSV file (not already processed as spatial format) + # and detect latitude/longitude columns to calculate spatial extent + # Use the existing lat/lon detection logic from FormulaProcessor + if (conf.AUTO_CSV_SPATIAL_EXTENT and not spatial_format_flag and + resource_format.upper() in ["CSV", "TSV", "TAB"]): + logger.info("Checking for latitude/longitude columns in CSV using existing detection logic...") + + # Use the detected lat/lon fields from the already initialized FormulaProcessor + lat_column = formula_processor.dpp.get("LAT_FIELD") + lon_column = formula_processor.dpp.get("LON_FIELD") + + # If we found both lat and lon columns, calculate spatial extent + if lat_column and lon_column and not formula_processor.dpp.get("NO_LAT_LON_FIELDS"): + logger.info(f"Found latitude/longitude columns: {lat_column}, {lon_column}") + + try: + # Get min/max values from the stats we already calculated + lat_stats = resource_fields_stats.get(lat_column, {}).get("stats", {}) + lon_stats = resource_fields_stats.get(lon_column, {}).get("stats", {}) + + if lat_stats and lon_stats: + lat_min = float(lat_stats.get("min", 0)) + lat_max = float(lat_stats.get("max", 0)) + lon_min = float(lon_stats.get("min", 0)) + lon_max = float(lon_stats.get("max", 0)) + + # The FormulaProcessor already validated coordinate bounds, + # so we can trust these values are within valid ranges + # Add spatial extent to package dpp_suggestions (following DRUF pattern) + spatial_extent_data = { + "type": "BoundingBox", + "coordinates": [ + [lon_min, lat_min], + [lon_max, lat_max], + ], + } + + # Add to package dpp_suggestions like other computed metadata + package.setdefault("dpp_suggestions", {})["dpp_spatial_extent"] = spatial_extent_data + + logger.info( + f"Added dpp_spatial_extent to package dpp_suggestions from CSV lat/lon columns: " + f"lat({lat_min}, {lat_max}), lon({lon_min}, {lon_max})" + ) + logger.info(f"Spatial extent: {spatial_extent_data}") + else: + logger.warning("Could not retrieve min/max statistics for lat/lon columns") + + except (ValueError, TypeError, KeyError) as e: + logger.warning(f"Error calculating spatial extent from lat/lon columns: {e}") + else: + if formula_processor.dpp.get("NO_LAT_LON_FIELDS"): + logger.info("No suitable latitude/longitude column pairs found in CSV") + package.setdefault("dpp_suggestions", {})[ "STATUS" ] = "STARTING FORMULAE PROCESSING..." From 549ece34a32e9749eba69820646e2e993695e146 Mon Sep 17 00:00:00 2001 From: Minhajuddin Mohammed Date: Tue, 28 Oct 2025 14:06:07 -0400 Subject: [PATCH 2/6] correctly parses geojson and the shape files to get wkt and populate the dpp_suggestions --- ckanext/datapusher_plus/config.py | 9 +- ckanext/datapusher_plus/helpers.py | 42 ++++ ckanext/datapusher_plus/jobs.py | 262 ++++++++++----------- ckanext/datapusher_plus/spatial_helpers.py | 207 ++++++++++++++++ 4 files changed, 384 insertions(+), 136 deletions(-) diff --git a/ckanext/datapusher_plus/config.py b/ckanext/datapusher_plus/config.py index e4b3a4f..a8077bc 100644 --- a/ckanext/datapusher_plus/config.py +++ b/ckanext/datapusher_plus/config.py @@ -32,7 +32,7 @@ # Supported formats FORMATS = tk.config.get( "ckanext.datapusher_plus.formats", - ["csv", "tsv", "tab", "ssv", "xls", "xlsx", "ods", "geojson", "shp", "qgis", "zip"], + ["csv", "tsv", "tab", "ssv", "xls", "xlsx", "ods", "geojson", "shp", "qgis", "zip", "dbf"], ) if isinstance(FORMATS, str): FORMATS = FORMATS.split() @@ -176,3 +176,10 @@ AUTO_UNZIP_ONE_FILE = tk.asbool( tk.config.get("ckanext.datapusher_plus.auto_unzip_one_file", True) ) + +# if a zip archive contains multiple files (and does not contain a shapefile), +# automatically create a manifest CSV with metadata about the zip contents +# and pump that into the datastore. If set to False, the zip will be skipped. +AUTO_CREATE_ZIP_MANIFEST = tk.asbool( + tk.config.get("ckanext.datapusher_plus.auto_create_zip_manifest", True) +) diff --git a/ckanext/datapusher_plus/helpers.py b/ckanext/datapusher_plus/helpers.py index 62b1b05..93b25a8 100644 --- a/ckanext/datapusher_plus/helpers.py +++ b/ckanext/datapusher_plus/helpers.py @@ -409,6 +409,8 @@ def extract_zip_or_metadata( Extract metadata from ZIP archive and save to CSV file. If the ZIP file contains only one item of a supported format and AUTO_UNZIP_ONE_FILE is True, extract it directly. + If the ZIP file contains shapefile components (.shp, .dbf, .shx, etc.), + extract the .dbf file for use as the data source. Args: zip_path: Path to the ZIP file @@ -437,6 +439,39 @@ def extract_zip_or_metadata( file_list = [info for info in zip_file.infolist() if not info.is_dir()] file_count = len(file_list) + # Check if this ZIP contains shapefile components + shp_files = [f for f in file_list if f.filename.lower().endswith('.shp')] + dbf_files = [f for f in file_list if f.filename.lower().endswith('.dbf')] + + # If we have shapefile components, look for the .dbf file + if shp_files and dbf_files: + # For each .shp file, try to find matching .dbf file + for shp_file in shp_files: + base_name = os.path.splitext(shp_file.filename)[0] + # Look for matching .dbf file (case-insensitive) + matching_dbf = None + for dbf_file in dbf_files: + dbf_base = os.path.splitext(dbf_file.filename)[0] + if dbf_base.lower() == base_name.lower(): + matching_dbf = dbf_file + break + + if matching_dbf: + logger.info( + f"ZIP contains shapefile components. Extracting .dbf file: {matching_dbf.filename}" + ) + # Extract ONLY the .dbf file (not the whole shapefile) + result_path = os.path.join(output_dir, "shapefile_data.dbf") + with zip_file.open(matching_dbf.filename) as source, open( + result_path, "wb" + ) as target: + target.write(source.read()) + logger.info( + f"Successfully extracted shapefile .dbf to '{result_path}'" + ) + # Return DBF format so it will be processed as a DBF file + return file_count, result_path, "DBF" + if file_count == 1 and conf.AUTO_UNZIP_ONE_FILE: file_info = file_list[0] file_name = file_info.filename @@ -461,6 +496,13 @@ def extract_zip_or_metadata( f"ZIP contains a single file that is not supported: {file_name}" ) + # Check if we should create a manifest + if not conf.AUTO_CREATE_ZIP_MANIFEST: + logger.info( + f"ZIP file contains {file_count} file/s, but AUTO_CREATE_ZIP_MANIFEST is disabled. Skipping..." + ) + return 0, "", "" + # Otherwise, write metadata CSV logger.info( f"ZIP file contains {file_count} file/s. Saving ZIP metadata..." diff --git a/ckanext/datapusher_plus/jobs.py b/ckanext/datapusher_plus/jobs.py index 7f947d6..f732b08 100644 --- a/ckanext/datapusher_plus/jobs.py +++ b/ckanext/datapusher_plus/jobs.py @@ -283,6 +283,18 @@ def _push_to_datastore( else: logger.info(f"File format: {resource_format}") + # Check if this is actually a GeoJSON file even if format is JSON + # GeoJSON files often get detected as JSON by MIME type + resource_name = resource.get("name", "").lower() + resource_url_lower = resource_url.lower() + if (resource_format == "JSON" and + (resource_name.endswith('.geojson') or + resource_url_lower.endswith('.geojson') or + 'geojson' in resource_name or + 'geojson' in resource_url_lower)): + logger.info("Detected GeoJSON file (was identified as JSON). Changing format to GEOJSON...") + resource_format = "GEOJSON" + tmp = os.path.join(temp_dir, "tmp." + resource_format) length = 0 # using MD5 for file deduplication only @@ -362,7 +374,10 @@ def _push_to_datastore( tmp, temp_dir, logger ) if not file_count: - logger.error("ZIP file invalid or no files found in ZIP file.") + logger.warning("ZIP file invalid, no files found, or ZIP manifest creation is disabled.") + return + if not extracted_path: + logger.warning("ZIP processing skipped (AUTO_CREATE_ZIP_MANIFEST is disabled).") return logger.info( f"More than one file in the ZIP file ({file_count} files), saving metadata..." @@ -385,7 +400,55 @@ def _push_to_datastore( # flag to check if the file is a spatial format spatial_format_flag = False - simplification_failed_flag = False + spatial_bounds = None # Store spatial bounds from GeoJSON/Shapefile + + # ----------------- is it a DBF file? --------------- + # DBF files need special handling since qsv excel doesn't support them + # We use Fiona which can read DBF files (they're part of shapefiles) + if resource.get("format").upper() == "DBF" or unzipped_format == "DBF": + logger.info("Converting DBF file to CSV using Fiona...") + try: + import fiona + + qsv_dbf_csv = os.path.join(temp_dir, "qsv_dbf.csv") + + # Read DBF file using Fiona + with fiona.open(tmp, 'r') as dbf: + # Get field names from schema + fieldnames = list(dbf.schema['properties'].keys()) + + # Write to CSV + with open(qsv_dbf_csv, 'w', newline='', encoding='utf-8') as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + + record_count = 0 + # Write all records + for feature in dbf: + # Get properties (attributes) from the feature + row = feature['properties'] + + # Convert None values to empty strings and ensure all values are strings + clean_row = {} + for field in fieldnames: + value = row.get(field) + if value is None: + clean_row[field] = '' + else: + clean_row[field] = str(value) if not isinstance(value, str) else value + + writer.writerow(clean_row) + record_count += 1 + + logger.info(f"Converted DBF to CSV successfully: {record_count} records") + + tmp = qsv_dbf_csv + logger.info("DBF file converted...") + except ImportError: + raise utils.JobError("Fiona library not installed. Cannot process DBF files. Install with: pip install fiona") + except Exception as e: + raise utils.JobError(f"Failed to convert DBF file: {e}") + # ----------------- is it a spreadsheet? --------------- # check content type or file extension if its a spreadsheet spreadsheet_extensions = ["XLS", "XLSX", "ODS", "XLSM", "XLSB"] @@ -431,115 +494,46 @@ def _push_to_datastore( os.link(tmp, qsv_spatial_file) qsv_spatial_csv = os.path.join(temp_dir, "qsv_spatial.csv") - if conf.AUTO_SPATIAL_SIMPLIFICATION: - # Try to convert spatial file to CSV using spatial_helpers - logger.info( - f"Converting spatial file to CSV with a simplification relative tolerance of {conf.SPATIAL_SIMPLIFICATION_RELATIVE_TOLERANCE}..." - ) - - try: - # Use the convert_to_csv function from spatial_helpers - success, error_message, bounds = sh.process_spatial_file( - qsv_spatial_file, - resource_format, - output_csv_path=qsv_spatial_csv, - tolerance=conf.SPATIAL_SIMPLIFICATION_RELATIVE_TOLERANCE, - task_logger=logger, - ) - - if success: - logger.info( - "Spatial file successfully simplified and converted to CSV" - ) - tmp = qsv_spatial_csv - - # Check if the simplified resource already exists - simplified_resource_name = ( - os.path.splitext(resource["name"])[0] - + "_simplified" - + os.path.splitext(resource["name"])[1] - ) - existing_resource, existing_resource_id = dsu.resource_exists( - resource["package_id"], simplified_resource_name - ) - - if existing_resource: - logger.info( - "Simplified resource already exists. Replacing it..." - ) - dsu.delete_resource(existing_resource_id) - else: - logger.info( - "Simplified resource does not exist. Uploading it..." - ) - new_simplified_resource = { - "package_id": resource["package_id"], - "name": os.path.splitext(resource["name"])[0] - + "_simplified" - + os.path.splitext(resource["name"])[1], - "url": "", - "format": resource["format"], - "hash": "", - "mimetype": resource["mimetype"], - "mimetype_inner": resource["mimetype_inner"], - } - - # Add bounds information if available - if bounds: - minx, miny, maxx, maxy = bounds - new_simplified_resource.update( - { - "dpp_spatial_extent": { - "type": "BoundingBox", - "coordinates": [ - [minx, miny], - [maxx, maxy], - ], - } - } - ) - logger.info( - f"Added dpp_spatial_extent to resource metadata: {bounds}" - ) - - dsu.upload_resource(new_simplified_resource, qsv_spatial_file) - - # delete the simplified spatial file - os.remove(qsv_spatial_file) - - simplification_failed_flag = False - else: - logger.warning( - f"Upload of simplified spatial file failed: {error_message}" - ) - simplification_failed_flag = True - except Exception as e: - logger.warning(f"Simplification and conversion failed: {str(e)}") - logger.warning( - f"Simplification and conversion failed. Using qsv geoconvert to convert to CSV, truncating large columns to {conf.QSV_STATS_STRING_MAX_LENGTH} characters..." - ) - simplification_failed_flag = True - pass - - # If we are not auto-simplifying or simplification failed, use qsv geoconvert - if not conf.AUTO_SPATIAL_SIMPLIFICATION or simplification_failed_flag: - logger.info("Converting spatial file to CSV using qsv geoconvert...") - - # Run qsv geoconvert - qsv_geoconvert_csv = os.path.join(temp_dir, "qsv_geoconvert.csv") - try: - qsv.geoconvert( - tmp, - resource_format, - "csv", - max_length=conf.QSV_STATS_STRING_MAX_LENGTH, - output_file=qsv_geoconvert_csv, - ) - except utils.JobError as e: - raise utils.JobError(f"qsv geoconvert failed: {e}") + # Convert spatial file to CSV WITHOUT geometry column for datastore + # (WKT can be too large for PostgreSQL tsvector) + # Spatial extent metadata will be preserved in resource metadata + logger.info("Converting spatial file to CSV (attributes only, excluding geometry)...") - tmp = qsv_geoconvert_csv - logger.info("Geoconverted successfully") + try: + success, error_message, bounds = sh.convert_spatial_to_csv( + qsv_spatial_file, + resource_format, + qsv_spatial_csv, + max_wkt_length=None, + task_logger=logger, + exclude_geometry=True, # Exclude geometry to avoid PostgreSQL size limits + ) + + if not success: + raise utils.JobError(f"Spatial to CSV conversion failed: {error_message}") + + tmp = qsv_spatial_csv + logger.info("Converted to CSV successfully (attributes only)") + spatial_format_flag = True + + # Store bounds in resource metadata if available + if bounds: + spatial_bounds = bounds + minx, miny, maxx, maxy = spatial_bounds + resource.update({ + "dpp_spatial_extent": { + "type": "BoundingBox", + "coordinates": [ + [minx, miny], + [maxx, maxy], + ], + } + }) + logger.info(f"Added dpp_spatial_extent to resource metadata: {bounds}") + logger.info("Note: Geometry column excluded from datastore (too large for PostgreSQL). Spatial extent preserved in metadata.") + + except Exception as e: + raise utils.JobError(f"Spatial to CSV conversion failed: {e}") else: # --- its not a spreadsheet nor a spatial format, its a CSV/TSV/TAB file ------ @@ -769,34 +763,32 @@ def _push_to_datastore( qsv_stats_csv = os.path.join(temp_dir, "qsv_stats.csv") try: - # If the file is a spatial format, we need to use --max-length + # If the file is a spatial format, we need to set the max string length # to truncate overly long strings from causing issues with # Python's CSV reader and Postgres's limits with the COPY command if spatial_format_flag: - env = os.environ.copy() - env["QSV_STATS_STRING_MAX_LENGTH"] = str(conf.QSV_STATS_STRING_MAX_LENGTH) - qsv_stats = qsv.stats( - tmp, - infer_dates=True, - dates_whitelist=conf.QSV_DATES_WHITELIST, - stats_jsonl=True, - prefer_dmy=conf.PREFER_DMY, - cardinality=bool(conf.AUTO_INDEX_THRESHOLD), - summary_stats_options=conf.SUMMARY_STATS_OPTIONS, - output_file=qsv_stats_csv, - env=env, - ) - else: - qsv_stats = qsv.stats( - tmp, - infer_dates=True, - dates_whitelist=conf.QSV_DATES_WHITELIST, - stats_jsonl=True, - prefer_dmy=conf.PREFER_DMY, - cardinality=bool(conf.AUTO_INDEX_THRESHOLD), - summary_stats_options=conf.SUMMARY_STATS_OPTIONS, - output_file=qsv_stats_csv, - ) + # Set environment variable for qsv stats + original_max_length = os.environ.get("QSV_STATS_STRING_MAX_LENGTH") + os.environ["QSV_STATS_STRING_MAX_LENGTH"] = str(conf.QSV_STATS_STRING_MAX_LENGTH) + + qsv_stats = qsv.stats( + tmp, + infer_dates=True, + dates_whitelist=conf.QSV_DATES_WHITELIST, + stats_jsonl=True, + prefer_dmy=conf.PREFER_DMY, + cardinality=bool(conf.AUTO_INDEX_THRESHOLD), + summary_stats_options=conf.SUMMARY_STATS_OPTIONS, + output_file=qsv_stats_csv, + ) + + # Restore original environment variable if it was set for spatial format + if spatial_format_flag: + if original_max_length is not None: + os.environ["QSV_STATS_STRING_MAX_LENGTH"] = original_max_length + else: + os.environ.pop("QSV_STATS_STRING_MAX_LENGTH", None) + except utils.JobError as e: raise utils.JobError(f"Cannot infer data types and compile statistics: {e}") diff --git a/ckanext/datapusher_plus/spatial_helpers.py b/ckanext/datapusher_plus/spatial_helpers.py index 5344313..4041a13 100644 --- a/ckanext/datapusher_plus/spatial_helpers.py +++ b/ckanext/datapusher_plus/spatial_helpers.py @@ -23,6 +23,213 @@ logger = logging.getLogger(__name__) +def extract_spatial_bounds( + input_path: Union[str, Path], + resource_format: str, + task_logger: Optional[logging.Logger] = None, +) -> Optional[Tuple[float, float, float, float]]: + """ + Extract bounding box from a spatial file without full processing. + + This is a lightweight function that only reads the spatial file metadata + to extract the bounding box coordinates, without performing simplification + or conversion to CSV. + + Args: + input_path (Union[str, Path]): Path to the input spatial file. Can be a zipped Shapefile + or a GeoJSON file. + resource_format (str): The format of the spatial file (e.g., "SHP", "QGIS", "GEOJSON"). + task_logger (Optional[logging.Logger], optional): Logger to use for logging progress and + errors. If not provided, a module-level logger is used. + + Returns: + Optional[Tuple[float, float, float, float]]: Bounding box coordinates (minx, miny, maxx, maxy) + or None if extraction fails. + """ + log = task_logger if task_logger is not None else logger + zip_temp_dir = None + + try: + input_path = Path(input_path) + if not input_path.exists(): + log.warning(f"Input file does not exist: {input_path}") + return None + + # Handle zipped Shapefiles + if resource_format.upper() == "SHP" or resource_format.upper() == "QGIS": + zip_temp_dir = input_path.parent / f"temp_bounds_{uuid.uuid4()}" + zip_temp_dir.mkdir(exist_ok=True) + + with zipfile.ZipFile(input_path, "r") as zip_ref: + zip_ref.extractall(zip_temp_dir) + + shp_files = [ + f for f in os.listdir(zip_temp_dir) if f.lower().endswith(".shp") + ] + + if shp_files: + input_path = zip_temp_dir / shp_files[0] + log.debug(f"Extracting bounds from shapefile: {input_path}") + else: + log.warning("No .shp file found in the zipped Shapefile") + return None + + # Extract bounds using Fiona + with fiona.open(input_path) as src: + bounds = src.bounds + log.info(f"Extracted spatial bounds: {bounds}") + return bounds + + except Exception as e: + log.warning(f"Failed to extract spatial bounds: {str(e)}") + return None + finally: + if zip_temp_dir and zip_temp_dir.exists(): + shutil.rmtree(zip_temp_dir) + + +def convert_spatial_to_csv( + input_path: Union[str, Path], + resource_format: str, + output_csv_path: Union[str, Path], + max_wkt_length: Optional[int] = None, + task_logger: Optional[logging.Logger] = None, + exclude_geometry: bool = False, +) -> Tuple[bool, Optional[str], Optional[Tuple[float, float, float, float]]]: + """ + Convert a spatial file to CSV format with WKT geometry column (no simplification). + + This function provides a reliable alternative to qsv geoconvert, using Fiona and + Shapely to read spatial data and convert geometries to WKT format. + + Args: + input_path (Union[str, Path]): Path to the input spatial file. Can be a zipped Shapefile + or a GeoJSON file. + resource_format (str): The format of the spatial file (e.g., "SHP", "QGIS", "GEOJSON"). + output_csv_path (Union[str, Path]): Path to the output CSV file. + max_wkt_length (Optional[int], optional): Maximum length for WKT strings. If provided, + WKT strings longer than this will be truncated with ellipsis. + task_logger (Optional[logging.Logger], optional): Logger to use for logging progress and + errors. If not provided, a module-level logger is used. + exclude_geometry (bool, optional): If True, exclude the geometry column from output. + Useful when geometry is too large for datastore. Default is False. + + Returns: + Tuple[bool, Optional[str], Optional[Tuple[float, float, float, float]]] + - success (bool): True if the conversion was successful, False otherwise. + - error_message (Optional[str]): Error message if failed, or None if successful. + - bounds (Optional[Tuple[float, float, float, float]]): Bounding box coordinates + (minx, miny, maxx, maxy) or None if failed + + Notes: + - The output CSV will contain all attribute columns plus a "geometry" column with WKT + (unless exclude_geometry is True). + - No simplification is performed - geometries are converted as-is. + - If max_wkt_length is specified, long WKT strings will be truncated. + """ + log = task_logger if task_logger is not None else logger + zip_temp_dir = None + + try: + input_path = Path(input_path) + if not input_path.exists(): + return False, f"Input file does not exist: {input_path}", None + + output_csv_path = Path(output_csv_path) + + log.info(f"Converting spatial file to CSV: {input_path}") + + # Handle zipped Shapefiles + if resource_format.upper() == "SHP" or resource_format.upper() == "QGIS": + zip_temp_dir = input_path.parent / f"temp_convert_{uuid.uuid4()}" + zip_temp_dir.mkdir(exist_ok=True) + + with zipfile.ZipFile(input_path, "r") as zip_ref: + zip_ref.extractall(zip_temp_dir) + + shp_files = [ + f for f in os.listdir(zip_temp_dir) if f.lower().endswith(".shp") + ] + + if shp_files: + input_path = zip_temp_dir / shp_files[0] + log.debug(f"Using shapefile: {input_path}") + else: + return False, "No .shp file found in the zipped Shapefile", None + + # Read spatial features using Fiona + log.debug(f"Reading features from {input_path}") + with fiona.open(input_path) as src: + features = list(src) + bounds = src.bounds + log.info(f"Source CRS: {src.crs}") + log.info(f"Source bounds: {bounds}") + log.info(f"Found {len(features)} features") + + if not features: + return False, "No features found in the input file", None + + # Process features + valid_attributes = [] + wkt_geoms = [] if not exclude_geometry else None + error_count = 0 + + for i, feat in enumerate(features): + try: + # Get attributes + valid_attributes.append(feat["properties"]) + + # Convert to WKT only if not excluding geometry + if not exclude_geometry: + # Convert GeoJSON geometry to Shapely geometry + original_geom = shape(feat["geometry"]) + + # Convert to WKT + wkt = dumps(original_geom) + + # Truncate if needed + if max_wkt_length and len(wkt) > max_wkt_length: + wkt = wkt[:max_wkt_length - 3] + "..." + log.debug(f"Truncated WKT for feature {i} to {max_wkt_length} characters") + + wkt_geoms.append(wkt) + + except Exception as e: + error_count += 1 + log.warning(f"Error processing feature {i}: {str(e)}") + continue + + if error_count > 0: + log.warning( + f"Failed to process {error_count} out of {len(features)} features" + ) + + if not valid_attributes: + return False, "No features could be processed", None + + # Create DataFrame + df = pd.DataFrame(valid_attributes) + + # Add geometry column only if not excluding + if not exclude_geometry and wkt_geoms: + df["geometry"] = wkt_geoms + log.info(f"Successfully converted {len(wkt_geoms)} features to CSV with WKT geometry") + else: + log.info(f"Successfully converted {len(valid_attributes)} features to CSV (geometry excluded)") + + # Write to CSV + df.to_csv(output_csv_path, index=False) + + return True, None, bounds + + except Exception as e: + return False, f"Error converting spatial file to CSV: {str(e)}", None + + finally: + if zip_temp_dir and zip_temp_dir.exists(): + shutil.rmtree(zip_temp_dir) + + def simplify_polygon( geom: Union[Polygon, MultiPolygon], relative_tolerance: float, From dd21f01b2fccb80e1bbbdbdc21b92397ca43a87a Mon Sep 17 00:00:00 2001 From: Minhajuddin Mohammed Date: Fri, 31 Oct 2025 16:05:40 -0400 Subject: [PATCH 3/6] updated dpp spatial extent --- ckanext/datapusher_plus/helpers.py | 46 ++++++++++++++++++++++++------ ckanext/datapusher_plus/jobs.py | 42 +++++++++++++++++++++++++-- ckanext/datapusher_plus/plugin.py | 4 +++ 3 files changed, 81 insertions(+), 11 deletions(-) diff --git a/ckanext/datapusher_plus/helpers.py b/ckanext/datapusher_plus/helpers.py index 93b25a8..293d3ac 100644 --- a/ckanext/datapusher_plus/helpers.py +++ b/ckanext/datapusher_plus/helpers.py @@ -420,10 +420,11 @@ def extract_zip_or_metadata( (if not provided, module logger will be used) Returns: - tuple: (int, str, str) - (file_count, result_path, unzipped_format) + tuple: (int, str, str, tuple) - (file_count, result_path, unzipped_format, spatial_bounds) - file_count: Number of files in the ZIP - result_path: Path to the extracted file or metadata CSV - unzipped_format: Format of the extracted file (e.g., "csv", "json", etc.) + - spatial_bounds: Tuple of (minx, miny, maxx, maxy) if shapefile, else None """ import os @@ -469,8 +470,37 @@ def extract_zip_or_metadata( logger.info( f"Successfully extracted shapefile .dbf to '{result_path}'" ) - # Return DBF format so it will be processed as a DBF file - return file_count, result_path, "DBF" + + # Also extract all shapefile components to read spatial bounds + spatial_bounds = None + try: + # Extract all shapefile components for the matching shapefile + shp_base = base_name + shp_dir = os.path.join(output_dir, "shapefile_temp") + os.makedirs(shp_dir, exist_ok=True) + + # Extract all files that match this shapefile base name + for file_info in file_list: + file_base = os.path.splitext(file_info.filename)[0] + if file_base.lower() == shp_base.lower(): + extract_path = os.path.join(shp_dir, os.path.basename(file_info.filename)) + with zip_file.open(file_info.filename) as source, open( + extract_path, "wb" + ) as target: + target.write(source.read()) + + # Read bounds from the extracted shapefile + import fiona + shp_path = os.path.join(shp_dir, os.path.basename(shp_file.filename)) + with fiona.open(shp_path, 'r') as src: + bounds = src.bounds + spatial_bounds = bounds # (minx, miny, maxx, maxy) + logger.info(f"Extracted spatial bounds from shapefile: {bounds}") + except Exception as e: + logger.warning(f"Could not extract spatial bounds from shapefile: {e}") + + # Return DBF format so it will be processed as a DBF file, with spatial bounds + return file_count, result_path, "DBF", spatial_bounds if file_count == 1 and conf.AUTO_UNZIP_ONE_FILE: file_info = file_list[0] @@ -490,7 +520,7 @@ def extract_zip_or_metadata( logger.debug( f"Successfully extracted '{file_name}' to '{result_path}'" ) - return file_count, result_path, file_ext + return file_count, result_path, file_ext, None else: logger.warning( f"ZIP contains a single file that is not supported: {file_name}" @@ -501,7 +531,7 @@ def extract_zip_or_metadata( logger.info( f"ZIP file contains {file_count} file/s, but AUTO_CREATE_ZIP_MANIFEST is disabled. Skipping..." ) - return 0, "", "" + return 0, "", "", None # Otherwise, write metadata CSV logger.info( @@ -552,14 +582,14 @@ def extract_zip_or_metadata( "compress_type": file_info.compress_type, } ) - return file_count, result_path, "CSV" + return file_count, result_path, "CSV", None except zipfile.BadZipFile: logger.error(f"Error: '{zip_path}' is not a valid ZIP file.") - return 0, "", "" + return 0, "", "", None except Exception as e: logger.error(f"Error: {str(e)}") - return 0, "", "" + return 0, "", "", None def scheming_field_suggestion(field): diff --git a/ckanext/datapusher_plus/jobs.py b/ckanext/datapusher_plus/jobs.py index f732b08..98e708b 100644 --- a/ckanext/datapusher_plus/jobs.py +++ b/ckanext/datapusher_plus/jobs.py @@ -370,9 +370,15 @@ def _push_to_datastore( if resource_format.upper() == "ZIP": logger.info("Processing ZIP file...") - file_count, extracted_path, unzipped_format = dph.extract_zip_or_metadata( + file_count, extracted_path, unzipped_format, zip_spatial_bounds = dph.extract_zip_or_metadata( tmp, temp_dir, logger ) + + # If spatial bounds were extracted from the shapefile, use them + if zip_spatial_bounds: + spatial_bounds = zip_spatial_bounds + logger.debug(f"Using spatial bounds from ZIP extraction: {spatial_bounds}") + if not file_count: logger.warning("ZIP file invalid, no files found, or ZIP manifest creation is disabled.") return @@ -1279,9 +1285,11 @@ def _push_to_datastore( # Check if this is a CSV file (not already processed as spatial format) # and detect latitude/longitude columns to calculate spatial extent # Use the existing lat/lon detection logic from FormulaProcessor + # Note: ZIP and DBF are included because shapefiles (ZIP) contain DBF files + # with lat/lon columns that should be processed for spatial extent if (conf.AUTO_CSV_SPATIAL_EXTENT and not spatial_format_flag and - resource_format.upper() in ["CSV", "TSV", "TAB"]): - logger.info("Checking for latitude/longitude columns in CSV using existing detection logic...") + resource_format.upper() in ["CSV", "TSV", "TAB", "ZIP", "DBF"]): + logger.info("Checking for latitude/longitude columns using existing detection logic...") # Use the detected lat/lon fields from the already initialized FormulaProcessor lat_column = formula_processor.dpp.get("LAT_FIELD") @@ -1329,6 +1337,34 @@ def _push_to_datastore( else: if formula_processor.dpp.get("NO_LAT_LON_FIELDS"): logger.info("No suitable latitude/longitude column pairs found in CSV") + + # ============================================================ + # SPATIAL EXTENT FOR GEOJSON/SHAPEFILE FORMATS + # ============================================================ + # If this was a spatial format (GeoJSON/Shapefile), add the spatial extent + # from resource metadata to package dpp_suggestions for the gazetteer widget + elif spatial_format_flag and spatial_bounds: + logger.info("Adding spatial extent from GeoJSON/Shapefile to package dpp_suggestions...") + try: + minx, miny, maxx, maxy = spatial_bounds + spatial_extent_data = { + "type": "BoundingBox", + "coordinates": [ + [minx, miny], + [maxx, maxy], + ], + } + + # Add to package dpp_suggestions for the gazetteer widget to access + package.setdefault("dpp_suggestions", {})["dpp_spatial_extent"] = spatial_extent_data + + logger.info( + f"Added dpp_spatial_extent to package dpp_suggestions from spatial format: " + f"bounds({minx}, {miny}, {maxx}, {maxy})" + ) + logger.info(f"Spatial extent: {spatial_extent_data}") + except (ValueError, TypeError, KeyError) as e: + logger.warning(f"Error adding spatial extent from spatial format to package suggestions: {e}") package.setdefault("dpp_suggestions", {})[ "STATUS" diff --git a/ckanext/datapusher_plus/plugin.py b/ckanext/datapusher_plus/plugin.py index 15a8718..9dc8aaa 100644 --- a/ckanext/datapusher_plus/plugin.py +++ b/ckanext/datapusher_plus/plugin.py @@ -18,6 +18,8 @@ import ckanext.datapusher_plus.logic.action as action import ckanext.datapusher_plus.logic.auth as auth import ckanext.datapusher_plus.cli as cli +import ckanext.datapusher_plus.jinja2_helpers as dphj + tk = p.toolkit @@ -221,6 +223,8 @@ def get_helpers(self) -> dict[str, Callable[..., Any]]: "scheming_get_suggestion_value": dph.scheming_get_suggestion_value, "scheming_is_valid_suggestion": dph.scheming_is_valid_suggestion, "is_preformulated_field": dph.is_preformulated_field, + "spatial_extent_feature_collection": dphj.spatial_extent_feature_collection, + } # IBlueprint From 071bc6138a419959aea80c7f929b06587f6c2b70 Mon Sep 17 00:00:00 2001 From: Minhajuddin Mohammed Date: Fri, 21 Nov 2025 03:21:05 -0500 Subject: [PATCH 4/6] AI suggestions added --- ckanext/datapusher_plus/ai_suggestions.py | 449 ++++++++++++++++++ .../assets/js/scheming-ai-suggestions.js | 420 ++++++++++++++++ ckanext/datapusher_plus/assets/webassets.yml | 8 + ckanext/datapusher_plus/config.py | 35 +- ckanext/datapusher_plus/helpers.py | 156 +++++- ckanext/datapusher_plus/jobs.py | 73 +++ ckanext/datapusher_plus/plugin.py | 6 +- .../scheming/form_snippets/markdown.html | 12 + .../snippets/ai_suggestions_asset.html | 2 + .../snippets/ai_suggestions_button.html | 18 + 10 files changed, 1170 insertions(+), 9 deletions(-) create mode 100644 ckanext/datapusher_plus/ai_suggestions.py create mode 100644 ckanext/datapusher_plus/assets/js/scheming-ai-suggestions.js create mode 100644 ckanext/datapusher_plus/templates/scheming/snippets/ai_suggestions_asset.html create mode 100644 ckanext/datapusher_plus/templates/scheming/snippets/ai_suggestions_button.html diff --git a/ckanext/datapusher_plus/ai_suggestions.py b/ckanext/datapusher_plus/ai_suggestions.py new file mode 100644 index 0000000..c676bdc --- /dev/null +++ b/ckanext/datapusher_plus/ai_suggestions.py @@ -0,0 +1,449 @@ +# encoding: utf-8 +# flake8: noqa: E501 + +""" +AI-powered suggestions generator for DataPusher Plus +Generates intelligent descriptions and tags based on QSV data analysis +""" + +import json +import logging +import requests +from typing import Dict, Any, Optional, List, Tuple +from datetime import datetime + +import ckanext.datapusher_plus.config as conf + +log = logging.getLogger(__name__) + + +class AIDescriptionGenerator: + """Generate AI-powered descriptions and tags using full QSV analysis""" + + def __init__(self, logger=None): + self.enabled = conf.ENABLE_AI_SUGGESTIONS + self.logger = logger or log + + if not self.enabled: + self.logger.info("AI suggestions are disabled") + return + + # Validate API key + if not conf.OPENROUTER_API_KEY: + self.logger.warning("OpenRouter API key is not configured - AI suggestions will be disabled") + self.enabled = False + return + + self.openrouter_api_key = conf.OPENROUTER_API_KEY + self.openrouter_model = conf.OPENROUTER_MODEL + self.openrouter_base_url = conf.OPENROUTER_BASE_URL + + self.logger.info(f"AI suggestions initialized with model: {self.openrouter_model}") + + def _safe_int(self, value, default=0): + """Safely convert value to int with fallback""" + try: + return int(value) if value is not None else default + except (ValueError, TypeError): + return default + + def _safe_float(self, value, default=0.0): + """Safely convert value to float with fallback""" + try: + return float(value) if value is not None else default + except (ValueError, TypeError): + return default + + def _call_openrouter(self, prompt: str, system_prompt: str = None, + temperature: float = None, max_tokens: int = None) -> str: + """Call OpenRouter API for text generation""" + if temperature is None: + temperature = float(conf.AI_TEMPERATURE) + if max_tokens is None: + max_tokens = conf.AI_MAX_TOKENS + + headers = { + "Authorization": f"Bearer {self.openrouter_api_key}", + "Content-Type": "application/json", + "HTTP-Referer": "https://github.com/dathere/datapusher-plus", + "X-Title": "DataPusher Plus AI Suggestions" + } + + messages = [] + if system_prompt: + messages.append({"role": "system", "content": system_prompt}) + messages.append({"role": "user", "content": prompt}) + + data = { + "model": self.openrouter_model, + "messages": messages, + "temperature": temperature, + "max_tokens": max_tokens + } + + self.logger.debug(f"OpenRouter request: model={self.openrouter_model}, temp={temperature}, max_tokens={max_tokens}") + self.logger.debug(f"Prompt length: {len(prompt)} characters") + + try: + self.logger.debug("Calling OpenRouter API...") + response = requests.post( + f"{self.openrouter_base_url}/chat/completions", + headers=headers, + json=data, + timeout=conf.AI_TIMEOUT + ) + + if response.status_code == 200: + try: + result = response.json() + content = result.get('choices', [{}])[0].get('message', {}).get('content', '') + if content: + self.logger.debug(f"OpenRouter response received: {len(content)} characters") + return content.strip() + else: + self.logger.warning("OpenRouter returned empty content in response") + self.logger.debug(f"Full response structure: {result}") + return "" + except json.JSONDecodeError as e: + self.logger.error(f"Failed to parse OpenRouter JSON response: {e}") + self.logger.debug(f"Response text: {response.text[:500]}") + return "" + else: + self.logger.error(f"OpenRouter API error {response.status_code}: {response.text[:500]}") + return "" + + except requests.exceptions.Timeout: + self.logger.error(f"OpenRouter API timeout after {conf.AI_TIMEOUT} seconds") + return "" + except requests.exceptions.RequestException as e: + self.logger.error(f"OpenRouter API request error: {e}") + return "" + except Exception as e: + self.logger.error(f"Error calling OpenRouter: {e}") + return "" + + def generate_ai_suggestions( + self, + resource_metadata: Dict[str, Any], + dataset_metadata: Dict[str, Any], + stats_data: Dict[str, Any], + freq_data: Dict[str, Any], + dataset_stats: Dict[str, Any], + sample_data: Optional[str] = None + ) -> Dict[str, Any]: + """ + Generate comprehensive AI suggestions for dataset fields in a single LLM call + + Returns a dictionary with field names as keys and suggestion data as values + """ + if not self.enabled: + return {} + + try: + self.logger.info("Starting AI suggestion generation (single LLM call)...") + start_time = datetime.now() + + # Build comprehensive context for LLM + context = self._build_context( + resource_metadata, dataset_metadata, stats_data, + freq_data, dataset_stats, sample_data + ) + + # Check if we need title and notes + current_title = dataset_metadata.get('title', '') + skip_title = current_title and len(current_title) > 10 + + # Check for geographic data + has_geo_data = self._has_geographic_data(stats_data) + + # Generate ALL suggestions in a single LLM call + suggestions = self._generate_all_suggestions_single_call( + context, + dataset_metadata, + skip_title=skip_title, + include_spatial=has_geo_data + ) + + elapsed = (datetime.now() - start_time).total_seconds() + self.logger.info(f"AI suggestions generated: {len(suggestions)} fields in {elapsed:.2f}s") + return suggestions + + except Exception as e: + self.logger.error(f"Error generating AI suggestions: {e}") + if conf.AI_FALLBACK_ON_FAILURE: + return self._generate_fallback_suggestions( + resource_metadata, dataset_metadata, stats_data + ) + return {} + + def _generate_all_suggestions_single_call( + self, + context: str, + dataset_metadata: Dict[str, Any], + skip_title: bool = False, + include_spatial: bool = False + ) -> Dict[str, Any]: + """ + Generate all suggestions in a single LLM call using structured JSON output + """ + + # Build the prompt requesting JSON output with all fields + fields_to_generate = [] + if not skip_title: + fields_to_generate.append('"title": "A clear, concise dataset title (max 100 chars)"') + fields_to_generate.append('"notes": "4-6 sentence comprehensive description explaining what data this contains, key variables, scope/coverage, and notable patterns"') + fields_to_generate.append(f'"tags": ["tag1", "tag2", ...] // Array of {conf.AI_MAX_TAGS} relevant keywords for search"') + if include_spatial: + fields_to_generate.append('"spatial_extent": "2-3 sentence description of geographic coverage"') + fields_to_generate.append('"additional_information": "2-4 sentences about data quality, patterns, use cases, and caveats"') + + fields_json_template = ",\n ".join(fields_to_generate) + + system_prompt = """You are a data cataloging expert. Analyze datasets and generate comprehensive metadata to help users understand and discover the data. Be specific, factual, and professional.""" + + prompt = f"""Analyze this dataset and generate metadata suggestions in JSON format: + +{context} + +Generate a JSON object with the following fields: +{{ + {fields_json_template} +}} + +Important: +- Base suggestions on the actual column names, data types, statistics, and sample values provided +- Be specific about what data is present (e.g., mention specific variables, date ranges if visible, geographic areas if identifiable) +- For tags, focus on subject matter, data types, and potential use cases +- Write professionally and informatively +- Return ONLY valid JSON, nothing else""" + + try: + self.logger.debug("Calling LLM for all suggestions in single call...") + response = self._call_openrouter( + prompt, + system_prompt=system_prompt, + temperature=0.7, + max_tokens=1500 # Increased for comprehensive response + ) + + if not response: + self.logger.warning("Empty response from LLM") + return {} + + # Clean response - remove markdown code fences if present + cleaned_response = response.strip() + if cleaned_response.startswith('```json'): + cleaned_response = cleaned_response[7:] + if cleaned_response.startswith('```'): + cleaned_response = cleaned_response[3:] + if cleaned_response.endswith('```'): + cleaned_response = cleaned_response[:-3] + cleaned_response = cleaned_response.strip() + + # Parse JSON response + try: + result = json.loads(cleaned_response) + except json.JSONDecodeError as e: + self.logger.error(f"Failed to parse LLM JSON response: {e}") + self.logger.debug(f"Response was: {cleaned_response[:500]}") + return {} + + # Convert to our suggestion format + suggestions = {} + + if 'title' in result and result['title']: + title = str(result['title']).strip().strip('"\'') + if len(title) > 5: + suggestions['title'] = { + 'value': title[:100], # Enforce max length + 'source': 'AI Generated', + 'confidence': 'high' + } + self.logger.debug(f"Title: {len(title)} chars") + + if 'notes' in result and result['notes']: + notes = str(result['notes']).strip() + if len(notes) >= conf.AI_MIN_DESCRIPTION_LENGTH: + suggestions['notes'] = { + 'value': notes, + 'source': 'AI Generated', + 'confidence': 'high' + } + self.logger.debug(f"Notes: {len(notes)} chars") + + if 'tags' in result and result['tags']: + if isinstance(result['tags'], list): + tags = [str(tag).lower().strip() for tag in result['tags'][:conf.AI_MAX_TAGS]] + if tags: + suggestions['primary_tags'] = { + 'value': ', '.join(tags), + 'source': 'AI Generated', + 'confidence': 'medium' + } + self.logger.debug(f"Tags: {len(tags)} tags") + elif isinstance(result['tags'], str): + # Handle case where LLM returned comma-separated string + tags = [tag.strip().lower() for tag in result['tags'].split(',')[:conf.AI_MAX_TAGS]] + if tags: + suggestions['primary_tags'] = { + 'value': ', '.join(tags), + 'source': 'AI Generated', + 'confidence': 'medium' + } + self.logger.debug(f"Tags: {len(tags)} tags (from string)") + + if include_spatial and 'spatial_extent' in result and result['spatial_extent']: + spatial = str(result['spatial_extent']).strip() + if len(spatial) > 20: + suggestions['spatial_extent'] = { + 'value': spatial, + 'source': 'AI Generated', + 'confidence': 'medium' + } + self.logger.debug(f"Spatial extent: {len(spatial)} chars") + + if 'additional_information' in result and result['additional_information']: + additional = str(result['additional_information']).strip() + if len(additional) > 30: + suggestions['additional_information'] = { + 'value': additional, + 'source': 'AI Generated', + 'confidence': 'medium' + } + self.logger.debug(f"Additional info: {len(additional)} chars") + + self.logger.info(f"Successfully parsed {len(suggestions)} suggestions from single LLM call") + return suggestions + + except Exception as e: + self.logger.error(f"Error in single-call suggestion generation: {e}") + return {} + + def _build_context( + self, + resource_metadata: Dict[str, Any], + dataset_metadata: Dict[str, Any], + stats_data: Dict[str, Any], + freq_data: Dict[str, Any], + dataset_stats: Dict[str, Any], + sample_data: Optional[str] + ) -> str: + """Build comprehensive context for LLM""" + context_parts = [] + + # Basic information + context_parts.extend([ + f"Resource: {resource_metadata.get('name', 'Unknown')} ({resource_metadata.get('format', 'unknown')})", + f"Dataset: {dataset_metadata.get('title', 'Unknown')}", + f"Records: {dataset_stats.get('RECORD_COUNT', 'Unknown')}", + "" + ]) + + # Add existing descriptions if available + if dataset_metadata.get('notes'): + context_parts.extend([ + "Existing Dataset Description:", + dataset_metadata['notes'][:800], # Limit length + "" + ]) + + # Add comprehensive QSV statistics + if stats_data: + context_parts.append("COLUMN STATISTICS:") + for field_name, field_stats in list(stats_data.items())[:20]: # Limit columns + if isinstance(field_stats, dict) and 'stats' in field_stats: + stats = field_stats['stats'] + context_parts.append( + f"- {field_name}: {stats.get('type', 'unknown')} | " + f"NULL: {stats.get('nullcount', 0)} | " + f"Unique: {stats.get('cardinality', 'unknown')}" + ) + + # Add range for numeric columns + if stats.get('type') in ['Integer', 'Float']: + min_val = stats.get('min', '') + max_val = stats.get('max', '') + if min_val and max_val: + context_parts.append(f" Range: {min_val} to {max_val}") + context_parts.append("") + + # Add frequency analysis (top values) + if freq_data: + context_parts.append("TOP VALUES BY COLUMN:") + for field_name, frequencies in list(freq_data.items())[:15]: # Limit columns + if isinstance(frequencies, list) and frequencies: + top_values = [str(freq.get('value', '')) for freq in frequencies[:5]] + context_parts.append(f"- {field_name}: {', '.join(top_values)}") + context_parts.append("") + + # Add sample data if available + if sample_data and conf.AI_INCLUDE_SAMPLE_DATA: + context_parts.extend([ + "SAMPLE DATA:", + sample_data[:1000], # Limit sample size + "" + ]) + + # Build the context string + full_context = "\n".join(context_parts) + + # Limit total context to prevent API issues + if len(full_context) > conf.AI_MAX_CONTEXT_LENGTH: + full_context = full_context[:conf.AI_MAX_CONTEXT_LENGTH] + "..." + + return full_context + + def _has_geographic_data(self, stats_data: Dict[str, Any]) -> bool: + """Check if dataset contains geographic data""" + geo_keywords = ['county', 'state', 'city', 'country', 'region', 'latitude', 'longitude', + 'lat', 'lon', 'location', 'address', 'zip', 'postal'] + + for field_name in stats_data.keys(): + field_lower = field_name.lower() + if any(keyword in field_lower for keyword in geo_keywords): + return True + return False + + def _generate_fallback_suggestions( + self, + resource_metadata: Dict[str, Any], + dataset_metadata: Dict[str, Any], + stats_data: Dict[str, Any] + ) -> Dict[str, Any]: + """Generate simple fallback suggestions when AI fails""" + suggestions = {} + + # Simple title based on resource name + resource_name = resource_metadata.get('name', 'Resource') + dataset_title = dataset_metadata.get('title', '') + + if not dataset_title or len(dataset_title) < 10: + suggestions['title'] = { + 'value': f"Data from {resource_name}", + 'source': 'Auto-generated', + 'confidence': 'low' + } + + # Simple description + record_count = resource_metadata.get('record_count', 'unknown') + format_type = resource_metadata.get('format', 'unknown') + + suggestions['notes'] = { + 'value': f"This {format_type} dataset contains {record_count} records with data related to {resource_name}.", + 'source': 'Auto-generated', + 'confidence': 'low' + } + + # Simple tags from column names + if stats_data: + col_tags = [col.lower() for col in list(stats_data.keys())[:5] + if len(col) > 2 and col.replace('_', '').isalpha()] + if col_tags: + suggestions['primary_tags'] = { + 'value': ', '.join(col_tags), + 'source': 'Auto-generated', + 'confidence': 'low' + } + + self.logger.info(f"Generated {len(suggestions)} fallback suggestions") + return suggestions diff --git a/ckanext/datapusher_plus/assets/js/scheming-ai-suggestions.js b/ckanext/datapusher_plus/assets/js/scheming-ai-suggestions.js new file mode 100644 index 0000000..f440dcb --- /dev/null +++ b/ckanext/datapusher_plus/assets/js/scheming-ai-suggestions.js @@ -0,0 +1,420 @@ +// /ckanext/scheming/assets/js/scheming-ai-suggestions.js + +// Global state for AI suggestions (similar to normal suggestions) +if (typeof window._schemingAiSuggestionsGlobalState === 'undefined') { + window._schemingAiSuggestionsGlobalState = { + datasetId: null, + globalInitDone: false, + pollAttempts: 0, + isPolling: false, + aiSuggestions: {} + }; +} + +ckan.module('scheming-ai-suggestions', function($) { + return { + options: { + pollingInterval: 2500, + maxPollAttempts: 40, + terminalStatuses: ['DONE', 'ERROR', 'FAILED'] + }, + + initialize: function() { + console.log("Initializing scheming-ai-suggestions module"); + + var self = this; + var el = this.el; + var fieldName = $(el).data('field-name'); + var globalState = window._schemingAiSuggestionsGlobalState; + + // Hide button initially (will show when suggestions are ready) + $(el).hide(); + + // Get dataset ID if not already set + if (!globalState.datasetId) { + var $form = $(el).closest('form.dataset-form, form#dataset-edit'); + + if ($form.length && $form.data('dataset-id')) { + globalState.datasetId = $form.data('dataset-id'); + } else if ($form.length && $form.find('input[name="id"]').val()) { + globalState.datasetId = $form.find('input[name="id"]').val(); + } else if ($('body').data('dataset-id')) { + globalState.datasetId = $('body').data('dataset-id'); + } else { + var pathArray = window.location.pathname.split('/'); + var datasetIndex = pathArray.indexOf('dataset'); + var editIndex = pathArray.indexOf('edit'); + if (datasetIndex !== -1 && editIndex !== -1 && editIndex === datasetIndex + 1 && pathArray.length > editIndex + 1) { + var potentialId = pathArray[editIndex + 1]; + if (potentialId && potentialId.length > 5) { + globalState.datasetId = potentialId; + } + } + } + } + + // Start polling if not already started + if (!globalState.globalInitDone && globalState.datasetId) { + globalState.globalInitDone = true; + if (!globalState.isPolling) { + this._pollForAiSuggestions(); + } + } + + // Create custom popover when clicked + $(el).on('click', function(e) { + e.preventDefault(); + e.stopPropagation(); + console.log("AI suggestion button clicked for field:", fieldName); + + // Hide all other popovers first + $('.ai-suggestion-popover').hide(); + + // Get suggestion from global state (updated by polling) + var suggestion = globalState.aiSuggestions[fieldName]; + var suggestionValue = suggestion ? suggestion.value : ''; + var suggestionSource = suggestion ? (suggestion.source + ' (Confidence: ' + (suggestion.confidence || 'N/A') + ')') : 'AI Generated'; + + // Create popover if it doesn't exist yet + var popoverId = 'ai-suggestion-popover-' + fieldName; + if ($('#' + popoverId).length === 0) { + createPopover(fieldName, suggestionValue, suggestionSource, $(el)); + } else { + // Update and show existing popover + updatePopover(popoverId, suggestionValue, suggestionSource); + var $popover = $('#' + popoverId); + positionPopover($popover, $(el)); + $popover.show(); + } + }); + }, + + _pollForAiSuggestions: function() { + var self = this; + var globalState = window._schemingAiSuggestionsGlobalState; + + if (!globalState.datasetId) { + console.warn("AI Suggestions: No dataset ID found, cannot poll"); + return; + } + + if (globalState.pollAttempts >= this.options.maxPollAttempts) { + console.log("AI Suggestions: Max poll attempts reached"); + globalState.isPolling = false; + return; + } + + globalState.isPolling = true; + globalState.pollAttempts++; + + console.log("AI Suggestions: Polling attempt " + globalState.pollAttempts); + + $.ajax({ + url: (ckan.SITE_ROOT || '') + '/api/3/action/package_show', + data: { id: globalState.datasetId, include_tracking: false }, + dataType: 'json', + cache: false, + success: function(response) { + if (response.success && response.result) { + var datasetObject = response.result; + var dppSuggestionsData = datasetObject.dpp_suggestions; + + if (dppSuggestionsData && dppSuggestionsData.ai_suggestions) { + console.log("AI Suggestions: Found AI suggestions", dppSuggestionsData.ai_suggestions); + + // Store suggestions in global state + globalState.aiSuggestions = dppSuggestionsData.ai_suggestions; + + // Show buttons for fields that have suggestions + self._showAiSuggestionButtons(dppSuggestionsData.ai_suggestions); + + // Check if processing is complete + var currentStatus = dppSuggestionsData.STATUS ? dppSuggestionsData.STATUS.toUpperCase() : null; + + if (currentStatus && self.options.terminalStatuses.includes(currentStatus)) { + console.log("AI Suggestions: Processing complete with status " + currentStatus); + globalState.isPolling = false; + return; + } + } + + // Continue polling if not done + if (globalState.pollAttempts < self.options.maxPollAttempts) { + setTimeout(function() { self._pollForAiSuggestions(); }, self.options.pollingInterval); + } else { + globalState.isPolling = false; + } + } + }, + error: function(jqXHR, textStatus, errorThrown) { + console.error("AI Suggestions: Poll error", textStatus, errorThrown); + + if (globalState.pollAttempts < self.options.maxPollAttempts) { + var nextPollDelay = self.options.pollingInterval * Math.pow(1.2, Math.min(globalState.pollAttempts, 7)); + setTimeout(function() { self._pollForAiSuggestions(); }, nextPollDelay); + } else { + globalState.isPolling = false; + } + } + }); + }, + + _showAiSuggestionButtons: function(aiSuggestions) { + console.log("AI Suggestions: Showing buttons for available suggestions"); + + // Show buttons for fields that have AI suggestions + Object.keys(aiSuggestions).forEach(function(fieldName) { + var $button = $('.ai-suggestion-btn[data-field-name="' + fieldName + '"]'); + if ($button.length > 0) { + // Update button data attributes with new suggestion data + var suggestion = aiSuggestions[fieldName]; + if (suggestion && suggestion.value) { + $button.attr('data-suggestion-value', suggestion.value); + $button.attr('data-suggestion-source', suggestion.source + ' (Confidence: ' + (suggestion.confidence || 'N/A') + ')'); + $button.show(); + console.log("AI Suggestions: Updated and showing button for field " + fieldName); + } + } + }); + } + }; +}); + +// Add direct click handling outside of the module for buttons that might not have been initialized +$(document).ready(function() { + console.log("Document ready - initializing AI suggestions click handlers"); + + // Direct click handler for all AI suggestion buttons + $(document).on('click', '.ai-suggestion-btn', function(e) { + e.preventDefault(); + e.stopPropagation(); + + var $button = $(this); + var fieldName = $button.data('field-name'); + var globalState = window._schemingAiSuggestionsGlobalState; + + console.log("AI suggestion button clicked for field:", fieldName); + + // Hide all other popovers first + $('.ai-suggestion-popover').hide(); + + // Get suggestion from global state + var suggestion = globalState.aiSuggestions[fieldName]; + var suggestionValue = suggestion ? suggestion.value : ''; + var suggestionSource = suggestion ? (suggestion.source + ' (Confidence: ' + (suggestion.confidence || 'N/A') + ')') : 'AI Generated'; + + console.log("Suggestion value length:", suggestionValue.length); + console.log("Suggestion source:", suggestionSource); + + // Create popover if it doesn't exist yet + var popoverId = 'ai-suggestion-popover-' + fieldName; + if ($('#' + popoverId).length === 0) { + console.log("Creating new popover for field:", fieldName); + createPopover(fieldName, suggestionValue, suggestionSource, $button); + } else { + // Update and show existing popover + console.log("Showing existing popover for field:", fieldName); + updatePopover(popoverId, suggestionValue, suggestionSource); + positionPopover($('#' + popoverId), $button); + $('#' + popoverId).show(); + } + }); + + // Close popover when clicking outside + $(document).on('click', function(e) { + if (!$(e.target).closest('.ai-suggestion-btn').length && + !$(e.target).closest('.ai-suggestion-popover').length) { + $('.ai-suggestion-popover').hide(); + } + }); +}); + +// Function to create popover - moved outside the module +function createPopover(fieldName, suggestionValue, suggestionSource, $button) { + var $field = $('#field-' + fieldName); + var isSelect = $field.is('select'); + var isTextarea = $field.is('textarea'); + + // Properly decode escaped newlines from JSON + if (typeof suggestionValue === 'string') { + // Replace escaped newlines with actual newlines + suggestionValue = suggestionValue.replace(/\\n/g, '\n'); + // Also handle any double-escaped newlines + suggestionValue = suggestionValue.replace(/\\\\n/g, '\n'); + } + + // Create popover HTML + var $popover = $('
'); + + // Display value with proper formatting + var displayValue = suggestionValue || 'No suggestion available'; + if (suggestionValue && suggestionValue.length > 300) { + displayValue = suggestionValue.substring(0, 300) + '...'; + } + + // Escape HTML and convert newlines to
for display + var escapedDisplayValue = displayValue + .replace(/&/g, '&') + .replace(//g, '>') + .replace(/"/g, '"') + .replace(/'/g, ''') + .replace(/\n/g, '
'); + + var popoverContent = + '
' + + '
' + + 'AI Suggestion' + + '
' + suggestionSource + '
' + + '
' + + '
' + escapedDisplayValue + '
' + + '' + + '
'; + + $popover.html(popoverContent); + $('body').append($popover); + + // Store the actual suggestion value (not escaped) in the button's data + $popover.find('.suggestion-apply-btn').data('actualValue', suggestionValue); + + // Position the popover + positionPopover($popover, $button); + + // Add event handler for apply button + $popover.find('.suggestion-apply-btn').on('click', function() { + console.log("Apply button clicked for field:", fieldName); + var targetId = $(this).data('target'); + var value = $(this).data('actualValue'); // Use the stored actual value + var isTextarea = $(this).data('is-textarea'); + var $target = $('#' + targetId); + + if ($target.length === 0) { + console.error("Target field not found:", targetId); + return; + } + + console.log("Applying suggestion to field:", targetId); + console.log("Value length:", value.length); + + // Apply the suggestion + if (isTextarea || $target.is('textarea')) { + $target.val(value); + // Trigger change event + $target.trigger('change'); + // Also trigger input event for any listeners + $target.trigger('input'); + } else if ($target.is('select')) { + // For select fields, try to match the value + $target.val(value); + $target.trigger('change'); + } else { + $target.val(value); + $target.trigger('change'); + $target.trigger('input'); + } + + // Add a success class for animation + $target.addClass('suggestion-applied'); + setTimeout(function() { + $target.removeClass('suggestion-applied'); + }, 1000); + + // Show success message + showSuccessMessage($target); + + // Hide the popover + $popover.hide(); + }); +} + +// Function to update existing popover content +function updatePopover(popoverId, suggestionValue, suggestionSource) { + var $popover = $('#' + popoverId); + if ($popover.length === 0) return; + + // Properly decode escaped newlines from JSON + if (typeof suggestionValue === 'string') { + suggestionValue = suggestionValue.replace(/\\n/g, '\n'); + suggestionValue = suggestionValue.replace(/\\\\n/g, '\n'); + } + + // Display value with proper formatting + var displayValue = suggestionValue || 'No suggestion available'; + if (suggestionValue && suggestionValue.length > 300) { + displayValue = suggestionValue.substring(0, 300) + '...'; + } + + // Escape HTML and convert newlines to
for display + var escapedDisplayValue = displayValue + .replace(/&/g, '&') + .replace(//g, '>') + .replace(/"/g, '"') + .replace(/'/g, ''') + .replace(/\n/g, '
'); + + // Update popover content + $popover.find('.ai-suggestion-source').text(suggestionSource); + $popover.find('.suggestion-value').html(escapedDisplayValue); + $popover.find('.suggestion-apply-btn').data('actualValue', suggestionValue); +} + +// Function to position popover +function positionPopover($popover, $button) { + var buttonPos = $button.offset(); + var parentWidth = $(window).width(); + var popoverWidth = Math.min(400, parentWidth - 40); + + var leftPos = buttonPos.left; + if (leftPos + popoverWidth > parentWidth - 20) { + leftPos = Math.max(20, parentWidth - popoverWidth - 20); + } + + $popover.css({ + position: 'absolute', + top: buttonPos.top + $button.outerHeight() + 10, + left: leftPos, + maxWidth: popoverWidth + 'px', + zIndex: 1000, + display: 'block' + }); +} + +// Function to show success message +function showSuccessMessage($target) { + var $successMsg = $('
✓ Suggestion applied!
'); + $successMsg.css({ + position: 'absolute', + top: $target.offset().top - 30, + left: $target.offset().left + $target.outerWidth() / 2, + transform: 'translateX(-50%)', + backgroundColor: 'rgba(42, 145, 52, 0.95)', + color: 'white', + padding: '6px 12px', + borderRadius: '4px', + fontSize: '13px', + fontWeight: '600', + zIndex: 1010, + opacity: 0, + transition: 'opacity 0.3s ease', + boxShadow: '0 2px 8px rgba(0,0,0,0.2)' + }); + $('body').append($successMsg); + + setTimeout(function() { + $successMsg.css('opacity', '1'); + }, 10); + + setTimeout(function() { + $successMsg.css('opacity', '0'); + setTimeout(function() { + $successMsg.remove(); + }, 300); + }, 1500); +} \ No newline at end of file diff --git a/ckanext/datapusher_plus/assets/webassets.yml b/ckanext/datapusher_plus/assets/webassets.yml index dc488c5..f347041 100644 --- a/ckanext/datapusher_plus/assets/webassets.yml +++ b/ckanext/datapusher_plus/assets/webassets.yml @@ -14,3 +14,11 @@ suggestions: contents: - js/scheming-suggestions.js +ai-suggestions: + filter: rjsmin + output: datapusher_plus/%(version)s_scheming-ai-suggestions.js + contents: + - js/scheming-ai-suggestions.js + extra: + preload: + - base/main \ No newline at end of file diff --git a/ckanext/datapusher_plus/config.py b/ckanext/datapusher_plus/config.py index a8077bc..dc1900b 100644 --- a/ckanext/datapusher_plus/config.py +++ b/ckanext/datapusher_plus/config.py @@ -32,7 +32,7 @@ # Supported formats FORMATS = tk.config.get( "ckanext.datapusher_plus.formats", - ["csv", "tsv", "tab", "ssv", "xls", "xlsx", "ods", "geojson", "shp", "qgis", "zip", "dbf"], + ["csv", "tsv", "tab", "ssv", "xls", "xlsx", "ods", "geojson", "shp", "qgis", "zip"], ) if isinstance(FORMATS, str): FORMATS = FORMATS.split() @@ -62,7 +62,7 @@ tk.config.get("ckanext.datapusher_plus.max_content_length", "5000000") ) CHUNK_SIZE = tk.asint(tk.config.get("ckanext.datapusher_plus.chunk_size", "1048576")) -DEFAULT_EXCEL_SHEET = tk.asint(tk.config.get("DEFAULT_EXCEL_SHEET", 0)) +DEFAULT_EXCEL_SHEET = tk.asint(tk.config.get("ckanext.datapusher_plus.default_excel_sheet", 0)) SORT_AND_DUPE_CHECK = tk.asbool( tk.config.get("ckanext.datapusher_plus.sort_and_dupe_check", True) ) @@ -177,9 +177,30 @@ tk.config.get("ckanext.datapusher_plus.auto_unzip_one_file", True) ) -# if a zip archive contains multiple files (and does not contain a shapefile), -# automatically create a manifest CSV with metadata about the zip contents -# and pump that into the datastore. If set to False, the zip will be skipped. -AUTO_CREATE_ZIP_MANIFEST = tk.asbool( - tk.config.get("ckanext.datapusher_plus.auto_create_zip_manifest", True) +# AI Suggestions Settings +ENABLE_AI_SUGGESTIONS = tk.asbool( + tk.config.get("ckanext.datapusher_plus.enable_ai_suggestions", True) +) +OPENROUTER_API_KEY = tk.config.get("ckanext.datapusher_plus.openrouter_api_key", "") +OPENROUTER_MODEL = tk.config.get( + "ckanext.datapusher_plus.openrouter_model", "anthropic/claude-3.5-sonnet" +) +OPENROUTER_BASE_URL = tk.config.get( + "ckanext.datapusher_plus.openrouter_base_url", "https://openrouter.ai/api/v1" +) +AI_TEMPERATURE = tk.config.get("ckanext.datapusher_plus.ai_temperature", 0.7) +AI_MAX_TOKENS = tk.asint(tk.config.get("ckanext.datapusher_plus.ai_max_tokens", "2000")) +AI_TIMEOUT = tk.asint(tk.config.get("ckanext.datapusher_plus.ai_timeout", "60")) +AI_MAX_CONTEXT_LENGTH = tk.asint( + tk.config.get("ckanext.datapusher_plus.ai_max_context_length", "8000") +) +AI_MIN_DESCRIPTION_LENGTH = tk.asint( + tk.config.get("ckanext.datapusher_plus.ai_min_description_length", "50") +) +AI_MAX_TAGS = tk.asint(tk.config.get("ckanext.datapusher_plus.ai_max_tags", "10")) +AI_INCLUDE_SAMPLE_DATA = tk.asbool( + tk.config.get("ckanext.datapusher_plus.ai_include_sample_data", True) +) +AI_FALLBACK_ON_FAILURE = tk.asbool( + tk.config.get("ckanext.datapusher_plus.ai_fallback_on_failure", True) ) diff --git a/ckanext/datapusher_plus/helpers.py b/ckanext/datapusher_plus/helpers.py index 293d3ac..76c9eb0 100644 --- a/ckanext/datapusher_plus/helpers.py +++ b/ckanext/datapusher_plus/helpers.py @@ -662,4 +662,158 @@ def is_preformulated_field(field): Check if a field is preformulated (has formula attribute) This helper returns True only if the field has a 'formula' key with a non-empty value """ - return bool(field.get('formula', False)) \ No newline at end of file + return bool(field.get('formula', False)) + + + + + +def scheming_has_ai_suggestion_fields(schema): + """ + Check if the schema has any fields that support AI suggestions + + Args: + schema: The schema dictionary + + Returns: + bool: True if any field supports AI suggestions, False otherwise + """ + if not schema: + return False + + if 'dataset_fields' in schema: + for field in schema['dataset_fields']: + if field.get('ai_suggestion', False): + return True + + if 'resource_fields' in schema: + for field in schema['resource_fields']: + if field.get('ai_suggestion', False): + return True + + return False + +def scheming_field_supports_ai_suggestion(field): + """ + Check if a field supports AI suggestions + + Args: + field: The field dictionary from the schema + + Returns: + bool: True if the field supports AI suggestions, False otherwise + """ + return field.get('ai_suggestion', False) + +def scheming_get_ai_suggestion_value(field_name, data=None): + """ + Get AI suggestion value for a field from dpp_suggestions + + Args: + field_name: Name of the field + data: Form data dictionary containing dpp_suggestions + + Returns: + str: AI suggestion value or empty string if not available + """ + if not data: + logger.debug(f"No data provided to scheming_get_ai_suggestion_value for field '{field_name}'") + return "" + + # Get dpp_suggestions from data + dpp_suggestions = data.get('dpp_suggestions', {}) + + # Handle JSON string + if isinstance(dpp_suggestions, str): + try: + import json + dpp_suggestions = json.loads(dpp_suggestions) + except (json.JSONDecodeError, TypeError): + logger.debug(f"Failed to parse dpp_suggestions JSON for field '{field_name}'") + return "" + + # Get AI suggestions + ai_suggestions = dpp_suggestions.get('ai_suggestions', {}) + + if not ai_suggestions or not isinstance(ai_suggestions, dict): + logger.debug(f"No AI suggestions found for field '{field_name}'. dpp_suggestions keys: {list(dpp_suggestions.keys())}") + return "" + + # Get suggestion for this field + field_suggestion = ai_suggestions.get(field_name, {}) + + if isinstance(field_suggestion, dict): + value = field_suggestion.get('value', '') + if value: + logger.debug(f"Found AI suggestion for '{field_name}': {len(value)} chars") + return value + + return str(field_suggestion) if field_suggestion else "" + + +def scheming_has_ai_suggestions(data=None): + """ + Check if AI suggestions are available in the data + + Args: + data: Form data dictionary containing dpp_suggestions + + Returns: + bool: True if AI suggestions are available, False otherwise + """ + if not data: + return False + + # Get dpp_suggestions from data + dpp_suggestions = data.get('dpp_suggestions', {}) + + # Handle JSON string + if isinstance(dpp_suggestions, str): + try: + dpp_suggestions = json.loads(dpp_suggestions) + except (json.JSONDecodeError, TypeError): + return False + + # Check if AI suggestions exist + ai_suggestions = dpp_suggestions.get('ai_suggestions', {}) + + return bool(ai_suggestions and isinstance(ai_suggestions, dict)) + + +def scheming_get_ai_suggestion_source(field_name, data=None): + """ + Get the source of AI suggestion for a field + + Args: + field_name: Name of the field + data: Form data dictionary containing dpp_suggestions + + Returns: + str: Source of the suggestion (e.g., "AI Generated", "Auto-generated") + """ + if not data: + return "" + + # Get dpp_suggestions from data + dpp_suggestions = data.get('dpp_suggestions', {}) + + # Handle JSON string + if isinstance(dpp_suggestions, str): + try: + dpp_suggestions = json.loads(dpp_suggestions) + except (json.JSONDecodeError, TypeError): + return "" + + # Get AI suggestions + ai_suggestions = dpp_suggestions.get('ai_suggestions', {}) + + if not ai_suggestions or not isinstance(ai_suggestions, dict): + return "" + + # Get suggestion for this field + field_suggestion = ai_suggestions.get(field_name, {}) + + if isinstance(field_suggestion, dict): + return field_suggestion.get('source', 'AI Generated') + + return "" \ No newline at end of file diff --git a/ckanext/datapusher_plus/jobs.py b/ckanext/datapusher_plus/jobs.py index 98e708b..c49bcec 100644 --- a/ckanext/datapusher_plus/jobs.py +++ b/ckanext/datapusher_plus/jobs.py @@ -1194,6 +1194,79 @@ def _push_to_datastore( f'...indexing/vacuum analysis done. Indexed {index_count} column/s in "{resource_id}" in {index_elapsed:,.2f} seconds.' ) + + # ============================================================ + # GENERATE AI SUGGESTIONS + # ============================================================ + # Generate AI-powered suggestions for dataset fields using QSV analysis + ai_suggestions_start = time.perf_counter() + + # Fetch the scheming_yaml and package first (needed for both AI and DRUF) + package_id = resource["package_id"] + scheming_yaml, package = dsu.get_scheming_yaml( + package_id, scheming_yaml_type="dataset" + ) + + if conf.ENABLE_AI_SUGGESTIONS: + logger.info("Generating AI suggestions...") + try: + from ckanext.datapusher_plus.ai_suggestions import AIDescriptionGenerator + + ai_generator = AIDescriptionGenerator(logger=logger) + + # Get sample data for context + sample_data = None + try: + qsv_sample = qsv.slice( + tmp, + start=0, + end=10, # Get first 10 rows as sample + ) + sample_data = str(qsv_sample.stdout) + except Exception as e: + logger.warning(f"Could not get sample data for AI: {e}") + + # Generate AI suggestions + ai_suggestions = ai_generator.generate_ai_suggestions( + resource_metadata=resource, + dataset_metadata=package, + stats_data=resource_fields_stats, + freq_data=resource_fields_freqs, + dataset_stats=dataset_stats, + sample_data=sample_data + ) + + # Store AI suggestions in dpp_suggestions field + if ai_suggestions: + logger.info(f"Generated {len(ai_suggestions)} AI suggestions") + + # Ensure dpp_suggestions field exists in package + if "dpp_suggestions" not in package: + package["dpp_suggestions"] = {} + + # Store AI suggestions under 'ai_suggestions' key + package["dpp_suggestions"]["ai_suggestions"] = ai_suggestions + + # Update package with AI suggestions + try: + dsu.patch_package(package) + logger.info("AI suggestions stored in package") + except Exception as e: + logger.error(f"Error storing AI suggestions: {e}") + else: + logger.info("No AI suggestions generated") + + ai_suggestions_elapsed = time.perf_counter() - ai_suggestions_start + logger.info(f"AI suggestions generation completed in {ai_suggestions_elapsed:,.2f} seconds") + + except ImportError as e: + logger.warning(f"Could not import AI suggestions module: {e}") + except Exception as e: + logger.error(f"Error generating AI suggestions: {e}") + logger.debug(f"AI suggestions error details: {traceback.format_exc()}") + else: + logger.debug("AI suggestions are disabled") + # ============================================================ # PROCESS DRUF JINJA2 FORMULAE # ============================================================ diff --git a/ckanext/datapusher_plus/plugin.py b/ckanext/datapusher_plus/plugin.py index 9dc8aaa..798c9a4 100644 --- a/ckanext/datapusher_plus/plugin.py +++ b/ckanext/datapusher_plus/plugin.py @@ -224,7 +224,11 @@ def get_helpers(self) -> dict[str, Callable[..., Any]]: "scheming_is_valid_suggestion": dph.scheming_is_valid_suggestion, "is_preformulated_field": dph.is_preformulated_field, "spatial_extent_feature_collection": dphj.spatial_extent_feature_collection, - + "scheming_has_ai_suggestion_fields": dph.scheming_has_ai_suggestion_fields, + "scheming_field_supports_ai_suggestion": dph.scheming_field_supports_ai_suggestion, + "scheming_get_ai_suggestion_value": dph.scheming_get_ai_suggestion_value, + "scheming_has_ai_suggestions": dph.scheming_has_ai_suggestions, + "scheming_get_ai_suggestion_source": dph.scheming_get_ai_suggestion_source, } # IBlueprint diff --git a/ckanext/datapusher_plus/templates/scheming/form_snippets/markdown.html b/ckanext/datapusher_plus/templates/scheming/form_snippets/markdown.html index d9e158a..ef6d201 100644 --- a/ckanext/datapusher_plus/templates/scheming/form_snippets/markdown.html +++ b/ckanext/datapusher_plus/templates/scheming/form_snippets/markdown.html @@ -1,8 +1,11 @@ {% import 'macros/form.html' as form %} {% include 'scheming/snippets/suggestions_asset.html' %} +{% include 'scheming/snippets/ai_suggestions_asset.html' %} + {# Check if this is a preformulated field #} {% set is_preformulated = h.is_preformulated_field(field) %} +{% set supports_ai_suggestion = h.scheming_field_supports_ai_suggestion(field) %} {# Create a custom label with suggestion button(s) and icons #} {% set suggestion = h.scheming_field_suggestion(field) %} @@ -12,6 +15,15 @@ {%- if suggestion -%} {%- snippet 'scheming/snippets/suggestion_button.html', field=field, data=data -%} {%- endif -%} + {%- if supports_ai_suggestion -%} + {% set ai_suggestion_value = h.scheming_get_ai_suggestion_value(field.field_name, data) %} + {% set ai_suggestion_source = h.scheming_get_ai_suggestion_source(field.field_name, data) %} + + {%- endif -%} {%- if is_preformulated -%} + Get AI Suggestions + + + +{% endif %} \ No newline at end of file From 2e83c3bfe201fdea3d923437f00bb51c4d58a387 Mon Sep 17 00:00:00 2001 From: Minhajuddin Mohammed Date: Mon, 1 Dec 2025 13:41:21 -0500 Subject: [PATCH 5/6] made DP+ resubmit more robust --- DATASTORE_RECOVERY_FIX.md | 99 +++++++++++++++++++++++++++++ ckanext/datapusher_plus/jobs.py | 108 +++++++++++++++++++++++--------- 2 files changed, 177 insertions(+), 30 deletions(-) create mode 100644 DATASTORE_RECOVERY_FIX.md diff --git a/DATASTORE_RECOVERY_FIX.md b/DATASTORE_RECOVERY_FIX.md new file mode 100644 index 0000000..615ed7c --- /dev/null +++ b/DATASTORE_RECOVERY_FIX.md @@ -0,0 +1,99 @@ +# DataPusher+ Datastore Recovery Fix + +## Problem +When DataPusher+ resubmits a resource (e.g., via `ckan datastore resubmit`), if there are failures during metadata updates (such as SOLR being down), the entire datastore would be deleted including any manual Data Dictionary edits. This resulted in permanent data loss. + +## Root Cause +The original code flow was: +1. Back up Data Dictionary info +2. Delete existing datastore completely +3. Create new datastore table +4. Copy data to datastore +5. Update resource metadata (can fail if SOLR is down) +6. Update package metadata (can fail if SOLR is down) + +If steps 5 or 6 failed, the old datastore was already deleted and couldn't be recovered. + +## Solution +The fix implements a robust error handling strategy with three layers of protection: + +### 1. Complete Field Backup Before Deletion +```python +existing_fields_backup = None # Backup complete field definitions for recovery +if existing: + existing_info = dict(...) + # Backup complete field definitions including Data Dictionary edits + existing_fields_backup = existing.get("fields", []) +``` + +This captures the complete field schema including all Data Dictionary edits (labels, types, notes, etc.). + +### 2. Rollback on Datastore Creation Failure +```python +try: + # Create datastore table + dsu.send_resource_to_datastore(...) +except Exception as e: + # If we deleted an existing datastore and creation fails, restore it + if datastore_deleted and existing_fields_backup: + logger.warning("Attempting to restore previous datastore structure...") + dsu.send_resource_to_datastore( + resource_id=resource["id"], + headers=existing_fields_backup, # Restore with Data Dictionary + ... + ) +``` + +If creating the new datastore table fails, immediately restore the previous structure with all Data Dictionary edits. + +### 3. Preserve Datastore on Metadata Update Failures +```python +try: + dsu.update_resource(resource) + dsu.send_resource_to_datastore(...) + dsu.patch_package(package) +except Exception as e: + logger.error(f"Failed to update resource/package metadata (possibly SOLR issue): {e}") + logger.warning( + f"Datastore table '{resource_id}' with Data Dictionary was successfully created, " + f"but metadata updates failed. The datastore and Data Dictionary are preserved." + ) + raise utils.JobError( + f"Datastore created successfully but metadata update failed: {e}. " + f"Data Dictionary is preserved in datastore." + ) +``` + +If SOLR or other CKAN metadata updates fail, the datastore (with all data and Data Dictionary) remains intact. The job fails with a clear error message, but no data is lost. + +## Benefits +1. **Data Dictionary Preservation**: Manual Data Dictionary edits are never lost +2. **Graceful Degradation**: If metadata updates fail (SOLR down), datastore remains functional +3. **Clear Error Messages**: Users know exactly what succeeded and what failed +4. **Atomic Operations**: Either everything succeeds or the previous state is preserved +5. **Retry-Friendly**: Failed jobs can be retried without losing previous work + +## Testing Recommendations +1. **SOLR Failure Test**: + - Set up a dataset with edited Data Dictionary + - Stop SOLR pod + - Run `ckan datastore resubmit --yes` + - Verify Data Dictionary is preserved + +2. **Database Connection Test**: + - Create resource with Data Dictionary edits + - Simulate database connection issues during update + - Verify rollback restores original structure + +3. **Normal Operation Test**: + - Verify normal resubmit still works correctly + - Verify Data Dictionary edits are properly merged + +## Files Modified +- `/usr/lib/ckan/default/src/datapusher-plus/ckanext/datapusher_plus/jobs.py` + - Added `existing_fields_backup` variable to capture complete field definitions + - Added try-except around datastore table creation with rollback logic + - Added try-except around metadata updates to preserve successful datastore creation + +## Backward Compatibility +This fix is fully backward compatible. It adds error handling without changing the API or normal operation flow. diff --git a/ckanext/datapusher_plus/jobs.py b/ckanext/datapusher_plus/jobs.py index c49bcec..bc63aef 100644 --- a/ckanext/datapusher_plus/jobs.py +++ b/ckanext/datapusher_plus/jobs.py @@ -821,10 +821,15 @@ def _push_to_datastore( # Get the field stats for each field in the headers list existing = dsu.datastore_resource_exists(resource_id) existing_info = None + existing_fields_backup = None # Backup complete field definitions for recovery if existing: existing_info = dict( (f["id"], f["info"]) for f in existing.get("fields", []) if "info" in f ) + # Backup complete field definitions including Data Dictionary edits + # This allows recovery if datastore operations fail + existing_fields_backup = existing.get("fields", []) + logger.info(f"Backed up {len(existing_fields_backup)} field definitions from existing datastore.") # if this is an existing resource # override with types user requested in Data Dictionary @@ -839,9 +844,16 @@ def _push_to_datastore( ] # Delete existing datastore resource before proceeding. + # We'll wrap subsequent operations in try-except to restore on failure + datastore_deleted = False if existing: logger.info(f'Deleting existing resource "{resource_id}" from datastore.') - dsu.delete_datastore_resource(resource_id) + try: + dsu.delete_datastore_resource(resource_id) + datastore_deleted = True + except Exception as e: + logger.error(f"Failed to delete existing datastore: {e}") + raise utils.JobError(f"Failed to delete existing datastore: {e}") # 1st pass of building headers_dict # here we map inferred types to postgresql data types @@ -1033,15 +1045,35 @@ def _push_to_datastore( else: logger.info(f"COPYING {rows_to_copy} rows to Datastore...") - # first, let's create an empty datastore table w/ guessed types - dsu.send_resource_to_datastore( - resource=None, - resource_id=resource["id"], - headers=headers_dicts, - records=None, - aliases=None, - calculate_record_count=False, - ) + # Wrap datastore operations in try-except to restore backup on failure + try: + # first, let's create an empty datastore table w/ guessed types + dsu.send_resource_to_datastore( + resource=None, + resource_id=resource["id"], + headers=headers_dicts, + records=None, + aliases=None, + calculate_record_count=False, + ) + except Exception as e: + logger.error(f"Failed to create datastore table: {e}") + # If we deleted an existing datastore and creation fails, try to restore it + if datastore_deleted and existing_fields_backup: + logger.warning("Attempting to restore previous datastore structure with Data Dictionary...") + try: + dsu.send_resource_to_datastore( + resource=None, + resource_id=resource["id"], + headers=existing_fields_backup, + records=None, + aliases=None, + calculate_record_count=False, + ) + logger.info("Successfully restored previous datastore structure with Data Dictionary.") + except Exception as restore_error: + logger.error(f"Failed to restore previous datastore structure: {restore_error}") + raise utils.JobError(f"Failed to create datastore table: {e}") copied_count = 0 try: @@ -1743,29 +1775,45 @@ def _push_to_datastore( resource["preview"] = False resource["preview_rows"] = None resource["partial_download"] = False - dsu.update_resource(resource) - - # tell CKAN to calculate_record_count and set alias if set - dsu.send_resource_to_datastore( - resource=None, - resource_id=resource["id"], - headers=headers_dicts, - records=None, - aliases=alias, - calculate_record_count=True, - ) + + # Wrap metadata updates in try-except to preserve datastore on SOLR/CKAN failures + try: + dsu.update_resource(resource) - if alias: - logger.info(f'Created alias "{alias}" for "{resource_id}"...') + # tell CKAN to calculate_record_count and set alias if set + dsu.send_resource_to_datastore( + resource=None, + resource_id=resource["id"], + headers=headers_dicts, + records=None, + aliases=alias, + calculate_record_count=True, + ) - metadata_elapsed = time.perf_counter() - metadata_start - logger.info( - f"RESOURCE METADATA UPDATES DONE! Resource metadata updated in {metadata_elapsed:,.2f} seconds." - ) + if alias: + logger.info(f'Created alias "{alias}" for "{resource_id}"...') - # -------------------- DONE -------------------- - package.setdefault("dpp_suggestions", {})["STATUS"] = "DONE" - dsu.patch_package(package) + metadata_elapsed = time.perf_counter() - metadata_start + logger.info( + f"RESOURCE METADATA UPDATES DONE! Resource metadata updated in {metadata_elapsed:,.2f} seconds." + ) + + # -------------------- DONE -------------------- + package.setdefault("dpp_suggestions", {})["STATUS"] = "DONE" + dsu.patch_package(package) + except Exception as e: + logger.error(f"Failed to update resource/package metadata (possibly SOLR issue): {e}") + logger.warning( + f"Datastore table '{resource_id}' with Data Dictionary was successfully created, " + f"but metadata updates failed. The datastore and Data Dictionary are preserved. " + f"You may need to retry or check SOLR status." + ) + # Don't delete the datastore - it's successfully created with data + # Just propagate the error so the job is marked as failed + raise utils.JobError( + f"Datastore created successfully but metadata update failed: {e}. " + f"Data Dictionary is preserved in datastore." + ) total_elapsed = time.perf_counter() - timer_start newline_var = "\n" From 6a9488805df90f25cea2138032694c4717184e01 Mon Sep 17 00:00:00 2001 From: Minhajuddin Mohammed Date: Mon, 1 Dec 2025 12:43:07 -0600 Subject: [PATCH 6/6] Delete DATASTORE_RECOVERY_FIX.md --- DATASTORE_RECOVERY_FIX.md | 99 --------------------------------------- 1 file changed, 99 deletions(-) delete mode 100644 DATASTORE_RECOVERY_FIX.md diff --git a/DATASTORE_RECOVERY_FIX.md b/DATASTORE_RECOVERY_FIX.md deleted file mode 100644 index 615ed7c..0000000 --- a/DATASTORE_RECOVERY_FIX.md +++ /dev/null @@ -1,99 +0,0 @@ -# DataPusher+ Datastore Recovery Fix - -## Problem -When DataPusher+ resubmits a resource (e.g., via `ckan datastore resubmit`), if there are failures during metadata updates (such as SOLR being down), the entire datastore would be deleted including any manual Data Dictionary edits. This resulted in permanent data loss. - -## Root Cause -The original code flow was: -1. Back up Data Dictionary info -2. Delete existing datastore completely -3. Create new datastore table -4. Copy data to datastore -5. Update resource metadata (can fail if SOLR is down) -6. Update package metadata (can fail if SOLR is down) - -If steps 5 or 6 failed, the old datastore was already deleted and couldn't be recovered. - -## Solution -The fix implements a robust error handling strategy with three layers of protection: - -### 1. Complete Field Backup Before Deletion -```python -existing_fields_backup = None # Backup complete field definitions for recovery -if existing: - existing_info = dict(...) - # Backup complete field definitions including Data Dictionary edits - existing_fields_backup = existing.get("fields", []) -``` - -This captures the complete field schema including all Data Dictionary edits (labels, types, notes, etc.). - -### 2. Rollback on Datastore Creation Failure -```python -try: - # Create datastore table - dsu.send_resource_to_datastore(...) -except Exception as e: - # If we deleted an existing datastore and creation fails, restore it - if datastore_deleted and existing_fields_backup: - logger.warning("Attempting to restore previous datastore structure...") - dsu.send_resource_to_datastore( - resource_id=resource["id"], - headers=existing_fields_backup, # Restore with Data Dictionary - ... - ) -``` - -If creating the new datastore table fails, immediately restore the previous structure with all Data Dictionary edits. - -### 3. Preserve Datastore on Metadata Update Failures -```python -try: - dsu.update_resource(resource) - dsu.send_resource_to_datastore(...) - dsu.patch_package(package) -except Exception as e: - logger.error(f"Failed to update resource/package metadata (possibly SOLR issue): {e}") - logger.warning( - f"Datastore table '{resource_id}' with Data Dictionary was successfully created, " - f"but metadata updates failed. The datastore and Data Dictionary are preserved." - ) - raise utils.JobError( - f"Datastore created successfully but metadata update failed: {e}. " - f"Data Dictionary is preserved in datastore." - ) -``` - -If SOLR or other CKAN metadata updates fail, the datastore (with all data and Data Dictionary) remains intact. The job fails with a clear error message, but no data is lost. - -## Benefits -1. **Data Dictionary Preservation**: Manual Data Dictionary edits are never lost -2. **Graceful Degradation**: If metadata updates fail (SOLR down), datastore remains functional -3. **Clear Error Messages**: Users know exactly what succeeded and what failed -4. **Atomic Operations**: Either everything succeeds or the previous state is preserved -5. **Retry-Friendly**: Failed jobs can be retried without losing previous work - -## Testing Recommendations -1. **SOLR Failure Test**: - - Set up a dataset with edited Data Dictionary - - Stop SOLR pod - - Run `ckan datastore resubmit --yes` - - Verify Data Dictionary is preserved - -2. **Database Connection Test**: - - Create resource with Data Dictionary edits - - Simulate database connection issues during update - - Verify rollback restores original structure - -3. **Normal Operation Test**: - - Verify normal resubmit still works correctly - - Verify Data Dictionary edits are properly merged - -## Files Modified -- `/usr/lib/ckan/default/src/datapusher-plus/ckanext/datapusher_plus/jobs.py` - - Added `existing_fields_backup` variable to capture complete field definitions - - Added try-except around datastore table creation with rollback logic - - Added try-except around metadata updates to preserve successful datastore creation - -## Backward Compatibility -This fix is fully backward compatible. It adds error handling without changing the API or normal operation flow.