Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
@@ -1 +1,15 @@
comment: no

coverage:
status:
project:
default:
# Target overall coverage percentage
target: 74%
# Allow coverage to drop by this amount without failing
# threshold: 0.5% # Optional: uncomment to allow small drops
patch:
default:
# Target coverage percentage for the changes in the PR/commit
target: 20% # Lower target for patch coverage
# threshold: 1% # Optional: Allow patch coverage to drop
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ error_log.txt
docs/*
!docs/*.rst
!docs/conf.py
scratch.py
venv*
.coverage*
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ repos:
rev: v4.0.0-alpha.8
hooks:
- id: prettier
types: [markdown] # Specify file types for prettier
exclude: .venv
15 changes: 15 additions & 0 deletions CHANGELOG.MD
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
# ChangeLog

## [2025-04-27]

### Benchmarking

- Added benchmarking scripts (`scripts/bench_sdk.py`, `scripts/bench_cli.sh`) to compare the performance of PII redaction using the Python SDK versus the CLI.
- Created a sample OpenTelemetry log file (`scripts/sample_otel_log.json`) containing 51 string fields for testing.
- **Methodology:**
- **SDK Benchmark (`bench_sdk.py`):** Reads the entire JSON file, extracts all string values, and times a single call to the SDK's `annotate_text` and `anonymize` functions to process all extracted strings together. The measurement was repeated 10 times, and the fastest run was reported.
- **CLI Benchmark (`bench_cli.sh`):** Uses `jq` to extract all string values from the JSON file and pipes them via `xargs` to the `datafog redact-text` command. This results in a separate invocation of the `datafog` CLI process for _each_ extracted string. The total time for processing all strings in this manner was measured over 5 loops.
- **Results:**
- **SDK:** Demonstrated significantly faster performance due to single process initialization and batch processing.
- Fastest time to process all 51 strings from `sample_otel_log.json`: **~1.15 seconds**.
- **CLI:** Showed much slower performance, primarily attributed to the overhead of starting a new `datafog` process and loading models for each of the 51 strings individually.
- Time per loop (processing all 51 strings via individual CLI calls): **~41-42 seconds** (based on initial runs).

## [2024-03-25]

### `datafog-python` [2.3.2]
Expand Down
40 changes: 33 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,25 @@

DataFog can be installed via pip:

```
```bash
pip install datafog
```

### Optional Features (Extras)

DataFog uses `extras` to manage dependencies for optional features like specific OCR engines. You can install these as needed:

```bash
# Install with OCR
pip install "datafog[ocr]"

# Install with Donut for structured document extraction
pip install "datafog[donut]"

# Install with ALL optional dependencies
pip install "datafog[all]"
```

# CLI

## 📚 Quick Reference
Expand Down Expand Up @@ -178,18 +193,21 @@ datafog list-entities

## Getting Started

To use DataFog, you'll need to create a DataFog client with the desired operations. Here's a basic setup:
Initialize the DataFog client. You can specify operations like `SCAN`, `REDACT`, `REPLACE`, `HASH`.

```python
from datafog import DataFog

# For text annotation
client = DataFog(operations="scan")

# For OCR (Optical Character Recognition)
ocr_client = DataFog(operations="extract")
# Initialize with default operations (SCAN)
client = DataFog()
```

DataFog now includes several performance enhancements:

- **SpaCy Model Caching:** Loaded spaCy models are cached in memory to speed up subsequent processing requests.
- **Efficient Batch Processing:** The text processing pipeline utilizes spaCy's `nlp.pipe()` internally, processing documents in batches (default batch size is 50, configurable via `DATAFOG_SPACY_BATCH_SIZE` environment variable) and leveraging multiple processes (`n_process=-1`) for improved throughput.
- **Asynchronous Execution:** When using asynchronous methods (`run_text_pipeline`, `run_ocr_pipeline`), potentially blocking operations like spaCy processing and Tesseract OCR are executed in separate threads to avoid blocking the main event loop.

## Text PII Annotation

Here's an example of how to annotate PII in a text document:
Expand Down Expand Up @@ -300,6 +318,14 @@ Output:

You can choose from SHA256 (default), SHA3-256, and MD5 hashing algorithms by specifying the `hash_type` parameter

## Configuration

### Environment Variables

- **`DATAFOG_SPACY_BATCH_SIZE`**: Controls the batch size used internally by spaCy's `nlp.pipe()` when processing text documents. Larger batch sizes can improve throughput on systems with sufficient memory but may increase memory usage.
- **Default:** `50`
- **Usage:** Set this environment variable to a positive integer (e.g., `export DATAFOG_SPACY_BATCH_SIZE=100`) to override the default.

## Examples

For more detailed examples, check out our Jupyter notebooks in the `examples/` directory:
Expand Down
2 changes: 1 addition & 1 deletion datafog/__about__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "3.3.0"
__version__ = "4.0.1"
2 changes: 0 additions & 2 deletions datafog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@
from .processing.image_processing.pytesseract_processor import PytesseractProcessor
from .processing.text_processing.spacy_pii_annotator import SpacyPIIAnnotator
from .services.image_service import ImageService
from .services.spark_service import SparkService
from .services.text_service import TextService

__all__ = [
"DonutProcessor",
"DataFog",
"ImageService",
"OperationType",
"SparkService",
"TextPIIAnnotator",
"TextService",
"SpacyPIIAnnotator",
Expand Down
17 changes: 3 additions & 14 deletions datafog/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from .models.anonymizer import Anonymizer, AnonymizerType, HashType
from .processing.text_processing.spacy_pii_annotator import SpacyPIIAnnotator
from .services.image_service import ImageService
from .services.spark_service import SparkService
from .services.text_service import TextService

logger = logging.getLogger("datafog_logger")
Expand All @@ -33,7 +32,6 @@
Attributes:
image_service: Service for image processing and OCR.
text_service: Service for text processing and annotation.
spark_service: Optional Spark service for distributed processing.
operations: List of operations to perform.
anonymizer: Anonymizer for PII redaction, replacement, or hashing.
"""
Expand All @@ -42,14 +40,12 @@
self,
image_service=None,
text_service=None,
spark_service=None,
operations: List[OperationType] = [OperationType.SCAN],
hash_type: HashType = HashType.SHA256,
anonymizer_type: AnonymizerType = AnonymizerType.REPLACE,
):
self.image_service = image_service or ImageService()
self.text_service = text_service or TextService()
self.spark_service: SparkService = spark_service
self.operations: List[OperationType] = operations
self.anonymizer = Anonymizer(
hash_type=hash_type, anonymizer_type=anonymizer_type
Expand All @@ -60,9 +56,6 @@
)
self.logger.info(f"Image Service: {type(self.image_service)}")
self.logger.info(f"Text Service: {type(self.text_service)}")
self.logger.info(
f"Spark Service: {type(self.spark_service) if self.spark_service else 'None'}"
)
self.logger.info(f"Operations: {operations}")
self.logger.info(f"Hash Type: {hash_type}")
self.logger.info(f"Anonymizer Type: {anonymizer_type}")
Expand Down Expand Up @@ -232,12 +225,10 @@

Attributes:
text_annotator: SpacyPIIAnnotator instance for text annotation.
spark_processor: Optional SparkService for distributed processing.
"""

def __init__(self):
self.text_annotator = SpacyPIIAnnotator.create()
self.spark_processor: SparkService = None

def run(self, text, output_path=None):
try:
Expand All @@ -249,8 +240,6 @@
json.dump(annotated_text, f)

return annotated_text

finally:
# Ensure Spark resources are released
if self.spark_processor:
self.spark_processor.stop()
except Exception as e:
logging.error(f"Error in run: {str(e)}")
raise

Check warning on line 245 in datafog/main.py

View check run for this annotation

Codecov / codecov/patch

datafog/main.py#L243-L245

Added lines #L243 - L245 were not covered by tests
136 changes: 109 additions & 27 deletions datafog/models/spacy_nlp.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,53 @@
text annotation, entity recognition, and related NLP tasks.
"""

import logging
import os
import threading
from typing import List, Optional
from uuid import uuid4

import spacy
from rich.progress import track
from spacy.language import Language # For type hinting

from .annotator import AnnotationResult, AnnotatorRequest

# Set up logging
logger = logging.getLogger(__name__)

_SPACY_MODEL_CACHE = {}
_CACHE_LOCK = threading.Lock()


def _get_spacy_model(model_name: str) -> Language:
"""Loads a spaCy model, utilizing a cache to avoid redundant loads."""
with _CACHE_LOCK:
if model_name in _SPACY_MODEL_CACHE:
logger.debug(f"Cache hit for model: {model_name!r}")
return _SPACY_MODEL_CACHE[model_name]

# Model not in cache, needs loading (lock ensures only one thread loads)
logger.debug(f"Cache miss for model: {model_name!r}. Loading...")
try:
# Show progress for download
if not spacy.util.is_package(model_name):
logger.info(f"Model {model_name!r} not found. Downloading...")
# Use rich.progress.track for visual feedback
# This is a simplified representation; actual progress tracking might need
# library integration or subprocess monitoring.
# For now, just log the download attempt.
spacy.cli.download(model_name)

# Load the spaCy model after ensuring it's downloaded
nlp = spacy.load(model_name)
_SPACY_MODEL_CACHE[model_name] = nlp
logger.debug(f"Model {model_name!r} loaded and cached.")
return nlp
except Exception as e:
logger.error(f"Failed to load or download spaCy model {model_name}: {e}")
raise


class SpacyAnnotator:
"""
Expand All @@ -24,41 +63,83 @@ class SpacyAnnotator:

def __init__(self, model_name: str = "en_core_web_lg"):
self.model_name = model_name
self.nlp = None
self.nlp = None # Keep lazy loading

def load_model(self):
if not spacy.util.is_package(self.model_name):
spacy.cli.download(self.model_name)
self.nlp = spacy.load(self.model_name)

def annotate_text(self, text: str, language: str = "en") -> List[AnnotationResult]:
"""Ensures the spaCy model is loaded, utilizing the cache."""
if not self.nlp:
self.load_model()
# Use the cached loader function
self.nlp = _get_spacy_model(self.model_name)

def annotate_text(
self, texts: List[str], language: str = "en"
) -> List[List[AnnotationResult]]:
"""Annotates a batch of texts using spaCy NLP pipeline.

Args:
texts (List[str]): A list of texts to annotate.
language (str): The language of the text (currently unused).

Returns:
List[List[AnnotationResult]]: A list where each inner list contains
AnnotationResult objects for the corresponding
input text.
"""
# Ensure the model is loaded
self.load_model()

annotator_request = AnnotatorRequest(
text=text,
language=language,
correlation_id=str(uuid4()),
score_threshold=0.5,
entities=None,
return_decision_process=False,
ad_hoc_recognizers=None,
context=None,
)
doc = self.nlp(annotator_request.text)
results = []
for ent in track(doc.ents, description="Processing entities"):
result = AnnotationResult(
start=ent.start_char,
end=ent.end_char,
score=0.8, # Placeholder score
entity_type=ent.label_,
recognition_metadata=None,

# Get batch size from environment variable or use default
default_batch_size = 50
try:
batch_size_str = os.getenv("DATAFOG_SPACY_BATCH_SIZE")
batch_size = int(batch_size_str) if batch_size_str else default_batch_size
if batch_size <= 0:
logger.warning(
f"Invalid DATAFOG_SPACY_BATCH_SIZE {batch_size_str!r}. "
f"Must be a positive integer. Using default: {default_batch_size}"
)
batch_size = default_batch_size
except (ValueError, TypeError):
# Handle cases where env var is set but not a valid integer
batch_size_str = os.getenv(
"DATAFOG_SPACY_BATCH_SIZE"
) # Get it again for logging
logger.warning(
f"Invalid DATAFOG_SPACY_BATCH_SIZE {batch_size_str!r}. "
f"Must be an integer. Using default: {default_batch_size}"
)
results.append(result)
batch_size = default_batch_size

logger.info(f"Using spaCy batch size: {batch_size}")

# Process texts in batches using nlp.pipe
docs = self.nlp.pipe(texts, batch_size=batch_size, n_process=-1)

# Wrap with track for progress bar
processed_docs = track(docs, description="Annotating text...", total=len(texts))

# Process each doc
for doc in processed_docs:
doc_results: List[AnnotationResult] = (
[]
) # Initialize list for current doc's results
for ent in doc.ents:
result = AnnotationResult(
start=ent.start_char,
end=ent.end_char,
score=0.8, # Placeholder score - Consider using actual scores if available
entity_type=ent.label_,
recognition_metadata=None, # Consider adding metadata if needed
)
doc_results.append(result)
results.append(doc_results) # Add results for this doc to the main list

return results

def show_model_path(self) -> str:
# This check now correctly uses the updated load_model -> _get_spacy_model
if not self.nlp:
self.load_model()
return str(self.nlp.path)
Expand All @@ -73,5 +154,6 @@ def list_models() -> List[str]:

@staticmethod
def list_entities() -> List[str]:
nlp = spacy.load("en_core_web_lg")
# Use the cached loader function for the default model
nlp = _get_spacy_model("en_core_web_lg")
return [ent for ent in nlp.pipe_labels["ner"]]
Loading