66import re
77from concurrent .futures import ThreadPoolExecutor , as_completed
88from pathlib import Path
9- from typing import List , Optional , Tuple
9+ from typing import List , Literal , Optional , Tuple
1010
11+ import fitz # PyMuPDF
1112import polars as pl
13+ from pydantic import ConfigDict , validate_call
1214
1315from fenic ._backends .local .utils .io_utils import PathScheme , get_path_scheme
16+ from fenic .core ._utils .schema import convert_custom_schema_to_polars_schema
1417from fenic .core .error import FileLoaderError , ValidationError
1518from fenic .core .types import ColumnField , Schema
1619from fenic .core .types .datatypes import (
20+ BooleanType ,
21+ IntegerType ,
1722 StringType ,
1823)
1924
@@ -30,9 +35,10 @@ class DocFolderLoader:
3035 """
3136
3237 @staticmethod
38+ @validate_call (config = ConfigDict (strict = True ))
3339 def load_docs_from_folder (
3440 paths : list [str ],
35- valid_file_extension : str ,
41+ valid_file_extension : Literal [ "md" , "json" , "pdf" ] ,
3642 exclude_pattern : Optional [str ] = None ,
3743 recursive : bool = False ,
3844 ) -> pl .DataFrame :
@@ -65,28 +71,47 @@ def load_docs_from_folder(
6571
6672 if not files :
6773 logger .debug (f"No files found in { paths } " )
68- return DocFolderLoader ._build_no_files_dataframe ()
74+ return DocFolderLoader ._build_no_files_dataframe (file_extension = valid_file_extension )
6975
7076 # Calculate the batch size to ensure that each worker gets at least one file.
7177 max_workers = os .cpu_count () + 4
72- return DocFolderLoader ._process_files (files , max_workers )
78+
79+ # Process files with the appropriate handler based on extension
80+ return DocFolderLoader ._process_files (files , max_workers , valid_file_extension )
7381
7482 @staticmethod
75- def get_schema () -> Schema :
83+ def get_schema (file_extension : str = None ) -> Schema :
7684 """Get the schema for the data type.
7785
7886 Args:
79- data_type : The data type of the files to load
87+ file_extension : The file extension to determine schema
8088
8189 Returns:
8290 Schema: The schema for the data type
8391 """
92+ column_fields = [
93+ ColumnField (name = "file_path" , data_type = StringType ),
94+ ColumnField (name = "error" , data_type = StringType ),
95+ ]
96+ if file_extension == "pdf" :
97+ column_fields .extend ([
98+ # additional file metadata fields
99+ ColumnField (name = "size" , data_type = IntegerType ),
100+ # PDF metadata fields
101+ ColumnField (name = "title" , data_type = StringType ),
102+ ColumnField (name = "author" , data_type = StringType ),
103+ ColumnField (name = "creation_date" , data_type = StringType ),
104+ ColumnField (name = "mod_date" , data_type = StringType ),
105+ ColumnField (name = "page_count" , data_type = IntegerType ),
106+ ColumnField (name = "has_forms" , data_type = BooleanType ),
107+ ColumnField (name = "has_signature_fields" , data_type = BooleanType ),
108+ ColumnField (name = "image_count" , data_type = IntegerType ),
109+ ColumnField (name = "is_encrypted" , data_type = BooleanType ),
110+ ])
111+ else : # load file content directly
112+ column_fields .append (ColumnField (name = "content" , data_type = StringType ))
84113 return Schema (
85- column_fields = [
86- ColumnField (name = "file_path" , data_type = StringType ),
87- ColumnField (name = "error" , data_type = StringType ),
88- ColumnField (name = "content" , data_type = StringType ),
89- ]
114+ column_fields = column_fields
90115 )
91116
92117 @staticmethod
@@ -143,19 +168,29 @@ def _enumerate_files(
143168 def _process_files (
144169 files : List [str ],
145170 max_workers : int ,
171+ file_extension : str = None ,
146172 ) -> pl .DataFrame :
147173 """Process files in parallel using a thread pool.
148174
149175 Args:
150176 files: List of file paths to process
151177 max_workers: Number of worker threads
178+ file_extension: File extension to determine processing type
152179
153180 Returns:
154181 DataFrame: A dataframe containing the files in the folder.
155182 """
183+ # Determine which processing function and schema to use
184+
185+ schema = convert_custom_schema_to_polars_schema (DocFolderLoader .get_schema (file_extension = file_extension ))
186+ if file_extension == "pdf" :
187+ process_func = DocFolderLoader ._process_single_pdf_metadata
188+ else :
189+ process_func = DocFolderLoader ._process_single_file
190+
156191 with ThreadPoolExecutor (max_workers = max_workers ) as executor :
157192 it = iter (files )
158- pending = {executor .submit (DocFolderLoader . _process_single_file , f )
193+ pending = {executor .submit (process_func , f )
159194 for _ , f in zip (range (max_workers ), it , strict = False )}
160195
161196 def results_generator ():
@@ -164,12 +199,12 @@ def results_generator():
164199 pending .remove (future )
165200 yield future .result ()
166201 try :
167- pending .add (executor .submit (DocFolderLoader . _process_single_file , next (it )))
202+ pending .add (executor .submit (process_func , next (it )))
168203 except StopIteration :
169204 pass
170205
171206 # Uses the iterator over the results to build the dataframe.
172- return pl .DataFrame (results_generator (), schema = DocFolderLoader . _get_polars_schema () )
207+ return pl .DataFrame (results_generator (), schema = schema )
173208
174209 @staticmethod
175210 def _process_single_file (
@@ -203,17 +238,9 @@ def _process_single_file(
203238 return file_path , string_error , file_content
204239
205240 @staticmethod
206- def _get_polars_schema () -> pl .Schema :
207- return pl .Schema ({
208- "file_path" : pl .Utf8 ,
209- "error" : pl .Utf8 ,
210- "content" : pl .Utf8 ,
211- })
212-
213- @staticmethod
214- def _build_no_files_dataframe () -> pl .DataFrame :
215- """Build a dataframe from the file content."""
216- return pl .DataFrame ({}, schema = DocFolderLoader ._get_polars_schema ())
241+ def _build_no_files_dataframe (file_extension : str ) -> pl .DataFrame :
242+ """Build an empty dataframe with the appropriate schema."""
243+ return pl .DataFrame ({}, schema = convert_custom_schema_to_polars_schema (DocFolderLoader .get_schema (file_extension = file_extension )))
217244
218245 @staticmethod
219246 def _enumerate_files_s3 (
@@ -302,3 +329,96 @@ def _load_file_s3(file_path: str) -> str:
302329 def _load_file_hf (file_path : str ) -> str :
303330 """Load a file from HuggingFace."""
304331 raise NotImplementedError ("HF file loading is not implemented yet." )
332+
333+ @staticmethod
334+ def _process_single_pdf_metadata (file_path : str ) -> dict :
335+ """Process a single PDF file to extract metadata.
336+
337+ Args:
338+ file_path: The path to the PDF file to process
339+
340+ Returns:
341+ dict: A dictionary containing PDF metadata and error information.
342+ """
343+
344+ path_scheme = get_path_scheme (file_path )
345+ logger .debug (f"Processing PDF: { file_path } - { path_scheme } " )
346+
347+ # Initialize the flat result dict with default values
348+ result = {
349+ "file_path" : file_path ,
350+ "error" : None ,
351+ "size" : 0 ,
352+ "title" : None ,
353+ "author" : None ,
354+ "creation_date" : None ,
355+ "mod_date" : None ,
356+ "page_count" : 0 ,
357+ "has_forms" : False ,
358+ "has_signature_fields" : False ,
359+ "image_count" : 0 ,
360+ "is_encrypted" : False ,
361+ }
362+
363+ try :
364+ if path_scheme == PathScheme .S3 :
365+ raise NotImplementedError ("S3 PDF processing not implemented yet." )
366+ elif path_scheme == PathScheme .HF :
367+ raise NotImplementedError ("HF PDF processing not implemented yet." )
368+ else :
369+ result ["size" ] = os .path .getsize (file_path )
370+ doc = fitz .open (file_path )
371+
372+ # Extract basic document info
373+ doc_metadata = doc .metadata
374+ result .update ({
375+ "title" : doc_metadata .get ("title" ) or "" ,
376+ "author" : doc_metadata .get ("author" ) or "" ,
377+ "creation_date" : doc_metadata .get ("creationDate" ) or "" ,
378+ "mod_date" : doc_metadata .get ("modDate" ) or "" ,
379+ "page_count" : len (doc ),
380+ "is_encrypted" : doc .needs_pass ,
381+ })
382+
383+ # Analyze document structure
384+ image_count = 0
385+ has_forms = False
386+ has_signature_fields = False
387+
388+ for page_num in range (len (doc )):
389+ page = doc [page_num ]
390+
391+ # Count raster images
392+ page_images = page .get_images ()
393+ if page_images :
394+ image_count += len (page_images )
395+
396+ # Vector drawings are represented as drawings in PyMuPDF
397+ drawings = page .get_drawings ()
398+ if drawings :
399+ image_count += len (drawings )
400+
401+ # Check for forms and signature fields
402+ if not has_forms or not has_signature_fields :
403+ widgets = list (page .widgets ())
404+ if len (widgets ) > 0 :
405+ has_forms = True
406+ for widget in widgets :
407+ if widget .field_type == fitz .PDF_WIDGET_TYPE_SIGNATURE :
408+ has_signature_fields = True
409+ break
410+
411+ result .update ({
412+ "has_forms" : has_forms ,
413+ "has_signature_fields" : has_signature_fields ,
414+ "image_count" : image_count ,
415+ })
416+
417+ doc .close ()
418+ logger .debug (f"PDF processed successfully: { file_path } " )
419+
420+ except Exception as e :
421+ logger .warning (f"Error processing PDF { file_path } : { str (e )} " )
422+ result ["error" ] = str (e )
423+
424+ return result
0 commit comments