1- """
2- Text processing service for PII annotation.
1+ """Text processing service for PII annotation.
32
4- Provides synchronous and asynchronous methods for annotating text with personally identifiable information (PII) using SpaCy. Supports chunking long texts and batch processing.
3+ Provides synchronous and asynchronous methods for annotating text with personally identifiable information (PII) using SpaCy or regex patterns . Supports chunking long texts and batch processing.
54"""
65
76import asyncio
8- from typing import Dict , List
7+ from typing import Dict , List , Optional , Union
98
9+ from datafog .processing .text_processing .regex_annotator .regex_annotator import (
10+ AnnotationResult ,
11+ RegexAnnotator ,
12+ Span ,
13+ )
1014from datafog .processing .text_processing .spacy_pii_annotator import SpacyPIIAnnotator
1115
1216
1317class TextService :
1418 """
15- Manages text annotation operations .
19+ Service for annotating text with PII entities .
1620
17- Handles text chunking, PII annotation, and result combination for both single texts and batches. Offers both synchronous and asynchronous interfaces.
21+ This service provides methods to detect and annotate personally identifiable information (PII)
22+ in text using different annotation engines. It supports chunking long texts for efficient processing
23+ and combining annotations from multiple chunks.
1824 """
1925
20- def __init__ (self , text_chunk_length : int = 1000 ):
21- self .annotator = SpacyPIIAnnotator .create ()
26+ def __init__ (self , text_chunk_length : int = 1000 , engine : str = "auto" ):
27+ """
28+ Initialize the TextService with specified chunk length and annotation engine.
29+
30+ Args:
31+ text_chunk_length: Maximum length of text chunks for processing. Default is 1000 characters.
32+ engine: The annotation engine to use. Options are:
33+ - "regex": Use only the RegexAnnotator for pattern-based entity detection
34+ - "spacy": Use only the SpacyPIIAnnotator for NLP-based entity detection
35+ - "auto": (Default) Try RegexAnnotator first and fall back to SpacyPIIAnnotator if no entities are found
36+
37+ Raises:
38+ AssertionError: If an invalid engine type is provided
39+ """
40+ assert engine in {"regex" , "spacy" , "auto" }, "Invalid engine"
41+ self .engine = engine
42+ self .spacy_annotator = SpacyPIIAnnotator .create ()
43+ self .regex_annotator = RegexAnnotator ()
2244 self .text_chunk_length = text_chunk_length
2345
2446 def _chunk_text (self , text : str ) -> List [str ]:
@@ -38,36 +60,232 @@ def _combine_annotations(self, annotations: List[Dict]) -> Dict:
3860 combined [key ].extend (value )
3961 return combined
4062
41- def annotate_text_sync (self , text : str ) -> Dict :
42- """Synchronously annotate a text string."""
63+ def _annotate_with_engine (
64+ self , text : str , structured : bool = False
65+ ) -> Union [Dict [str , List [str ]], List [Span ]]:
66+ """
67+ Annotate text using the selected engine based on the engine parameter.
68+
69+ This method implements the engine selection logic:
70+ - For "regex" mode: Uses only the RegexAnnotator
71+ - For "spacy" mode: Uses only the SpacyPIIAnnotator
72+ - For "auto" mode: Tries RegexAnnotator first and falls back to SpacyPIIAnnotator if no entities are found
73+
74+ Args:
75+ text: The text to annotate
76+ structured: If True, return structured output (list of Span objects)
77+
78+ Returns:
79+ If structured=False: Dictionary of annotations by entity type where keys are entity types (e.g., "EMAIL", "PERSON", "ORG")
80+ and values are lists of detected entities of that type
81+ If structured=True: List of Span objects with entity information
82+ """
83+ if structured :
84+ # Handle structured output mode
85+ if self .engine == "regex" :
86+ _ , annotation_result = self .regex_annotator .annotate_with_spans (text )
87+ return annotation_result .spans
88+ elif self .engine == "spacy" :
89+ # For spaCy, we need to convert the dictionary format to spans
90+ spacy_dict = self .spacy_annotator .annotate (text )
91+ spans = []
92+ for label , entities in spacy_dict .items ():
93+ for entity in entities :
94+ # Find the entity in the text to get its position
95+ start = text .find (entity )
96+ if start >= 0 :
97+ end = start + len (entity )
98+ spans .append (
99+ Span (label = label , start = start , end = end , text = entity )
100+ )
101+ return spans
102+ else : # auto mode
103+ # Try regex first
104+ regex_dict , annotation_result = (
105+ self .regex_annotator .annotate_with_spans (text )
106+ )
107+
108+ # Check if any entities were found
109+ has_entities = any (
110+ len (entities ) > 0 for entities in regex_dict .values ()
111+ )
112+
113+ # If regex found entities, return those results
114+ if has_entities :
115+ return annotation_result .spans
116+
117+ # Otherwise, fall back to spaCy and convert to spans
118+ spacy_dict = self .spacy_annotator .annotate (text )
119+ spans = []
120+ for label , entities in spacy_dict .items ():
121+ for entity in entities :
122+ # Find the entity in the text to get its position
123+ start = text .find (entity )
124+ if start >= 0 :
125+ end = start + len (entity )
126+ spans .append (
127+ Span (label = label , start = start , end = end , text = entity )
128+ )
129+ return spans
130+ else :
131+ # Handle legacy dictionary output mode
132+ if self .engine == "regex" :
133+ return self .regex_annotator .annotate (text )
134+ elif self .engine == "spacy" :
135+ return self .spacy_annotator .annotate (text )
136+ else : # auto mode
137+ # Try regex first
138+ regex_results = self .regex_annotator .annotate (text )
139+
140+ # Check if any entities were found
141+ has_entities = any (
142+ len (entities ) > 0 for entities in regex_results .values ()
143+ )
144+
145+ # If regex found entities, return those results
146+ if has_entities :
147+ return regex_results
148+
149+ # Otherwise, fall back to spaCy
150+ return self .spacy_annotator .annotate (text )
151+
152+ def annotate_text_sync (
153+ self , text : str , structured : bool = False
154+ ) -> Union [Dict [str , List [str ]], List [Span ]]:
155+ """
156+ Synchronously annotate a text string.
157+
158+ Args:
159+ text: The text to annotate
160+ structured: If True, return structured output (list of Span objects)
161+
162+ Returns:
163+ If structured=False: Dictionary mapping entity types to lists of strings
164+ If structured=True: List of Span objects with entity information
165+ """
43166 if not text :
44- return {}
167+ return [] if structured else {}
168+
45169 print (f"Starting on { text .split ()[0 ]} " )
46170 chunks = self ._chunk_text (text )
47- annotations = []
48- for chunk in chunks :
49- res = self .annotator .annotate (chunk )
50- annotations .append (res )
51- combined = self ._combine_annotations (annotations )
52- print (f"Done processing { text .split ()[0 ]} " )
53- return combined
54171
55- def batch_annotate_text_sync (self , texts : List [str ]) -> Dict [str , Dict ]:
56- """Synchronously annotate a list of text input."""
57- results = [self .annotate_text_sync (text ) for text in texts ]
172+ if structured :
173+ # Handle structured output mode
174+ all_spans = []
175+ chunk_offset = 0 # Track the offset for each chunk in the original text
176+
177+ for chunk in chunks :
178+ # Get spans for this chunk
179+ chunk_spans = self ._annotate_with_engine (chunk , structured = True )
180+
181+ # Adjust span positions based on chunk offset in the original text
182+ for span in chunk_spans :
183+ span .start += chunk_offset
184+ span .end += chunk_offset
185+ # Verify the span text matches the text at the adjusted position
186+ # This helps catch any positioning errors
187+ if span .start < len (text ) and span .end <= len (text ):
188+ span .text = text [span .start : span .end ]
189+ all_spans .append (span )
190+
191+ # Update offset for the next chunk
192+ chunk_offset += len (chunk )
193+
194+ print (f"Done processing { text .split ()[0 ]} " )
195+ return all_spans
196+ else :
197+ # Handle legacy dictionary output mode
198+ annotations = []
199+ for chunk in chunks :
200+ res = self ._annotate_with_engine (chunk )
201+ annotations .append (res )
202+ combined = self ._combine_annotations (annotations )
203+ print (f"Done processing { text .split ()[0 ]} " )
204+ return combined
205+
206+ def batch_annotate_text_sync (
207+ self , texts : List [str ], structured : bool = False
208+ ) -> Dict [str , Union [Dict [str , List [str ]], List [Span ]]]:
209+ """
210+ Synchronously annotate a list of text input.
211+
212+ Args:
213+ texts: List of text strings to annotate
214+ structured: If True, return structured output (list of Span objects) for each text
215+
216+ Returns:
217+ Dictionary mapping each input text to its annotation result
218+ """
219+ results = [
220+ self .annotate_text_sync (text , structured = structured ) for text in texts
221+ ]
58222 return dict (zip (texts , results , strict = True ))
59223
60- async def annotate_text_async (self , text : str ) -> Dict :
61- """Asynchronously annotate a text string."""
224+ async def annotate_text_async (
225+ self , text : str , structured : bool = False
226+ ) -> Union [Dict [str , List [str ]], List [Span ]]:
227+ """
228+ Asynchronously annotate a text string.
229+
230+ Args:
231+ text: The text to annotate
232+ structured: If True, return structured output (list of Span objects)
233+
234+ Returns:
235+ If structured=False: Dictionary mapping entity types to lists of strings
236+ If structured=True: List of Span objects with entity information
237+ """
62238 if not text :
63- return {}
239+ return [] if structured else {}
240+
64241 chunks = self ._chunk_text (text )
65- tasks = [asyncio .to_thread (self .annotator .annotate , chunk ) for chunk in chunks ]
66- annotations = await asyncio .gather (* tasks )
67- return self ._combine_annotations (annotations )
68242
69- async def batch_annotate_text_async (self , texts : List [str ]) -> Dict [str , Dict ]:
70- """Asynchronously annotate a list of text input."""
71- tasks = [self .annotate_text_async (txt ) for txt in texts ]
243+ if structured :
244+ # Handle structured output mode asynchronously
245+ all_spans = []
246+ chunk_offset = 0 # Track the offset for each chunk in the original text
247+
248+ for chunk in chunks :
249+ # We can't easily parallelize this due to the need to track offsets sequentially
250+ # In a production environment, you might want a more sophisticated approach
251+ chunk_spans = self ._annotate_with_engine (chunk , structured = True )
252+
253+ # Adjust span positions based on chunk offset in the original text
254+ for span in chunk_spans :
255+ span .start += chunk_offset
256+ span .end += chunk_offset
257+ # Verify the span text matches the text at the adjusted position
258+ if span .start < len (text ) and span .end <= len (text ):
259+ span .text = text [span .start : span .end ]
260+ all_spans .append (span )
261+
262+ # Update offset for the next chunk
263+ chunk_offset += len (chunk )
264+
265+ return all_spans
266+ else :
267+ # Handle legacy dictionary output mode asynchronously
268+ tasks = [
269+ asyncio .to_thread (self ._annotate_with_engine , chunk ) for chunk in chunks
270+ ]
271+ annotations = await asyncio .gather (* tasks )
272+ return self ._combine_annotations (annotations )
273+
274+ async def batch_annotate_text_async (
275+ self , texts : List [str ], structured : bool = False
276+ ) -> Dict [str , Union [Dict [str , List [str ]], List [Span ]]]:
277+ """
278+ Asynchronously annotate a list of text input.
279+
280+ Args:
281+ texts: List of text strings to annotate
282+ structured: If True, return structured output (list of Span objects) for each text
283+
284+ Returns:
285+ Dictionary mapping each input text to its annotation result
286+ """
287+ tasks = [
288+ self .annotate_text_async (text , structured = structured ) for text in texts
289+ ]
72290 results = await asyncio .gather (* tasks )
73291 return dict (zip (texts , results , strict = True ))
0 commit comments