From a608e572ab9cac58f56ad16378a876e6a0d1f2b3 Mon Sep 17 00:00:00 2001 From: Fabian Wallwitz Date: Mon, 4 Nov 2024 08:17:36 +0100 Subject: [PATCH 01/20] add scan option --- scripts/adlsgen2setup.py | 126 +++++++++++++++++++++++++++++++++++---- 1 file changed, 113 insertions(+), 13 deletions(-) diff --git a/scripts/adlsgen2setup.py b/scripts/adlsgen2setup.py index 1deccdf199..7f9aee55d5 100644 --- a/scripts/adlsgen2setup.py +++ b/scripts/adlsgen2setup.py @@ -1,8 +1,10 @@ import argparse import asyncio +import datetime import json import logging import os +import hashlib from typing import Any, Optional import aiohttp @@ -56,7 +58,7 @@ def __init__( self.data_access_control_format = data_access_control_format self.graph_headers: Optional[dict[str, str]] = None - async def run(self): + async def run(self, scandirs: bool = False): async with self.create_service_client() as service_client: logger.info(f"Ensuring {self.filesystem_name} exists...") async with service_client.get_file_system_client(self.filesystem_name) as filesystem_client: @@ -80,15 +82,17 @@ async def run(self): ) directories[directory] = directory_client + logger.info("Uploading scanned files...") + if scandirs and directory != "/": + await self.scan_and_upload_directories(directories, filesystem_client) + logger.info("Uploading files...") for file, file_info in self.data_access_control_format["files"].items(): directory = file_info["directory"] if directory not in directories: logger.error(f"File {file} has unknown directory {directory}, exiting...") return - await self.upload_file( - directory_client=directories[directory], file_path=os.path.join(self.data_directory, file) - ) + await self.upload_file(directory_client=directories[directory], file_path=os.path.join(self.data_directory, file)) logger.info("Setting access control...") for directory, access_control in self.data_access_control_format["directories"].items(): @@ -100,8 +104,7 @@ async def run(self): f"Directory {directory} has unknown group {group_name} in access control list, exiting" ) return - await directory_client.update_access_control_recursive( - acl=f"group:{groups[group_name]}:r-x" + await directory_client.update_access_control_recursive(acl=f"group:{groups[group_name]}:r-x" ) if "oids" in access_control: for oid in access_control["oids"]: @@ -110,15 +113,110 @@ async def run(self): for directory_client in directories.values(): await directory_client.close() + async def walk_files(self, src_filepath = "."): + filepath_list = [] + + #This for loop uses the os.walk() function to walk through the files and directories + #and records the filepaths of the files to a list + for root, dirs, files in os.walk(src_filepath): + + #iterate through the files currently obtained by os.walk() and + #create the filepath string for that file and add it to the filepath_list list + for file in files: + #Checks to see if the root is '.' and changes it to the correct current + #working directory by calling os.getcwd(). Otherwise root_path will just be the root variable value. + if root == '.': + root_path = os.getcwd() + "/" + else: + root_path = root + + filepath = os.path.join(root_path, file) + + #Appends filepath to filepath_list if filepath does not currently exist in filepath_list + if filepath not in filepath_list: + filepath_list.append(filepath) + + #Return filepath_list + return filepath_list + + async def scan_and_upload_directories(self, directories: dict[str, DataLakeDirectoryClient], filesystem_client): + logger.info("Scanning and uploading files from directories recursively...") + for directory, directory_client in directories.items(): + directory_path = os.path.join(self.data_directory, directory) + + # Überprüfen, ob 'scandir' existiert und auf False gesetzt ist + if not self.data_access_control_format["directories"][directory].get("scandir", True): + logger.info(f"Skipping directory {directory} as 'scandir' is set to False") + continue + + groups = self.data_access_control_format["directories"][directory].get("groups", []) + + # Check if the directory exists before walking it + if not os.path.exists(directory_path): + logger.warning(f"Directory does not exist: {directory_path}") + continue + + # Get all file paths using the walk_files function + file_paths = await self.walk_files(directory_path) + + # Upload each file collected + for file_path in file_paths: + await self.upload_file(directory_client, file_path) + logger.info(f"Uploaded {file_path} to {directory}") + def create_service_client(self): return DataLakeServiceClient( account_url=f"https://{self.storage_account_name}.dfs.core.windows.net", credential=self.credentials ) - async def upload_file(self, directory_client: DataLakeDirectoryClient, file_path: str): + async def calc_md5(self, path: str) -> str: + with open(path, "rb") as file: + return hashlib.md5(file.read()).hexdigest() + + async def check_md5(self, path: str, md5_hash: str) -> bool: + # if filename ends in .md5 skip + if path.endswith(".md5"): + return True + + # if there is a file called .md5 in this directory, see if its updated + stored_hash = None + hash_path = f"{path}.md5" + os.path.exists(hash_path): + with open(hash_path, encoding="utf-8") as md5_f: + stored_hash = md5_f.read() + + if stored_hash and stored_hash.strip() == md5_hash.strip(): + logger.info("Skipping %s, no changes detected.", path) + return True + + # Write the hash + with open(hash_path, "w", encoding="utf-8") as md5_f: + md5_f.write(md5_hash) + + return False + + async def upload_file(self, directory_client: DataLakeDirectoryClient, file_path: str, category: str): + # Calculate MD5 hash once + md5_hash = await self.calc_md5(file_path) + + # Check if the file has been uploaded or if it has changed + if await self.check_md5(file_path, md5_hash): + logger.info("File %s has already been uploaded, skipping upload.", file_path) + return # Skip uploading if the MD5 check indicates no changes + + # Proceed with the upload since the file has changed with open(file=file_path, mode="rb") as f: file_client = directory_client.get_file_client(file=os.path.basename(file_path)) - await file_client.upload_data(f, overwrite=True) + last_modified = datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat() + title = os.path.splitext(os.path.basename(file_path))[0] + metadata = { + "md5": md5_hash, + "category": category, + "updated": last_modified, + "title": title + } + await file_client.upload_data(f, overwrite=True, metadata=metadata) + logger.info("File %s uploaded with metadata %s.", file_path, metadata) async def create_or_get_group(self, group_name: str): group_id = None @@ -144,6 +242,7 @@ async def create_or_get_group(self, group_name: str): # If Unified does not work for you, then you may need the following settings instead: # "mailEnabled": False, # "mailNickname": group_name, + } async with session.post("https://graph.microsoft.com/v1.0/groups", json=group) as response: content = await response.json() @@ -165,19 +264,19 @@ async def main(args: Any): data_access_control_format = json.load(f) command = AdlsGen2Setup( data_directory=args.data_directory, - storage_account_name=os.environ["AZURE_ADLS_GEN2_STORAGE_ACCOUNT"], - filesystem_name="gptkbcontainer", + storage_account_name=os.environ["AZURE_ADLS_GEN2_STORAGE_ACCOUNT"], + filesystem_name=os.environ["AZURE_ADLS_GEN2_FILESYSTEM"], security_enabled_groups=args.create_security_enabled_groups, credentials=credentials, data_access_control_format=data_access_control_format, ) - await command.run() + await command.run(args.scandirs) if __name__ == "__main__": parser = argparse.ArgumentParser( - description="Upload sample data to a Data Lake Storage Gen2 account and associate sample access control lists with it using sample groups", - epilog="Example: ./scripts/adlsgen2setup.py ./data --data-access-control ./scripts/sampleacls.json --create-security-enabled-groups ", + description="Upload data to a Data Lake Storage Gen2 account and associate access control lists with it using sample groups", + epilog="Example: ./scripts/adlsgen2setup.py ./data --data-access-control .azure/${AZURE_ENV_NAME}/docs_acls.json --create-security-enabled-groups --scandirs", ) parser.add_argument("data_directory", help="Data directory that contains sample PDFs") parser.add_argument( @@ -190,6 +289,7 @@ async def main(args: Any): "--data-access-control", required=True, help="JSON file describing access control for the sample data" ) parser.add_argument("--verbose", "-v", required=False, action="store_true", help="Verbose output") + parser.add_argument("--scandirs", required=False, action="store_true", help="Scan and upload all files from directories recursively") args = parser.parse_args() if args.verbose: logging.basicConfig() From 3206092d7f44bf03a56b6001c166fb5c32acacca Mon Sep 17 00:00:00 2001 From: Fabian Wallwitz Date: Mon, 4 Nov 2024 08:22:59 +0100 Subject: [PATCH 02/20] add scandir option an example folder --- scripts/sampleacls.json | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/scripts/sampleacls.json b/scripts/sampleacls.json index dd2d4888fa..b83ca3708c 100644 --- a/scripts/sampleacls.json +++ b/scripts/sampleacls.json @@ -21,10 +21,16 @@ }, "directories": { "employeeinfo": { - "groups": ["GPTKB_HRTest"] + "groups": ["GPTKB_HRTest"], + "scandir": false }, "benefitinfo": { - "groups": ["GPTKB_EmployeeTest", "GPTKB_HRTest"] + "groups": ["GPTKB_EmployeeTest", "GPTKB_HRTest"], + "scandir": false + }, + "GPT4V_Examples": { + "groups": ["GPTKB_EmployeeTest", "GPTKB_HRTest"], + "scandir": true }, "/": { "groups": ["GPTKB_AdminTest"] From a482213ddfdd5411ea0a2870eaa4d62d20321c35 Mon Sep 17 00:00:00 2001 From: Fabian Wallwitz Date: Mon, 4 Nov 2024 08:32:17 +0100 Subject: [PATCH 03/20] update docs --- docs/login_and_acl.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/login_and_acl.md b/docs/login_and_acl.md index 1af26dd249..333b571b1c 100644 --- a/docs/login_and_acl.md +++ b/docs/login_and_acl.md @@ -233,7 +233,9 @@ The script performs the following steps: - Creates example [groups](https://learn.microsoft.com/entra/fundamentals/how-to-manage-groups) listed in the [sampleacls.json](/scripts/sampleacls.json) file. - Creates a filesystem / container `gptkbcontainer` in the storage account. - Creates the directories listed in the [sampleacls.json](/scripts/sampleacls.json) file. -- Uploads the sample PDFs referenced in the [sampleacls.json](/scripts/sampleacls.json) file into the appropriate directories. +- Scans the directories for files recursively if you add the option '--scandirs' (default false) cto the argument list (default off) and you don't have '"scandir": false' (default true) below the directory element in the sampleacls.json file. +- Caluclates md5 checksuk of each file refrenced anc compares with existing 'filename.ext.md5' file. Skip upload if same else upload and storenew md5 value in 'filename.ext.md5' +- Uploads the sample PDFs referenced in the [sampleacls.json](/scripts/sampleacls.json) file or files found in the folders with scandir option set to true into the appropriate directories. - [Recursively sets Access Control Lists (ACLs)](https://learn.microsoft.com/azure/storage/blobs/data-lake-storage-acl-cli) using the information from the [sampleacls.json](/scripts/sampleacls.json) file. In order to use the sample access control, you need to join these groups in your Microsoft Entra tenant. From ce4bffee1da7972d7743fd83625370e30027b6c0 Mon Sep 17 00:00:00 2001 From: Fabian Wallwitz Date: Mon, 4 Nov 2024 08:49:26 +0100 Subject: [PATCH 04/20] fix if issue --- scripts/adlsgen2setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/adlsgen2setup.py b/scripts/adlsgen2setup.py index 7f9aee55d5..719538ff8b 100644 --- a/scripts/adlsgen2setup.py +++ b/scripts/adlsgen2setup.py @@ -181,7 +181,7 @@ async def check_md5(self, path: str, md5_hash: str) -> bool: # if there is a file called .md5 in this directory, see if its updated stored_hash = None hash_path = f"{path}.md5" - os.path.exists(hash_path): + if os.path.exists(hash_path): with open(hash_path, encoding="utf-8") as md5_f: stored_hash = md5_f.read() From 285ae7dee1d88a398cc2d00da2b36fea5af6bce0 Mon Sep 17 00:00:00 2001 From: Fabian Wallwitz Date: Wed, 6 Nov 2024 19:11:50 +0100 Subject: [PATCH 05/20] fix findings, add metadata like md5 --- scripts/adlsgen2setup.py | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/scripts/adlsgen2setup.py b/scripts/adlsgen2setup.py index 719538ff8b..c9f2729f8c 100644 --- a/scripts/adlsgen2setup.py +++ b/scripts/adlsgen2setup.py @@ -1,6 +1,6 @@ import argparse import asyncio -import datetime +from datetime import datetime import json import logging import os @@ -83,7 +83,7 @@ async def run(self, scandirs: bool = False): directories[directory] = directory_client logger.info("Uploading scanned files...") - if scandirs and directory != "/": + if scandirs: await self.scan_and_upload_directories(directories, filesystem_client) logger.info("Uploading files...") @@ -122,35 +122,37 @@ async def walk_files(self, src_filepath = "."): #iterate through the files currently obtained by os.walk() and #create the filepath string for that file and add it to the filepath_list list + root_found: bool = False for file in files: #Checks to see if the root is '.' and changes it to the correct current - #working directory by calling os.getcwd(). Otherwise root_path will just be the root variable value. - if root == '.': - root_path = os.getcwd() + "/" - else: - root_path = root - - filepath = os.path.join(root_path, file) + #working directory by calling os.getcwd(). Otherwise root_path will just be the root variable value. + if not root_found and root == '.': + filepath =os.path.join(os.getcwd() + "/", file) + root_found = True + else: + filepath = os.path.join(root, file) + #Appends filepath to filepath_list if filepath does not currently exist in filepath_list if filepath not in filepath_list: - filepath_list.append(filepath) + filepath_list.append(filepath) #Return filepath_list return filepath_list async def scan_and_upload_directories(self, directories: dict[str, DataLakeDirectoryClient], filesystem_client): logger.info("Scanning and uploading files from directories recursively...") + for directory, directory_client in directories.items(): directory_path = os.path.join(self.data_directory, directory) + if directory == "/": + continue - # Überprüfen, ob 'scandir' existiert und auf False gesetzt ist + # Check if 'scandir' exists and is set to False if not self.data_access_control_format["directories"][directory].get("scandir", True): logger.info(f"Skipping directory {directory} as 'scandir' is set to False") continue - groups = self.data_access_control_format["directories"][directory].get("groups", []) - # Check if the directory exists before walking it if not os.path.exists(directory_path): logger.warning(f"Directory does not exist: {directory_path}") @@ -161,7 +163,7 @@ async def scan_and_upload_directories(self, directories: dict[str, DataLakeDirec # Upload each file collected for file_path in file_paths: - await self.upload_file(directory_client, file_path) + await self.upload_file(directory_client, file_path, directory) logger.info(f"Uploaded {file_path} to {directory}") def create_service_client(self): @@ -195,7 +197,7 @@ async def check_md5(self, path: str, md5_hash: str) -> bool: return False - async def upload_file(self, directory_client: DataLakeDirectoryClient, file_path: str, category: str): + async def upload_file(self, directory_client: DataLakeDirectoryClient, file_path: str, category: str = ""): # Calculate MD5 hash once md5_hash = await self.calc_md5(file_path) @@ -207,7 +209,8 @@ async def upload_file(self, directory_client: DataLakeDirectoryClient, file_path # Proceed with the upload since the file has changed with open(file=file_path, mode="rb") as f: file_client = directory_client.get_file_client(file=os.path.basename(file_path)) - last_modified = datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat() + tmtime = os.path.getmtime(file_path) + last_modified = datetime.fromtimestamp(tmtime).isoformat() title = os.path.splitext(os.path.basename(file_path))[0] metadata = { "md5": md5_hash, From bfb95abe775390694cd209332622c2958bd2ec64 Mon Sep 17 00:00:00 2001 From: Fabian Wallwitz Date: Wed, 6 Nov 2024 19:15:36 +0100 Subject: [PATCH 06/20] Update requirements --- app/backend/requirements.txt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/app/backend/requirements.txt b/app/backend/requirements.txt index 51df00e14b..b1af1878b9 100644 --- a/app/backend/requirements.txt +++ b/app/backend/requirements.txt @@ -433,5 +433,8 @@ yarl==1.9.4 zipp==3.20.0 # via importlib-metadata +# used for adlsgen2setup.py +datetime==4.3.0 + # via -r requirements.in # The following packages are considered to be unsafe in a requirements file: # setuptools From d07c9eb26c389e39c5677905c0a36a906f1db146 Mon Sep 17 00:00:00 2001 From: Fabian Wallwitz Date: Wed, 6 Nov 2024 19:15:54 +0100 Subject: [PATCH 07/20] Update requirements --- app/backend/requirements.in | 1 + 1 file changed, 1 insertion(+) diff --git a/app/backend/requirements.in b/app/backend/requirements.in index 99cb44e678..0a1d35a257 100644 --- a/app/backend/requirements.in +++ b/app/backend/requirements.in @@ -30,3 +30,4 @@ types-beautifulsoup4 msgraph-sdk==1.1.0 openai-messages-token-helper python-dotenv +datetime From 4769133b1e2032816fb0269d45d10b3347f1cad6 Mon Sep 17 00:00:00 2001 From: Fabian Wallwitz Date: Wed, 6 Nov 2024 21:15:50 +0100 Subject: [PATCH 08/20] conditional upload - check blob md5 before upload --- scripts/adlsgen2setup.py | 93 +++++++++++++++++++++------------------- 1 file changed, 48 insertions(+), 45 deletions(-) diff --git a/scripts/adlsgen2setup.py b/scripts/adlsgen2setup.py index c9f2729f8c..90a12eca86 100644 --- a/scripts/adlsgen2setup.py +++ b/scripts/adlsgen2setup.py @@ -18,7 +18,9 @@ from load_azd_env import load_azd_env logger = logging.getLogger("scripts") - +# Set the logging level for the azure package to DEBUG +logging.getLogger("azure").setLevel(logging.DEBUG) +logging.getLogger('azure.core.pipeline.policies.http_logging_policy').setLevel(logging.DEBUG) class AdlsGen2Setup: """ @@ -162,9 +164,12 @@ async def scan_and_upload_directories(self, directories: dict[str, DataLakeDirec file_paths = await self.walk_files(directory_path) # Upload each file collected + count =0 + num = len(file_paths) for file_path in file_paths: await self.upload_file(directory_client, file_path, directory) - logger.info(f"Uploaded {file_path} to {directory}") + count=+1 + logger.info(f"Uploaded [{count}/{num}] {directory}/{file_path}") def create_service_client(self): return DataLakeServiceClient( @@ -172,54 +177,52 @@ def create_service_client(self): ) async def calc_md5(self, path: str) -> str: + hash_md5 = hashlib.md5() with open(path, "rb") as file: - return hashlib.md5(file.read()).hexdigest() - - async def check_md5(self, path: str, md5_hash: str) -> bool: - # if filename ends in .md5 skip - if path.endswith(".md5"): - return True - - # if there is a file called .md5 in this directory, see if its updated - stored_hash = None - hash_path = f"{path}.md5" - if os.path.exists(hash_path): - with open(hash_path, encoding="utf-8") as md5_f: - stored_hash = md5_f.read() - - if stored_hash and stored_hash.strip() == md5_hash.strip(): - logger.info("Skipping %s, no changes detected.", path) - return True - - # Write the hash - with open(hash_path, "w", encoding="utf-8") as md5_f: - md5_f.write(md5_hash) - - return False + for chunk in iter(lambda: file.read(4096), b""): + hash_md5.update(chunk) + return hash_md5.hexdigest() + async def get_blob_md5(self, directory_client: DataLakeDirectoryClient, filename: str) -> Optional[str]: + """ + Retrieves the MD5 checksum from the metadata of the specified blob. + """ + file_client = directory_client.get_file_client(filename) + try: + properties = await file_client.get_file_properties() + return properties.metadata.get('md5') + except Exception as e: + logger.error(f"Error getting blob properties for {filename}: {e}") + return None + async def upload_file(self, directory_client: DataLakeDirectoryClient, file_path: str, category: str = ""): # Calculate MD5 hash once md5_hash = await self.calc_md5(file_path) - # Check if the file has been uploaded or if it has changed - if await self.check_md5(file_path, md5_hash): - logger.info("File %s has already been uploaded, skipping upload.", file_path) - return # Skip uploading if the MD5 check indicates no changes - - # Proceed with the upload since the file has changed - with open(file=file_path, mode="rb") as f: - file_client = directory_client.get_file_client(file=os.path.basename(file_path)) - tmtime = os.path.getmtime(file_path) - last_modified = datetime.fromtimestamp(tmtime).isoformat() - title = os.path.splitext(os.path.basename(file_path))[0] - metadata = { - "md5": md5_hash, - "category": category, - "updated": last_modified, - "title": title - } - await file_client.upload_data(f, overwrite=True, metadata=metadata) - logger.info("File %s uploaded with metadata %s.", file_path, metadata) + # Get the filename + filename = os.path.basename(file_path) + + # Get the MD5 checksum from the blob metadata + blob_md5 = await self.get_blob_md5(directory_client, filename) + + # Upload the file if it does not exist or the checksum differs + if blob_md5 is None or md5_hash != blob_md5: + with open(file_path, "rb") as f: + file_client = directory_client.get_file_client(filename) + tmtime = os.path.getmtime(file_path) + last_modified = datetime.fromtimestamp(tmtime).isoformat() + title = os.path.splitext(filename)[0] + metadata = { + "md5": md5_hash, + "category": category, + "updated": last_modified, + "title": title + } + await file_client.upload_data(f, overwrite=True) + await file_client.set_metadata(metadata) + logger.info(f"Uploaded and updated metadata for {filename}") + else: + logger.info(f"No upload needed for {filename}, checksums match") async def create_or_get_group(self, group_name: str): group_id = None @@ -296,6 +299,6 @@ async def main(args: Any): args = parser.parse_args() if args.verbose: logging.basicConfig() - logging.getLogger().setLevel(logging.INFO) + logging.getLogger().setLevel(logging.INFO) asyncio.run(main(args)) From f4a504cff603c05fcef0e891285f6d0602864816 Mon Sep 17 00:00:00 2001 From: Fabian Wallwitz Date: Sun, 10 Nov 2024 11:26:18 +0100 Subject: [PATCH 09/20] add checksum, reduce log verbose --- app/backend/prepdocslib/filestrategy.py | 63 ++++++++++++++++++++----- 1 file changed, 50 insertions(+), 13 deletions(-) diff --git a/app/backend/prepdocslib/filestrategy.py b/app/backend/prepdocslib/filestrategy.py index 55b24b6f3a..69183f3220 100644 --- a/app/backend/prepdocslib/filestrategy.py +++ b/app/backend/prepdocslib/filestrategy.py @@ -1,6 +1,10 @@ import logging +import asyncio +from concurrent.futures import ThreadPoolExecutor from typing import List, Optional - +from concurrent.futures import ThreadPoolExecutor +from typing import List, Optional +from tqdm.asyncio import tqdm from .blobmanager import BlobManager from .embeddings import ImageEmbeddings, OpenAIEmbeddings from .fileprocessor import FileProcessor @@ -22,17 +26,16 @@ async def parse_file( if processor is None: logger.info("Skipping '%s', no parser found.", file.filename()) return [] - logger.info("Ingesting '%s'", file.filename()) + logger.debug("Ingesting '%s'", file.filename()) pages = [page async for page in processor.parser.parse(content=file.content)] - logger.info("Splitting '%s' into sections", file.filename()) + logger.debug("Splitting '%s' into sections", file.filename()) if image_embeddings: - logger.warning("Each page will be split into smaller chunks of text, but images will be of the entire page.") + logger.debug("Each page will be split into smaller chunks of text, but images will be of the entire page.") sections = [ Section(split_page, content=file, category=category) for split_page in processor.splitter.split_pages(pages) ] return sections - class FileStrategy(Strategy): """ Strategy for ingesting documents into a search service from files stored either locally or in a data lake storage account @@ -44,6 +47,7 @@ def __init__( blob_manager: BlobManager, search_info: SearchInfo, file_processors: dict[str, FileProcessor], + ignore_checksum: bool, document_action: DocumentAction = DocumentAction.Add, embeddings: Optional[OpenAIEmbeddings] = None, image_embeddings: Optional[ImageEmbeddings] = None, @@ -55,6 +59,7 @@ def __init__( self.blob_manager = blob_manager self.file_processors = file_processors self.document_action = document_action + self.ignore_checksum = ignore_checksum self.embeddings = embeddings self.image_embeddings = image_embeddings self.search_analyzer_name = search_analyzer_name @@ -77,29 +82,61 @@ async def run(self): search_manager = SearchManager( self.search_info, self.search_analyzer_name, self.use_acls, False, self.embeddings ) + doccount = self.list_file_strategy.count_docs() + logger.info(f"Processing {doccount} files") + processed_count = 0 if self.document_action == DocumentAction.Add: files = self.list_file_strategy.list() async for file in files: try: - sections = await parse_file(file, self.file_processors, self.category, self.image_embeddings) - if sections: - blob_sas_uris = await self.blob_manager.upload_blob(file) - blob_image_embeddings: Optional[List[List[float]]] = None - if self.image_embeddings and blob_sas_uris: - blob_image_embeddings = await self.image_embeddings.create_embeddings(blob_sas_uris) - await search_manager.update_content(sections, blob_image_embeddings, url=file.url) + if self.ignore_checksum or not await search_manager.file_exists(file): + sections = await parse_file(file, self.file_processors, self.category, self.image_embeddings) + if sections: + blob_sas_uris = await self.blob_manager.upload_blob(file) + blob_image_embeddings: Optional[List[List[float]]] = None + if self.image_embeddings and blob_sas_uris: + blob_image_embeddings = await self.image_embeddings.create_embeddings(blob_sas_uris) + await search_manager.update_content(sections=sections, file=file, image_embeddings=blob_image_embeddings) finally: if file: file.close() + processed_count += 1 + if processed_count % 10 == 0: + remaining = max(doccount - processed_count, 1) + logger.info(f"{processed_count} processed, {remaining} documents remaining") + elif self.document_action == DocumentAction.Remove: + doccount = self.list_file_strategy.count_docs() paths = self.list_file_strategy.list_paths() async for path in paths: await self.blob_manager.remove_blob(path) await search_manager.remove_content(path) + processed_count += 1 + if processed_count % 10 == 0: + remaining = max(doccount - processed_count, 1) + logger.info(f"{processed_count} removed, {remaining} documents remaining") + elif self.document_action == DocumentAction.RemoveAll: await self.blob_manager.remove_blob() await search_manager.remove_content() + async def process_file(self, file, search_manager): + try: + sections = await parse_file(file, self.file_processors, self.category, self.image_embeddings) + if sections: + blob_sas_uris = await self.blob_manager.upload_blob(file) + blob_image_embeddings: Optional[List[List[float]]] = None + if self.image_embeddings and blob_sas_uris: + blob_image_embeddings = await self.image_embeddings.create_embeddings(blob_sas_uris) + await search_manager.update_content(sections=sections, file=file, image_embeddings=blob_image_embeddings) + finally: + if file: + file.close() + + async def remove_file(self, path, search_manager): + await self.blob_manager.remove_blob(path) + await search_manager.remove_content(path) + class UploadUserFileStrategy: """ @@ -124,7 +161,7 @@ async def add_file(self, file: File): logging.warning("Image embeddings are not currently supported for the user upload feature") sections = await parse_file(file, self.file_processors) if sections: - await self.search_manager.update_content(sections, url=file.url) + await self.search_manager.update_content(sections=sections, file=file) async def remove_file(self, filename: str, oid: str): if filename is None or filename == "": From 4ac3780c39959885b556c6ab90afa6790176129a Mon Sep 17 00:00:00 2001 From: Fabian Wallwitz Date: Sun, 10 Nov 2024 11:27:04 +0100 Subject: [PATCH 10/20] add datetime --- app/backend/requirements.txt | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/app/backend/requirements.txt b/app/backend/requirements.txt index e3670630bf..239aea761f 100644 --- a/app/backend/requirements.txt +++ b/app/backend/requirements.txt @@ -350,7 +350,7 @@ python-dateutil==2.9.0.post0 # time-machine python-dotenv==1.0.1 # via -r requirements.in -quart==0.19.7 +quart==0.19.6 # via # -r requirements.in # quart-cors @@ -403,7 +403,6 @@ typing-extensions==4.12.2 # azure-ai-documentintelligence # azure-core # azure-identity - # azure-search-documents # azure-storage-blob # azure-storage-file-datalake # openai @@ -416,7 +415,7 @@ urllib3==2.2.2 # via requests uvicorn==0.30.6 # via -r requirements.in -werkzeug==3.0.6 +werkzeug==3.0.4 # via # flask # quart @@ -434,8 +433,9 @@ yarl==1.9.4 zipp==3.20.0 # via importlib-metadata +# The following packages are considered to be unsafe in a requirements file: +# setuptools + # used for adlsgen2setup.py datetime==4.3.0 # via -r requirements.in -# The following packages are considered to be unsafe in a requirements file: -# setuptools From 283d6d239f0e57af669976de0a5ecfb44e0e0fe0 Mon Sep 17 00:00:00 2001 From: Fabian Wallwitz Date: Sun, 10 Nov 2024 11:27:26 +0100 Subject: [PATCH 11/20] add datetime From 797347c1c78410185abbbca36286c118fc9b0d77 Mon Sep 17 00:00:00 2001 From: Fabian Wallwitz Date: Sun, 10 Nov 2024 11:32:26 +0100 Subject: [PATCH 12/20] support checksum --- app/backend/prepdocslib/blobmanager.py | 90 ++++++++++++++++++++------ 1 file changed, 69 insertions(+), 21 deletions(-) diff --git a/app/backend/prepdocslib/blobmanager.py b/app/backend/prepdocslib/blobmanager.py index e9f18e795a..78b9d5b0b9 100644 --- a/app/backend/prepdocslib/blobmanager.py +++ b/app/backend/prepdocslib/blobmanager.py @@ -3,14 +3,15 @@ import logging import os import re -from typing import List, Optional, Union +from typing import List, Optional, Union, NamedTuple, Tuple import fitz # type: ignore from azure.core.credentials_async import AsyncTokenCredential from azure.storage.blob import ( BlobSasPermissions, UserDelegationKey, - generate_blob_sas, + generate_blob_sas, + BlobClient ) from azure.storage.blob.aio import BlobServiceClient, ContainerClient from PIL import Image, ImageDraw, ImageFont @@ -20,7 +21,6 @@ logger = logging.getLogger("scripts") - class BlobManager: """ Class to manage uploading and deleting blobs containing citation information from a blob storage account @@ -45,29 +45,63 @@ def __init__( self.subscriptionId = subscriptionId self.user_delegation_key: Optional[UserDelegationKey] = None + #async def upload_blob(self, file: File, container_client:ContainerClient) -> Optional[List[str]]: + + async def _create_new_blob(self, file: File, container_client:ContainerClient) -> BlobClient: + with open(file.content.name, "rb") as reopened_file: + blob_name = BlobManager.blob_name_from_file_name(file.content.name) + logger.info("Uploading blob for whole file -> %s", blob_name) + return await container_client.upload_blob(blob_name, reopened_file, overwrite=True, metadata=file.metadata) + + async def _file_blob_update_needed(self, blob_client: BlobClient, file : File) -> bool: + md5_check : int = 0 # 0= not done, 1, positive,. 2 negative + # Get existing blob properties + blob_properties = await blob_client.get_blob_properties() + blob_metadata = blob_properties.metadata + + # Check if the md5 values are the same + file_md5 = file.metadata.get('md5') + blob_md5 = blob_metadata.get('md5') + + # Remove md5 from file metadata if it matches the blob metadata + if file_md5 and file_md5 != blob_md5: + return True + else: + return False + async def upload_blob(self, file: File) -> Optional[List[str]]: async with BlobServiceClient( account_url=self.endpoint, credential=self.credential, max_single_put_size=4 * 1024 * 1024 ) as service_client, service_client.get_container_client(self.container) as container_client: if not await container_client.exists(): await container_client.create_container() - - # Re-open and upload the original file + + # Re-open and upload the original file + md5_check : int = 0 # 0= not done, 1, positive,. 2 negative + + # upload the file local storage zu azure storage + # file.url is only None if files are not uploaded yet, for datalake it is set if file.url is None: - with open(file.content.name, "rb") as reopened_file: - blob_name = BlobManager.blob_name_from_file_name(file.content.name) - logger.info("Uploading blob for whole file -> %s", blob_name) - blob_client = await container_client.upload_blob(blob_name, reopened_file, overwrite=True) - file.url = blob_client.url + blob_client = container_client.get_blob_client(file.url) - if self.store_page_images: + if not await blob_client.exists(): + blob_client = await self._create_new_blob(file, container_client) + else: + if self._blob_update_needed(blob_client, file): + md5_check = 2 + # Upload the file with the updated metadata + with open(file.content.name, "rb") as data: + await blob_client.upload_blob(data, overwrite=True, metadata=file.metadata) + else: + md5_check = 1 + file.url = blob_client.url + + if md5_check!=1 and self.store_page_images: if os.path.splitext(file.content.name)[1].lower() == ".pdf": return await self.upload_pdf_blob_images(service_client, container_client, file) else: logger.info("File %s is not a PDF, skipping image upload", file.content.name) - return None - def get_managedidentity_connectionstring(self): return f"ResourceId=/subscriptions/{self.subscriptionId}/resourceGroups/{self.resourceGroup}/providers/Microsoft.Storage/storageAccounts/{self.account};" @@ -93,7 +127,21 @@ async def upload_pdf_blob_images( for i in range(page_count): blob_name = BlobManager.blob_image_name_from_file_page(file.content.name, i) - logger.info("Converting page %s to image and uploading -> %s", i, blob_name) + + blob_client = container_client.get_blob_client(blob_name) + do_upload : bool = True + if await blob_client.exists(): + # Get existing blob properties + blob_properties = await blob_client.get_blob_properties() + blob_metadata = blob_properties.metadata + + # Check if the md5 values are the same + file_md5 = file.metadata.get('md5') + blob_md5 = blob_metadata.get('md5') + if file_md5 == blob_md5: + continue # documemt already uploaded + + logger.debug("Converting page %s to image and uploading -> %s", i, blob_name) doc = fitz.open(file.content.name) page = doc.load_page(i) @@ -119,21 +167,21 @@ async def upload_pdf_blob_images( output = io.BytesIO() new_img.save(output, format="PNG") output.seek(0) - - blob_client = await container_client.upload_blob(blob_name, output, overwrite=True) + + await blob_client.upload_blob(data=output, overwrite=True, metadata=file.metadata) if not self.user_delegation_key: self.user_delegation_key = await service_client.get_user_delegation_key(start_time, expiry_time) - if blob_client.account_name is not None: + if container_client.account_name is not None: sas_token = generate_blob_sas( - account_name=blob_client.account_name, - container_name=blob_client.container_name, - blob_name=blob_client.blob_name, + account_name=container_client.account_name, + container_name=container_client.container_name, + blob_name=blob_name, user_delegation_key=self.user_delegation_key, permission=BlobSasPermissions(read=True), expiry=expiry_time, start=start_time, - ) + ) sas_uris.append(f"{blob_client.url}?{sas_token}") return sas_uris From 66438f2659dd65eabd720c958fc9017f02bd7427 Mon Sep 17 00:00:00 2001 From: Fabian Wallwitz Date: Sun, 10 Nov 2024 11:33:31 +0100 Subject: [PATCH 13/20] reduce verbose --- app/backend/prepdocslib/htmlparser.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/backend/prepdocslib/htmlparser.py b/app/backend/prepdocslib/htmlparser.py index a42579f640..d75594f71c 100644 --- a/app/backend/prepdocslib/htmlparser.py +++ b/app/backend/prepdocslib/htmlparser.py @@ -38,7 +38,7 @@ async def parse(self, content: IO) -> AsyncGenerator[Page, None]: Returns: Page: The parsed html Page. """ - logger.info("Extracting text from '%s' using local HTML parser (BeautifulSoup)", content.name) + logger.debug("Extracting text from '%s' using local HTML parser (BeautifulSoup)", content.name) data = content.read() soup = BeautifulSoup(data, "html.parser") From 15f34a5df03b44e461e87370755f4c33404f50f4 Mon Sep 17 00:00:00 2001 From: Fabian Wallwitz Date: Sun, 10 Nov 2024 11:35:30 +0100 Subject: [PATCH 14/20] support checksum --- app/backend/prepdocslib/listfilestrategy.py | 55 +++++++++++++++++++-- 1 file changed, 50 insertions(+), 5 deletions(-) diff --git a/app/backend/prepdocslib/listfilestrategy.py b/app/backend/prepdocslib/listfilestrategy.py index 3c8fcd27b0..8d232e0b54 100644 --- a/app/backend/prepdocslib/listfilestrategy.py +++ b/app/backend/prepdocslib/listfilestrategy.py @@ -1,3 +1,5 @@ +from azure.storage.filedatalake import DataLakeServiceClient +from azure.storage.blob import BlobServiceClient import base64 import hashlib import logging @@ -8,6 +10,8 @@ from glob import glob from typing import IO, AsyncGenerator, Dict, List, Optional, Union +from azure.identity import DefaultAzureCredential + from azure.core.credentials_async import AsyncTokenCredential from azure.storage.filedatalake.aio import ( DataLakeServiceClient, @@ -22,10 +26,11 @@ class File: This file might contain access control information about which users or groups can access it """ - def __init__(self, content: IO, acls: Optional[dict[str, list]] = None, url: Optional[str] = None): + def __init__(self, content: IO, acls: Optional[dict[str, list]] = None, url: Optional[str] = None, metadata : Dict[str, str]= None): self.content = content self.acls = acls or {} self.url = url + self.metadata = metadata def filename(self): return os.path.basename(self.content.name) @@ -58,7 +63,10 @@ async def list(self) -> AsyncGenerator[File, None]: async def list_paths(self) -> AsyncGenerator[str, None]: if False: # pragma: no cover - this is necessary for mypy to type check yield - + + def count_docs(self) -> int: + if False: # pragma: no cover - this is necessary for mypy to type check + yield class LocalListFileStrategy(ListFileStrategy): """ @@ -109,7 +117,23 @@ def check_md5(self, path: str) -> bool: md5_f.write(existing_hash) return False - + + + def count_docs(self) -> int: + """ + Return the number of files that match the path pattern. + """ + return sum(1 for _ in self._list_paths_sync(self.path_pattern)) + + def _list_paths_sync(self, path_pattern: str): + """ + Synchronous version of _list_paths to be used for counting files. + """ + for path in glob(path_pattern): + if os.path.isdir(path): + yield from self._list_paths_sync(f"{path}/*") + else: + yield path class ADLSGen2ListFileStrategy(ListFileStrategy): """ @@ -167,11 +191,32 @@ async def list(self) -> AsyncGenerator[File, None]: if acl_parts[0] == "user" and "r" in acl_parts[2]: acls["oids"].append(acl_parts[1]) if acl_parts[0] == "group" and "r" in acl_parts[2]: - acls["groups"].append(acl_parts[1]) - yield File(content=open(temp_file_path, "rb"), acls=acls, url=file_client.url) + acls["groups"].append(acl_parts[1]) + properties = await file_client.get_file_properties() + yield File(content=open(temp_file_path, "rb"), acls=acls, url=file_client.url, metadata=properties.metadata) except Exception as data_lake_exception: logger.error(f"\tGot an error while reading {path} -> {data_lake_exception} --> skipping file") try: os.remove(temp_file_path) except Exception as file_delete_exception: logger.error(f"\tGot an error while deleting {temp_file_path} -> {file_delete_exception}") + + def count_docs(self) -> int: + """ + Return the number of blobs in the specified folder within the Azure Blob Storage container. + """ + + # Create a BlobServiceClient using account URL and credentials + service_client = BlobServiceClient( + account_url=f"https://{self.data_lake_storage_account}.blob.core.windows.net", + credential=DefaultAzureCredential()) + + # Get the container client + container_client = service_client.get_container_client(self.data_lake_filesystem) + + # Count blobs within the specified folder + if self.data_lake_path != "/": + return sum(1 for _ in container_client.list_blobs(name_starts_with= self.data_lake_path)) + else: + return sum(1 for _ in container_client.list_blobs()) + From 2375e70cf79978cc6947297a558c762452275392 Mon Sep 17 00:00:00 2001 From: Fabian Wallwitz Date: Sun, 10 Nov 2024 11:35:59 +0100 Subject: [PATCH 15/20] reduce verbose --- app/backend/prepdocslib/pdfparser.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/backend/prepdocslib/pdfparser.py b/app/backend/prepdocslib/pdfparser.py index 6604110020..8f0340cf6b 100644 --- a/app/backend/prepdocslib/pdfparser.py +++ b/app/backend/prepdocslib/pdfparser.py @@ -21,7 +21,7 @@ class LocalPdfParser(Parser): """ async def parse(self, content: IO) -> AsyncGenerator[Page, None]: - logger.info("Extracting text from '%s' using local PDF parser (pypdf)", content.name) + logger.debug("Extracting text from '%s' using local PDF parser (pypdf)", content.name) reader = PdfReader(content) pages = reader.pages @@ -46,7 +46,7 @@ def __init__( self.credential = credential async def parse(self, content: IO) -> AsyncGenerator[Page, None]: - logger.info("Extracting text from '%s' using Azure Document Intelligence", content.name) + logger.debug("Extracting text from '%s' using Azure Document Intelligence", content.name) async with DocumentIntelligenceClient( endpoint=self.endpoint, credential=self.credential From 9d2dbf10f3745d93550c27b9c4f2ba4aeb91b33b Mon Sep 17 00:00:00 2001 From: Fabian Wallwitz Date: Sun, 10 Nov 2024 11:37:17 +0100 Subject: [PATCH 16/20] support checksum --- app/backend/prepdocslib/searchmanager.py | 229 ++++++++++++++--------- 1 file changed, 138 insertions(+), 91 deletions(-) diff --git a/app/backend/prepdocslib/searchmanager.py b/app/backend/prepdocslib/searchmanager.py index f75af03514..15f7d836d8 100644 --- a/app/backend/prepdocslib/searchmanager.py +++ b/app/backend/prepdocslib/searchmanager.py @@ -1,7 +1,9 @@ import asyncio +import datetime +import dateutil.parser as parser import logging import os -from typing import List, Optional +from typing import Dict, List, Optional from azure.search.documents.indexes.models import ( AzureOpenAIVectorizer, @@ -70,92 +72,107 @@ async def create_index(self, vectorizers: Optional[List[VectorSearchVectorizer]] logger.info("Checking whether search index %s exists...", self.search_info.index_name) async with self.search_info.create_search_index_client() as search_index_client: - - if self.search_info.index_name not in [name async for name in search_index_client.list_index_names()]: - logger.info("Creating new search index %s", self.search_info.index_name) - fields = [ - ( - SimpleField(name="id", type="Edm.String", key=True) - if not self.use_int_vectorization - else SearchField( - name="id", + fields = [ + ( + SimpleField(name="id", type="Edm.String", key=True) + if not self.use_int_vectorization + else SearchField( + name="id", + type="Edm.String", + key=True, + sortable=True, + filterable=True, + facetable=True, + analyzer_name="keyword", + ) + ), + SearchableField( + name="content", + type="Edm.String", + analyzer_name=self.search_analyzer_name, + ), + SearchField( + name="embedding", + type=SearchFieldDataType.Collection(SearchFieldDataType.Single), + hidden=False, + searchable=True, + filterable=False, + sortable=False, + facetable=False, + vector_search_dimensions=self.embedding_dimensions, + vector_search_profile_name="embedding_config", + ), + SimpleField(name="category", type="Edm.String", - key=True, - sortable=True, filterable=True, - facetable=True, - analyzer_name="keyword", - ) - ), - SearchableField( - name="content", - type="Edm.String", - analyzer_name=self.search_analyzer_name, - ), + facetable=True), + SimpleField(name="md5", + type="Edm.String", + filterable=True, + facetable=True), + SimpleField(name="deeplink", + type="Edm.String", + filterable=True, + facetable=False), + SimpleField(name="updated", + type="Edm.DateTimeOffset", + filterable=True, + facetable=True), + SimpleField( + name="sourcepage", + type="Edm.String", + filterable=True, + facetable=True, + ), + SimpleField( + name="sourcefile", + type="Edm.String", + filterable=True, + facetable=True, + ), + SimpleField( + name="storageUrl", + type="Edm.String", + filterable=True, + facetable=False, + ), + ] + if self.use_acls: + fields.append( + SimpleField( + name="oids", + type=SearchFieldDataType.Collection(SearchFieldDataType.String), + filterable=True, + ) + ) + fields.append( + SimpleField( + name="groups", + type=SearchFieldDataType.Collection(SearchFieldDataType.String), + filterable=True, + ) + ) + if self.use_int_vectorization: + logger.info("Including parent_id field in new index %s", self.search_info.index_name) + fields.append(SearchableField(name="parent_id", type="Edm.String", filterable=True)) + if self.search_images: + logger.info("Including imageEmbedding field in new index %s", self.search_info.index_name) + fields.append( SearchField( - name="embedding", + name="imageEmbedding", type=SearchFieldDataType.Collection(SearchFieldDataType.Single), hidden=False, searchable=True, filterable=False, sortable=False, facetable=False, - vector_search_dimensions=self.embedding_dimensions, + vector_search_dimensions=1024, vector_search_profile_name="embedding_config", ), - SimpleField(name="category", type="Edm.String", filterable=True, facetable=True), - SimpleField( - name="sourcepage", - type="Edm.String", - filterable=True, - facetable=True, - ), - SimpleField( - name="sourcefile", - type="Edm.String", - filterable=True, - facetable=True, - ), - SimpleField( - name="storageUrl", - type="Edm.String", - filterable=True, - facetable=False, - ), - ] - if self.use_acls: - fields.append( - SimpleField( - name="oids", - type=SearchFieldDataType.Collection(SearchFieldDataType.String), - filterable=True, - ) - ) - fields.append( - SimpleField( - name="groups", - type=SearchFieldDataType.Collection(SearchFieldDataType.String), - filterable=True, - ) - ) - if self.use_int_vectorization: - logger.info("Including parent_id field in new index %s", self.search_info.index_name) - fields.append(SearchableField(name="parent_id", type="Edm.String", filterable=True)) - if self.search_images: - logger.info("Including imageEmbedding field in new index %s", self.search_info.index_name) - fields.append( - SearchField( - name="imageEmbedding", - type=SearchFieldDataType.Collection(SearchFieldDataType.Single), - hidden=False, - searchable=True, - filterable=False, - sortable=False, - facetable=False, - vector_search_dimensions=1024, - vector_search_profile_name="embedding_config", - ), - ) + ) + + if self.search_info.index_name not in [name async for name in search_index_client.list_index_names()]: + logger.info("Creating new search index %s", self.search_info.index_name) vectorizers = [] if self.embeddings and isinstance(self.embeddings, AzureOpenAIEmbeddingService): @@ -217,16 +234,13 @@ async def create_index(self, vectorizers: Optional[List[VectorSearchVectorizer]] else: logger.info("Search index %s already exists", self.search_info.index_name) existing_index = await search_index_client.get_index(self.search_info.index_name) - if not any(field.name == "storageUrl" for field in existing_index.fields): - logger.info("Adding storageUrl field to index %s", self.search_info.index_name) - existing_index.fields.append( - SimpleField( - name="storageUrl", - type="Edm.String", - filterable=True, - facetable=False, - ), - ) + existing_field_names = {field.name for field in existing_index.fields} + + # Check and add missing fields + missing_fields = [field for field in fields if field.name not in existing_field_names] + if missing_fields: + logger.info("Adding missing fields to index %s: %s", self.search_info.index_name, [field.name for field in missing_fields]) + existing_index.fields.extend(missing_fields) await search_index_client.create_or_update_index(existing_index) if existing_index.vector_search is not None and ( @@ -252,19 +266,52 @@ async def create_index(self, vectorizers: Optional[List[VectorSearchVectorizer]] self.search_info, ) + async def file_exists(self, file : File ) -> bool: + async with self.search_info.create_search_client() as search_client: + ## make sure that we don't update unchanged sections, by if sourcefile and md5 are the same + if file.metadata.get('md5')!= None: + filter = None + assert file.filename() is not None + filter = f"sourcefile eq '{str(file.filename())}' and md5 eq '{file.metadata.get('md5')}'" + + # make sure (when applicable) that we don't skip if different categories have same file.filename() + #TODO: refactoring: check if using file.filename() as primary for blob is a good idea, or better use sha256(instead as md5) as reliable for blob and index primary key + if file.metadata.get('category') is not None: + filter = filter + f" and category eq '{file.metadata.get('category')}'" + max_results = 1 + result = await search_client.search( + search_text="", filter=filter, top=max_results, include_total_count=True + ) + result_count = await result.get_count() + if result_count > 0: + logger.debug("Skipping %s, no changes detected.", file.filename()) + return True + else: + return False + ## -- end of check + async def update_content( - self, sections: List[Section], image_embeddings: Optional[List[List[float]]] = None, url: Optional[str] = None - ): + self, sections: List[Section], file : File ,image_embeddings: Optional[List[List[float]]] = None): MAX_BATCH_SIZE = 1000 section_batches = [sections[i : i + MAX_BATCH_SIZE] for i in range(0, len(sections), MAX_BATCH_SIZE)] async with self.search_info.create_search_client() as search_client: + + ## caluclate a (default) updated timestamp in format of index + if file.metadata.get('updated') is None: + docdate = datetime.now(datetime.timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z' + else: + docdate = parser.isoparse(file.metadata.get('updated')).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z' + for batch_index, batch in enumerate(section_batches): documents = [ { "id": f"{section.content.filename_to_id()}-page-{section_index + batch_index * MAX_BATCH_SIZE}", "content": section.split_page.text, - "category": section.category, + "category": file.metadata.get('category'), + "md5": file.metadata.get('md5'), + "deeplink": file.metadata.get('deeplink'), # optional deel link original doc source for citiation,inline view + "updated": docdate, "sourcepage": ( BlobManager.blob_image_name_from_file_page( filename=section.content.filename(), @@ -281,9 +328,9 @@ async def update_content( } for section_index, section in enumerate(batch) ] - if url: + if file.url: for document in documents: - document["storageUrl"] = url + document["storageUrl"] = file.url if self.embeddings: embeddings = await self.embeddings.create_embeddings( texts=[section.split_page.text for section in batch] From 4ef562285b4aa9406d1af77386bb2b1fbfb3fd13 Mon Sep 17 00:00:00 2001 From: Pamela Fox Date: Wed, 20 Nov 2024 09:12:32 -0800 Subject: [PATCH 17/20] Addressed some of my comments --- app/backend/prepdocslib/blobmanager.py | 81 ++++++++++---------- app/backend/prepdocslib/filestrategy.py | 15 ++-- app/backend/prepdocslib/listfilestrategy.py | 39 +++++----- app/backend/prepdocslib/searchmanager.py | 65 ++++++++-------- scripts/adlsgen2setup.py | 82 ++++++++++----------- 5 files changed, 143 insertions(+), 139 deletions(-) diff --git a/app/backend/prepdocslib/blobmanager.py b/app/backend/prepdocslib/blobmanager.py index 78b9d5b0b9..eefbad5efe 100644 --- a/app/backend/prepdocslib/blobmanager.py +++ b/app/backend/prepdocslib/blobmanager.py @@ -3,15 +3,16 @@ import logging import os import re -from typing import List, Optional, Union, NamedTuple, Tuple +from enum import Enum +from typing import List, Optional, Union import fitz # type: ignore from azure.core.credentials_async import AsyncTokenCredential from azure.storage.blob import ( + BlobClient, BlobSasPermissions, UserDelegationKey, - generate_blob_sas, - BlobClient + generate_blob_sas, ) from azure.storage.blob.aio import BlobServiceClient, ContainerClient from PIL import Image, ImageDraw, ImageFont @@ -21,6 +22,7 @@ logger = logging.getLogger("scripts") + class BlobManager: """ Class to manage uploading and deleting blobs containing citation information from a blob storage account @@ -45,58 +47,60 @@ def __init__( self.subscriptionId = subscriptionId self.user_delegation_key: Optional[UserDelegationKey] = None - #async def upload_blob(self, file: File, container_client:ContainerClient) -> Optional[List[str]]: - - async def _create_new_blob(self, file: File, container_client:ContainerClient) -> BlobClient: + async def _create_new_blob(self, file: File, container_client: ContainerClient) -> BlobClient: with open(file.content.name, "rb") as reopened_file: - blob_name = BlobManager.blob_name_from_file_name(file.content.name) - logger.info("Uploading blob for whole file -> %s", blob_name) - return await container_client.upload_blob(blob_name, reopened_file, overwrite=True, metadata=file.metadata) + blob_name = BlobManager.blob_name_from_file_name(file.content.name) + logger.info("Uploading blob for whole file -> %s", blob_name) + return await container_client.upload_blob(blob_name, reopened_file, overwrite=True, metadata=file.metadata) - async def _file_blob_update_needed(self, blob_client: BlobClient, file : File) -> bool: - md5_check : int = 0 # 0= not done, 1, positive,. 2 negative + async def _file_blob_update_needed(self, blob_client: BlobClient, file: File) -> bool: # Get existing blob properties blob_properties = await blob_client.get_blob_properties() blob_metadata = blob_properties.metadata - + # Check if the md5 values are the same - file_md5 = file.metadata.get('md5') - blob_md5 = blob_metadata.get('md5') - - # Remove md5 from file metadata if it matches the blob metadata - if file_md5 and file_md5 != blob_md5: - return True - else: - return False - + file_md5 = file.metadata.get("md5") + blob_md5 = blob_metadata.get("md5") + + # If the file has an md5 value, check if it is different from the blob + return file_md5 and file_md5 != blob_md5 + async def upload_blob(self, file: File) -> Optional[List[str]]: async with BlobServiceClient( account_url=self.endpoint, credential=self.credential, max_single_put_size=4 * 1024 * 1024 ) as service_client, service_client.get_container_client(self.container) as container_client: if not await container_client.exists(): await container_client.create_container() - - # Re-open and upload the original file - md5_check : int = 0 # 0= not done, 1, positive,. 2 negative - - # upload the file local storage zu azure storage + + # Re-open and upload the original file if the blob does not exist or the md5 values do not match + class MD5Check(Enum): + NOT_DONE = 0 + MATCH = 1 + NO_MATCH = 2 + + md5_check = MD5Check.NOT_DONE + + # Upload the file to Azure Storage # file.url is only None if files are not uploaded yet, for datalake it is set if file.url is None: blob_client = container_client.get_blob_client(file.url) if not await blob_client.exists(): + logger.info("Blob %s does not exist, uploading", file.url) blob_client = await self._create_new_blob(file, container_client) else: if self._blob_update_needed(blob_client, file): - md5_check = 2 + logger.info("Blob %s exists but md5 values do not match, updating", file.url) + md5_check = MD5Check.NO_MATCH # Upload the file with the updated metadata with open(file.content.name, "rb") as data: await blob_client.upload_blob(data, overwrite=True, metadata=file.metadata) else: - md5_check = 1 + logger.info("Blob %s exists and md5 values match, skipping upload", file.url) + md5_check = MD5Check.MATCH file.url = blob_client.url - - if md5_check!=1 and self.store_page_images: + + if md5_check != MD5Check.MATCH and self.store_page_images: if os.path.splitext(file.content.name)[1].lower() == ".pdf": return await self.upload_pdf_blob_images(service_client, container_client, file) else: @@ -127,20 +131,19 @@ async def upload_pdf_blob_images( for i in range(page_count): blob_name = BlobManager.blob_image_name_from_file_page(file.content.name, i) - + blob_client = container_client.get_blob_client(blob_name) - do_upload : bool = True if await blob_client.exists(): # Get existing blob properties blob_properties = await blob_client.get_blob_properties() blob_metadata = blob_properties.metadata - + # Check if the md5 values are the same - file_md5 = file.metadata.get('md5') - blob_md5 = blob_metadata.get('md5') + file_md5 = file.metadata.get("md5") + blob_md5 = blob_metadata.get("md5") if file_md5 == blob_md5: - continue # documemt already uploaded - + continue # documemt already uploaded + logger.debug("Converting page %s to image and uploading -> %s", i, blob_name) doc = fitz.open(file.content.name) @@ -167,7 +170,7 @@ async def upload_pdf_blob_images( output = io.BytesIO() new_img.save(output, format="PNG") output.seek(0) - + await blob_client.upload_blob(data=output, overwrite=True, metadata=file.metadata) if not self.user_delegation_key: self.user_delegation_key = await service_client.get_user_delegation_key(start_time, expiry_time) @@ -181,7 +184,7 @@ async def upload_pdf_blob_images( permission=BlobSasPermissions(read=True), expiry=expiry_time, start=start_time, - ) + ) sas_uris.append(f"{blob_client.url}?{sas_token}") return sas_uris diff --git a/app/backend/prepdocslib/filestrategy.py b/app/backend/prepdocslib/filestrategy.py index 69183f3220..fbef639ff2 100644 --- a/app/backend/prepdocslib/filestrategy.py +++ b/app/backend/prepdocslib/filestrategy.py @@ -1,10 +1,6 @@ import logging -import asyncio -from concurrent.futures import ThreadPoolExecutor from typing import List, Optional -from concurrent.futures import ThreadPoolExecutor -from typing import List, Optional -from tqdm.asyncio import tqdm + from .blobmanager import BlobManager from .embeddings import ImageEmbeddings, OpenAIEmbeddings from .fileprocessor import FileProcessor @@ -36,6 +32,7 @@ async def parse_file( ] return sections + class FileStrategy(Strategy): """ Strategy for ingesting documents into a search service from files stored either locally or in a data lake storage account @@ -96,7 +93,9 @@ async def run(self): blob_image_embeddings: Optional[List[List[float]]] = None if self.image_embeddings and blob_sas_uris: blob_image_embeddings = await self.image_embeddings.create_embeddings(blob_sas_uris) - await search_manager.update_content(sections=sections, file=file, image_embeddings=blob_image_embeddings) + await search_manager.update_content( + sections=sections, file=file, image_embeddings=blob_image_embeddings + ) finally: if file: file.close() @@ -128,7 +127,9 @@ async def process_file(self, file, search_manager): blob_image_embeddings: Optional[List[List[float]]] = None if self.image_embeddings and blob_sas_uris: blob_image_embeddings = await self.image_embeddings.create_embeddings(blob_sas_uris) - await search_manager.update_content(sections=sections, file=file, image_embeddings=blob_image_embeddings) + await search_manager.update_content( + sections=sections, file=file, image_embeddings=blob_image_embeddings + ) finally: if file: file.close() diff --git a/app/backend/prepdocslib/listfilestrategy.py b/app/backend/prepdocslib/listfilestrategy.py index 8d232e0b54..eed61d452e 100644 --- a/app/backend/prepdocslib/listfilestrategy.py +++ b/app/backend/prepdocslib/listfilestrategy.py @@ -1,5 +1,3 @@ -from azure.storage.filedatalake import DataLakeServiceClient -from azure.storage.blob import BlobServiceClient import base64 import hashlib import logging @@ -10,12 +8,10 @@ from glob import glob from typing import IO, AsyncGenerator, Dict, List, Optional, Union -from azure.identity import DefaultAzureCredential - from azure.core.credentials_async import AsyncTokenCredential -from azure.storage.filedatalake.aio import ( - DataLakeServiceClient, -) +from azure.identity import DefaultAzureCredential +from azure.storage.blob import BlobServiceClient +from azure.storage.filedatalake.aio import DataLakeServiceClient logger = logging.getLogger("scripts") @@ -26,11 +22,17 @@ class File: This file might contain access control information about which users or groups can access it """ - def __init__(self, content: IO, acls: Optional[dict[str, list]] = None, url: Optional[str] = None, metadata : Dict[str, str]= None): + def __init__( + self, + content: IO, + acls: Optional[dict[str, list]] = None, + url: Optional[str] = None, + metadata: Dict[str, str] = None, + ): self.content = content self.acls = acls or {} self.url = url - self.metadata = metadata + self.metadata = metadata def filename(self): return os.path.basename(self.content.name) @@ -63,11 +65,12 @@ async def list(self) -> AsyncGenerator[File, None]: async def list_paths(self) -> AsyncGenerator[str, None]: if False: # pragma: no cover - this is necessary for mypy to type check yield - + def count_docs(self) -> int: if False: # pragma: no cover - this is necessary for mypy to type check yield + class LocalListFileStrategy(ListFileStrategy): """ Concrete strategy for listing files that are located in a local filesystem @@ -117,7 +120,6 @@ def check_md5(self, path: str) -> bool: md5_f.write(existing_hash) return False - def count_docs(self) -> int: """ @@ -135,6 +137,7 @@ def _list_paths_sync(self, path_pattern: str): else: yield path + class ADLSGen2ListFileStrategy(ListFileStrategy): """ Concrete strategy for listing files that are located in a data lake storage account @@ -191,9 +194,11 @@ async def list(self) -> AsyncGenerator[File, None]: if acl_parts[0] == "user" and "r" in acl_parts[2]: acls["oids"].append(acl_parts[1]) if acl_parts[0] == "group" and "r" in acl_parts[2]: - acls["groups"].append(acl_parts[1]) + acls["groups"].append(acl_parts[1]) properties = await file_client.get_file_properties() - yield File(content=open(temp_file_path, "rb"), acls=acls, url=file_client.url, metadata=properties.metadata) + yield File( + content=open(temp_file_path, "rb"), acls=acls, url=file_client.url, metadata=properties.metadata + ) except Exception as data_lake_exception: logger.error(f"\tGot an error while reading {path} -> {data_lake_exception} --> skipping file") try: @@ -205,18 +210,18 @@ def count_docs(self) -> int: """ Return the number of blobs in the specified folder within the Azure Blob Storage container. """ - + # Create a BlobServiceClient using account URL and credentials service_client = BlobServiceClient( account_url=f"https://{self.data_lake_storage_account}.blob.core.windows.net", - credential=DefaultAzureCredential()) + credential=DefaultAzureCredential(), + ) # Get the container client container_client = service_client.get_container_client(self.data_lake_filesystem) # Count blobs within the specified folder if self.data_lake_path != "/": - return sum(1 for _ in container_client.list_blobs(name_starts_with= self.data_lake_path)) + return sum(1 for _ in container_client.list_blobs(name_starts_with=self.data_lake_path)) else: return sum(1 for _ in container_client.list_blobs()) - diff --git a/app/backend/prepdocslib/searchmanager.py b/app/backend/prepdocslib/searchmanager.py index 15f7d836d8..47945786a5 100644 --- a/app/backend/prepdocslib/searchmanager.py +++ b/app/backend/prepdocslib/searchmanager.py @@ -1,10 +1,10 @@ import asyncio import datetime -import dateutil.parser as parser import logging import os -from typing import Dict, List, Optional +from typing import List, Optional +import dateutil.parser as parser from azure.search.documents.indexes.models import ( AzureOpenAIVectorizer, AzureOpenAIVectorizerParameters, @@ -102,22 +102,10 @@ async def create_index(self, vectorizers: Optional[List[VectorSearchVectorizer]] vector_search_dimensions=self.embedding_dimensions, vector_search_profile_name="embedding_config", ), - SimpleField(name="category", - type="Edm.String", - filterable=True, - facetable=True), - SimpleField(name="md5", - type="Edm.String", - filterable=True, - facetable=True), - SimpleField(name="deeplink", - type="Edm.String", - filterable=True, - facetable=False), - SimpleField(name="updated", - type="Edm.DateTimeOffset", - filterable=True, - facetable=True), + SimpleField(name="category", type="Edm.String", filterable=True, facetable=True), + SimpleField(name="md5", type="Edm.String", filterable=True, facetable=True), + SimpleField(name="deeplink", type="Edm.String", filterable=True, facetable=False), + SimpleField(name="updated", type="Edm.DateTimeOffset", filterable=True, facetable=True), SimpleField( name="sourcepage", type="Edm.String", @@ -239,7 +227,11 @@ async def create_index(self, vectorizers: Optional[List[VectorSearchVectorizer]] # Check and add missing fields missing_fields = [field for field in fields if field.name not in existing_field_names] if missing_fields: - logger.info("Adding missing fields to index %s: %s", self.search_info.index_name, [field.name for field in missing_fields]) + logger.info( + "Adding missing fields to index %s: %s", + self.search_info.index_name, + [field.name for field in missing_fields], + ) existing_index.fields.extend(missing_fields) await search_index_client.create_or_update_index(existing_index) @@ -266,17 +258,17 @@ async def create_index(self, vectorizers: Optional[List[VectorSearchVectorizer]] self.search_info, ) - async def file_exists(self, file : File ) -> bool: + async def file_exists(self, file: File) -> bool: async with self.search_info.create_search_client() as search_client: ## make sure that we don't update unchanged sections, by if sourcefile and md5 are the same - if file.metadata.get('md5')!= None: + if file.metadata.get("md5") is not None: filter = None - assert file.filename() is not None + assert file.filename() is not None filter = f"sourcefile eq '{str(file.filename())}' and md5 eq '{file.metadata.get('md5')}'" - + # make sure (when applicable) that we don't skip if different categories have same file.filename() - #TODO: refactoring: check if using file.filename() as primary for blob is a good idea, or better use sha256(instead as md5) as reliable for blob and index primary key - if file.metadata.get('category') is not None: + # TODO: refactoring: check if using file.filename() as primary for blob is a good idea, or better use sha256(instead as md5) as reliable for blob and index primary key + if file.metadata.get("category") is not None: filter = filter + f" and category eq '{file.metadata.get('category')}'" max_results = 1 result = await search_client.search( @@ -285,32 +277,35 @@ async def file_exists(self, file : File ) -> bool: result_count = await result.get_count() if result_count > 0: logger.debug("Skipping %s, no changes detected.", file.filename()) - return True + return True else: return False ## -- end of check async def update_content( - self, sections: List[Section], file : File ,image_embeddings: Optional[List[List[float]]] = None): + self, sections: List[Section], file: File, image_embeddings: Optional[List[List[float]]] = None + ): MAX_BATCH_SIZE = 1000 section_batches = [sections[i : i + MAX_BATCH_SIZE] for i in range(0, len(sections), MAX_BATCH_SIZE)] async with self.search_info.create_search_client() as search_client: - + ## caluclate a (default) updated timestamp in format of index - if file.metadata.get('updated') is None: - docdate = datetime.now(datetime.timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z' + if file.metadata.get("updated") is None: + docdate = datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" else: - docdate = parser.isoparse(file.metadata.get('updated')).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z' - + docdate = parser.isoparse(file.metadata.get("updated")).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" + for batch_index, batch in enumerate(section_batches): documents = [ { "id": f"{section.content.filename_to_id()}-page-{section_index + batch_index * MAX_BATCH_SIZE}", "content": section.split_page.text, - "category": file.metadata.get('category'), - "md5": file.metadata.get('md5'), - "deeplink": file.metadata.get('deeplink'), # optional deel link original doc source for citiation,inline view + "category": file.metadata.get("category"), + "md5": file.metadata.get("md5"), + "deeplink": file.metadata.get( + "deeplink" + ), # optional deel link original doc source for citiation,inline view "updated": docdate, "sourcepage": ( BlobManager.blob_image_name_from_file_page( diff --git a/scripts/adlsgen2setup.py b/scripts/adlsgen2setup.py index 90a12eca86..10a5cd59a0 100644 --- a/scripts/adlsgen2setup.py +++ b/scripts/adlsgen2setup.py @@ -1,10 +1,10 @@ import argparse import asyncio -from datetime import datetime +import hashlib import json import logging import os -import hashlib +from datetime import datetime from typing import Any, Optional import aiohttp @@ -20,7 +20,8 @@ logger = logging.getLogger("scripts") # Set the logging level for the azure package to DEBUG logging.getLogger("azure").setLevel(logging.DEBUG) -logging.getLogger('azure.core.pipeline.policies.http_logging_policy').setLevel(logging.DEBUG) +logging.getLogger("azure.core.pipeline.policies.http_logging_policy").setLevel(logging.DEBUG) + class AdlsGen2Setup: """ @@ -94,7 +95,9 @@ async def run(self, scandirs: bool = False): if directory not in directories: logger.error(f"File {file} has unknown directory {directory}, exiting...") return - await self.upload_file(directory_client=directories[directory], file_path=os.path.join(self.data_directory, file)) + await self.upload_file( + directory_client=directories[directory], file_path=os.path.join(self.data_directory, file) + ) logger.info("Setting access control...") for directory, access_control in self.data_access_control_format["directories"].items(): @@ -106,7 +109,8 @@ async def run(self, scandirs: bool = False): f"Directory {directory} has unknown group {group_name} in access control list, exiting" ) return - await directory_client.update_access_control_recursive(acl=f"group:{groups[group_name]}:r-x" + await directory_client.update_access_control_recursive( + acl=f"group:{groups[group_name]}:r-x" ) if "oids" in access_control: for oid in access_control["oids"]: @@ -115,60 +119,60 @@ async def run(self, scandirs: bool = False): for directory_client in directories.values(): await directory_client.close() - async def walk_files(self, src_filepath = "."): + async def walk_files(self, src_filepath="."): filepath_list = [] - - #This for loop uses the os.walk() function to walk through the files and directories - #and records the filepaths of the files to a list + + # This for loop uses the os.walk() function to walk through the files and directories + # and records the filepaths of the files to a list for root, dirs, files in os.walk(src_filepath): - - #iterate through the files currently obtained by os.walk() and - #create the filepath string for that file and add it to the filepath_list list + + # iterate through the files currently obtained by os.walk() and + # create the filepath string for that file and add it to the filepath_list list root_found: bool = False for file in files: - #Checks to see if the root is '.' and changes it to the correct current - #working directory by calling os.getcwd(). Otherwise root_path will just be the root variable value. - - if not root_found and root == '.': - filepath =os.path.join(os.getcwd() + "/", file) + # Checks to see if the root is '.' and changes it to the correct current + # working directory by calling os.getcwd(). Otherwise root_path will just be the root variable value. + + if not root_found and root == ".": + filepath = os.path.join(os.getcwd() + "/", file) root_found = True else: filepath = os.path.join(root, file) - - #Appends filepath to filepath_list if filepath does not currently exist in filepath_list + + # Appends filepath to filepath_list if filepath does not currently exist in filepath_list if filepath not in filepath_list: - filepath_list.append(filepath) - - #Return filepath_list + filepath_list.append(filepath) + + # Return filepath_list return filepath_list async def scan_and_upload_directories(self, directories: dict[str, DataLakeDirectoryClient], filesystem_client): logger.info("Scanning and uploading files from directories recursively...") - + for directory, directory_client in directories.items(): directory_path = os.path.join(self.data_directory, directory) if directory == "/": continue - - # Check if 'scandir' exists and is set to False + + # Check if 'scandir' exists and is set to False if not self.data_access_control_format["directories"][directory].get("scandir", True): logger.info(f"Skipping directory {directory} as 'scandir' is set to False") continue - + # Check if the directory exists before walking it if not os.path.exists(directory_path): logger.warning(f"Directory does not exist: {directory_path}") continue - + # Get all file paths using the walk_files function file_paths = await self.walk_files(directory_path) # Upload each file collected - count =0 + count = 0 num = len(file_paths) for file_path in file_paths: await self.upload_file(directory_client, file_path, directory) - count=+1 + count = +1 logger.info(f"Uploaded [{count}/{num}] {directory}/{file_path}") def create_service_client(self): @@ -190,11 +194,11 @@ async def get_blob_md5(self, directory_client: DataLakeDirectoryClient, filename file_client = directory_client.get_file_client(filename) try: properties = await file_client.get_file_properties() - return properties.metadata.get('md5') + return properties.metadata.get("md5") except Exception as e: logger.error(f"Error getting blob properties for {filename}: {e}") return None - + async def upload_file(self, directory_client: DataLakeDirectoryClient, file_path: str, category: str = ""): # Calculate MD5 hash once md5_hash = await self.calc_md5(file_path) @@ -212,12 +216,7 @@ async def upload_file(self, directory_client: DataLakeDirectoryClient, file_path tmtime = os.path.getmtime(file_path) last_modified = datetime.fromtimestamp(tmtime).isoformat() title = os.path.splitext(filename)[0] - metadata = { - "md5": md5_hash, - "category": category, - "updated": last_modified, - "title": title - } + metadata = {"md5": md5_hash, "category": category, "updated": last_modified, "title": title} await file_client.upload_data(f, overwrite=True) await file_client.set_metadata(metadata) logger.info(f"Uploaded and updated metadata for {filename}") @@ -248,7 +247,6 @@ async def create_or_get_group(self, group_name: str): # If Unified does not work for you, then you may need the following settings instead: # "mailEnabled": False, # "mailNickname": group_name, - } async with session.post("https://graph.microsoft.com/v1.0/groups", json=group) as response: content = await response.json() @@ -270,7 +268,7 @@ async def main(args: Any): data_access_control_format = json.load(f) command = AdlsGen2Setup( data_directory=args.data_directory, - storage_account_name=os.environ["AZURE_ADLS_GEN2_STORAGE_ACCOUNT"], + storage_account_name=os.environ["AZURE_ADLS_GEN2_STORAGE_ACCOUNT"], filesystem_name=os.environ["AZURE_ADLS_GEN2_FILESYSTEM"], security_enabled_groups=args.create_security_enabled_groups, credentials=credentials, @@ -295,10 +293,12 @@ async def main(args: Any): "--data-access-control", required=True, help="JSON file describing access control for the sample data" ) parser.add_argument("--verbose", "-v", required=False, action="store_true", help="Verbose output") - parser.add_argument("--scandirs", required=False, action="store_true", help="Scan and upload all files from directories recursively") + parser.add_argument( + "--scandirs", required=False, action="store_true", help="Scan and upload all files from directories recursively" + ) args = parser.parse_args() if args.verbose: logging.basicConfig() - logging.getLogger().setLevel(logging.INFO) + logging.getLogger().setLevel(logging.INFO) asyncio.run(main(args)) From 7a1c07fa49e50df52d47623083de31853368a250 Mon Sep 17 00:00:00 2001 From: Pamela Fox Date: Wed, 20 Nov 2024 09:28:25 -0800 Subject: [PATCH 18/20] Make diff more minimal --- app/backend/prepdocslib/blobmanager.py | 5 +-- app/backend/prepdocslib/filestrategy.py | 42 +++++++++++++------------ app/backend/prepdocslib/pdfparser.py | 4 +-- 3 files changed, 27 insertions(+), 24 deletions(-) diff --git a/app/backend/prepdocslib/blobmanager.py b/app/backend/prepdocslib/blobmanager.py index eefbad5efe..4d5b37772c 100644 --- a/app/backend/prepdocslib/blobmanager.py +++ b/app/backend/prepdocslib/blobmanager.py @@ -142,9 +142,10 @@ async def upload_pdf_blob_images( file_md5 = file.metadata.get("md5") blob_md5 = blob_metadata.get("md5") if file_md5 == blob_md5: - continue # documemt already uploaded + logger.info("Blob %s exists and md5 values match, skipping upload", blob_name) + continue # document already uploaded - logger.debug("Converting page %s to image and uploading -> %s", i, blob_name) + logger.info("Converting page %s to image and uploading -> %s", i, blob_name) doc = fitz.open(file.content.name) page = doc.load_page(i) diff --git a/app/backend/prepdocslib/filestrategy.py b/app/backend/prepdocslib/filestrategy.py index fbef639ff2..1b1e094b7e 100644 --- a/app/backend/prepdocslib/filestrategy.py +++ b/app/backend/prepdocslib/filestrategy.py @@ -22,11 +22,11 @@ async def parse_file( if processor is None: logger.info("Skipping '%s', no parser found.", file.filename()) return [] - logger.debug("Ingesting '%s'", file.filename()) + logger.info("Ingesting '%s'", file.filename()) pages = [page async for page in processor.parser.parse(content=file.content)] - logger.debug("Splitting '%s' into sections", file.filename()) + logger.info("Splitting '%s' into sections", file.filename()) if image_embeddings: - logger.debug("Each page will be split into smaller chunks of text, but images will be of the entire page.") + logger.info("Each page will be split into smaller chunks of text, but images will be of the entire page.") sections = [ Section(split_page, content=file, category=category) for split_page in processor.splitter.split_pages(pages) ] @@ -79,41 +79,43 @@ async def run(self): search_manager = SearchManager( self.search_info, self.search_analyzer_name, self.use_acls, False, self.embeddings ) - doccount = self.list_file_strategy.count_docs() - logger.info(f"Processing {doccount} files") + doc_count = self.list_file_strategy.count_docs() + logger.info("Processing %s files", doc_count) processed_count = 0 if self.document_action == DocumentAction.Add: files = self.list_file_strategy.list() async for file in files: try: - if self.ignore_checksum or not await search_manager.file_exists(file): - sections = await parse_file(file, self.file_processors, self.category, self.image_embeddings) - if sections: - blob_sas_uris = await self.blob_manager.upload_blob(file) - blob_image_embeddings: Optional[List[List[float]]] = None - if self.image_embeddings and blob_sas_uris: - blob_image_embeddings = await self.image_embeddings.create_embeddings(blob_sas_uris) - await search_manager.update_content( - sections=sections, file=file, image_embeddings=blob_image_embeddings - ) + if not self.ignore_checksum and await search_manager.file_exists(file): + logger.info("'%s' has already been processed", file.filename()) + continue + sections = await parse_file(file, self.file_processors, self.category, self.image_embeddings) + if sections: + blob_sas_uris = await self.blob_manager.upload_blob(file) + blob_image_embeddings: Optional[List[List[float]]] = None + if self.image_embeddings and blob_sas_uris: + blob_image_embeddings = await self.image_embeddings.create_embeddings(blob_sas_uris) + await search_manager.update_content( + sections=sections, file=file, image_embeddings=blob_image_embeddings + ) finally: if file: file.close() processed_count += 1 if processed_count % 10 == 0: - remaining = max(doccount - processed_count, 1) - logger.info(f"{processed_count} processed, {remaining} documents remaining") + remaining = max(doc_count - processed_count, 1) + logger.info("%s processed, %s documents remaining", processed_count, remaining) elif self.document_action == DocumentAction.Remove: - doccount = self.list_file_strategy.count_docs() + doc_count = self.list_file_strategy.count_docs() paths = self.list_file_strategy.list_paths() async for path in paths: await self.blob_manager.remove_blob(path) await search_manager.remove_content(path) processed_count += 1 if processed_count % 10 == 0: - remaining = max(doccount - processed_count, 1) - logger.info(f"{processed_count} removed, {remaining} documents remaining") + remaining = max(doc_count - processed_count, 1) + logger.info("%s removed, %s documents remaining", processed_count, remaining) elif self.document_action == DocumentAction.RemoveAll: await self.blob_manager.remove_blob() diff --git a/app/backend/prepdocslib/pdfparser.py b/app/backend/prepdocslib/pdfparser.py index 8f0340cf6b..6604110020 100644 --- a/app/backend/prepdocslib/pdfparser.py +++ b/app/backend/prepdocslib/pdfparser.py @@ -21,7 +21,7 @@ class LocalPdfParser(Parser): """ async def parse(self, content: IO) -> AsyncGenerator[Page, None]: - logger.debug("Extracting text from '%s' using local PDF parser (pypdf)", content.name) + logger.info("Extracting text from '%s' using local PDF parser (pypdf)", content.name) reader = PdfReader(content) pages = reader.pages @@ -46,7 +46,7 @@ def __init__( self.credential = credential async def parse(self, content: IO) -> AsyncGenerator[Page, None]: - logger.debug("Extracting text from '%s' using Azure Document Intelligence", content.name) + logger.info("Extracting text from '%s' using Azure Document Intelligence", content.name) async with DocumentIntelligenceClient( endpoint=self.endpoint, credential=self.credential From 0b438127e8d1d0dbd1b0852eb44da08bbbbcb2d5 Mon Sep 17 00:00:00 2001 From: Pamela Fox Date: Wed, 20 Nov 2024 09:31:42 -0800 Subject: [PATCH 19/20] Remove unused functions --- app/backend/prepdocslib/filestrategy.py | 21 +-------------------- app/backend/prepdocslib/htmlparser.py | 2 +- 2 files changed, 2 insertions(+), 21 deletions(-) diff --git a/app/backend/prepdocslib/filestrategy.py b/app/backend/prepdocslib/filestrategy.py index 1b1e094b7e..fc8b8f41af 100644 --- a/app/backend/prepdocslib/filestrategy.py +++ b/app/backend/prepdocslib/filestrategy.py @@ -26,7 +26,7 @@ async def parse_file( pages = [page async for page in processor.parser.parse(content=file.content)] logger.info("Splitting '%s' into sections", file.filename()) if image_embeddings: - logger.info("Each page will be split into smaller chunks of text, but images will be of the entire page.") + logger.warning("Each page will be split into smaller chunks of text, but images will be of the entire page.") sections = [ Section(split_page, content=file, category=category) for split_page in processor.splitter.split_pages(pages) ] @@ -121,25 +121,6 @@ async def run(self): await self.blob_manager.remove_blob() await search_manager.remove_content() - async def process_file(self, file, search_manager): - try: - sections = await parse_file(file, self.file_processors, self.category, self.image_embeddings) - if sections: - blob_sas_uris = await self.blob_manager.upload_blob(file) - blob_image_embeddings: Optional[List[List[float]]] = None - if self.image_embeddings and blob_sas_uris: - blob_image_embeddings = await self.image_embeddings.create_embeddings(blob_sas_uris) - await search_manager.update_content( - sections=sections, file=file, image_embeddings=blob_image_embeddings - ) - finally: - if file: - file.close() - - async def remove_file(self, path, search_manager): - await self.blob_manager.remove_blob(path) - await search_manager.remove_content(path) - class UploadUserFileStrategy: """ diff --git a/app/backend/prepdocslib/htmlparser.py b/app/backend/prepdocslib/htmlparser.py index d75594f71c..a42579f640 100644 --- a/app/backend/prepdocslib/htmlparser.py +++ b/app/backend/prepdocslib/htmlparser.py @@ -38,7 +38,7 @@ async def parse(self, content: IO) -> AsyncGenerator[Page, None]: Returns: Page: The parsed html Page. """ - logger.debug("Extracting text from '%s' using local HTML parser (BeautifulSoup)", content.name) + logger.info("Extracting text from '%s' using local HTML parser (BeautifulSoup)", content.name) data = content.read() soup = BeautifulSoup(data, "html.parser") From d7f1c807b19749254c476afab0ea72f04f3645c2 Mon Sep 17 00:00:00 2001 From: Pamela Fox Date: Wed, 20 Nov 2024 09:40:09 -0800 Subject: [PATCH 20/20] Remove unneeded datetime module --- app/backend/prepdocslib/listfilestrategy.py | 3 +-- app/backend/requirements.in | 1 - app/backend/requirements.txt | 4 ---- 3 files changed, 1 insertion(+), 7 deletions(-) diff --git a/app/backend/prepdocslib/listfilestrategy.py b/app/backend/prepdocslib/listfilestrategy.py index eed61d452e..dd426c55f9 100644 --- a/app/backend/prepdocslib/listfilestrategy.py +++ b/app/backend/prepdocslib/listfilestrategy.py @@ -9,7 +9,6 @@ from typing import IO, AsyncGenerator, Dict, List, Optional, Union from azure.core.credentials_async import AsyncTokenCredential -from azure.identity import DefaultAzureCredential from azure.storage.blob import BlobServiceClient from azure.storage.filedatalake.aio import DataLakeServiceClient @@ -214,7 +213,7 @@ def count_docs(self) -> int: # Create a BlobServiceClient using account URL and credentials service_client = BlobServiceClient( account_url=f"https://{self.data_lake_storage_account}.blob.core.windows.net", - credential=DefaultAzureCredential(), + credential=self.credential, ) # Get the container client diff --git a/app/backend/requirements.in b/app/backend/requirements.in index 0a1d35a257..99cb44e678 100644 --- a/app/backend/requirements.in +++ b/app/backend/requirements.in @@ -30,4 +30,3 @@ types-beautifulsoup4 msgraph-sdk==1.1.0 openai-messages-token-helper python-dotenv -datetime diff --git a/app/backend/requirements.txt b/app/backend/requirements.txt index 239aea761f..51df00e14b 100644 --- a/app/backend/requirements.txt +++ b/app/backend/requirements.txt @@ -435,7 +435,3 @@ zipp==3.20.0 # The following packages are considered to be unsafe in a requirements file: # setuptools - -# used for adlsgen2setup.py -datetime==4.3.0 - # via -r requirements.in