diff --git a/.codecov.yml b/.codecov.yml index a052f98d..f0984c42 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -1 +1,15 @@ comment: no + +coverage: + status: + project: + default: + # Target overall coverage percentage + target: 74% + # Allow coverage to drop by this amount without failing + # threshold: 0.5% # Optional: uncomment to allow small drops + patch: + default: + # Target coverage percentage for the changes in the PR/commit + target: 20% # Lower target for patch coverage + # threshold: 1% # Optional: Allow patch coverage to drop diff --git a/.gitignore b/.gitignore index e95d26b6..30bd9b39 100644 --- a/.gitignore +++ b/.gitignore @@ -36,4 +36,5 @@ error_log.txt docs/* !docs/*.rst !docs/conf.py -scratch.py \ No newline at end of file +venv* +.coverage* \ No newline at end of file diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 23d07950..ff21cee7 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -25,4 +25,5 @@ repos: rev: v4.0.0-alpha.8 hooks: - id: prettier + types: [markdown] # Specify file types for prettier exclude: .venv diff --git a/CHANGELOG.MD b/CHANGELOG.MD index f11d10ef..1b6eb1f1 100644 --- a/CHANGELOG.MD +++ b/CHANGELOG.MD @@ -1,5 +1,20 @@ # ChangeLog +## [2025-04-27] + +### Benchmarking + +- Added benchmarking scripts (`scripts/bench_sdk.py`, `scripts/bench_cli.sh`) to compare the performance of PII redaction using the Python SDK versus the CLI. +- Created a sample OpenTelemetry log file (`scripts/sample_otel_log.json`) containing 51 string fields for testing. +- **Methodology:** + - **SDK Benchmark (`bench_sdk.py`):** Reads the entire JSON file, extracts all string values, and times a single call to the SDK's `annotate_text` and `anonymize` functions to process all extracted strings together. The measurement was repeated 10 times, and the fastest run was reported. + - **CLI Benchmark (`bench_cli.sh`):** Uses `jq` to extract all string values from the JSON file and pipes them via `xargs` to the `datafog redact-text` command. This results in a separate invocation of the `datafog` CLI process for _each_ extracted string. The total time for processing all strings in this manner was measured over 5 loops. +- **Results:** + - **SDK:** Demonstrated significantly faster performance due to single process initialization and batch processing. + - Fastest time to process all 51 strings from `sample_otel_log.json`: **~1.15 seconds**. + - **CLI:** Showed much slower performance, primarily attributed to the overhead of starting a new `datafog` process and loading models for each of the 51 strings individually. + - Time per loop (processing all 51 strings via individual CLI calls): **~41-42 seconds** (based on initial runs). + ## [2024-03-25] ### `datafog-python` [2.3.2] diff --git a/README.md b/README.md index cc4be8f2..324572b3 100644 --- a/README.md +++ b/README.md @@ -21,10 +21,25 @@ DataFog can be installed via pip: -``` +```bash pip install datafog ``` +### Optional Features (Extras) + +DataFog uses `extras` to manage dependencies for optional features like specific OCR engines. You can install these as needed: + +```bash +# Install with OCR +pip install "datafog[ocr]" + +# Install with Donut for structured document extraction +pip install "datafog[donut]" + +# Install with ALL optional dependencies +pip install "datafog[all]" +``` + # CLI ## 📚 Quick Reference @@ -178,18 +193,21 @@ datafog list-entities ## Getting Started -To use DataFog, you'll need to create a DataFog client with the desired operations. Here's a basic setup: +Initialize the DataFog client. You can specify operations like `SCAN`, `REDACT`, `REPLACE`, `HASH`. ```python from datafog import DataFog -# For text annotation -client = DataFog(operations="scan") - -# For OCR (Optical Character Recognition) -ocr_client = DataFog(operations="extract") +# Initialize with default operations (SCAN) +client = DataFog() ``` +DataFog now includes several performance enhancements: + +- **SpaCy Model Caching:** Loaded spaCy models are cached in memory to speed up subsequent processing requests. +- **Efficient Batch Processing:** The text processing pipeline utilizes spaCy's `nlp.pipe()` internally, processing documents in batches (default batch size is 50, configurable via `DATAFOG_SPACY_BATCH_SIZE` environment variable) and leveraging multiple processes (`n_process=-1`) for improved throughput. +- **Asynchronous Execution:** When using asynchronous methods (`run_text_pipeline`, `run_ocr_pipeline`), potentially blocking operations like spaCy processing and Tesseract OCR are executed in separate threads to avoid blocking the main event loop. + ## Text PII Annotation Here's an example of how to annotate PII in a text document: @@ -300,6 +318,14 @@ Output: You can choose from SHA256 (default), SHA3-256, and MD5 hashing algorithms by specifying the `hash_type` parameter +## Configuration + +### Environment Variables + +- **`DATAFOG_SPACY_BATCH_SIZE`**: Controls the batch size used internally by spaCy's `nlp.pipe()` when processing text documents. Larger batch sizes can improve throughput on systems with sufficient memory but may increase memory usage. + - **Default:** `50` + - **Usage:** Set this environment variable to a positive integer (e.g., `export DATAFOG_SPACY_BATCH_SIZE=100`) to override the default. + ## Examples For more detailed examples, check out our Jupyter notebooks in the `examples/` directory: diff --git a/datafog/__about__.py b/datafog/__about__.py index 88c513ea..76ad18b8 100644 --- a/datafog/__about__.py +++ b/datafog/__about__.py @@ -1 +1 @@ -__version__ = "3.3.0" +__version__ = "4.0.1" diff --git a/datafog/__init__.py b/datafog/__init__.py index 7838dd31..faed52f3 100644 --- a/datafog/__init__.py +++ b/datafog/__init__.py @@ -21,7 +21,6 @@ from .processing.image_processing.pytesseract_processor import PytesseractProcessor from .processing.text_processing.spacy_pii_annotator import SpacyPIIAnnotator from .services.image_service import ImageService -from .services.spark_service import SparkService from .services.text_service import TextService __all__ = [ @@ -29,7 +28,6 @@ "DataFog", "ImageService", "OperationType", - "SparkService", "TextPIIAnnotator", "TextService", "SpacyPIIAnnotator", diff --git a/datafog/main.py b/datafog/main.py index 58224e59..93016f03 100644 --- a/datafog/main.py +++ b/datafog/main.py @@ -17,7 +17,6 @@ from .models.anonymizer import Anonymizer, AnonymizerType, HashType from .processing.text_processing.spacy_pii_annotator import SpacyPIIAnnotator from .services.image_service import ImageService -from .services.spark_service import SparkService from .services.text_service import TextService logger = logging.getLogger("datafog_logger") @@ -33,7 +32,6 @@ class DataFog: Attributes: image_service: Service for image processing and OCR. text_service: Service for text processing and annotation. - spark_service: Optional Spark service for distributed processing. operations: List of operations to perform. anonymizer: Anonymizer for PII redaction, replacement, or hashing. """ @@ -42,14 +40,12 @@ def __init__( self, image_service=None, text_service=None, - spark_service=None, operations: List[OperationType] = [OperationType.SCAN], hash_type: HashType = HashType.SHA256, anonymizer_type: AnonymizerType = AnonymizerType.REPLACE, ): self.image_service = image_service or ImageService() self.text_service = text_service or TextService() - self.spark_service: SparkService = spark_service self.operations: List[OperationType] = operations self.anonymizer = Anonymizer( hash_type=hash_type, anonymizer_type=anonymizer_type @@ -60,9 +56,6 @@ def __init__( ) self.logger.info(f"Image Service: {type(self.image_service)}") self.logger.info(f"Text Service: {type(self.text_service)}") - self.logger.info( - f"Spark Service: {type(self.spark_service) if self.spark_service else 'None'}" - ) self.logger.info(f"Operations: {operations}") self.logger.info(f"Hash Type: {hash_type}") self.logger.info(f"Anonymizer Type: {anonymizer_type}") @@ -232,12 +225,10 @@ class TextPIIAnnotator: Attributes: text_annotator: SpacyPIIAnnotator instance for text annotation. - spark_processor: Optional SparkService for distributed processing. """ def __init__(self): self.text_annotator = SpacyPIIAnnotator.create() - self.spark_processor: SparkService = None def run(self, text, output_path=None): try: @@ -249,8 +240,6 @@ def run(self, text, output_path=None): json.dump(annotated_text, f) return annotated_text - - finally: - # Ensure Spark resources are released - if self.spark_processor: - self.spark_processor.stop() + except Exception as e: + logging.error(f"Error in run: {str(e)}") + raise diff --git a/datafog/models/spacy_nlp.py b/datafog/models/spacy_nlp.py index 6ac37c68..5a25e898 100644 --- a/datafog/models/spacy_nlp.py +++ b/datafog/models/spacy_nlp.py @@ -5,14 +5,53 @@ text annotation, entity recognition, and related NLP tasks. """ +import logging +import os +import threading from typing import List, Optional from uuid import uuid4 import spacy from rich.progress import track +from spacy.language import Language # For type hinting from .annotator import AnnotationResult, AnnotatorRequest +# Set up logging +logger = logging.getLogger(__name__) + +_SPACY_MODEL_CACHE = {} +_CACHE_LOCK = threading.Lock() + + +def _get_spacy_model(model_name: str) -> Language: + """Loads a spaCy model, utilizing a cache to avoid redundant loads.""" + with _CACHE_LOCK: + if model_name in _SPACY_MODEL_CACHE: + logger.debug(f"Cache hit for model: {model_name!r}") + return _SPACY_MODEL_CACHE[model_name] + + # Model not in cache, needs loading (lock ensures only one thread loads) + logger.debug(f"Cache miss for model: {model_name!r}. Loading...") + try: + # Show progress for download + if not spacy.util.is_package(model_name): + logger.info(f"Model {model_name!r} not found. Downloading...") + # Use rich.progress.track for visual feedback + # This is a simplified representation; actual progress tracking might need + # library integration or subprocess monitoring. + # For now, just log the download attempt. + spacy.cli.download(model_name) + + # Load the spaCy model after ensuring it's downloaded + nlp = spacy.load(model_name) + _SPACY_MODEL_CACHE[model_name] = nlp + logger.debug(f"Model {model_name!r} loaded and cached.") + return nlp + except Exception as e: + logger.error(f"Failed to load or download spaCy model {model_name}: {e}") + raise + class SpacyAnnotator: """ @@ -24,41 +63,83 @@ class SpacyAnnotator: def __init__(self, model_name: str = "en_core_web_lg"): self.model_name = model_name - self.nlp = None + self.nlp = None # Keep lazy loading def load_model(self): - if not spacy.util.is_package(self.model_name): - spacy.cli.download(self.model_name) - self.nlp = spacy.load(self.model_name) - - def annotate_text(self, text: str, language: str = "en") -> List[AnnotationResult]: + """Ensures the spaCy model is loaded, utilizing the cache.""" if not self.nlp: - self.load_model() + # Use the cached loader function + self.nlp = _get_spacy_model(self.model_name) + + def annotate_text( + self, texts: List[str], language: str = "en" + ) -> List[List[AnnotationResult]]: + """Annotates a batch of texts using spaCy NLP pipeline. + + Args: + texts (List[str]): A list of texts to annotate. + language (str): The language of the text (currently unused). + + Returns: + List[List[AnnotationResult]]: A list where each inner list contains + AnnotationResult objects for the corresponding + input text. + """ + # Ensure the model is loaded + self.load_model() - annotator_request = AnnotatorRequest( - text=text, - language=language, - correlation_id=str(uuid4()), - score_threshold=0.5, - entities=None, - return_decision_process=False, - ad_hoc_recognizers=None, - context=None, - ) - doc = self.nlp(annotator_request.text) results = [] - for ent in track(doc.ents, description="Processing entities"): - result = AnnotationResult( - start=ent.start_char, - end=ent.end_char, - score=0.8, # Placeholder score - entity_type=ent.label_, - recognition_metadata=None, + + # Get batch size from environment variable or use default + default_batch_size = 50 + try: + batch_size_str = os.getenv("DATAFOG_SPACY_BATCH_SIZE") + batch_size = int(batch_size_str) if batch_size_str else default_batch_size + if batch_size <= 0: + logger.warning( + f"Invalid DATAFOG_SPACY_BATCH_SIZE {batch_size_str!r}. " + f"Must be a positive integer. Using default: {default_batch_size}" + ) + batch_size = default_batch_size + except (ValueError, TypeError): + # Handle cases where env var is set but not a valid integer + batch_size_str = os.getenv( + "DATAFOG_SPACY_BATCH_SIZE" + ) # Get it again for logging + logger.warning( + f"Invalid DATAFOG_SPACY_BATCH_SIZE {batch_size_str!r}. " + f"Must be an integer. Using default: {default_batch_size}" ) - results.append(result) + batch_size = default_batch_size + + logger.info(f"Using spaCy batch size: {batch_size}") + + # Process texts in batches using nlp.pipe + docs = self.nlp.pipe(texts, batch_size=batch_size, n_process=-1) + + # Wrap with track for progress bar + processed_docs = track(docs, description="Annotating text...", total=len(texts)) + + # Process each doc + for doc in processed_docs: + doc_results: List[AnnotationResult] = ( + [] + ) # Initialize list for current doc's results + for ent in doc.ents: + result = AnnotationResult( + start=ent.start_char, + end=ent.end_char, + score=0.8, # Placeholder score - Consider using actual scores if available + entity_type=ent.label_, + recognition_metadata=None, # Consider adding metadata if needed + ) + doc_results.append(result) + results.append(doc_results) # Add results for this doc to the main list + return results def show_model_path(self) -> str: + # This check now correctly uses the updated load_model -> _get_spacy_model if not self.nlp: self.load_model() return str(self.nlp.path) @@ -73,5 +154,6 @@ def list_models() -> List[str]: @staticmethod def list_entities() -> List[str]: - nlp = spacy.load("en_core_web_lg") + # Use the cached loader function for the default model + nlp = _get_spacy_model("en_core_web_lg") return [ent for ent in nlp.pipe_labels["ner"]] diff --git a/datafog/notes/v4.0.1-tickets.md b/datafog/notes/v4.0.1-tickets.md new file mode 100644 index 00000000..4285d7a1 --- /dev/null +++ b/datafog/notes/v4.0.1-tickets.md @@ -0,0 +1,43 @@ +## Ticket:Spark Dependency Removal (Commented Out) + +- **`datafog/services/spark_service.py`**: Commented out `pyspark` imports (`DataFrame`, `SparkSession`, `udf`, `ArrayType`, `StringType`) and the `ensure_installed("pyspark")` call. +- **`datafog/processing/spark_processing/pyspark_udfs.py`**: Commented out `pyspark` imports and `ensure_installed("pyspark")` calls within the `pii_annotator` and `broadcast_pii_annotator_udf` functions. +- **`datafog/processing/spark_processing/__init__.py`**: Commented out the entire `get_pyspark_udfs` function. +- **`docs/important-concepts.rst`**: Commented out the `.. automodule:: datafog.processing.spark_processing.pyspark_udfs` directive. + +## Ticket: Document dependencies across Spacy, Donut, and Tesseract + +**Status:** DONE + +**Title:** Document dependencies across Spacy, Donut, and Tesseract + +**Description:** Document dependencies across Spacy, Donut, and Tesseract in this file. + +**Tasks:** + +- Add Spacy dependencies to setup.py. +- Add Donut dependencies to setup.py. +- Add Tesseract dependencies to setup.py. + +**Acceptance Criteria:** + +- Spacy, Donut, and Tesseract dependencies are documented in setup.py. + +## Ticket: Centralize Version Definition + +**Title:** Read **version** from datafog/**about**.py in setup.py + +**Description:** Currently, the package version might be duplicated or inconsistently defined. We need to centralize the version definition in datafog/**about**.py. + +**Tasks:** + +- Ensure datafog/**about**.py exists and contains a **version** string variable (e.g., **version** = "4.1.0"). +- Modify setup.py to read this **version** variable from datafog/**about**.py. Common patterns involve reading the file and executing its content in a temporary namespace or using regular expressions. +- Remove any hardcoded version assignment within setup.py itself. +- Verify that pip install . and building distributions (sdist, wheel) correctly pick up the version from **about**.py. + +**Acceptance Criteria:** + +- The package version is defined only in datafog/**about**.py. +- setup.py successfully reads the version from **about**.py during installation and build processes. +- Running import datafog; print(datafog.**version**) (if applicable) shows the correct version. diff --git a/datafog/processing/image_processing/donut_processor.py b/datafog/processing/image_processing/donut_processor.py index b3554140..cc562add 100644 --- a/datafog/processing/image_processing/donut_processor.py +++ b/datafog/processing/image_processing/donut_processor.py @@ -19,6 +19,21 @@ from .image_downloader import ImageDownloader +# Attempt imports and provide helpful error messages +try: + import torch +except ModuleNotFoundError: + raise ModuleNotFoundError( + "torch is not installed. Please install it to use Donut features: pip install 'datafog[donut]'" + ) +try: + from transformers import DonutProcessor as TransformersDonutProcessor + from transformers import VisionEncoderDecoderModel +except ModuleNotFoundError: + raise ModuleNotFoundError( + "transformers is not installed. Please install it to use Donut features: pip install 'datafog[donut]'" + ) + class DonutProcessor: """ @@ -30,13 +45,6 @@ class DonutProcessor: """ def __init__(self, model_path="naver-clova-ix/donut-base-finetuned-cord-v2"): - self.ensure_installed("torch") - self.ensure_installed("transformers") - - import torch - from transformers import DonutProcessor as TransformersDonutProcessor - from transformers import VisionEncoderDecoderModel - self.processor = TransformersDonutProcessor.from_pretrained(model_path) self.model = VisionEncoderDecoderModel.from_pretrained(model_path) self.device = "cuda" if torch.cuda.is_available() else "cpu" @@ -44,14 +52,6 @@ def __init__(self, model_path="naver-clova-ix/donut-base-finetuned-cord-v2"): self.model.eval() self.downloader = ImageDownloader() - def ensure_installed(self, package_name): - try: - importlib.import_module(package_name) - except ImportError: - subprocess.check_call( - [sys.executable, "-m", "pip", "install", package_name] - ) - def preprocess_image(self, image: Image.Image) -> np.ndarray: # Convert to RGB if the image is not already in RGB mode if image.mode != "RGB": diff --git a/datafog/processing/image_processing/pytesseract_processor.py b/datafog/processing/image_processing/pytesseract_processor.py index f7291470..e217a1d9 100644 --- a/datafog/processing/image_processing/pytesseract_processor.py +++ b/datafog/processing/image_processing/pytesseract_processor.py @@ -5,6 +5,7 @@ using the Pytesseract OCR engine. """ +import asyncio import logging import pytesseract @@ -21,7 +22,8 @@ class PytesseractProcessor: async def extract_text_from_image(self, image: Image.Image) -> str: try: - return pytesseract.image_to_string(image) + # Run the blocking function in a separate thread + return await asyncio.to_thread(pytesseract.image_to_string, image) except Exception as e: logging.error(f"Pytesseract error: {str(e)}") raise diff --git a/datafog/processing/spark_processing/__init__.py b/datafog/processing/spark_processing/__init__.py deleted file mode 100644 index 8d584957..00000000 --- a/datafog/processing/spark_processing/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -# from .pyspark_udfs import broadcast_pii_annotator_udf, pii_annotator - - -def get_pyspark_udfs(): - from .pyspark_udfs import broadcast_pii_annotator_udf, pii_annotator - - return broadcast_pii_annotator_udf, pii_annotator diff --git a/datafog/processing/spark_processing/pyspark_udfs.py b/datafog/processing/spark_processing/pyspark_udfs.py deleted file mode 100644 index 81d6986f..00000000 --- a/datafog/processing/spark_processing/pyspark_udfs.py +++ /dev/null @@ -1,77 +0,0 @@ -""" -PySpark UDFs for PII annotation and related utilities. - -This module provides functions for PII (Personally Identifiable Information) annotation -using SpaCy models in a PySpark environment. It includes utilities for installing -dependencies, creating and broadcasting PII annotator UDFs, and performing PII annotation -on text data. -""" - -import importlib -import subprocess -import sys - -PII_ANNOTATION_LABELS = ["DATE_TIME", "LOC", "NRP", "ORG", "PER"] -MAXIMAL_STRING_SIZE = 1000000 - - -def pii_annotator(text: str, broadcasted_nlp) -> list[list[str]]: - """Extract features using en_core_web_lg model. - - Returns: - list[list[str]]: Values as arrays in order defined in the PII_ANNOTATION_LABELS. - """ - ensure_installed("pyspark") - ensure_installed("spacy") - import spacy - from pyspark.sql import SparkSession - from pyspark.sql.functions import udf - from pyspark.sql.types import ArrayType, StringType, StructField, StructType - - if text: - if len(text) > MAXIMAL_STRING_SIZE: - # Cut the strings for required sizes - text = text[:MAXIMAL_STRING_SIZE] - nlp = broadcasted_nlp.value - doc = nlp(text) - - # Pre-create dictionary with labels matching to expected extracted entities - classified_entities: dict[str, list[str]] = { - _label: [] for _label in PII_ANNOTATION_LABELS - } - for ent in doc.ents: - # Add entities from extracted values - classified_entities[ent.label_].append(ent.text) - - return [_ent for _ent in classified_entities.values()] - else: - return [[] for _ in PII_ANNOTATION_LABELS] - - -def broadcast_pii_annotator_udf( - spark_session=None, spacy_model: str = "en_core_web_lg" -): - """Broadcast PII annotator across Spark cluster and create UDF""" - ensure_installed("pyspark") - ensure_installed("spacy") - import spacy - from pyspark.sql import SparkSession - from pyspark.sql.functions import udf - from pyspark.sql.types import ArrayType, StringType, StructField, StructType - - if not spark_session: - spark_session = SparkSession.builder.getOrCreate() - broadcasted_nlp = spark_session.sparkContext.broadcast(spacy.load(spacy_model)) - - pii_annotation_udf = udf( - lambda text: pii_annotator(text, broadcasted_nlp), - ArrayType(ArrayType(StringType())), - ) - return pii_annotation_udf - - -def ensure_installed(self, package_name): - try: - importlib.import_module(package_name) - except ImportError: - subprocess.check_call([sys.executable, "-m", "pip", "install", package_name]) diff --git a/datafog/services/__init__.py b/datafog/services/__init__.py index 8b47cdfa..3e139ff9 100644 --- a/datafog/services/__init__.py +++ b/datafog/services/__init__.py @@ -1,3 +1,2 @@ from .image_service import ImageService -from .spark_service import SparkService from .text_service import TextService diff --git a/datafog/services/spark_service.py b/datafog/services/spark_service.py deleted file mode 100644 index 04bfcaf4..00000000 --- a/datafog/services/spark_service.py +++ /dev/null @@ -1,49 +0,0 @@ -""" -Spark service for data processing and analysis. - -Provides a wrapper around PySpark functionality, including session creation, -JSON reading, and package management. -""" - -import importlib -import json -import subprocess -import sys -from typing import Any, List - - -class SparkService: - """ - Manages Spark operations and dependencies. - - Initializes a Spark session, handles imports, and provides methods for - data reading and package installation. - """ - - def __init__(self): - self.spark = self.create_spark_session() - self.ensure_installed("pyspark") - - from pyspark.sql import DataFrame, SparkSession - from pyspark.sql.functions import udf - from pyspark.sql.types import ArrayType, StringType - - self.SparkSession = SparkSession - self.DataFrame = DataFrame - self.udf = udf - self.ArrayType = ArrayType - self.StringType = StringType - - def create_spark_session(self): - return self.SparkSession.builder.appName("datafog").getOrCreate() - - def read_json(self, path: str) -> List[dict]: - return self.spark.read.json(path).collect() - - def ensure_installed(self, package_name): - try: - importlib.import_module(package_name) - except ImportError: - subprocess.check_call( - [sys.executable, "-m", "pip", "install", package_name] - ) diff --git a/docs/definitions.rst b/docs/definitions.rst index 6ac214f7..7f9c2da8 100644 --- a/docs/definitions.rst +++ b/docs/definitions.rst @@ -15,5 +15,4 @@ Class Definitions generated/datafog.models.spacy_nlp.SpacyAnnotator generated/datafog.services.image_service.ImageDownloader generated/datafog.services.image_service.ImageService - generated/datafog.services.spark_service.SparkService generated/datafog.services.text_service.TextService \ No newline at end of file diff --git a/docs/important-concepts.rst b/docs/important-concepts.rst index 791fa0a0..86482a42 100644 --- a/docs/important-concepts.rst +++ b/docs/important-concepts.rst @@ -32,8 +32,6 @@ Services Core services: * ImageService Image handling and OCR -* SparkService - PySpark wrapper * TextService PII annotation @@ -85,9 +83,6 @@ Processors .. automodule:: datafog.processing.text_processing.spacy_pii_annotator :members: -.. automodule:: datafog.processing.spark_processing.pyspark_udfs - :members: - Services ------------------------- @@ -101,15 +96,6 @@ Services ImageDownloader ImageService -.. automodule:: datafog.services.spark_service - :members: - -.. autosummary:: - :toctree: generated/ - :template: class.rst - SparkService - - .. automodule:: datafog.services.text_service :members: @@ -117,3 +103,7 @@ Services :toctree: generated/ :template: class.rst TextService + +Refer to the TextService section for information on PII annotation. + +Refer to the ImageService section for information on image processing and OCR. diff --git a/docs/python-sdk.rst b/docs/python-sdk.rst index dbf1982d..3a1203c6 100644 --- a/docs/python-sdk.rst +++ b/docs/python-sdk.rst @@ -5,7 +5,7 @@ DataFog Python SDK Overview -------- The main entrypoint for the SDK is through the DataFog class, defined in :mod:`datafog.main`. -Here you can initialize the different services, including TextService, ImageService, and SparkService. +Here you can initialize the different services, including TextService and ImageService. Definitions ----------- diff --git a/notes/ROADMAP.md b/notes/ROADMAP.md new file mode 100644 index 00000000..94f3bd1b --- /dev/null +++ b/notes/ROADMAP.md @@ -0,0 +1,73 @@ +--- + +### **v4.1.0 — Baseline stability** + +- **MUST** read `__version__` from `datafog/__about__.py` and import it in `setup.py`; delete the duplicate there. +- **MUST** remove every `ensure_installed()` runtime `pip install`; fail fast instead. +- **MUST** document OCR/Donut extras in `setup.py[extras]`. + +--- + +### **v4.2.0 — Faster spaCy path** + +- **MUST** hold the spaCy `nlp` object in a module-level cache (singleton). +- **MUST** replace per-doc loops with `nlp.pipe(batch_size=?, n_process=-1)`. +- **MUST** run spaCy and Tesseract calls in `asyncio.to_thread()` (or a thread-pool) so the event-loop stays free. +- **SHOULD** expose `PIPE_BATCH_SIZE` env var for tuning. + +--- + +### **v4.3.0 — Strong types, predictable output** + +- **MUST** make `_process_text` always return `Dict[str, Dict]`. +- **MUST** add `mypy --strict` to CI; fix any revealed issues. +- **SHOULD** convert `datafog.config` to a Pydantic v2 `BaseSettings`. + +--- + +### **v4.4.0 — Clean OCR architecture** + +- **MUST** split `ImageService` into `TesseractOCR` and `DonutOCR`, each with `extract_text(Image)->str`. +- **MUST** let users pick via `ImageService(backend="tesseract"|"donut")` or the `DATAFOG_DEFAULT_OCR` env var. +- **SHOULD** add unit tests that stub each backend independently. + +--- + +### **v4.5.0 — Rust-powered pattern matching (optional wheel)** + +- **MUST** create a PyO3 extension `datafog._fastregex` that wraps `aho-corasick` / `regex-automata`. +- **MUST** auto-import it when available; fall back to pure-Python silently. +- **SHOULD** publish platform wheels under `pip install "datafog[fastregex]"`. + +--- + +### **v4.6.0 — Streaming and zero-copy** + +- **MUST** add `async def stream_text_pipeline(iterable[str]) -> AsyncIterator[Result]`. +- **MUST** scan CSV/JSON via `pyarrow.dataset` to avoid reading the whole file into RAM. +- **SHOULD** provide example notebook comparing latency/bandwidth vs. v4.5. + +--- + +### **v4.7.0 — GPU / transformer toggle** + +- **MUST** accept `DataFog(use_gpu=True)` which loads `en_core_web_trf` in half precision if CUDA is present. +- **MUST** fall back gracefully on CPU-only hosts. +- **SHOULD** benchmark and log model choice at INFO level. + +--- + +### **v4.8.0 — Fast anonymizer core** + +- **MUST** rewrite `Anonymizer.replace_pii/redact_pii/hash_pii` in Cython (single-pass over the string). +- **MUST** switch hashing to OpenSSL EVP via `cffi` for SHA-256/SHA3-256. +- **SHOULD** guard with `pip install "datafog[fast]"`. + +--- + +### **v4.9.0 — Edge & CI polish** + +- **MUST** compile the annotator and anonymizer to WebAssembly using `maturin`, package as `_datafog_wasm`. +- **MUST** auto-load WASM build on `wasmtime` when `import datafog.wasm` succeeds. +- **MUST** cache spaCy model artefacts in GitHub Actions with `actions/cache`, keyed by `model-hash`. +- **SHOULD** update docs and `README.md` badges for new extras and WASM support. diff --git a/notes/v4.1.0-tickets.md b/notes/v4.1.0-tickets.md new file mode 100644 index 00000000..b66d119b --- /dev/null +++ b/notes/v4.1.0-tickets.md @@ -0,0 +1,73 @@ +# v4.1.0 Tickets - Baseline Stability + +--- + +## Ticket 1: Centralize Version Definition + +**Title:** Read `__version__` from `datafog/__about__.py` in `setup.py` + +**Description:** +Currently, the package version might be duplicated or inconsistently defined. We need to centralize the version definition in `datafog/__about__.py`. + +**Tasks:** + +1. Ensure `datafog/__about__.py` exists and contains a `__version__` string variable (e.g., `__version__ = "4.1.0"`). +2. Modify `setup.py` to read this `__version__` variable from `datafog/__about__.py`. Common patterns involve reading the file and executing its content in a temporary namespace or using regular expressions. +3. Remove any hardcoded `version` assignment within `setup.py` itself. +4. Verify that `pip install .` and building distributions (`sdist`, `wheel`) correctly pick up the version from `__about__.py`. + +**Acceptance Criteria:** + +- The package version is defined _only_ in `datafog/__about__.py`. +- `setup.py` successfully reads the version from `__about__.py` during installation and build processes. +- Running `import datafog; print(datafog.__version__)` (if applicable) shows the correct version. + +--- + +## Ticket 2: Remove Runtime Dependency Installations + +**Title:** Remove `ensure_installed()` runtime `pip install` calls + +**Description:** +The codebase currently uses functions like `ensure_installed()` that attempt to `pip install` missing dependencies at runtime. This practice is unreliable, can hide dependency issues, slow down startup, and interfere with environment management. We must remove this pattern and adopt a "fail fast" approach. + +**Tasks:** + +1. Identify all code locations where runtime `pip install` commands are executed (e.g., calls to `ensure_installed`, `subprocess.run(['pip', 'install', ...])`). +2. Remove these runtime installation calls entirely. +3. Replace them with standard `import` statements. If an `ImportError` occurs, the program should exit gracefully, clearly stating which dependency is missing and how to install it (e.g., "Please install the 'X' package: pip install datafog[feature]"). +4. Ensure all necessary dependencies are listed correctly in `setup.py`'s `install_requires` or `extras_require`. + +**Acceptance Criteria:** + +- No code attempts to install packages using `pip` or similar mechanisms during program execution. +- If an optional dependency (part of an `extra`) is needed but not installed, the program raises an `ImportError` with a helpful message instructing the user how to install the required extra. +- Core dependencies listed in `install_requires` are assumed to be present; missing core dependencies will naturally cause `ImportError` on startup. + +--- + +## Ticket 3: Define and Document Setup Extras for OCR + +**Title:** Document OCR/Donut extras in `setup.py[extras_require]` + +**Description:** +The project offers optional OCR functionality using Tesseract and/or Donut models, which have their own dependencies. These optional dependencies need to be formally defined using `extras_require` in `setup.py` and documented for users. + +**Tasks:** + +1. Identify all dependencies required _only_ for Tesseract functionality. +2. Identify all dependencies required _only_ for Donut functionality. +3. Define appropriate extras in the `extras_require` dictionary within `setup.py`. Suggestions: + - `'ocr': ['pytesseract', 'pillow', ...]` (for Tesseract) + - `'donut': ['transformers[torch]', 'sentencepiece', ...]` (for Donut) + - Optionally, a combined extra: `'all_ocr': ['pytesseract', 'pillow', 'transformers[torch]', 'sentencepiece', ...]` or include dependencies in a general `'ocr'` extra if they don't conflict significantly. +4. Update the `README.md` and any installation documentation (e.g., `docs/installation.md`) to explain these extras and how users can install them (e.g., `pip install "datafog[ocr]"` or `pip install "datafog[donut]"`). + +**Acceptance Criteria:** + +- `setup.py` contains an `extras_require` section defining keys like `ocr` and/or `donut`. +- Installing the package with these extras (e.g., `pip install .[ocr]`) successfully installs the associated dependencies. +- Documentation clearly explains the available extras and the installation commands. +- Core installation (`pip install .`) does _not_ install the OCR-specific dependencies. + +--- diff --git a/notes/v4.2.0-tickets.md b/notes/v4.2.0-tickets.md new file mode 100644 index 00000000..bf4e7f85 --- /dev/null +++ b/notes/v4.2.0-tickets.md @@ -0,0 +1,41 @@ +# v4.2.0 — Faster spaCy Path: Work Breakdown + +This outlines the specific tasks required to implement the performance improvements for the spaCy processing pipeline planned for v4.2.0. + +## Task 1: Implement Module-Level Cache for spaCy `nlp` Object + +- **Goal:** Avoid reloading spaCy models repeatedly by caching the loaded `nlp` object. +- **Tickets:** + - [x] **TICKET-4.2.1:** Design and implement a singleton or module-level cache mechanism in `datafog.models.spacy_nlp` to store loaded `spacy.Language` objects, keyed by model name/path. + - [x] **TICKET-4.2.2:** Refactor `SpacyAnnotator` (and potentially `SpacyNLP`) to request the `nlp` object from the cache instead of loading it directly. + - [x] **TICKET-4.2.3:** Add/update unit tests to verify that the same `nlp` object is returned for subsequent requests for the same model and that different models are cached separately. + +## Task 2: Batch Processing with `nlp.pipe()` + +- **Goal:** Process multiple documents efficiently using spaCy's `nlp.pipe` method. +- **Tickets:** + - [x] **TICKET-4.2.4:** Modify the core spaCy processing logic (likely in `SpacyAnnotator`) to accept a list of texts and use `nlp.pipe()` for batch annotation. + - [x] **TICKET-4.2.5:** Determine and set a sensible default `batch_size` for `nlp.pipe`. + - [x] **TICKET-4.2.6:** Ensure `n_process=-1` is used within `nlp.pipe` calls to leverage multi-processing. + - [x] **TICKET-4.2.7:** Update relevant unit and integration tests to work with batch inputs and outputs. + +## Task 3: Asynchronous Execution of Blocking Calls + +- **Goal:** Ensure blocking synchronous calls (like spaCy processing or Tesseract OCR) don't halt the asyncio event loop when called from async functions. +- **Method:** Use `asyncio.to_thread()`. +- **Tickets:** + - [x] **TICKET-4.2.8:** Identify synchronous spaCy calls within async functions (e.g., potentially within `run_text_pipeline_async` or similar). + - [x] **TICKET-4.2.9:** Wrap the identified spaCy calls with `await asyncio.to_thread()`. (Note: This was already correctly implemented in `TextService.annotate_text_async`). + - [x] **TICKET-4.2.10:** Identify synchronous Tesseract OCR calls within async functions (e.g., potentially within `run_ocr_pipeline_async` or similar). + - [x] **TICKET-4.2.11:** Wrap the identified Tesseract calls with `await asyncio.to_thread()`. + - [x] **TICKET-4.2.12:** Add/update tests to verify asynchronous execution doesn't block and yields correct results. (Note: Existing async tests in `test_image_service.py` and `test_text_service.py` verify the successful execution of functions using `asyncio.to_thread`). + +## Task 4: Configurable Batch Size via Environment Variable + +- **Goal:** Allow users to tune the `nlp.pipe` batch size for their specific hardware and workload. +- **Tickets:** + - [x] **TICKET-4.2.13:** Implement logic to read an environment variable (e.g., `DATAFOG_SPACY_PIPE_BATCH_SIZE`) when setting the `batch_size` for `nlp.pipe`. + - [x] **TICKET-4.2.14:** Ensure the code falls back gracefully to the default `batch_size` if the environment variable is not set or invalid. + - [x] **TICKET-4.2.15:** Document the `DATAFOG_SPACY_PIPE_BATCH_SIZE` environment variable in the project's README or configuration documentation. + +## Task 5: Documentation Update diff --git a/requirements.txt b/requirements.txt index e7be4549..eb0e8dd0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,7 @@ Pillow sentencepiece protobuf pytesseract +torch aiohttp pytest-asyncio numpy diff --git a/scripts/bench_cli.sh b/scripts/bench_cli.sh new file mode 100755 index 00000000..6b1a0ec9 --- /dev/null +++ b/scripts/bench_cli.sh @@ -0,0 +1,116 @@ +#!/bin/bash +# Simple CLI benchmark for DataFog text service + +# --- Configuration --- +NUM_LOOPS=5 +# SAMPLE_TEXT="Redact my name, John Doe, and my organization, Acme Inc." +JSON_FILE="scripts/sample_otel_log.json" # Added +VENV_PATH="./venv_4.0.1" # Assuming venv is in the project root +TIME_CMD="/usr/bin/time" # Use full path for `time` to avoid shell built-in + +# Get project root directory (assuming the script is run from the project root) +PROJECT_DIR=$(pwd) + +# Check if venv exists +if [ ! -d "$VENV_PATH" ]; then + echo "Error: Virtual environment not found at $VENV_PATH" + exit 1 +fi + +# Construct absolute paths +DATAFOG_CLI="$VENV_PATH/bin/datafog" +JSON_FILE_PATH="$PROJECT_DIR/$JSON_FILE" # Added + +# Check if files/commands exist +if [ ! -x "$DATAFOG_CLI" ]; then + echo "Error: DataFog CLI not found or not executable at $DATAFOG_CLI" + exit 1 +fi + +# Check if jq is installed +if ! command -v jq &> /dev/null +then + echo "Error: jq is not installed. Please install jq (e.g., 'brew install jq' or 'sudo apt-get install jq')." + exit 1 +fi + +# Check if JSON file exists +if [ ! -f "$JSON_FILE_PATH" ]; then + echo "Error: JSON file not found at $JSON_FILE_PATH" + exit 1 +fi + + + + +# Function to extract strings using jq +get_json_strings() { # Added + jq -r '.. | strings?' "$JSON_FILE_PATH" +} + + +echo "Starting CLI Benchmark for $JSON_FILE..." # Modified +echo "Working Directory: $(pwd)" +echo "Using DataFog CLI: $DATAFOG_CLI" +echo "Timing $NUM_LOOPS executions." +echo "---" + +# --- Warm-up Run --- +echo "Performing warm-up run..." +# Pipe extracted strings to xargs and then to datafog +get_json_strings | xargs -I {} "$DATAFOG_CLI" redact-text "{}" > /dev/null # Modified +exit_code=$? +if [ $exit_code -ne 0 ]; then + echo "Error: Warm-up run failed with exit code $exit_code." + exit 1 +fi +echo "Warm-up complete." + +# --- Timed Runs --- +echo "Starting timed benchmark loops..." +total_real_time=0 +total_user_time=0 +total_sys_time=0 + +for i in $(seq 1 $NUM_LOOPS) +do + echo "Running loop $i/$NUM_LOOPS..." + + # Execute the command using time and capture stderr (which contains time output) + # Pipe extracted strings to xargs and then to datafog + time_output=$({ $TIME_CMD -p bash -c "jq -r '.. | strings?' \"$JSON_FILE_PATH\" | xargs -I {} \"$DATAFOG_CLI\" redact-text \"{}\" > /dev/null"; } 2>&1) # Modified + exit_code=$? + if [ $exit_code -ne 0 ]; then + echo "Error: Benchmark loop $i failed with exit code $exit_code." + # Decide if you want to exit or continue + # exit 1 + continue # Skipping this loop's result + fi + + # Extract real, user, sys times using awk + real_time=$(echo "$time_output" | awk '/real/ {print $2}') + user_time=$(echo "$time_output" | awk '/user/ {print $2}') + sys_time=$(echo "$time_output" | awk '/sys/ {print $2}') + + # Add to totals using awk for floating point arithmetic + total_real_time=$(awk "BEGIN {print $total_real_time + $real_time}") + total_user_time=$(awk "BEGIN {print $total_user_time + $user_time}") + total_sys_time=$(awk "BEGIN {print $total_sys_time + $sys_time}") + + echo "Loop $i time: Real=${real_time}s User=${user_time}s Sys=${sys_time}s" +done + +# Calculate averages +avg_real_time=$(awk "BEGIN {print $total_real_time / $NUM_LOOPS}") +avg_user_time=$(awk "BEGIN {print $total_user_time / $NUM_LOOPS}") +avg_sys_time=$(awk "BEGIN {print $total_sys_time / $NUM_LOOPS}") + +echo "" +echo "Average Execution Time (over $NUM_LOOPS runs):" +echo " Real: ${avg_real_time}s" +echo " User: ${avg_user_time}s" +echo " Sys: ${avg_sys_time}s" +echo "(Execution = extracting strings from $JSON_FILE with jq, piping each via xargs to 'datafog redact-text')" # Modified + +echo "==================================" +echo "Benchmark complete." \ No newline at end of file diff --git a/scripts/bench_sdk.py b/scripts/bench_sdk.py new file mode 100755 index 00000000..ce70a5c7 --- /dev/null +++ b/scripts/bench_sdk.py @@ -0,0 +1,119 @@ +import json +import os +import sys +import timeit +from typing import Any, List + +# Ensure the project root is in the Python path +project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, project_root) + +try: + from datafog.models.anonymizer import Anonymizer, AnonymizerType + from datafog.models.spacy_nlp import SpacyAnnotator +except ImportError: + print("Error: Could not import SpacyAnnotator or Anonymizer.") + print("Make sure datafog-python is installed in your environment.") + print(f"Project root added to path: {project_root}") + sys.exit(1) + +# --- Configuration --- +NUM_RUNS = 1 # Run the full file processing once per timeit loop +NUM_LOOPS = 10 # Number of times to repeat the timeit measurement +JSON_FILE_PATH = os.path.join(project_root, "scripts", "sample_otel_log.json") + + +# --- Helper Function to Extract Strings --- +def extract_strings_from_json(data: Any) -> List[str]: + """Recursively extract all string values from a JSON object/list.""" + strings = [] + if isinstance(data, dict): + for key, value in data.items(): + if isinstance(key, str): + strings.append(key) + strings.extend(extract_strings_from_json(value)) + elif isinstance(data, list): + for item in data: + strings.extend(extract_strings_from_json(item)) + elif isinstance(data, str): + strings.append(data) + return strings + + +print(f"Starting SDK Benchmark for {os.path.basename(JSON_FILE_PATH)}...") +print( + f"Timing {NUM_RUNS} full file processing execution(s) per loop, repeated {NUM_LOOPS} times." +) +print("---") + +# --- Setup Code for timeit --- +# This code runs once before each timing loop (NUM_LOOPS) +setup_code = """ +import os +import sys +project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, project_root) +from datafog.models.spacy_nlp import SpacyAnnotator +from datafog.models.anonymizer import Anonymizer, AnonymizerType + +# Instantiate the services +# Model loading happens here or on first call +annotator = SpacyAnnotator() +anonymizer = Anonymizer(anonymizer_type=AnonymizerType.REDACT) +""" + +# --- Get Sample Data --- +try: + with open(JSON_FILE_PATH, "r", encoding="utf-8") as f: + json_data = json.load(f) + sample_texts = extract_strings_from_json(json_data) + if not sample_texts: + print(f"Error: No strings found in {JSON_FILE_PATH}") + sys.exit(1) +except FileNotFoundError: + print(f"Error: JSON file not found at {JSON_FILE_PATH}") + sys.exit(1) +except json.JSONDecodeError as e: + print(f"Error decoding JSON {JSON_FILE_PATH}: {e}") + sys.exit(1) +except Exception as e: + print(f"Error reading or processing JSON file: {e}") + sys.exit(1) + +# --- Statement to Time --- +stmt_to_time = f""" +for text in {sample_texts}: + if text: + annotations = annotator.annotate_text(text) + anonymizer.anonymize(text, annotations) +""" + +# --- Running the Benchmark --- +try: + print(f"Benchmarking processing of {len(sample_texts)} strings from the file:") + # timeit runs the setup code, then runs stmt_to_time 'number' times, + # and repeats this process 'repeat' times (if repeat is used). + # It returns a list of times for each repetition. + # We use number=NUM_RUNS here. + times = timeit.repeat( + stmt=stmt_to_time, + setup=setup_code, + number=NUM_RUNS, # Number of executions per timing loop + repeat=NUM_LOOPS, # Number of timing loops + ) + + # Calculate average time per execution across all loops + min_time_per_loop = min(times) + avg_time_per_run = min_time_per_loop / NUM_RUNS + + print("---") + print(f"Fastest loop time ({NUM_RUNS} runs): {min_time_per_loop:.6f} seconds") + print( + f"Average time per single full file processing (in fastest loop): {avg_time_per_run:.6f} seconds" + ) + print("==================================") + +except Exception as e: + print(f"An error occurred during benchmarking: {e}") + +print("Benchmark complete.") diff --git a/scripts/sample_otel_log.json b/scripts/sample_otel_log.json new file mode 100644 index 00000000..37c28f08 --- /dev/null +++ b/scripts/sample_otel_log.json @@ -0,0 +1,32 @@ +{ + "Timestamp": "2025-04-27T19:50:38.123456789Z", + "ObservedTimestamp": "2025-04-27T19:50:38.987654321Z", + "TraceId": "a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6", + "SpanId": "1a2b3c4d5e6f7a8b", + "SeverityText": "ERROR", + "SeverityNumber": 17, + "Body": "User login failed for user jane.doe@example.com. Error processing payment for order #12345 associated with account JD98765. IP address: 192.168.1.101. Session ID: sess_abc123.", + "Attributes": { + "http.method": "POST", + "http.url": "https://api.example.com/v1/login", + "http.status_code": 401, + "user.id": "jane.doe@example.com", + "user.name": "Jane Doe", + "customer.id": "JD98765", + "error.message": "Invalid credentials provided by user from 192.168.1.101", + "payment.details.card_last4": "4242", + "session.id": "sess_abc123", + "thread.id": 12, + "thread.name": "worker-3" + }, + "Resource": { + "service.name": "auth-service", + "service.version": "1.2.5", + "service.instance.id": "auth-service-7bdf9f5cf-xyz12", + "deployment.environment": "production", + "host.name": "prod-auth-server-01", + "cloud.provider": "aws", + "cloud.region": "us-west-2", + "cloud.account.id": "123456789012" + } +} diff --git a/setup.py b/setup.py index ffdca1a4..3e86737c 100644 --- a/setup.py +++ b/setup.py @@ -1,11 +1,16 @@ +import os + from setuptools import find_packages, setup # Read README for the long description with open("README.md", "r") as f: long_description = f.read() -# Use a single source of truth for the version -__version__ = "4.0.0" +# Get version from __about__.py +about = {} +here = os.path.abspath(os.path.dirname(__file__)) +with open(os.path.join(here, "datafog", "__about__.py"), "r") as f: + exec(f.read(), about) project_urls = { "Homepage": "https://datafog.ai", @@ -17,7 +22,7 @@ setup( name="datafog", - version=__version__, + version=about["__version__"], author="Sid Mohan", author_email="sid@datafog.ai", description="Scan, redact, and manage PII in your documents before they get uploaded to a Retrieval Augmented Generation (RAG) system.", @@ -25,24 +30,30 @@ long_description_content_type="text/markdown", packages=find_packages(), install_requires=[ + # Core dependencies "pandas", "requests==2.32.3", - "spacy==3.7.5", "pydantic", - "Pillow", - "sentencepiece", + "Pillow", # Image processing "protobuf", - "pytesseract", "aiohttp", - "pytest-asyncio", "numpy", "fastapi", "asyncio", "setuptools", "pydantic-settings==2.3.4", "typer==0.12.3", - "sphinx", + "sphinx", # Documentation "cryptography", + # Spacy dependencies + "spacy==3.7.5", + # Tesseract dependencies + "pytesseract", + # Donut dependencies (requires transformers, torch) + "sentencepiece", + "torch", # Add torch for Donut model support + # Development/Testing dependencies (moved to extras_require ideally) + "pytest-asyncio", ], python_requires=">=3.10,<3.13", entry_points={ @@ -82,6 +93,25 @@ "pytest-cov", "build", "twine", + "ipykernel", + ], + "ocr": [ + "pytesseract>=0.3.10", + "Pillow>=9.0.0", + ], + "donut": [ + "torch>=1.8.0", + "transformers[torch]>=4.10.0", + "sentencepiece", + "protobuf", + ], + "all": [ + "pytesseract>=0.3.10", + "Pillow>=9.0.0", + "torch>=1.8.0", + "transformers[torch]>=4.10.0", + "sentencepiece", + "protobuf", ], }, ) diff --git a/tests/test_main.py b/tests/test_main.py index 7140faa0..c14a36d3 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -88,7 +88,6 @@ def test_datafog_init(): datafog = DataFog() assert isinstance(datafog.image_service, ImageService) assert isinstance(datafog.text_service, TextService) - assert datafog.spark_service is None assert datafog.operations == [OperationType.SCAN] custom_image_service = ImageService() diff --git a/tests/test_spacy_nlp.py b/tests/test_spacy_nlp.py new file mode 100644 index 00000000..312e65f8 --- /dev/null +++ b/tests/test_spacy_nlp.py @@ -0,0 +1,292 @@ +# tests/test_spacy_nlp.py +from unittest.mock import MagicMock, patch +from uuid import UUID + +import pytest + +# Import the function and cache we want to test directly +from datafog.models.spacy_nlp import ( + _SPACY_MODEL_CACHE, + AnnotationResult, + SpacyAnnotator, + _get_spacy_model, +) + + +@pytest.fixture(autouse=True) +def clear_spacy_cache_before_test(): + """Fixture to clear the spaCy model cache before each test.""" + _SPACY_MODEL_CACHE.clear() + yield # Test runs here + _SPACY_MODEL_CACHE.clear() # Clean up after test too, just in case + + +@patch("datafog.models.spacy_nlp.spacy.load") +def test_annotate_text_basic(mock_spacy_load): + """ + Test that annotate_text correctly processes a batch of texts using nlp.pipe + and returns a list of lists of AnnotationResult objects. + """ + # Arrange: Mock the spaCy NLP object and its pipe method + mock_nlp = MagicMock() + mock_doc1 = MagicMock() + mock_doc2 = MagicMock() + + # Simulate entities for the first text + mock_ent1_doc1 = MagicMock(start_char=0, end_char=4, label_="PERSON") + mock_ent2_doc1 = MagicMock(start_char=11, end_char=17, label_="LOCATION") + mock_doc1.ents = [mock_ent1_doc1, mock_ent2_doc1] + + # Simulate entities for the second text + mock_ent1_doc2 = MagicMock(start_char=0, end_char=5, label_="PERSON") + mock_ent2_doc2 = MagicMock(start_char=16, end_char=22, label_="ORG") + mock_doc2.ents = [mock_ent1_doc2, mock_ent2_doc2] + + # Mock the return value of nlp.pipe to be an iterator of our mock docs + mock_nlp.pipe.return_value = iter([mock_doc1, mock_doc2]) + mock_spacy_load.return_value = mock_nlp # spacy.load() returns the mock_nlp + + # Instantiate the annotator + annotator = SpacyAnnotator() + + # Act: Call the method under test with a list of texts + test_texts = ["John lives in London.", "Alice works at Google."] + results = annotator.annotate_text(test_texts) + + # Assert: + # Check that spacy.load was called (implicitly tests load_model) + mock_spacy_load.assert_called_once_with(annotator.model_name) + # Check that nlp.pipe was called with the texts and default args + mock_nlp.pipe.assert_called_once_with(test_texts, batch_size=50, n_process=-1) + + # Check the structure of the results (list of lists) + assert isinstance(results, list) + assert len(results) == 2 # One list of results for each input text + assert isinstance(results[0], list) + assert isinstance(results[1], list) + + # --- Check results for the first text --- # + assert len(results[0]) == 2 + # First entity + assert isinstance(results[0][0], AnnotationResult) + assert results[0][0].start == 0 + assert results[0][0].end == 4 + assert results[0][0].entity_type == "PERSON" + assert isinstance(results[0][0].score, float) + # Second entity + assert isinstance(results[0][1], AnnotationResult) + assert results[0][1].start == 11 + assert results[0][1].end == 17 + assert results[0][1].entity_type == "LOCATION" + assert isinstance(results[0][1].score, float) + + # --- Check results for the second text --- # + assert len(results[1]) == 2 + # First entity + assert isinstance(results[1][0], AnnotationResult) + assert results[1][0].start == 0 + assert results[1][0].end == 5 + assert results[1][0].entity_type == "PERSON" + assert isinstance(results[1][0].score, float) + # Second entity + assert isinstance(results[1][1], AnnotationResult) + assert results[1][1].start == 16 + assert results[1][1].end == 22 + # Expect UNKNOWN because 'ORG' is not in EntityTypes enum (validator replaces it) + assert results[1][1].entity_type == "UNKNOWN" + assert isinstance(results[1][1].score, float) + + +# Example of testing other branches (e.g., model already loaded) +@patch("datafog.models.spacy_nlp.spacy.load") +def test_annotate_text_model_already_loaded(mock_spacy_load): + """ + Test that annotate_text doesn't reload the model if already loaded + and still calls nlp.pipe correctly. + """ + # Arrange + mock_nlp = MagicMock() + mock_doc = MagicMock() + mock_doc.ents = [] # No entities for simplicity + mock_nlp.pipe.return_value = iter([mock_doc]) # nlp.pipe returns iterator + mock_spacy_load.return_value = mock_nlp # This shouldn't be called if pre-set + + annotator = SpacyAnnotator() + annotator.nlp = mock_nlp # Pre-set the nlp attribute + + # Act + test_texts = ["Some text."] + results = annotator.annotate_text(test_texts) + + # Assert + mock_spacy_load.assert_not_called() # Should not be called again + mock_nlp.pipe.assert_called_once_with(test_texts, batch_size=50, n_process=-1) + assert isinstance(results, list) + assert len(results) == 1 + assert isinstance(results[0], list) + assert len(results[0]) == 0 # No entities expected + + +# --- Tests for _get_spacy_model Caching --- # + + +@patch("datafog.models.spacy_nlp.spacy.util.is_package", return_value=True) +@patch("datafog.models.spacy_nlp.spacy.load") +def test_get_spacy_model_cache_hit(mock_spacy_load, mock_is_package): + """Verify that the model is loaded only once for the same name.""" + # Arrange: Create mock models + mock_nlp_1 = MagicMock(name="model_lg") + mock_spacy_load.return_value = mock_nlp_1 + model_name = "en_core_web_lg" + + # Act: Call twice with the same model name + nlp_obj1 = _get_spacy_model(model_name) + nlp_obj2 = _get_spacy_model(model_name) + + # Assert: spacy.load called only once, returned objects are the same instance + mock_is_package.assert_called_once_with( + model_name + ) # is_package check happens first + mock_spacy_load.assert_called_once_with(model_name) + assert nlp_obj1 is nlp_obj2 + assert model_name in _SPACY_MODEL_CACHE + assert _SPACY_MODEL_CACHE[model_name] is nlp_obj1 + + +@patch("datafog.models.spacy_nlp.spacy.util.is_package", return_value=True) +@patch("datafog.models.spacy_nlp.spacy.load") +def test_get_spacy_model_cache_miss_different_models(mock_spacy_load, mock_is_package): + """Verify that different models are loaded and cached separately.""" + # Arrange: Mock different return values for spacy.load + mock_nlp_lg = MagicMock(name="model_lg") + mock_nlp_sm = MagicMock(name="model_sm") + mock_spacy_load.side_effect = [mock_nlp_lg, mock_nlp_sm] + model_name_lg = "en_core_web_lg" + model_name_sm = "en_core_web_sm" + + # Act: Call with different model names + nlp_obj_lg = _get_spacy_model(model_name_lg) + nlp_obj_sm = _get_spacy_model(model_name_sm) + + # Assert: spacy.load called for each model, objects are different + assert mock_is_package.call_count == 2 + assert mock_spacy_load.call_count == 2 + mock_spacy_load.assert_any_call(model_name_lg) + mock_spacy_load.assert_any_call(model_name_sm) + assert nlp_obj_lg is not nlp_obj_sm + assert nlp_obj_lg is mock_nlp_lg + assert nlp_obj_sm is mock_nlp_sm + assert model_name_lg in _SPACY_MODEL_CACHE + assert model_name_sm in _SPACY_MODEL_CACHE + assert _SPACY_MODEL_CACHE[model_name_lg] is nlp_obj_lg + assert _SPACY_MODEL_CACHE[model_name_sm] is nlp_obj_sm + + +@patch("datafog.models.spacy_nlp.spacy.cli.download") +@patch("datafog.models.spacy_nlp.spacy.load") +@patch("datafog.models.spacy_nlp.spacy.util.is_package", return_value=False) +def test_get_spacy_model_triggers_download( + mock_is_package, mock_spacy_load, mock_download +): + """Verify that spacy.cli.download is called if model package is not found.""" + # Arrange + mock_nlp = MagicMock(name="downloaded_model") + mock_spacy_load.return_value = mock_nlp + model_name = "en_new_model" + + # Act + nlp_obj = _get_spacy_model(model_name) + + # Assert + mock_is_package.assert_called_once_with(model_name) + mock_download.assert_called_once_with(model_name) + mock_spacy_load.assert_called_once_with(model_name) + assert nlp_obj is mock_nlp + assert model_name in _SPACY_MODEL_CACHE + + +# Test batch size configuration + + +@patch("datafog.models.spacy_nlp.os.getenv") +@patch("datafog.models.spacy_nlp._get_spacy_model") +def test_annotate_text_uses_default_batch_size(mock_get_model, mock_getenv): + """Verify nlp.pipe is called with default batch_size=50 when env var is not set.""" + # Mock os.getenv to return None (env var not set) + mock_getenv.return_value = None + + # Setup mock spaCy model and its pipe method + mock_nlp = MagicMock() + mock_nlp.pipe.return_value = [] # pipe returns an iterable + mock_get_model.return_value = mock_nlp + + # Instantiate annotator and call the method + annotator = SpacyAnnotator() + annotator.annotate_text(["text1", "text2"]) + + # Assert _get_spacy_model was called + mock_get_model.assert_called_once_with(annotator.model_name) + + # Assert nlp.pipe was called with default batch_size=50 + mock_nlp.pipe.assert_called_once() + args, kwargs = mock_nlp.pipe.call_args + assert kwargs.get("batch_size") == 50 + assert kwargs.get("n_process") == -1 + + +@patch("datafog.models.spacy_nlp.os.getenv") +@patch("datafog.models.spacy_nlp._get_spacy_model") +def test_annotate_text_uses_env_var_batch_size(mock_get_model, mock_getenv): + """Verify nlp.pipe is called with batch_size from env var.""" + # Mock os.getenv to return a specific batch size + mock_getenv.return_value = "10" + + # Setup mock spaCy model and its pipe method + mock_nlp = MagicMock() + mock_nlp.pipe.return_value = [] + mock_get_model.return_value = mock_nlp + + # Instantiate annotator and call the method + annotator = SpacyAnnotator() + annotator.annotate_text(["text1", "text2"]) + + # Assert _get_spacy_model was called + mock_get_model.assert_called_once_with(annotator.model_name) + + # Assert nlp.pipe was called with batch_size=10 + mock_nlp.pipe.assert_called_once() + args, kwargs = mock_nlp.pipe.call_args + assert kwargs.get("batch_size") == 10 + assert kwargs.get("n_process") == -1 + + +@pytest.mark.parametrize("invalid_value", ["abc", "-5", "0", " "]) +@patch("datafog.models.spacy_nlp.os.getenv") +@patch("datafog.models.spacy_nlp._get_spacy_model") +def test_annotate_text_uses_default_on_invalid_env_var( + mock_get_model, mock_getenv, invalid_value +): + """Verify nlp.pipe uses default batch_size=50 when env var is invalid.""" + # Mock os.getenv to return an invalid value + mock_getenv.return_value = invalid_value + + # Setup mock spaCy model and its pipe method + mock_nlp = MagicMock() + mock_nlp.pipe.return_value = [] + mock_get_model.return_value = mock_nlp + + # Instantiate annotator and call the method + annotator = SpacyAnnotator() + annotator.annotate_text(["text1", "text2"]) + + # Assert _get_spacy_model was called + mock_get_model.assert_called_once_with(annotator.model_name) + + # Assert nlp.pipe was called with default batch_size=50 + mock_nlp.pipe.assert_called_once() + args, kwargs = mock_nlp.pipe.call_args + assert kwargs.get("batch_size") == 50 + assert kwargs.get("n_process") == -1 + + +# Existing download/list tests remain unchanged...