From 8eff307c3af54676070ab60096702367f69572cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20L=C3=B3pez?= Date: Mon, 31 Mar 2025 10:42:28 +0200 Subject: [PATCH 1/3] Refactoring of the code --- test_data_model/master_tests.py | 139 ++++++++++-------- test_data_model/multiple_tests.py | 82 +++++------ test_data_model/requirements.txt | 6 + .../tests/test_array_object_structure.py | 61 ++++---- .../tests/test_duplicated_attributes.py | 45 +++--- test_data_model/tests/test_file_exists.py | 7 +- .../tests/test_schema_descriptions.py | 65 ++++---- test_data_model/tests/test_schema_metadata.py | 40 ++--- .../tests/test_string_incorrect.py | 6 +- test_data_model/tests/test_valid_json.py | 24 ++- .../tests/test_valid_keyvalues_examples.py | 33 ++--- test_data_model/tests/test_valid_ngsild.py | 12 +- test_data_model/tests/test_valid_ngsiv2.py | 6 +- test_data_model/tests/test_yaml_files.py | 18 +-- 14 files changed, 288 insertions(+), 256 deletions(-) create mode 100644 test_data_model/requirements.txt diff --git a/test_data_model/master_tests.py b/test_data_model/master_tests.py index 46df44077f..5f0b5d4d65 100644 --- a/test_data_model/master_tests.py +++ b/test_data_model/master_tests.py @@ -16,15 +16,15 @@ ################################################################################# # version 26/02/25 - 1 -import json -import importlib -import sys -import os -import requests -import shutil +from json import dump, dumps +from importlib import import_module +from os.path import join, dirname, exists +from os import makedirs +from requests import get +from shutil import copy, rmtree from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime -import argparse # Import argparse for command-line argument parsing +from argparse import ArgumentParser # Import argparse for command-line argument parsing def is_url(path): """ @@ -54,24 +54,22 @@ def convert_github_url_to_raw(repo_url): # Handle master branch URLs if "/blob/master/" in repo_url: - # Replace "github.com" with "raw.githubusercontent.com" - raw_url = repo_url.replace("github.com", "raw.githubusercontent.com") - # Replace "/blob/master/" with "/refs/heads/master/" - raw_url = raw_url.replace("/blob/master/", "/refs/heads/master/") - return raw_url - - # Handle PR branch URLs + return _extracted_from_convert_github_url_to_raw( + repo_url, "/blob/master/", "/refs/heads/master/" + ) elif "/tree/" in repo_url: - # Replace "github.com" with "raw.githubusercontent.com" - raw_url = repo_url.replace("github.com", "raw.githubusercontent.com") - # Replace "/tree/" with "/" - raw_url = raw_url.replace("/tree/", "/") - return raw_url - + return _extracted_from_convert_github_url_to_raw(repo_url, "/tree/", "/") else: raise ValueError("Unsupported GitHub URL format.") except Exception as e: - raise ValueError(f"Error converting GitHub URL to raw URL: {e}") + raise ValueError(f"Error converting GitHub URL to raw URL: {e}") from e + +# TODO Rename this here and in `convert_github_url_to_raw` +def _extracted_from_convert_github_url_to_raw(repo_url: str, arg1: str, arg2: str) -> str: + # Replace "github.com" with "raw.githubusercontent.com" + raw_url = repo_url.replace("github.com", "raw.githubusercontent.com") + + return raw_url.replace(arg1, arg2) def download_file(url, file_path): """ @@ -86,18 +84,19 @@ def download_file(url, file_path): """ try: # Ensure the directory structure exists - os.makedirs(os.path.dirname(file_path), exist_ok=True) + makedirs(dirname(file_path), exist_ok=True) # Download the file - response = requests.get(url) - if response.status_code == 200: - with open(file_path, "wb") as f: - f.write(response.content) - return (file_path, True, "Download successful") - else: - return (file_path, False, f"Failed to download {url}: HTTP {response.status_code}") + response = get(url) + if response.status_code != 200: + return file_path, False, f"Failed to download {url}: HTTP {response.status_code}" + + with open(file_path, "wb") as f: + f.write(response.content) + + return file_path, True, "Download successful" except Exception as e: - return (file_path, False, f"Error downloading {url}: {e}") + return file_path, False, f"Error downloading {url}: {e}" def download_files(base_url_or_path, download_dir): """ @@ -112,7 +111,7 @@ def download_files(base_url_or_path, download_dir): """ try: # Ensure the download directory exists - os.makedirs(download_dir, exist_ok=True) + makedirs(download_dir, exist_ok=True) # List of files to download/copy (adjust as needed) files_to_download = [ @@ -131,7 +130,7 @@ def download_files(base_url_or_path, download_dir): futures = [] for file in files_to_download: file_url = f"{base_url_or_path.rstrip('/')}/{file}" - file_path = os.path.join(download_dir, file) + file_path = join(download_dir, file) futures.append(executor.submit(download_file, file_url, file_path)) # Wait for all downloads to complete and check for errors @@ -142,15 +141,15 @@ def download_files(base_url_or_path, download_dir): else: # Copy files from a local directory (no parallelization needed) for file in files_to_download: - src_path = os.path.join(base_url_or_path, file) - dest_path = os.path.join(download_dir, file) + src_path = join(base_url_or_path, file) + dest_path = join(download_dir, file) # Ensure the directory structure exists - os.makedirs(os.path.dirname(dest_path), exist_ok=True) + makedirs(dirname(dest_path), exist_ok=True) # Copy the file - if os.path.exists(src_path): - shutil.copy(src_path, dest_path) + if exists(src_path): + copy(src_path, dest_path) else: raise Exception(f"File not found: {src_path}") @@ -158,7 +157,7 @@ def download_files(base_url_or_path, download_dir): except Exception as e: raise Exception(f"Error downloading/copying files: {e}") -def run_tests(test_files, repo_to_test, only_report_errors, options): +def run_tests(test_files: list, repo_to_test: str, only_report_errors: bool, options: dict) -> dict: """ Run a series of tests on a file. @@ -175,7 +174,7 @@ def run_tests(test_files, repo_to_test, only_report_errors, options): for test_file in test_files: try: # Import the test module - module = importlib.import_module(f"tests.{test_file}") + module = import_module(f"tests.{test_file}") # Run the test function (assumes the function name is the same as the module name without 'test_') test_function = getattr(module, test_file) test_name, success, message = test_function(repo_to_test, options) @@ -195,10 +194,7 @@ def run_tests(test_files, repo_to_test, only_report_errors, options): return results def main(): - # Set up argument parser - # results_dir = "/var/www/html/extra/test2/results" - results_dir = "/home/aabella/PycharmProjects/data-models/test_data_model/results" - parser = argparse.ArgumentParser(description="Run tests on a repository.") + parser = ArgumentParser(description="Run tests on a repository.") # Mandatory arguments parser.add_argument("repo_url_or_local_path", type=str, help="The repository URL or local path.") @@ -222,27 +218,43 @@ def main(): # Validate the email (basic check) if not args.email or "@" not in args.email: print("Error: Missing or invalid email address.") - sys.exit(1) + exit(1) + + quality_analysis(repo_url_or_local_path=args.repo_url_or_local_path, + published=published, + private=private, + only_report_errors=only_report_errors, + email=args.email, + output_file=output_file) + + +def quality_analysis(repo_url_or_local_path: str, email: str, only_report_errors: bool, published: bool =False, + private: bool =False, output_file: str =None) -> str | None: + # Set up argument parser + # results_dir = "/var/www/html/extra/test2/results" + # results_dir = "/home/aabella/PycharmProjects/data-models/test_data_model/results" + results_dir = "/tmp/test_data_model/results" + if not exists(results_dir): + makedirs(results_dir) # Temporary directory to download/copy the files # download_dir = "/var/html/www/extra/test2/repo_to_test" - download_dir = "/home/aabella/transparentia/CLIENTES/EU/FIWARE/GITHUB/repo_to_test" + # download_dir = "/home/aabella/transparentia/CLIENTES/EU/FIWARE/GITHUB/repo_to_test" + download_dir = "/tmp/test_data_model/repo_to_test" + + results = str() + try: # If the input is a URL, convert it to a raw file base URL - if is_url(args.repo_url_or_local_path): - raw_base_url = convert_github_url_to_raw(args.repo_url_or_local_path) + if is_url(repo_url_or_local_path): + raw_base_url = convert_github_url_to_raw(repo_url_or_local_path) else: - raw_base_url = args.repo_url_or_local_path + raw_base_url = repo_url_or_local_path # Download or copy the files repo_path = download_files(raw_base_url, download_dir) # List of test files to run -# test_files = [ -# "test_valid_json", "test_file_exists", "test_schema_descriptions", -# "test_schema_metadata", "test_duplicated_attributes", "test_yaml_files", -# "test_valid_keyvalues_examples", "test_valid_ngsiv2", "test_valid_ngsild" -# ] test_files = ["test_file_exists", "test_valid_json", "test_yaml_files", @@ -267,29 +279,32 @@ def main(): test_results = run_tests(test_files, repo_path, only_report_errors, options) # Add email to the results - test_results["email"] = args.email + test_results["email"] = email # Display the results - print(json.dumps(test_results, indent=4)) + results = dumps(test_results, indent=4) + # print(results) # Save a file with the results - email_name = args.email.replace("@", "_at_") - time_name = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - filename = f"{results_dir}/{time_name}_{email_name}.json" + email_name = email.replace("@", "_at_") + time_name = datetime.now().strftime("%Y%m%d-%H%M%S") + filename = f"{results_dir}/test_results_{time_name}_{email_name}.json" with open(filename, "w") as f: - json.dump(test_results, f, indent=4) + dump(test_results, f, indent=4) # Save an additional copy of the results if --output is provided if output_file: with open(output_file, "w") as f: - json.dump(test_results, f, indent=4) + dump(test_results, f, indent=4) except Exception as e: print(f"Error: {e}") finally: # Clean up the temporary directory - if os.path.exists(download_dir): - shutil.rmtree(download_dir) + if exists(download_dir): + rmtree(download_dir) + + return results if __name__ == "__main__": main() diff --git a/test_data_model/multiple_tests.py b/test_data_model/multiple_tests.py index 66247ffc95..cc50694de7 100644 --- a/test_data_model/multiple_tests.py +++ b/test_data_model/multiple_tests.py @@ -16,11 +16,11 @@ ################################################################################# # version 26/02/25 - 1 -import sys -import json -import subprocess -import requests +from sys import argv +from json import loads, dump +from requests import get from datetime import datetime +from master_tests import quality_analysis def get_subdirectories(repo_url, root_directory): """ @@ -42,18 +42,16 @@ def get_subdirectories(repo_url, root_directory): api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{root_directory}" try: - response = requests.get(api_url) - if response.status_code == 200: - contents = response.json() - # Filter out only directories - subdirectories = [item['name'] for item in contents if item['type'] == 'dir'] - return subdirectories - else: + response = get(api_url) + if response.status_code != 200: raise Exception(f"Failed to fetch directory contents: HTTP {response.status_code}") + + contents = response.json() + return [item['name'] for item in contents if item['type'] == 'dir'] except Exception as e: - raise Exception(f"Error fetching subdirectories: {e}") + raise Exception(f"Error fetching subdirectories: {e}") from e -def run_master_tests(repo_url, subdirectory, email, only_report_errors): +def run_master_tests(repo_url: str, subdirectory: str, email:str, only_report_errors: bool): """ Run the master_tests.py script for a specific subdirectory. @@ -70,55 +68,55 @@ def run_master_tests(repo_url, subdirectory, email, only_report_errors): # Construct the full URL to the subdirectory subdirectory_url = f"{repo_url}/tree/master/{subdirectory}" print(subdirectory_url) + # Run the master_tests.py script - result = subprocess.run( - [ - "python3", "master_tests.py", - subdirectory_url, - email, - "true" if only_report_errors else "false" - ], - capture_output=True, - text=True - ) + # result = run( + # [ + # "python3", "master_tests.py", + # subdirectory_url, + # email, + # "true" if only_report_errors else "false" + # ], + # capture_output=True, + # text=True + # ) + + # only_report_errors = "true" if only_report_errors else "false" + + result = quality_analysis(repo_url_or_local_path=subdirectory_url, email=email, only_report_errors=only_report_errors) # Parse the output as JSON - return json.loads(result.stdout) + return loads(result) except Exception as e: print("hemos tenido un error") return {"error": str(e)} def main(): - if len(sys.argv) != 5: + if len(argv) != 5: print("Usage: python3 multiple_tests.py ") - sys.exit(1) + exit(1) - repo_url = sys.argv[1] - root_directory = sys.argv[2] - email = sys.argv[3] - only_report_errors = sys.argv[4].lower() == "true" + repo_url = argv[1] + root_directory = argv[2] + email = argv[3] + only_report_errors = argv[4].lower() == "true" # Get the list of subdirectories subdirectories = get_subdirectories(repo_url, root_directory) print(subdirectories) # Run tests for each subdirectory and collect results results = [] - for subdirectory in subdirectories: - print(f"Running tests for {subdirectory}...") - test_result = run_master_tests(repo_url, root_directory + "/" + subdirectory, email, only_report_errors) - for item in test_result: - print(item) - item["datamodel"] = subdirectory - results.append({ - "datamodel": subdirectory, - "result": test_result - }) + + results = \ + [{"datamodel": subdirectory, + "result": run_master_tests(repo_url, f"{root_directory}/{subdirectory}", email, only_report_errors)} + for subdirectory in subdirectories] # Save the results to a JSON file - timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_filename = f"test_results_{timestamp}.json" with open(output_filename, "w") as f: - json.dump(results, f, indent=4) + dump(results, f, indent=4) print(f"Test results saved to {output_filename}") diff --git a/test_data_model/requirements.txt b/test_data_model/requirements.txt new file mode 100644 index 0000000000..c52406d2e8 --- /dev/null +++ b/test_data_model/requirements.txt @@ -0,0 +1,6 @@ +# Python3.13 project +requests==2.32.3 +pyyaml==6.0.2 +jsonpointer==3.0.0 +jsonschema==4.23.0 +jsonref==1.1.0 diff --git a/test_data_model/tests/test_array_object_structure.py b/test_data_model/tests/test_array_object_structure.py index 26f3110b94..e0eb06ed9d 100644 --- a/test_data_model/tests/test_array_object_structure.py +++ b/test_data_model/tests/test_array_object_structure.py @@ -15,9 +15,9 @@ # Author: Alberto Abella # ################################################################################# # version 26/02/25 - 1 -import json -import os -import requests +from json import load, JSONDecodeError +from os.path import join +from requests import get from urllib.parse import urljoin from jsonpointer import resolve_pointer @@ -34,10 +34,7 @@ def resolve_ref(repo_path, ref, base_uri=""): dict: The resolved schema fragment. """ try: - if "#" in ref: - url_part, pointer_part = ref.split("#", 1) - else: - url_part, pointer_part = ref, "" + url_part, pointer_part = ref.split("#", 1) if "#" in ref else (ref, "") if url_part.startswith("http"): # External reference (absolute URL) @@ -48,28 +45,24 @@ def resolve_ref(repo_path, ref, base_uri=""): else: # Local reference within the same file # Use the base URI to determine the file name - if base_uri: - resolved_url = base_uri - else: - # Fallback to the primary schema file in the repo path - resolved_url = os.path.join(repo_path, "schema.json") - + resolved_url = base_uri or join(repo_path, "schema.json") + # Fetch the schema if resolved_url.startswith("http"): - response = requests.get(resolved_url) + response = get(resolved_url) if response.status_code != 200: raise ValueError(f"Failed to fetch external schema from {resolved_url}") schema = response.json() else: with open(resolved_url, 'r') as file: - schema = json.load(file) + schema = load(file) # Resolve the JSON Pointer if it exists if pointer_part: try: schema = resolve_pointer(schema, pointer_part) except Exception as e: - raise ValueError(f"Failed to resolve JSON Pointer '{pointer_part}' in schema: {e}") + raise ValueError(f"Failed to resolve JSON Pointer '{pointer_part}' in schema: {e}") from e # Recursively resolve any nested $refs in the resolved schema # Use the resolved URL as the base URI for nested $refs @@ -77,7 +70,7 @@ def resolve_ref(repo_path, ref, base_uri=""): return schema except Exception as e: - raise ValueError(f"Error resolving reference {ref}: {e}") + raise ValueError(f"Error resolving reference {ref}: {e}") from e def resolve_nested_refs(schema, base_uri): """ @@ -86,16 +79,16 @@ def resolve_nested_refs(schema, base_uri): if isinstance(schema, dict): if "$ref" in schema: return resolve_ref("", schema["$ref"], base_uri) - else: - for key, value in schema.items(): - schema[key] = resolve_nested_refs(value, base_uri) + + for key, value in schema.items(): + schema[key] = resolve_nested_refs(value, base_uri) elif isinstance(schema, list): for i, item in enumerate(schema): schema[i] = resolve_nested_refs(item, base_uri) return schema -def validate_properties(repo_path, properties, base_uri, path="", success=True, output=[]): +def validate_properties(repo_path, properties, base_uri, path="", success=True, output=None): """ Recursively validate properties in the schema, ensuring that arrays have 'items' and objects have 'properties'. @@ -110,6 +103,9 @@ def validate_properties(repo_path, properties, base_uri, path="", success=True, Returns: tuple: (success: bool, output: list) """ + if output is None: + output = [] + for key, value in properties.items(): current_path = f"{path}.{key}" if path else key @@ -135,9 +131,24 @@ def validate_properties(repo_path, properties, base_uri, path="", success=True, # Recursively check nested properties if "properties" in value and isinstance(value["properties"], dict): - success, output = validate_properties(repo_path, value["properties"], base_uri, current_path + ".", success, output) + success, output = validate_properties( + repo_path, + value["properties"], + base_uri, + f"{current_path}.", + success, + output, + ) + if "items" in value and isinstance(value["items"], dict): - success, output = validate_properties(repo_path, value["items"], base_uri, current_path + ".", success, output) + success, output = validate_properties( + repo_path, + value["items"], + base_uri, + f"{current_path}.", + success, + output, + ) return success, output @@ -159,7 +170,7 @@ def test_array_object_structure(repo_path, options): try: with open(f"{repo_path}/schema.json", 'r') as file: - schema = json.load(file) + schema = load(file) base_uri = schema.get("$id", "") # Use $id as the base URI for resolving relative $refs @@ -171,7 +182,7 @@ def test_array_object_structure(repo_path, options): elif "properties" in schema and isinstance(schema["properties"], dict): success, output = validate_properties(repo_path, schema["properties"], base_uri, "", success, output) - except json.JSONDecodeError: + except JSONDecodeError: success = False output.append("*** schema.json is not a valid JSON file") except FileNotFoundError: diff --git a/test_data_model/tests/test_duplicated_attributes.py b/test_data_model/tests/test_duplicated_attributes.py index 0d8f9c7635..b1db49321b 100644 --- a/test_data_model/tests/test_duplicated_attributes.py +++ b/test_data_model/tests/test_duplicated_attributes.py @@ -15,11 +15,11 @@ # Author: Alberto Abella # ################################################################################# # version 26/02/25 - 1 -import json -import os -import jsonref -import urllib.request -import urllib.parse +from json import dumps, load +from os.path import join, exists, dirname, abspath +from jsonref import loads +from urllib.request import pathname2url +from urllib.parse import urljoin def extract_attributes_from_payload(payload, parent_path=""): @@ -87,22 +87,22 @@ def test_duplicated_attributes(repo_to_test, options): success (bool): True if all attributes are defined, False otherwise. output (list): List of messages describing the results of the test. """ - schema_file = os.path.join(repo_to_test, "schema.json") - payload_file = os.path.join(repo_to_test, "examples/example.json") + schema_file = join(repo_to_test, "schema.json") + payload_file = join(repo_to_test, "examples/example.json") - if not os.path.exists(schema_file): + if not exists(schema_file): return "Checking that all payload attributes are defined in the schema", False, ["Schema file not found."] - if not os.path.exists(payload_file): + if not exists(payload_file): return "Checking that all payload attributes are defined in the schema", False, ["Payload file not found."] # Normalize the base URI to ensure proper resolution of references - schema_dir = os.path.dirname(os.path.abspath(schema_file)) - base_uri = urllib.parse.urljoin('file:', urllib.request.pathname2url(schema_dir)) + schema_dir = dirname(abspath(schema_file)) + base_uri = urljoin('file:', pathname2url(schema_dir)) # Load the schema and fully resolve all $ref references using jsonref with open(schema_file, 'r') as f: - schema = jsonref.loads( - json.dumps(json.load(f)), + schema = loads( + dumps(load(f)), base_uri=base_uri, lazy_load=False, load_on_repr=True @@ -110,7 +110,7 @@ def test_duplicated_attributes(repo_to_test, options): # Load the payload with open(payload_file, 'r') as f: - payload = json.load(f) + payload = load(f) output = [] @@ -124,17 +124,20 @@ def test_duplicated_attributes(repo_to_test, options): # Check for attributes in the payload that are not in the schema undefined_attributes = [] - for attribute in payload_attributes: - if attribute not in schema_attributes: - undefined_attributes.append(attribute) + undefined_attributes.extend( + attribute + for attribute in payload_attributes + if attribute not in schema_attributes + ) if undefined_attributes: output.append("The following attributes in the payload are not defined in the schema:") - for attribute in sorted(undefined_attributes): - output.append(f"*** Attribute '{attribute}' in the payload is not defined in the schema.") - + output.extend( + f"*** Attribute '{attribute}' in the payload is not defined in the schema." + for attribute in sorted(undefined_attributes) + ) # Determine if the test was successful - success = len(undefined_attributes) == 0 + success = not undefined_attributes test_name = "Checking that all payload attributes are defined in the schema" return test_name, success, output \ No newline at end of file diff --git a/test_data_model/tests/test_file_exists.py b/test_data_model/tests/test_file_exists.py index 4b4d2d2413..2c3a4da2ee 100644 --- a/test_data_model/tests/test_file_exists.py +++ b/test_data_model/tests/test_file_exists.py @@ -15,7 +15,8 @@ # Author: Alberto Abella # ################################################################################# # version 26/02/25 - 1 -import os +#import os +from os.path import join, exists def test_file_exists(repo_path, options): """ @@ -52,8 +53,8 @@ def test_file_exists(repo_path, options): # Check if each mandatory file exists for file in mandatory_files: - path_to_file = os.path.join(repo_path, file) - exist_file = os.path.exists(path_to_file) + path_to_file = join(repo_path, file) + exist_file = exists(path_to_file) success = success and exist_file if exist_file: diff --git a/test_data_model/tests/test_schema_descriptions.py b/test_data_model/tests/test_schema_descriptions.py index a18b18e667..1a6e5293e4 100644 --- a/test_data_model/tests/test_schema_descriptions.py +++ b/test_data_model/tests/test_schema_descriptions.py @@ -15,11 +15,12 @@ # Author: Alberto Abella # ################################################################################# # version 26/02/25 - 1 -import json -import os -import requests +from json import load +from os.path import join, exists +from requests import get from urllib.parse import urljoin from jsonpointer import resolve_pointer +from itertools import product def validate_description(description): """ @@ -32,7 +33,7 @@ def validate_description(description): if len(description) < 15: return False, "*** Description must be at least 15 characters long." - parts = [part for part in description.split(". ")] + parts = list(description.split(". ")) valid_ngsi_types = ["Property", "GeoProperty", "Relationship", "LanguageProperty", "ListProperty"] ngsi_type_found = None @@ -42,23 +43,28 @@ def validate_description(description): break if not ngsi_type_found: - for part in parts: - for ngsi_type in valid_ngsi_types: - if ngsi_type in part and part != ngsi_type: - return False, f"NGSI type '{part}' contains extra characters." - return False, "*** NGSI type is not described. Must be one of: Property, GeoProperty, Relationship, LanguageProperty, ListProperty" - + return next( + ( + (False, f"NGSI type '{part}' contains extra characters.") + for part, ngsi_type in product(parts, valid_ngsi_types) + if ngsi_type in part and part != ngsi_type + ), + ( + False, + "*** NGSI type is not described. Must be one of: Property, GeoProperty, Relationship, LanguageProperty, ListProperty", + ), + ) + if ngsi_type_found.strip() != ngsi_type_found: return False, f"*** NGSI type '{ngsi_type_found}' contains extra spaces." optional_keys = ["Model:", "Units:", "Enum:", "Privacy:", "Multilingual"] - for part in parts: - for key in optional_keys: - if part.startswith(key): - if not part[len(key):].startswith("'"): - return False, f"*** Invalid format for '{key}'. Expected format: {key}'value'." - if not part.endswith("'"): - return False, f"*** Invalid format for '{key}'. Expected format: {key}'value'." + for part, key in product(parts, optional_keys): + if part.startswith(key): + if not part[len(key):].startswith("'"): + return False, f"*** Invalid format for '{key}'. Expected format: {key}'value'." + if not part.endswith("'"): + return False, f"*** Invalid format for '{key}'. Expected format: {key}'value'." return True, "Description is valid." @@ -68,17 +74,13 @@ def resolve_ref(ref, base_uri): Handles both remote URLs and JSON Pointers, and recursively resolves nested $refs. JSON Pointers (starting with #) are resolved relative to the schema being referenced. """ - if "#" in ref: - url_part, pointer_part = ref.split("#", 1) - else: - url_part, pointer_part = ref, "" - + url_part, pointer_part = ref.split("#", 1) if "#" in ref else (ref, "") if url_part.startswith("http"): resolved_url = url_part else: resolved_url = urljoin(base_uri, url_part) - response = requests.get(resolved_url) + response = get(resolved_url) if response.status_code != 200: raise ValueError(f"*** Failed to fetch external schema from {resolved_url}") @@ -89,7 +91,9 @@ def resolve_ref(ref, base_uri): # Resolve the JSON Pointer relative to the fetched schema schema = resolve_pointer(schema, pointer_part) except Exception as e: - raise ValueError(f"*** Failed to resolve JSON Pointer '{pointer_part}' in schema: {e}") + raise ValueError( + f"*** Failed to resolve JSON Pointer '{pointer_part}' in schema: {e}" + ) from e # Recursively resolve any nested $refs in the resolved schema schema = resolve_nested_refs(schema, resolved_url if url_part else base_uri) @@ -103,15 +107,16 @@ def resolve_nested_refs(schema, base_uri): if isinstance(schema, dict): if "$ref" in schema: return resolve_ref(schema["$ref"], base_uri) - else: - for key, value in schema.items(): - schema[key] = resolve_nested_refs(value, base_uri) + + for key, value in schema.items(): + schema[key] = resolve_nested_refs(value, base_uri) elif isinstance(schema, list): for i, item in enumerate(schema): schema[i] = resolve_nested_refs(item, base_uri) return schema + def check_property_descriptions(properties, base_uri, output, path=""): """ Recursively check descriptions for all properties, including nested ones and arrays. @@ -201,12 +206,12 @@ def test_schema_descriptions(repo_to_test, options): success (bool): True if all descriptions are valid, False otherwise. output (list): List of messages describing the results of the test. """ - schema_file = os.path.join(repo_to_test, "schema.json") - if not os.path.exists(schema_file): + schema_file = join(repo_to_test, "schema.json") + if not exists(schema_file): return "Checking that the schema is properly described in all its attributes", False, ["Schema file not found."] with open(schema_file, 'r') as f: - schema = json.load(f) + schema = load(f) output = [] base_uri = schema.get("$id", "") diff --git a/test_data_model/tests/test_schema_metadata.py b/test_data_model/tests/test_schema_metadata.py index 255c1aa8d2..72214c8a10 100644 --- a/test_data_model/tests/test_schema_metadata.py +++ b/test_data_model/tests/test_schema_metadata.py @@ -16,9 +16,9 @@ ################################################################################# # version 26/02/25 - 1 -import json -import re -import requests +from json import load, JSONDecodeError +from re import compile +from requests import get, RequestException def test_schema_metadata(repo_path, options): """ @@ -35,7 +35,8 @@ def test_schema_metadata(repo_path, options): - it has a license (even if it is empty) just a warning Parameters: - file_path (str): The path to the schema.json file. + repo_path (str): The path to the schema.json file. + options (dict): The options passed to the requests library. Returns: tuple: (success: bool, message: str) @@ -55,7 +56,7 @@ def test_schema_metadata(repo_path, options): try: with open(f"{repo_path}/schema.json", 'r') as file: - schema = json.load(file) + schema = load(file) # Check for $schema and validate its value if "$schema" not in schema: @@ -64,7 +65,8 @@ def test_schema_metadata(repo_path, options): else: if schema["$schema"] != "https://json-schema.org/draft/2020-12/schema": success = False - output.append(f"*** $schema is not pointing to https://json-schema.org/draft/2020-12/schema (found: {schema['$schema']})") + output.append(f"*** $schema is not pointing to https://json-schema.org/draft/2020-12/schema " + f"(found: {schema['$schema']})") else: output.append("$schema is valid") @@ -83,7 +85,7 @@ def test_schema_metadata(repo_path, options): success = False output.append("*** $schemaVersion is missing") else: - version_pattern = re.compile(r"^\d{1,2}\.\d{1,2}\.\d{1,2}$") + version_pattern = compile(r"^\d{1,2}\.\d{1,2}\.\d{1,2}$") if not version_pattern.match(schema["$schemaVersion"]): success = False output.append(f"*** $schemaVersion is not in the correct format (XX.XX.XX) (found: {schema['$schemaVersion']})") @@ -94,23 +96,21 @@ def test_schema_metadata(repo_path, options): if "title" not in schema: success = False output.append("*** title is missing") + elif len(schema["title"]) < minTitleLength: + success = False + output.append(f"*** title is too short (minimum {minTitleLength} characters) (found: {len(schema['title'])} characters)") else: - if len(schema["title"]) < minTitleLength: - success = False - output.append(f"*** title is too short (minimum {minTitleLength} characters) (found: {len(schema['title'])} characters)") - else: - output.append("title is valid") + output.append("title is valid") # Check for description and ensure it is at least 50 characters long if "description" not in schema: success = False output.append("*** description is missing") + elif len(schema["description"]) < minDescriptionLength: + success = False + output.append(f"*** description is too short (minimum {minDescriptionLength} characters) (found: {len(schema['description'])} characters)") else: - if len(schema["description"]) < minDescriptionLength: - success = False - output.append(f"*** description is too short (minimum {minDescriptionLength} characters) (found: {len(schema['description'])} characters)") - else: - output.append("description is valid") + output.append("description is valid") # Check for $id and validate that it points to a real site if "$id" not in schema: @@ -118,7 +118,7 @@ def test_schema_metadata(repo_path, options): output.append("*** $id is missing") else: try: - response = requests.get(schema["$id"]) + response = get(schema["$id"]) if response.status_code != 200: if unpublished: success = True @@ -129,7 +129,7 @@ def test_schema_metadata(repo_path, options): output.append(f"*** $id does not point to a valid site (status code: {response.status_code})") else: output.append("$id is valid and points to a real site") - except requests.RequestException as e: + except RequestException as e: success = False output.append(f"*** $id is not reachable: {e}") @@ -171,7 +171,7 @@ def test_schema_metadata(repo_path, options): else: output.append("license is present and not empty") - except json.JSONDecodeError: + except JSONDecodeError: success = False output.append("*** schema.json is not a valid JSON file") except FileNotFoundError: diff --git a/test_data_model/tests/test_string_incorrect.py b/test_data_model/tests/test_string_incorrect.py index b2dd57831d..ed80ebb545 100644 --- a/test_data_model/tests/test_string_incorrect.py +++ b/test_data_model/tests/test_string_incorrect.py @@ -16,7 +16,7 @@ ################################################################################# # version 26/02/25 - 1 -import json +from json import load, JSONDecodeError def test_string_incorrect(repo_path, options): """ @@ -40,7 +40,7 @@ def test_string_incorrect(repo_path, options): try: with open(f"{repo_path}/schema.json", 'r') as file: - schema = json.load(file) + schema = load(file) def validate_properties(properties, path=""): nonlocal success @@ -58,7 +58,7 @@ def validate_properties(properties, path=""): if "properties" in schema and isinstance(schema["properties"], dict): validate_properties(schema["properties"]) - except json.JSONDecodeError: + except JSONDecodeError: success = False output.append("*** schema.json is not a valid JSON file") except FileNotFoundError: diff --git a/test_data_model/tests/test_valid_json.py b/test_data_model/tests/test_valid_json.py index 78dc790f84..481bb2d113 100644 --- a/test_data_model/tests/test_valid_json.py +++ b/test_data_model/tests/test_valid_json.py @@ -16,7 +16,7 @@ ################################################################################# # version 26/02/25 - 1 -import json +from json import load, JSONDecodeError def test_valid_json(file_path, options): """ @@ -29,29 +29,25 @@ def test_valid_json(file_path, options): tuple: (success: bool, message: str) """ test_name = "Checking that the mandatory json files are valid json files" - mandatory_json_files = ["schema.json", "examples/example.json", "examples/example-normalized.json", "examples/example.jsonld", "examples/example-normalized.jsonld"] + mandatory_json_files = ["schema.json", + "examples/example.json", + "examples/example-normalized.json", + "examples/example.jsonld", + "examples/example-normalized.jsonld" + ] success = True output = [] - # Example usage of the options parameter (optional, for future flexibility) -# if options.get("published", False): -# unpublished = True -# if options.get("private", False): -# output.append("This is a private model.") - - - for file in mandatory_json_files: - try: - local_path = file_path + "/" + file + local_path = f"{file_path}/{file}" # print(f"The local path to the file is {local_path}") with open(local_path, 'r') as local_file: - json.load(local_file) + load(local_file) success = success and True output.append(f"file {file} is a valid json") - except json.JSONDecodeError as e: + except JSONDecodeError as e: success = success and False output.append(f"*** file {file} is NOT a valid json") diff --git a/test_data_model/tests/test_valid_keyvalues_examples.py b/test_data_model/tests/test_valid_keyvalues_examples.py index 948fadabfa..646e8551d1 100644 --- a/test_data_model/tests/test_valid_keyvalues_examples.py +++ b/test_data_model/tests/test_valid_keyvalues_examples.py @@ -16,9 +16,9 @@ ################################################################################# # version 26/02/25 - 1 -import json -import os -import requests +from json import load +from os.path import join, exists +from requests import get from jsonschema import validate, ValidationError def validate_json_against_schema(json_data, schema): @@ -56,7 +56,7 @@ def check_context_url(context): if isinstance(context, str): # Single URL case try: - response = requests.get(context) + response = get(context) if response.status_code == 200: return True, f"The @context URL '{context}' is valid." else: @@ -68,7 +68,7 @@ def check_context_url(context): warnings = [] for url in context: try: - response = requests.get(url) + response = get(url) if response.status_code != 200: warnings.append(f"*** The @context URL '{url}' does not return a valid response (HTTP {response.status_code}).") except Exception as e: @@ -96,9 +96,9 @@ def test_valid_keyvalues_examples(repo_to_test, options): output (list): List of messages describing the results of the test. """ # Paths to the files - schema_file = os.path.join(repo_to_test, "schema.json") - example_json_file = os.path.join(repo_to_test, "examples", "example.json") - example_jsonld_file = os.path.join(repo_to_test, "examples", "example.jsonld") + schema_file = join(repo_to_test, "schema.json") + example_json_file = join(repo_to_test, "examples", "example.json") + example_jsonld_file = join(repo_to_test, "examples", "example.jsonld") output = [] success = True @@ -111,17 +111,17 @@ def test_valid_keyvalues_examples(repo_to_test, options): # Check if the schema file exists - if not os.path.exists(schema_file): + if not exists(schema_file): return "Checking that example files are valid against the schema", False, ["Schema file not found."] # Load the schema with open(schema_file, 'r') as f: - schema = json.load(f) + schema = load(f) # Validate example.json - if os.path.exists(example_json_file): + if exists(example_json_file): with open(example_json_file, 'r') as f: - example_json = json.load(f) + example_json = load(f) is_valid, message = validate_json_against_schema(example_json, schema) output.append(f"example.json: {message}") if not is_valid: @@ -131,9 +131,9 @@ def test_valid_keyvalues_examples(repo_to_test, options): success = False # Validate example.jsonld - if os.path.exists(example_jsonld_file): + if exists(example_jsonld_file): with open(example_jsonld_file, 'r') as f: - example_jsonld = json.load(f) + example_jsonld = load(f) is_valid, message = validate_json_against_schema(example_jsonld, schema) output.append(f"example.jsonld: {message}") if not is_valid: @@ -143,10 +143,7 @@ def test_valid_keyvalues_examples(repo_to_test, options): if "@context" in example_jsonld: context = example_jsonld["@context"] is_context_valid, context_message = check_context_url(context) - if not is_context_valid: - output.append(context_message) # Warning message - else: - output.append(context_message) + output.append(context_message) # Warning message else: output.append("*** example.jsonld is missing the mandatory '@context' attribute.") success = False diff --git a/test_data_model/tests/test_valid_ngsild.py b/test_data_model/tests/test_valid_ngsild.py index f536979e21..10b7a56143 100644 --- a/test_data_model/tests/test_valid_ngsild.py +++ b/test_data_model/tests/test_valid_ngsild.py @@ -16,8 +16,8 @@ ################################################################################# # version 26/02/25 - 1 -import json -import requests +from json import load, JSONDecodeError +from requests import get def check_context_url(context): @@ -36,7 +36,7 @@ def check_context_url(context): if isinstance(context, str): # Single URL case try: - response = requests.get(context) + response = get(context) if response.status_code == 200: return True, f"The @context URL '{context}' is valid." else: @@ -48,7 +48,7 @@ def check_context_url(context): warnings = [] for url in context: try: - response = requests.get(url) + response = get(url) if response.status_code != 200: warnings.append( f"*** The @context URL '{url}' does not return a valid response (HTTP {response.status_code}).") @@ -83,7 +83,7 @@ def test_valid_ngsild(repo_path, options): try: # Load the example-normalized.jsonld file with open(f"{repo_path}/examples/example-normalized.jsonld", 'r') as file: - entity = json.load(file) + entity = load(file) # Validate that the root element is a single entity (a dictionary) if not isinstance(entity, dict): @@ -152,7 +152,7 @@ def test_valid_ngsild(repo_path, options): output.append(f"*** Property '{key}' is missing the 'value' field") - except json.JSONDecodeError: + except JSONDecodeError: success = False output.append("*** example-normalized.jsonld is not a valid JSON file") except FileNotFoundError: diff --git a/test_data_model/tests/test_valid_ngsiv2.py b/test_data_model/tests/test_valid_ngsiv2.py index bd5159e6d0..338a22590c 100644 --- a/test_data_model/tests/test_valid_ngsiv2.py +++ b/test_data_model/tests/test_valid_ngsiv2.py @@ -16,7 +16,7 @@ ################################################################################# # version 26/02/25 - 1 -import json +from json import load, JSONDecodeError def validate_entity(entity): """ @@ -77,7 +77,7 @@ def test_valid_ngsiv2(repo_path, options): try: # Load the example-normalized.json file with open(f"{repo_path}/examples/example-normalized.json", 'r') as file: - data = json.load(file) + data = load(file) success, output = validate_entity(data) @@ -97,7 +97,7 @@ def test_valid_ngsiv2(repo_path, options): if "value" not in data [entity]: success = False output.append(f"*** {entity} has not value") - except json.JSONDecodeError: + except JSONDecodeError: success = False output.append("*** example-normalized.json is not a valid JSON file") except FileNotFoundError: diff --git a/test_data_model/tests/test_yaml_files.py b/test_data_model/tests/test_yaml_files.py index d93279876b..290cd1060b 100644 --- a/test_data_model/tests/test_yaml_files.py +++ b/test_data_model/tests/test_yaml_files.py @@ -16,8 +16,8 @@ ################################################################################# # version 26/02/25 - 1 -import os -import yaml +from os.path import join, exists, basename +from yaml import safe_load, YAMLError def validate_yaml_file(file_path): """ @@ -33,17 +33,17 @@ def validate_yaml_file(file_path): """ try: with open(file_path, 'r') as file: - yaml.safe_load(file) + safe_load(file) # Extract only the filename from the full path - file_name = os.path.basename(file_path) + file_name = basename(file_path) return True, f"The file '{file_name}' is a valid YAML file." - except yaml.YAMLError as e: + except YAMLError as e: # Extract only the filename from the full path - file_name = os.path.basename(file_path) + file_name = basename(file_path) return False, f"*** The file '{file_name}' is not a valid YAML file: {e}" except Exception as e: # Extract only the filename from the full path - file_name = os.path.basename(file_path) + file_name = basename(file_path) return False, f"*** An error occurred while reading '{file_name}': {e}" def test_yaml_files(repo_to_test, options): @@ -72,8 +72,8 @@ def test_yaml_files(repo_to_test, options): for yaml_file in yaml_files: - file_path = os.path.join(repo_to_test, yaml_file) - if not os.path.exists(file_path): + file_path = join(repo_to_test, yaml_file) + if not exists(file_path): output.append(f"*** The file '{yaml_file}' does not exist.") success = False else: From 35f3d4f9a138e6362881583d78698c02293ab2b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20L=C3=B3pez?= Date: Mon, 31 Mar 2025 14:07:05 +0200 Subject: [PATCH 2/3] Resolve merge problems --- test_data_model/config.json | 6 + test_data_model/master_tests.py | 235 +++++++++++------- test_data_model/multiple_tests.py | 67 +++-- .../tests/test_schema_descriptions.py | 150 ++++++++--- 4 files changed, 298 insertions(+), 160 deletions(-) create mode 100644 test_data_model/config.json diff --git a/test_data_model/config.json b/test_data_model/config.json new file mode 100644 index 0000000000..453eb8e52f --- /dev/null +++ b/test_data_model/config.json @@ -0,0 +1,6 @@ +{ + "results_dir": "", + "results_dir_help": "This directory will store the results of the tests either one or multiple. It has to be writable by the script", + "download_dir": "", + "download_dir_help": "this directory is use for temporal download of files and removed once finished. Don't point to any directory with valuable content" +} diff --git a/test_data_model/master_tests.py b/test_data_model/master_tests.py index 5f0b5d4d65..1b24945376 100644 --- a/test_data_model/master_tests.py +++ b/test_data_model/master_tests.py @@ -16,7 +16,7 @@ ################################################################################# # version 26/02/25 - 1 -from json import dump, dumps +from json import dump, dumps, load from importlib import import_module from os.path import join, dirname, exists from os import makedirs @@ -25,6 +25,53 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from argparse import ArgumentParser # Import argparse for command-line argument parsing +from pathlib import Path +from typing import Dict, Any + + +def load_config(config_path: str = None) -> Dict[str, Any]: + """ + Load configuration from a JSON file. + + Loads configuration data from a JSON file, searching default locations if a path isn't provided. + It also validates that required keys are present and converts specific paths to absolute paths. + + Parameters: + config_path (str, optional): The path to the configuration file. If None, default locations are searched. + + Returns: + Dict[str, Any]: The loaded configuration data. + + Raises: + FileNotFoundError: If no configuration file is found. + ValueError: If required keys are missing in the configuration. + """ + default_locations = [ + Path("config.json"), + Path.home() / ".your_package_config.json", + Path(__file__).parent / "config.json" + ] + + if config_path is None: + for location in default_locations: + if location.exists(): + config_path = location + break + else: + raise FileNotFoundError("No configuration file found in default locations") + + with open(config_path, 'r') as f: + config = load(f) + + required_keys = ['results_dir', 'download_dir'] + for key in required_keys: + if key not in config: + raise ValueError(f"Missing required config key: {key}") + + config['results_dir'] = str(Path(config['results_dir']).expanduser().absolute()) + config['download_dir'] = str(Path(config['download_dir']).expanduser().absolute()) + + return config def is_url(path): """ @@ -36,42 +83,40 @@ def is_url(path): Returns: bool: True if the path is a URL, False otherwise. """ - return path.startswith("http://") or path.startswith("https://") + return path.startswith(("http://", "https://")) -def convert_github_url_to_raw(repo_url): + +def convert_github_url_to_raw(subject_root): """ Convert a GitHub repository URL to the corresponding raw file URL. Parameters: - repo_url (str): The GitHub repository URL (e.g., https://github.com/smart-data-models/dataModel.Weather/blob/master/WeatherObserved/schema.json). + subject_root (str): The GitHub repository URL (e.g., https://github.com/smart-data-models/dataModel.Weather/blob/master/WeatherObserved/schema.json). Returns: str: The raw file base URL (e.g., https://raw.githubusercontent.com/smart-data-models/dataModel.Weather/refs/heads/master/WeatherObserved/). """ try: - if "github.com" not in repo_url: + if "github.com" not in subject_root: raise ValueError("Invalid GitHub repository URL.") # Handle master branch URLs - if "/blob/master/" in repo_url: + if "/blob/" in subject_root: return _extracted_from_convert_github_url_to_raw( - repo_url, "/blob/master/", "/refs/heads/master/" + subject_root, "/blob/", "/" ) - elif "/tree/" in repo_url: - return _extracted_from_convert_github_url_to_raw(repo_url, "/tree/", "/") + elif "/tree/" in subject_root: + return _extracted_from_convert_github_url_to_raw(subject_root, "/tree/", "/") else: - raise ValueError("Unsupported GitHub URL format.") + return f"{subject_root.replace("github.com", "raw.githubusercontent.com")}/master" except Exception as e: raise ValueError(f"Error converting GitHub URL to raw URL: {e}") from e -# TODO Rename this here and in `convert_github_url_to_raw` def _extracted_from_convert_github_url_to_raw(repo_url: str, arg1: str, arg2: str) -> str: - # Replace "github.com" with "raw.githubusercontent.com" raw_url = repo_url.replace("github.com", "raw.githubusercontent.com") - return raw_url.replace(arg1, arg2) -def download_file(url, file_path): +def download_file(url: str, file_path): """ Download a single file from a URL and save it to the specified path. @@ -84,12 +129,11 @@ def download_file(url, file_path): """ try: # Ensure the directory structure exists - makedirs(dirname(file_path), exist_ok=True) + makedirs(name=dirname(p=file_path), exist_ok=True) # Download the file - response = get(url) - if response.status_code != 200: - return file_path, False, f"Failed to download {url}: HTTP {response.status_code}" + response = get(url=url, timeout=10) + response.raise_for_status() with open(file_path, "wb") as f: f.write(response.content) @@ -98,16 +142,22 @@ def download_file(url, file_path): except Exception as e: return file_path, False, f"Error downloading {url}: {e}" -def download_files(base_url_or_path, download_dir): +def download_files(subject_root: str, download_dir: str): """ - Download files from a raw GitHub base URL or copy files from a local directory using parallel threads. + Download or copy files from a URL or local directory. + + Downloads or copies a predefined set of files from a given URL or local directory to a specified download directory. + If the source is a URL, parallel downloads are used. If it's a local path, files are copied. Parameters: - base_url_or_path (str): The base URL for raw files or the local directory path. - download_dir (str): The directory to download/copy the files into. + subject_root (str): The URL or local path to download/copy files from. + download_dir (str): The directory to save the downloaded/copied files. Returns: - str: The path to the downloaded/copied files. + str: The path to the download directory. + + Raises: + Exception: If any error occurs during download or copying. """ try: # Ensure the download directory exists @@ -124,28 +174,28 @@ def download_files(base_url_or_path, download_dir): "notes.yaml", ] - if is_url(base_url_or_path): + if is_url(subject_root): # Download files from a URL using parallel threads with ThreadPoolExecutor(max_workers=5) as executor: # Adjust max_workers as needed futures = [] for file in files_to_download: - file_url = f"{base_url_or_path.rstrip('/')}/{file}" + file_url = f"{subject_root.rstrip('/')}/{file}" file_path = join(download_dir, file) futures.append(executor.submit(download_file, file_url, file_path)) # Wait for all downloads to complete and check for errors for future in as_completed(futures): file_path, success, message = future.result() - if not success: + if not success and message: raise Exception(message) else: # Copy files from a local directory (no parallelization needed) for file in files_to_download: - src_path = join(base_url_or_path, file) + src_path = join(subject_root, file) dest_path = join(download_dir, file) # Ensure the directory structure exists - makedirs(dirname(dest_path), exist_ok=True) + makedirs(name=dirname(p=dest_path), exist_ok=True) # Copy the file if exists(src_path): @@ -194,63 +244,68 @@ def run_tests(test_files: list, repo_to_test: str, only_report_errors: bool, opt return results def main(): - parser = ArgumentParser(description="Run tests on a repository.") - - # Mandatory arguments - parser.add_argument("repo_url_or_local_path", type=str, help="The repository URL or local path.") - parser.add_argument("email", type=str, help="The email address for reporting results.") - parser.add_argument("only_report_errors", type=str, help="Whether to report only errors (true/false or 1/0).") - - # Optional arguments - parser.add_argument("--published", type=str, help="Whether the model is officially published (true/false or 1/0).", default="false") - parser.add_argument("--private", type=str, help="Whether the model is private (true/false or 1/0).", default="false") - parser.add_argument("--output", type=str, help="Additional output file path for the test results.", default=None) - - # Parse arguments - args = parser.parse_args() - - # Convert string arguments to appropriate types - only_report_errors = args.only_report_errors.lower() == "true" or args.only_report_errors == "1" - published = args.published.lower() == "true" or args.published == "1" - private = args.private.lower() == "true" or args.private == "1" - output_file = args.output - - # Validate the email (basic check) - if not args.email or "@" not in args.email: - print("Error: Missing or invalid email address.") - exit(1) - - quality_analysis(repo_url_or_local_path=args.repo_url_or_local_path, - published=published, - private=private, - only_report_errors=only_report_errors, - email=args.email, - output_file=output_file) - - -def quality_analysis(repo_url_or_local_path: str, email: str, only_report_errors: bool, published: bool =False, + try: + parser = ArgumentParser(description="Run tests on a repository.") + # https://github.com/smart-data-models/dataModel.DCAT-AP/tree/a0b2ee1a86be25fa896103c10c0a943558a7d6d2/Agent alberto.abella@fiware.org 0 + # Mandatory arguments + parser.add_argument("subject_root", type=str, help="The subject root of the repository to check.") + parser.add_argument("email", type=str, help="The email address for reporting results.") + parser.add_argument("only_report_errors", type=str, help="Whether to report only errors (true/false or 1/0).") + + # Optional arguments + parser.add_argument("--published", type=str, help="Whether the model is officially published (true/false or 1/0).", default="false") + parser.add_argument("--private", type=str, help="Whether the model is private (true/false or 1/0).", default="false") + parser.add_argument("--output", type=str, help="Additional output file path for the test results.", default=None) + + # Parse arguments + args = parser.parse_args() + + # Convert string arguments to appropriate types + only_report_errors = args.only_report_errors.lower() in ("true", "1") + published = args.published.lower() in ("true", "1") + private = args.private.lower() in ("true", "1") + output_file = args.output + + # Validate the email (basic check) + if not args.email or "@" not in args.email: + raise ValueError("Missing or invalid email address") + + # Validate the subject_root, if the input is a URL, convert it to a raw file base URL + if is_url(args.subject_root): + raw_base_url = convert_github_url_to_raw(args.subject_root) + else: + raw_base_url = args.subject_root + + return quality_analysis(raw_base_url=raw_base_url, + published=published, + private=private, + only_report_errors=only_report_errors, + email=args.email, + output_file=output_file) + except Exception as e: + Exception(f"Error analyzing the data model: {e}") + +def quality_analysis(raw_base_url: str, email: str, only_report_errors: bool, published: bool =False, private: bool =False, output_file: str =None) -> str | None: - # Set up argument parser - # results_dir = "/var/www/html/extra/test2/results" - # results_dir = "/home/aabella/PycharmProjects/data-models/test_data_model/results" - results_dir = "/tmp/test_data_model/results" - if not exists(results_dir): - makedirs(results_dir) - - # Temporary directory to download/copy the files - # download_dir = "/var/html/www/extra/test2/repo_to_test" - # download_dir = "/home/aabella/transparentia/CLIENTES/EU/FIWARE/GITHUB/repo_to_test" - download_dir = "/tmp/test_data_model/repo_to_test" + result = { + "success": False, + "error": None, + "test_results": None, + "metadata": { + "timestamp": datetime.now().isoformat() + } + } + + config = load_config() + results_dir = config['results_dir'] + download_dir = config['download_dir'] + + Path(results_dir).mkdir(parents=True, exist_ok=True) + Path(download_dir).mkdir(parents=True, exist_ok=True) results = str() try: - # If the input is a URL, convert it to a raw file base URL - if is_url(repo_url_or_local_path): - raw_base_url = convert_github_url_to_raw(repo_url_or_local_path) - else: - raw_base_url = repo_url_or_local_path - # Download or copy the files repo_path = download_files(raw_base_url, download_dir) @@ -273,22 +328,23 @@ def quality_analysis(repo_url_or_local_path: str, email: str, only_report_errors "published": published, "private": private } - # print(options) # Run the tests with the options object - test_results = run_tests(test_files, repo_path, only_report_errors, options) + test_results = run_tests(test_files=test_files, + repo_to_test=repo_path, + only_report_errors=only_report_errors, + options=options) # Add email to the results test_results["email"] = email # Display the results - results = dumps(test_results, indent=4) - # print(results) + result |= {"success": True, "test_results": test_results} # Save a file with the results email_name = email.replace("@", "_at_") - time_name = datetime.now().strftime("%Y%m%d-%H%M%S") - filename = f"{results_dir}/test_results_{time_name}_{email_name}.json" + timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") + filename = f"{results_dir}/test_results_{timestamp}_{email_name}.json" with open(filename, "w") as f: dump(test_results, f, indent=4) @@ -298,12 +354,15 @@ def quality_analysis(repo_url_or_local_path: str, email: str, only_report_errors dump(test_results, f, indent=4) except Exception as e: - print(f"Error: {e}") + result["error"] = str(e) finally: # Clean up the temporary directory - if exists(download_dir): + if 'download_dir' in locals() and exists(download_dir): rmtree(download_dir) + # Ensure we always print valid JSON + print(dumps(result, indent=2)) + return results if __name__ == "__main__": diff --git a/test_data_model/multiple_tests.py b/test_data_model/multiple_tests.py index cc50694de7..793dc974ef 100644 --- a/test_data_model/multiple_tests.py +++ b/test_data_model/multiple_tests.py @@ -22,7 +22,7 @@ from datetime import datetime from master_tests import quality_analysis -def get_subdirectories(repo_url, root_directory): +def get_subdirectories(subject_root): """ Get the list of first-level subdirectories in the specified root directory of a GitHub repository. @@ -34,14 +34,21 @@ def get_subdirectories(repo_url, root_directory): list: List of subdirectory names. """ # Extract the owner and repo name from the URL - parts = repo_url.strip("/").split("/") - owner = parts[-2] - repo = parts[-1] + # TODO: Only work with tree structure and not normal url to a data model + parts = subject_root.strip("/").split("/") + if len(parts) < 7: + raise ValueError("Invalid subject_root URL. It must include owner, repo, branch, and root directory.") + + owner = parts[3] # e.g., "smart-data-models" + repo = parts[4] # e.g., "incubated" + branch = parts[6] # e.g., "d7b7b48f03b9b221d141e074e1d311985ab04f25" + root_directory = "/".join(parts[7:]) # e.g., "SMARTMANUFACTURING/dataModel.PredictiveMaintenance" # GitHub API URL to list contents of the root directory - api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{root_directory}" + api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{root_directory}?ref={branch}" try: + # Fetch the contents of the root directory response = get(api_url) if response.status_code != 200: raise Exception(f"Failed to fetch directory contents: HTTP {response.status_code}") @@ -51,12 +58,12 @@ def get_subdirectories(repo_url, root_directory): except Exception as e: raise Exception(f"Error fetching subdirectories: {e}") from e -def run_master_tests(repo_url: str, subdirectory: str, email:str, only_report_errors: bool): +def run_master_tests(subject_root: str, subdirectory: str, email:str, only_report_errors: bool): """ Run the master_tests.py script for a specific subdirectory. Parameters: - repo_url (str): The URL of the GitHub repository. + subject_root (str): The full path to the root directory in the GitHub repository. subdirectory (str): The subdirectory to test. email (str): The email address for reporting results. only_report_errors (bool): Whether to report only errors. @@ -65,51 +72,39 @@ def run_master_tests(repo_url: str, subdirectory: str, email:str, only_report_er dict: The results from master_tests.py. """ try: - # Construct the full URL to the subdirectory - subdirectory_url = f"{repo_url}/tree/master/{subdirectory}" - print(subdirectory_url) - - # Run the master_tests.py script - # result = run( - # [ - # "python3", "master_tests.py", - # subdirectory_url, - # email, - # "true" if only_report_errors else "false" - # ], - # capture_output=True, - # text=True - # ) - - # only_report_errors = "true" if only_report_errors else "false" - - result = quality_analysis(repo_url_or_local_path=subdirectory_url, email=email, only_report_errors=only_report_errors) + # Remove any trailing slashes and append the subdirectory + subject_root = subject_root.rstrip("/") + subdirectory_url = f"{subject_root}/{subdirectory}" + print(f"Testing subdirectory: {subdirectory_url}") + + result = quality_analysis(raw_base_url=subdirectory_url, + email=email, + only_report_errors=only_report_errors) # Parse the output as JSON return loads(result) except Exception as e: - print("hemos tenido un error") + print(f"Error running tests for {subdirectory}: {e}") return {"error": str(e)} def main(): - if len(argv) != 5: - print("Usage: python3 multiple_tests.py ") + if len(argv) != 4: + print("Usage: python3 multiple_tests.py ") exit(1) - repo_url = argv[1] - root_directory = argv[2] - email = argv[3] - only_report_errors = argv[4].lower() == "true" + subject_root = argv[1] + email = argv[2] + only_report_errors = argv[3].lower() == "true" # Get the list of subdirectories - subdirectories = get_subdirectories(repo_url, root_directory) - print(subdirectories) + subdirectories = get_subdirectories(subject_root) + # Run tests for each subdirectory and collect results results = [] results = \ [{"datamodel": subdirectory, - "result": run_master_tests(repo_url, f"{root_directory}/{subdirectory}", email, only_report_errors)} + "result": run_master_tests(subject_root, subdirectory, email, only_report_errors)} for subdirectory in subdirectories] # Save the results to a JSON file diff --git a/test_data_model/tests/test_schema_descriptions.py b/test_data_model/tests/test_schema_descriptions.py index 1a6e5293e4..58c048cd63 100644 --- a/test_data_model/tests/test_schema_descriptions.py +++ b/test_data_model/tests/test_schema_descriptions.py @@ -75,6 +75,7 @@ def resolve_ref(ref, base_uri): JSON Pointers (starting with #) are resolved relative to the schema being referenced. """ url_part, pointer_part = ref.split("#", 1) if "#" in ref else (ref, "") + if url_part.startswith("http"): resolved_url = url_part else: @@ -117,18 +118,36 @@ def resolve_nested_refs(schema, base_uri): return schema -def check_property_descriptions(properties, base_uri, output, path=""): +def check_property_descriptions(properties, base_uri, output, path="", processed_refs=None): """ Recursively check descriptions for all properties, including nested ones and arrays. + Keeps track of processed references to avoid duplicate processing. """ + if processed_refs is None: + processed_refs = set() + for prop_name, prop_details in properties.items(): current_path = f"{path}.{prop_name}" if path else prop_name + # Handle $ref properties if "$ref" in prop_details: + ref = prop_details["$ref"] + ref_id = f"{current_path}:{ref}" + + # Skip if this reference has already been processed for this path + if ref_id in processed_refs: + continue + + processed_refs.add(ref_id) + try: - ref_schema = resolve_ref(prop_details["$ref"], base_uri) + ref_schema = resolve_ref(ref, base_uri) if "properties" in ref_schema: - check_property_descriptions(ref_schema["properties"], base_uri, output, current_path) + check_property_descriptions(properties=ref_schema["properties"], + base_uri=base_uri, + output=output, + path=current_path, + processed_refs=processed_refs) if "description" in ref_schema: description = ref_schema["description"] is_valid, message = validate_description(description) @@ -140,44 +159,90 @@ def check_property_descriptions(properties, base_uri, output, path=""): output.append(f"*** The attribute '{current_path}' is missing a description.") except ValueError as e: output.append(f"*** Error resolving $ref for property '{current_path}': {e}") - elif "properties" in prop_details: - check_property_descriptions(prop_details["properties"], base_uri, output, current_path) - elif "items" in prop_details: + + continue + + # Check description for the current property + if "description" not in prop_details: + # Only report missing description if it's not a container that will have its items checked separately + if not ("properties" in prop_details or "items" in prop_details): + output.append(f"*** The attribute '{current_path}' is missing a description.") + else: + # For arrays and objects, explicitly note that the container itself needs a description + if "properties" in prop_details: + output.append(f"*** The attribute '{current_path}' (object) is missing a description.") + elif "items" in prop_details: + output.append(f"*** The attribute '{current_path}' (array) is missing a description.") + else: + description = prop_details["description"] + is_valid, message = validate_description(description) + if not is_valid: + output.append(f"*** The attribute '{current_path}' has an invalid description: {message}") + else: + output.append(f"The attribute '{current_path}' is properly documented.") + + # Check nested properties (for objects) + if "properties" in prop_details: + check_property_descriptions(prop_details["properties"], base_uri, output, current_path, processed_refs) + + # Check items (for arrays) + if "items" in prop_details: items = prop_details["items"] + if "$ref" in items: try: - ref_schema = resolve_ref(items["$ref"], base_uri) - if "description" in ref_schema: - description = ref_schema["description"] - is_valid, message = validate_description(description) - if not is_valid: - output.append(f"*** The attribute '{current_path}.items' has an invalid description: {message}") + items_ref = items["$ref"] + items_ref_id = f"{current_path}.items:{items_ref}" + + if items_ref_id not in processed_refs: + processed_refs.add(items_ref_id) + ref_schema = resolve_ref(items_ref, base_uri) + + if "description" in ref_schema: + description = ref_schema["description"] + is_valid, message = validate_description(description) + if not is_valid: + output.append( + f"*** The attribute '{current_path}.items' has an invalid description: {message}") + else: + output.append(f"The attribute '{current_path}.items' is properly documented.") else: - output.append(f"The attribute '{current_path}.items' is properly documented.") - else: - output.append(f"*** The attribute '{current_path}.items' is missing a description.") + output.append(f"*** The attribute '{current_path}.items' is missing a description.") + + if "properties" in ref_schema: + check_property_descriptions(ref_schema["properties"], base_uri, output, + f"{current_path}.items", processed_refs) except ValueError as e: - output.append(f"*** Error resolving $ref for property '{current_path}.items': {e}") + output.append(f"*** Error resolving $ref for items in '{current_path}': {e}") elif "anyOf" in items: for idx, any_of_item in enumerate(items["anyOf"]): if "properties" in any_of_item: - check_property_descriptions(any_of_item["properties"], base_uri, output, f"{current_path}.items.anyOf[{idx}]") + check_property_descriptions(any_of_item["properties"], base_uri, output, + f"{current_path}.items.anyOf[{idx}]", processed_refs) elif "items" in any_of_item: - check_property_descriptions({"items": any_of_item["items"]}, base_uri, output, f"{current_path}.items.anyOf[{idx}]") + nested_items_path = f"{current_path}.items.anyOf[{idx}]" + if "description" not in any_of_item: + output.append(f"*** The attribute '{nested_items_path}' is missing a description.") + check_property_descriptions({"items": any_of_item["items"]}, base_uri, output, + nested_items_path, processed_refs) else: if "description" not in any_of_item: - output.append(f"*** The attribute '{current_path}.items.anyOf[{idx}]' is missing a description.") + output.append( + f"*** The attribute '{current_path}.items.anyOf[{idx}]' is missing a description.") else: description = any_of_item["description"] is_valid, message = validate_description(description) if not is_valid: - output.append(f"*** The attribute '{current_path}.items.anyOf[{idx}]' has an invalid description: {message}") + output.append( + f"*** The attribute '{current_path}.items.anyOf[{idx}]' has an invalid description: {message}") else: - output.append(f"The attribute '{current_path}.items.anyOf[{idx}]' is properly documented.") + output.append( + f"The attribute '{current_path}.items.anyOf[{idx}]' is properly documented.") elif "properties" in items: - check_property_descriptions(items["properties"], base_uri, output, f"{current_path}.items") + check_property_descriptions(items["properties"], base_uri, output, f"{current_path}.items", + processed_refs) elif "items" in items: - check_property_descriptions({"items": items["items"]}, base_uri, output, current_path) + check_property_descriptions({"items": items["items"]}, base_uri, output, current_path, processed_refs) else: if "description" not in items: output.append(f"*** The attribute '{current_path}.items' is missing a description.") @@ -188,15 +253,6 @@ def check_property_descriptions(properties, base_uri, output, path=""): output.append(f"*** The attribute '{current_path}.items' has an invalid description: {message}") else: output.append(f"The attribute '{current_path}.items' is properly documented.") - elif "description" not in prop_details: - output.append(f"*** The attribute '{current_path}' is missing a description.") - else: - description = prop_details["description"] - is_valid, message = validate_description(description) - if not is_valid: - output.append(f"*** The attribute '{current_path}' has an invalid description: {message}") - else: - output.append(f"The attribute '{current_path}' is properly documented.") def test_schema_descriptions(repo_to_test, options): """ @@ -216,15 +272,37 @@ def test_schema_descriptions(repo_to_test, options): output = [] base_uri = schema.get("$id", "") + # Check the schema description itself - but don't validate it with the NGSI requirements + if "description" not in schema: + output.append("*** The schema is missing a root description.") + else: + # For the root schema, we only check that a description exists, not its format + output.append("The schema has a root description.") + if "properties" in schema: check_property_descriptions(schema["properties"], base_uri, output) if "allOf" in schema: - for item in schema["allOf"]: - if "properties" in item: - check_property_descriptions(item["properties"], base_uri, output) + for idx, item in enumerate(schema["allOf"]): + if "$ref" in item: + try: + ref_schema = resolve_ref(item["$ref"], base_uri) + if "properties" in ref_schema: + check_property_descriptions(ref_schema["properties"], base_uri, output, f"allOf[{idx}]") + except ValueError as e: + output.append(f"*** Error resolving $ref in allOf[{idx}]: {e}") + elif "properties" in item: + check_property_descriptions(item["properties"], base_uri, output, f"allOf[{idx}]") + + # Filter out duplicate messages + unique_output = [] + seen = set() + for message in output: + if message not in seen: + seen.add(message) + unique_output.append(message) - success = not any("invalid" in message or "missing" in message for message in output) + success = not any("invalid" in message or "missing" in message for message in unique_output) test_name = "Checking that the schema is properly described in all its attributes" return test_name, success, output \ No newline at end of file From 213579d679bd7596bb37e74f657881ba38125220 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20L=C3=B3pez?= Date: Mon, 31 Mar 2025 16:03:13 +0200 Subject: [PATCH 3/3] Resolve merge problems --- test_data_model/master_tests.py | 34 ++++++++++++-------- test_data_model/multiple_tests.py | 52 +++++++++++++++++++++---------- 2 files changed, 57 insertions(+), 29 deletions(-) diff --git a/test_data_model/master_tests.py b/test_data_model/master_tests.py index 1b24945376..907e2568ec 100644 --- a/test_data_model/master_tests.py +++ b/test_data_model/master_tests.py @@ -73,6 +73,7 @@ def load_config(config_path: str = None) -> Dict[str, Any]: return config + def is_url(path): """ Check if the provided path is a URL. @@ -108,14 +109,18 @@ def convert_github_url_to_raw(subject_root): elif "/tree/" in subject_root: return _extracted_from_convert_github_url_to_raw(subject_root, "/tree/", "/") else: - return f"{subject_root.replace("github.com", "raw.githubusercontent.com")}/master" + parts = subject_root.split('/') + url = '/'.join(parts[:-1]) + '/refs/heads/master/' + parts[-1] + return f"{url.replace("github.com", "raw.githubusercontent.com")}" except Exception as e: raise ValueError(f"Error converting GitHub URL to raw URL: {e}") from e + def _extracted_from_convert_github_url_to_raw(repo_url: str, arg1: str, arg2: str) -> str: raw_url = repo_url.replace("github.com", "raw.githubusercontent.com") return raw_url.replace(arg1, arg2) + def download_file(url: str, file_path): """ Download a single file from a URL and save it to the specified path. @@ -142,6 +147,7 @@ def download_file(url: str, file_path): except Exception as e: return file_path, False, f"Error downloading {url}: {e}" + def download_files(subject_root: str, download_dir: str): """ Download or copy files from a URL or local directory. @@ -207,6 +213,7 @@ def download_files(subject_root: str, download_dir: str): except Exception as e: raise Exception(f"Error downloading/copying files: {e}") + def run_tests(test_files: list, repo_to_test: str, only_report_errors: bool, options: dict) -> dict: """ Run a series of tests on a file. @@ -243,6 +250,7 @@ def run_tests(test_files: list, repo_to_test: str, only_report_errors: bool, opt } return results + def main(): try: parser = ArgumentParser(description="Run tests on a repository.") @@ -270,13 +278,7 @@ def main(): if not args.email or "@" not in args.email: raise ValueError("Missing or invalid email address") - # Validate the subject_root, if the input is a URL, convert it to a raw file base URL - if is_url(args.subject_root): - raw_base_url = convert_github_url_to_raw(args.subject_root) - else: - raw_base_url = args.subject_root - - return quality_analysis(raw_base_url=raw_base_url, + return quality_analysis(base_url=args.subject_root, published=published, private=private, only_report_errors=only_report_errors, @@ -285,8 +287,9 @@ def main(): except Exception as e: Exception(f"Error analyzing the data model: {e}") -def quality_analysis(raw_base_url: str, email: str, only_report_errors: bool, published: bool =False, - private: bool =False, output_file: str =None) -> str | None: + +def quality_analysis(base_url: str, email: str, only_report_errors: bool, published: bool =False, + private: bool =False, output_file: str =None) -> dict | None: result = { "success": False, "error": None, @@ -296,6 +299,12 @@ def quality_analysis(raw_base_url: str, email: str, only_report_errors: bool, pu } } + # Validate the subject_root, if the input is a URL, convert it to a raw file base URL + if is_url(base_url): + raw_base_url = convert_github_url_to_raw(base_url) + else: + raw_base_url = base_url + config = load_config() results_dir = config['results_dir'] download_dir = config['download_dir'] @@ -303,8 +312,6 @@ def quality_analysis(raw_base_url: str, email: str, only_report_errors: bool, pu Path(results_dir).mkdir(parents=True, exist_ok=True) Path(download_dir).mkdir(parents=True, exist_ok=True) - results = str() - try: # Download or copy the files repo_path = download_files(raw_base_url, download_dir) @@ -363,7 +370,8 @@ def quality_analysis(raw_base_url: str, email: str, only_report_errors: bool, pu # Ensure we always print valid JSON print(dumps(result, indent=2)) - return results + return result + if __name__ == "__main__": main() diff --git a/test_data_model/multiple_tests.py b/test_data_model/multiple_tests.py index 793dc974ef..a55c58c16e 100644 --- a/test_data_model/multiple_tests.py +++ b/test_data_model/multiple_tests.py @@ -22,6 +22,7 @@ from datetime import datetime from master_tests import quality_analysis + def get_subdirectories(subject_root): """ Get the list of first-level subdirectories in the specified root directory of a GitHub repository. @@ -34,18 +35,7 @@ def get_subdirectories(subject_root): list: List of subdirectory names. """ # Extract the owner and repo name from the URL - # TODO: Only work with tree structure and not normal url to a data model - parts = subject_root.strip("/").split("/") - if len(parts) < 7: - raise ValueError("Invalid subject_root URL. It must include owner, repo, branch, and root directory.") - - owner = parts[3] # e.g., "smart-data-models" - repo = parts[4] # e.g., "incubated" - branch = parts[6] # e.g., "d7b7b48f03b9b221d141e074e1d311985ab04f25" - root_directory = "/".join(parts[7:]) # e.g., "SMARTMANUFACTURING/dataModel.PredictiveMaintenance" - - # GitHub API URL to list contents of the root directory - api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{root_directory}?ref={branch}" + api_url = get_api_url(subject_root=subject_root) try: # Fetch the contents of the root directory @@ -58,7 +48,36 @@ def get_subdirectories(subject_root): except Exception as e: raise Exception(f"Error fetching subdirectories: {e}") from e -def run_master_tests(subject_root: str, subdirectory: str, email:str, only_report_errors: bool): + +def get_api_url(subject_root: str) -> str: + # Extract the owner and repo name from the URL + parts = subject_root.strip("/").split("/") + + owner = parts[3] # e.g., "smart-data-models" + repo = parts[4] # e.g., "incubated" + + if 'tree' in parts: + if len(parts) < 7: + raise ValueError("Invalid subject_root URL. It must include owner, repo, branch, and root directory.") + + branch = parts[6] # e.g., "d7b7b48f03b9b221d141e074e1d311985ab04f25" + root_directory = "/".join(parts[7:]) # e.g., "SMARTMANUFACTURING/dataModel.PredictiveMaintenance" + + # GitHub API URL to list contents of the root directory + api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{root_directory}?ref={branch}" + else: + if len(parts) < 5: + raise ValueError("Invalid subject_root URL. It must include owner, repo, branch, and root directory.") + + root_directory = "/".join(parts[5:]) # e.g., "SMARTMANUFACTURING/dataModel.PredictiveMaintenance" + + # GitHub API URL to list contents of the root directory + api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{root_directory}?ref=master" + + return api_url + + +def run_master_tests(subject_root: str, subdirectory: str, email:str, only_report_errors: bool) -> dict: """ Run the master_tests.py script for a specific subdirectory. @@ -77,16 +96,16 @@ def run_master_tests(subject_root: str, subdirectory: str, email:str, only_repor subdirectory_url = f"{subject_root}/{subdirectory}" print(f"Testing subdirectory: {subdirectory_url}") - result = quality_analysis(raw_base_url=subdirectory_url, + result = quality_analysis(base_url=subdirectory_url, email=email, only_report_errors=only_report_errors) - # Parse the output as JSON - return loads(result) + return result except Exception as e: print(f"Error running tests for {subdirectory}: {e}") return {"error": str(e)} + def main(): if len(argv) != 4: print("Usage: python3 multiple_tests.py ") @@ -115,5 +134,6 @@ def main(): print(f"Test results saved to {output_filename}") + if __name__ == "__main__": main() \ No newline at end of file