diff --git a/ann_benchmarks/algorithms/base/Dockerfile b/ann_benchmarks/algorithms/base/Dockerfile index aab07357b..7a1c38401 100644 --- a/ann_benchmarks/algorithms/base/Dockerfile +++ b/ann_benchmarks/algorithms/base/Dockerfile @@ -8,5 +8,3 @@ RUN python3 --version | grep 'Python 3.10.6' WORKDIR /home/app COPY requirements.txt run_algorithm.py ./ RUN pip3 install -r requirements.txt - -ENTRYPOINT ["python3", "-u", "run_algorithm.py"] diff --git a/ann_benchmarks/main.py b/ann_benchmarks/main.py index 2dd73042e..f199d3276 100644 --- a/ann_benchmarks/main.py +++ b/ann_benchmarks/main.py @@ -18,7 +18,7 @@ from .constants import INDEX_DIR from .datasets import DATASETS, get_dataset from .results import build_result_filepath -from .runner import run, run_docker +from .runner import run, run_custom, run_docker logging.config.fileConfig("logging.conf") @@ -52,28 +52,41 @@ def run_worker(cpu: int, args: argparse.Namespace, queue: multiprocessing.Queue) """ Executes the algorithm based on the provided parameters. - The algorithm is either executed directly or through a Docker container based on the `args.local` - argument. The function runs until the queue is emptied. When running in a docker container, it - executes the algorithm in a Docker container. + There are three choices how the algorithm is executed: + - if `args.custom_container` is set, the algorithm is executed through a user provided container setup + - otherwise, if `args.local` is set, the algorithm is executed directly + - if none of them are set (default), the algorithm is executed in docker container. + The function runs until the queue is emptied. Args: cpu (int): The CPU number to be used in the execution. - args (argparse.Namespace): User provided arguments for running workers. + args (argparse.Namespace): User provided arguments for running workers. queue (multiprocessing.Queue): The multiprocessing queue that contains the algorithm definitions. Returns: None """ - while not queue.empty(): - definition = queue.get() - if args.local: - run(definition, args.dataset, args.count, args.runs, args.batch) - else: - memory_margin = 500e6 # reserve some extra memory for misc stuff - mem_limit = int((psutil.virtual_memory().available - memory_margin) / args.parallelism) - cpu_limit = str(cpu) if not args.batch else f"0-{multiprocessing.cpu_count() - 1}" - - run_docker(definition, args.dataset, args.count, args.runs, args.timeout, args.batch, cpu_limit, mem_limit) + + if args.custom_container: + with open(args.custom_container) as f: + custom_cmd = f.read() + algorithms = set() + while not queue.empty(): + definition = queue.get() + algorithms.add((definition.algorithm, definition.docker_tag)) + for algorithm, container_tag in algorithms: + run_custom(custom_cmd, args.definitions, algorithm, container_tag, args.dataset, args.count, args.runs, args.batch, args.force) + else: + while not queue.empty(): + definition = queue.get() + if args.local: + run(definition, args.dataset, args.count, args.runs, args.batch) + else: + memory_margin = 500e6 # reserve some extra memory for misc stuff + mem_limit = int((psutil.virtual_memory().available - memory_margin) / args.parallelism) + cpu_limit = str(cpu) if not args.batch else f"0-{multiprocessing.cpu_count() - 1}" + + run_docker(definition, args.dataset, args.count, args.runs, args.timeout, args.batch, cpu_limit, mem_limit) def parse_arguments() -> argparse.Namespace: @@ -117,6 +130,10 @@ def parse_arguments() -> argparse.Namespace: action="store_true", help="If set, then will run everything locally (inside the same " "process) rather than using Docker", ) + parser.add_argument( + '--custom-container', + help="If set, then it will execute a custom string provided by the user", + ) parser.add_argument("--batch", action="store_true", help="If set, algorithms get all queries at once") parser.add_argument( "--max-n-algorithms", type=int, help="Max number of algorithms to run (just used for testing)", default=-1 @@ -131,16 +148,16 @@ def parse_arguments() -> argparse.Namespace: def filter_already_run_definitions( - definitions: List[Definition], - dataset: str, - count: int, - batch: bool, + definitions: List[Definition], + dataset: str, + count: int, + batch: bool, force: bool ) -> List[Definition]: """Filters out the algorithm definitions based on whether they have already been run or not. - This function checks if there are existing results for each definition by constructing the - result filename from the algorithm definition and the provided arguments. If there are no + This function checks if there are existing results for each definition by constructing the + result filename from the algorithm definition and the provided arguments. If there are no existing results or if the parameter `force=True`, the definition is kept. Otherwise, it is discarded. @@ -153,14 +170,14 @@ def filter_already_run_definitions( batch (bool): If set, algorithms get all queries at once (only used in file naming convention). Returns: - List[Definition]: A list of algorithm definitions that either have not been run or are + List[Definition]: A list of algorithm definitions that either have not been run or are forced to be re-run. """ filtered_definitions = [] for definition in definitions: not_yet_run = [ - query_args + query_args for query_args in (definition.query_argument_groups or [[]]) if force or not os.path.exists(build_result_filepath(dataset, count, definition, query_args, batch)) ] @@ -168,7 +185,7 @@ def filter_already_run_definitions( if not_yet_run: definition = replace(definition, query_argument_groups=not_yet_run) if definition.query_argument_groups else definition filtered_definitions.append(definition) - + return filtered_definitions @@ -176,9 +193,9 @@ def filter_by_available_docker_images(definitions: List[Definition]) -> List[Def """ Filters out the algorithm definitions that do not have an associated, available Docker images. - This function uses the Docker API to list all Docker images available in the system. It - then checks the Docker tags associated with each algorithm definition against the list - of available Docker images, filtering out those that are unavailable. + This function uses the Docker API to list all Docker images available in the system. It + then checks the Docker tags associated with each algorithm definition against the list + of available Docker images, filtering out those that are unavailable. Args: definitions (List[Definition]): A list of algorithm definitions to be filtered. @@ -194,7 +211,7 @@ def filter_by_available_docker_images(definitions: List[Definition]) -> List[Def logger.info(f"not all docker images available, only: {docker_tags}") logger.info(f"missing docker images: {missing_docker_images}") definitions = [d for d in definitions if d.docker_tag in docker_tags] - + return definitions @@ -202,16 +219,16 @@ def check_module_import_and_constructor(df: Definition) -> bool: """ Verifies if the algorithm module can be imported and its constructor exists. - This function checks if the module specified in the definition can be imported. - Additionally, it verifies if the constructor for the algorithm exists within the + This function checks if the module specified in the definition can be imported. + Additionally, it verifies if the constructor for the algorithm exists within the imported module. Args: - df (Definition): A definition object containing the module and constructor + df (Definition): A definition object containing the module and constructor for the algorithm. Returns: - bool: True if the module can be imported and the constructor exists, False + bool: True if the module can be imported and the constructor exists, False otherwise. """ status = algorithm_status(df) @@ -224,7 +241,7 @@ def check_module_import_and_constructor(df: Definition) -> bool: f"{df.module}.{df.constructor}({df.arguments}): the module '{df.module}' could not be loaded; skipping" ) return False - + return True def create_workers_and_execute(definitions: List[Definition], args: argparse.Namespace): @@ -233,10 +250,10 @@ def create_workers_and_execute(definitions: List[Definition], args: argparse.Nam Args: definitions (List[Definition]): List of algorithm definitions to be processed. - args (argparse.Namespace): User provided arguments for running workers. + args (argparse.Namespace): User provided arguments for running workers. Raises: - Exception: If the level of parallelism exceeds the available CPU count or if batch mode is on with more than + Exception: If the level of parallelism exceeds the available CPU count or if batch mode is on with more than one worker. """ cpu_count = multiprocessing.cpu_count() @@ -284,7 +301,7 @@ def limit_algorithms(definitions: List[Definition], limit: int) -> List[Definiti """ Limits the number of algorithm definitions based on the given limit. - If the limit is negative, all definitions are returned. For valid + If the limit is negative, all definitions are returned. For valid sampling, `definitions` should be shuffled before `limit_algorithms`. Args: @@ -307,6 +324,9 @@ def main(): if os.path.exists(INDEX_DIR): shutil.rmtree(INDEX_DIR) + if args.custom_container and not os.path.exists(args.custom_container): + raise Exception("Custom container file does not exist.") + dataset, dimension = get_dataset(args.dataset) definitions: List[Definition] = get_definitions( dimension=dimension, @@ -316,10 +336,10 @@ def main(): ) random.shuffle(definitions) - definitions = filter_already_run_definitions(definitions, - dataset=args.dataset, - count=args.count, - batch=args.batch, + definitions = filter_already_run_definitions(definitions, + dataset=args.dataset, + count=args.count, + batch=args.batch, force=args.force, ) @@ -327,9 +347,9 @@ def main(): logger.info(f"running only {args.algorithm}") definitions = [d for d in definitions if d.algorithm == args.algorithm] - if not args.local: + if not args.local and not args.custom_container: definitions = filter_by_available_docker_images(definitions) - else: + elif args.local: definitions = list(filter( check_module_import_and_constructor, definitions )) diff --git a/ann_benchmarks/runner.py b/ann_benchmarks/runner.py index b811651b8..e1c97bae4 100644 --- a/ann_benchmarks/runner.py +++ b/ann_benchmarks/runner.py @@ -2,6 +2,8 @@ import json import logging import os +import re +from string import Template import threading import time from typing import Dict, Optional, Tuple, List, Union @@ -19,7 +21,7 @@ from .results import store_results -def run_individual_query(algo: BaseANN, X_train: numpy.array, X_test: numpy.array, distance: str, count: int, +def run_individual_query(algo: BaseANN, X_train: numpy.array, X_test: numpy.array, distance: str, count: int, run_count: int, batch: bool) -> Tuple[dict, list]: """Run a search query using the provided algorithm and report the results. @@ -53,7 +55,7 @@ def single_query(v: numpy.array) -> Tuple[float, List[Tuple[int, float]]]: Returns: List[Tuple[float, List[Tuple[int, float]]]]: Tuple containing - 1. Total time taken for each query + 1. Total time taken for each query 2. Result pairs consisting of (point index, distance to candidate data ) """ if prepared_queries: @@ -87,7 +89,7 @@ def batch_query(X: numpy.array) -> List[Tuple[float, List[Tuple[int, float]]]]: Returns: List[Tuple[float, List[Tuple[int, float]]]]: List of tuples, each containing - 1. Total time taken for each query + 1. Total time taken for each query 2. Result pairs consisting of (point index, distance to candidate data ) """ # TODO: consider using a dataclass to represent return value. @@ -213,7 +215,7 @@ def run(definition: Definition, dataset_name: str, count: int, run_count: int, b print(f"Running query argument group {pos} of {len(query_argument_groups)}...") if query_arguments: algo.set_query_arguments(*query_arguments) - + descriptor, results = run_individual_query(algo, X_train, X_test, distance, count, run_count, batch) descriptor.update({ @@ -227,8 +229,48 @@ def run(definition: Definition, dataset_name: str, count: int, run_count: int, b finally: algo.done() +def run_custom(cmd_template: str, definition: str, algo: str, container_tag: str, + dataset_name: str, count: int, runs: int, batch: bool, force: bool) -> None: + """Run the algorithm benchmarking with a custom runner specified by `cmd_template`. + + Args: + cmd_template (str): A templated custom runner string. + definition (str): The algorithm definition. + algo (str): The name of the algorithm. + container_tag (str): A reference to the original docker container. + dataset_name (str): The name of the dataset. + count (int): The number of results to return. + runs (int): The number of runs. + batch (bool): If true, runs in batch mode. + force (bool): If true, overwrite existing results. + """ + template = Template(cmd_template) + + additional_cmd = [ + "--runs", + str(runs), + "--count", + str(count), + ] + if batch: + additional_cmd += ["--batch"] + if force: + additional_cmd += ["--force"] + + additional_cmd = " ".join(additional_cmd) + + cmd = template.safe_substitute( + additional=additional_cmd, + algo=re.escape(algo), + container=container_tag, + definition=definition, + ds=dataset_name, + ) + + os.system(cmd) + def run_from_cmdline(): - """Calls the function `run` using arguments from the command line. See `ArgumentParser` for + """Calls the function `run` using arguments from the command line. See `ArgumentParser` for arguments, all run it with `--help`. """ parser = argparse.ArgumentParser( @@ -293,6 +335,9 @@ def run_docker( See `run_from_cmdline` for details on the args. """ cmd = [ + "python3", + "-u", + "run_algorithm.py", "--dataset", dataset, "--algorithm", diff --git a/templates/custom_runner/convert_docker_to_singularity.py b/templates/custom_runner/convert_docker_to_singularity.py new file mode 100644 index 000000000..66d8142de --- /dev/null +++ b/templates/custom_runner/convert_docker_to_singularity.py @@ -0,0 +1,9 @@ +import docker +import os + +docker_client = docker.from_env() +docker_tags = {tag.split(":")[0] for image in docker_client.images.list() for tag in image.tags} +for tag in [tag for tag in docker_tags if tag.startswith("ann-benchmarks-")]: + os.system(f'docker save {tag} -o {tag}.tar') + os.system(f'singularity build {tag}.sif docker-archive://{tag}.tar') + os.system(f'rm {tag}.tar') \ No newline at end of file diff --git a/templates/custom_runner/singularity.template b/templates/custom_runner/singularity.template new file mode 100644 index 000000000..669eec57e --- /dev/null +++ b/templates/custom_runner/singularity.template @@ -0,0 +1,13 @@ +# example singularity run +# docker images must be converted to singularity containers +# e.g., `docker save ann-benchmarks-$algo -o ann-benchmarks-$algo.tar, and +# `singularity build ann-benchmarks-$algo.sif docker-archive://ann-benchmarks-$algo.tar` + +# need to provide $algo, $container, $ds, $definition, nothing else is exposed. + +CONTAINER=$container.sif +if [ ! -f "$CONTAINER" ]; then + echo "$CONTAINER does not exist" + exit 1 +fi +singularity run -B /run/shm:/run/shm $CONTAINER python3 -u run.py --dataset $ds --algorithm $algo --runs 1 --local --definition $definition $additional diff --git a/templates/custom_runner/slurm.template b/templates/custom_runner/slurm.template new file mode 100644 index 000000000..021c6a61d --- /dev/null +++ b/templates/custom_runner/slurm.template @@ -0,0 +1,14 @@ +# example slurm run using singularity +# docker images must be converted to singularity containers +# e.g., `docker save ann-benchmarks-$algo -o ann-benchmarks-$algo.tar, and +# `singularity build ann-benchmarks-$algo.sif docker-archive://ann-benchmarks-$algo.tar` + +# need to provide $algo, $container, $ds, $definition, $additional. + +CONTAINER=$container.sif +if [ ! -f "$CONTAINER" ]; then + echo "$CONTAINER does not exist" + exit 1 +fi + +sbatch -J $algo-$ds -o log-$ds-$algo.log --cpus-per-task=1 --time=01:59:59 --partition=red --wrap="singularity run $CONTAINER python3 -u run.py --dataset $ds --algorithm $algo --local --runs 1 --definition $definition $additional"