Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,18 +199,20 @@ datafog list-entities

## Getting Started

To use DataFog, you'll need to create a DataFog client with the desired operations. Here's a basic setup:
Initialize the DataFog client. You can specify operations like `SCAN`, `REDACT`, `REPLACE`, `HASH`.

```python
from datafog import DataFog

# For text annotation
client = DataFog(operations="scan")

# For OCR (Optical Character Recognition)
ocr_client = DataFog(operations="extract")
# Initialize with default operations (SCAN)
client = DataFog()
```

DataFog now includes several performance enhancements:
- **SpaCy Model Caching:** Loaded spaCy models are cached in memory to speed up subsequent processing requests.
- **Efficient Batch Processing:** The text processing pipeline utilizes spaCy's `nlp.pipe()` internally, processing documents in batches (default batch size is 50, configurable via `DATAFOG_SPACY_BATCH_SIZE` environment variable) and leveraging multiple processes (`n_process=-1`) for improved throughput.
- **Asynchronous Execution:** When using asynchronous methods (`run_text_pipeline`, `run_ocr_pipeline`), potentially blocking operations like spaCy processing and Tesseract OCR are executed in separate threads to avoid blocking the main event loop.

## Text PII Annotation

Here's an example of how to annotate PII in a text document:
Expand Down Expand Up @@ -321,6 +323,14 @@ Output:

You can choose from SHA256 (default), SHA3-256, and MD5 hashing algorithms by specifying the `hash_type` parameter

## Configuration

### Environment Variables

- **`DATAFOG_SPACY_BATCH_SIZE`**: Controls the batch size used internally by spaCy's `nlp.pipe()` when processing text documents. Larger batch sizes can improve throughput on systems with sufficient memory but may increase memory usage.
- **Default:** `50`
- **Usage:** Set this environment variable to a positive integer (e.g., `export DATAFOG_SPACY_BATCH_SIZE=100`) to override the default.

## Examples

For more detailed examples, check out our Jupyter notebooks in the `examples/` directory:
Expand Down
136 changes: 109 additions & 27 deletions datafog/models/spacy_nlp.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,53 @@
text annotation, entity recognition, and related NLP tasks.
"""

import logging
import os
import threading
from typing import List, Optional
from uuid import uuid4

import spacy
from rich.progress import track
from spacy.language import Language # For type hinting

from .annotator import AnnotationResult, AnnotatorRequest

# Set up logging
logger = logging.getLogger(__name__)

_SPACY_MODEL_CACHE = {}
_CACHE_LOCK = threading.Lock()


def _get_spacy_model(model_name: str) -> Language:
"""Loads a spaCy model, utilizing a cache to avoid redundant loads."""
with _CACHE_LOCK:
if model_name in _SPACY_MODEL_CACHE:
logger.debug(f"Cache hit for model: {model_name!r}")
return _SPACY_MODEL_CACHE[model_name]

# Model not in cache, needs loading (lock ensures only one thread loads)
logger.debug(f"Cache miss for model: {model_name!r}. Loading...")
try:
# Show progress for download
if not spacy.util.is_package(model_name):
logger.info(f"Model {model_name!r} not found. Downloading...")
# Use rich.progress.track for visual feedback
# This is a simplified representation; actual progress tracking might need
# library integration or subprocess monitoring.
# For now, just log the download attempt.
spacy.cli.download(model_name)

# Load the spaCy model after ensuring it's downloaded
nlp = spacy.load(model_name)
_SPACY_MODEL_CACHE[model_name] = nlp
logger.debug(f"Model {model_name!r} loaded and cached.")
return nlp
except Exception as e:
logger.error(f"Failed to load or download spaCy model {model_name}: {e}")
raise

Check warning on line 53 in datafog/models/spacy_nlp.py

View check run for this annotation

Codecov / codecov/patch

datafog/models/spacy_nlp.py#L51-L53

Added lines #L51 - L53 were not covered by tests


class SpacyAnnotator:
"""
Expand All @@ -24,41 +63,83 @@

def __init__(self, model_name: str = "en_core_web_lg"):
self.model_name = model_name
self.nlp = None
self.nlp = None # Keep lazy loading

def load_model(self):
if not spacy.util.is_package(self.model_name):
spacy.cli.download(self.model_name)
self.nlp = spacy.load(self.model_name)

def annotate_text(self, text: str, language: str = "en") -> List[AnnotationResult]:
"""Ensures the spaCy model is loaded, utilizing the cache."""
if not self.nlp:
self.load_model()
# Use the cached loader function
self.nlp = _get_spacy_model(self.model_name)

def annotate_text(
self, texts: List[str], language: str = "en"
) -> List[List[AnnotationResult]]:
"""Annotates a batch of texts using spaCy NLP pipeline.

Args:
texts (List[str]): A list of texts to annotate.
language (str): The language of the text (currently unused).

Returns:
List[List[AnnotationResult]]: A list where each inner list contains
AnnotationResult objects for the corresponding
input text.
"""
# Ensure the model is loaded
self.load_model()

annotator_request = AnnotatorRequest(
text=text,
language=language,
correlation_id=str(uuid4()),
score_threshold=0.5,
entities=None,
return_decision_process=False,
ad_hoc_recognizers=None,
context=None,
)
doc = self.nlp(annotator_request.text)
results = []
for ent in track(doc.ents, description="Processing entities"):
result = AnnotationResult(
start=ent.start_char,
end=ent.end_char,
score=0.8, # Placeholder score
entity_type=ent.label_,
recognition_metadata=None,

# Get batch size from environment variable or use default
default_batch_size = 50
try:
batch_size_str = os.getenv("DATAFOG_SPACY_BATCH_SIZE")
batch_size = int(batch_size_str) if batch_size_str else default_batch_size
if batch_size <= 0:
logger.warning(
f"Invalid DATAFOG_SPACY_BATCH_SIZE {batch_size_str!r}. "
f"Must be a positive integer. Using default: {default_batch_size}"
)
batch_size = default_batch_size
except (ValueError, TypeError):
# Handle cases where env var is set but not a valid integer
batch_size_str = os.getenv(
"DATAFOG_SPACY_BATCH_SIZE"
) # Get it again for logging
logger.warning(
f"Invalid DATAFOG_SPACY_BATCH_SIZE {batch_size_str!r}. "
f"Must be an integer. Using default: {default_batch_size}"
)
results.append(result)
batch_size = default_batch_size

logger.info(f"Using spaCy batch size: {batch_size}")

# Process texts in batches using nlp.pipe
docs = self.nlp.pipe(texts, batch_size=batch_size, n_process=-1)

# Wrap with track for progress bar
processed_docs = track(docs, description="Annotating text...", total=len(texts))

# Process each doc
for doc in processed_docs:
doc_results: List[AnnotationResult] = (
[]
) # Initialize list for current doc's results
for ent in doc.ents:
result = AnnotationResult(
start=ent.start_char,
end=ent.end_char,
score=0.8, # Placeholder score - Consider using actual scores if available
entity_type=ent.label_,
recognition_metadata=None, # Consider adding metadata if needed
)
doc_results.append(result)
results.append(doc_results) # Add results for this doc to the main list

return results

def show_model_path(self) -> str:
# This check now correctly uses the updated load_model -> _get_spacy_model
if not self.nlp:
self.load_model()
return str(self.nlp.path)
Expand All @@ -73,5 +154,6 @@

@staticmethod
def list_entities() -> List[str]:
nlp = spacy.load("en_core_web_lg")
# Use the cached loader function for the default model
nlp = _get_spacy_model("en_core_web_lg")

Check warning on line 158 in datafog/models/spacy_nlp.py

View check run for this annotation

Codecov / codecov/patch

datafog/models/spacy_nlp.py#L158

Added line #L158 was not covered by tests
return [ent for ent in nlp.pipe_labels["ner"]]
4 changes: 3 additions & 1 deletion datafog/processing/image_processing/pytesseract_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using the Pytesseract OCR engine.
"""

import asyncio
import logging

import pytesseract
Expand All @@ -21,7 +22,8 @@ class PytesseractProcessor:

async def extract_text_from_image(self, image: Image.Image) -> str:
try:
return pytesseract.image_to_string(image)
# Run the blocking function in a separate thread
return await asyncio.to_thread(pytesseract.image_to_string, image)
except Exception as e:
logging.error(f"Pytesseract error: {str(e)}")
raise
8 changes: 3 additions & 5 deletions notes/ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

### **v4.1.0 — Baseline stability**

* **MUST** read `__version__` from `datafog/__about__.py` and import it in `setup.py`; delete the duplicate there.
* **MUST** remove every `ensure_installed()` runtime `pip install`; fail fast instead.
* **MUST** document OCR/Donut extras in `setup.py[extras]`.
- **MUST** read `__version__` from `datafog/__about__.py` and import it in `setup.py`; delete the duplicate there.
- **MUST** remove every `ensure_installed()` runtime `pip install`; fail fast instead.
- **MUST** document OCR/Donut extras in `setup.py[extras]`.

---

Expand Down Expand Up @@ -72,6 +72,4 @@
- **MUST** cache spaCy model artefacts in GitHub Actions with `actions/cache`, keyed by `model-hash`.
- **SHOULD** update docs and `README.md` badges for new extras and WASM support.

---

Use this ladder as-is, bumping **only the minor version** each time, so v4.0.x callers never break.
41 changes: 41 additions & 0 deletions notes/v4.2.0-tickets.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# v4.2.0 — Faster spaCy Path: Work Breakdown

This outlines the specific tasks required to implement the performance improvements for the spaCy processing pipeline planned for v4.2.0.

## Task 1: Implement Module-Level Cache for spaCy `nlp` Object

* **Goal:** Avoid reloading spaCy models repeatedly by caching the loaded `nlp` object.
* **Tickets:**
* [x] **TICKET-4.2.1:** Design and implement a singleton or module-level cache mechanism in `datafog.models.spacy_nlp` to store loaded `spacy.Language` objects, keyed by model name/path.
* [x] **TICKET-4.2.2:** Refactor `SpacyAnnotator` (and potentially `SpacyNLP`) to request the `nlp` object from the cache instead of loading it directly.
* [x] **TICKET-4.2.3:** Add/update unit tests to verify that the same `nlp` object is returned for subsequent requests for the same model and that different models are cached separately.

## Task 2: Batch Processing with `nlp.pipe()`

* **Goal:** Process multiple documents efficiently using spaCy's `nlp.pipe` method.
* **Tickets:**
* [x] **TICKET-4.2.4:** Modify the core spaCy processing logic (likely in `SpacyAnnotator`) to accept a list of texts and use `nlp.pipe()` for batch annotation.
* [x] **TICKET-4.2.5:** Determine and set a sensible default `batch_size` for `nlp.pipe`.
* [x] **TICKET-4.2.6:** Ensure `n_process=-1` is used within `nlp.pipe` calls to leverage multi-processing.
* [x] **TICKET-4.2.7:** Update relevant unit and integration tests to work with batch inputs and outputs.

## Task 3: Asynchronous Execution of Blocking Calls

* **Goal:** Ensure blocking synchronous calls (like spaCy processing or Tesseract OCR) don't halt the asyncio event loop when called from async functions.
* **Method:** Use `asyncio.to_thread()`.
* **Tickets:**
* [x] **TICKET-4.2.8:** Identify synchronous spaCy calls within async functions (e.g., potentially within `run_text_pipeline_async` or similar).
* [x] **TICKET-4.2.9:** Wrap the identified spaCy calls with `await asyncio.to_thread()`. (Note: This was already correctly implemented in `TextService.annotate_text_async`).
* [x] **TICKET-4.2.10:** Identify synchronous Tesseract OCR calls within async functions (e.g., potentially within `run_ocr_pipeline_async` or similar).
* [x] **TICKET-4.2.11:** Wrap the identified Tesseract calls with `await asyncio.to_thread()`.
* [x] **TICKET-4.2.12:** Add/update tests to verify asynchronous execution doesn't block and yields correct results. (Note: Existing async tests in `test_image_service.py` and `test_text_service.py` verify the successful execution of functions using `asyncio.to_thread`).

## Task 4: Configurable Batch Size via Environment Variable

* **Goal:** Allow users to tune the `nlp.pipe` batch size for their specific hardware and workload.
* **Tickets:**
* [x] **TICKET-4.2.13:** Implement logic to read an environment variable (e.g., `DATAFOG_SPACY_PIPE_BATCH_SIZE`) when setting the `batch_size` for `nlp.pipe`.
* [x] **TICKET-4.2.14:** Ensure the code falls back gracefully to the default `batch_size` if the environment variable is not set or invalid.
* [x] **TICKET-4.2.15:** Document the `DATAFOG_SPACY_PIPE_BATCH_SIZE` environment variable in the project's README or configuration documentation.

## Task 5: Documentation Update
Loading