Skip to content

Custom runners (singularity/slurm/...) #448

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions ann_benchmarks/algorithms/base/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,3 @@ RUN python3 --version | grep 'Python 3.10.6'
WORKDIR /home/app
COPY requirements.txt run_algorithm.py ./
RUN pip3 install -r requirements.txt

ENTRYPOINT ["python3", "-u", "run_algorithm.py"]
104 changes: 62 additions & 42 deletions ann_benchmarks/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from .constants import INDEX_DIR
from .datasets import DATASETS, get_dataset
from .results import build_result_filepath
from .runner import run, run_docker
from .runner import run, run_custom, run_docker


logging.config.fileConfig("logging.conf")
Expand Down Expand Up @@ -52,28 +52,41 @@ def run_worker(cpu: int, args: argparse.Namespace, queue: multiprocessing.Queue)
"""
Executes the algorithm based on the provided parameters.

The algorithm is either executed directly or through a Docker container based on the `args.local`
argument. The function runs until the queue is emptied. When running in a docker container, it
executes the algorithm in a Docker container.
There are three choices how the algorithm is executed:
- if `args.custom_container` is set, the algorithm is executed through a user provided container setup
- otherwise, if `args.local` is set, the algorithm is executed directly
- if none of them are set (default), the algorithm is executed in docker container.
The function runs until the queue is emptied.

Args:
cpu (int): The CPU number to be used in the execution.
args (argparse.Namespace): User provided arguments for running workers.
args (argparse.Namespace): User provided arguments for running workers.
queue (multiprocessing.Queue): The multiprocessing queue that contains the algorithm definitions.

Returns:
None
"""
while not queue.empty():
definition = queue.get()
if args.local:
run(definition, args.dataset, args.count, args.runs, args.batch)
else:
memory_margin = 500e6 # reserve some extra memory for misc stuff
mem_limit = int((psutil.virtual_memory().available - memory_margin) / args.parallelism)
cpu_limit = str(cpu) if not args.batch else f"0-{multiprocessing.cpu_count() - 1}"

run_docker(definition, args.dataset, args.count, args.runs, args.timeout, args.batch, cpu_limit, mem_limit)

if args.custom_container:
with open(args.custom_container) as f:
custom_cmd = f.read()
algorithms = set()
while not queue.empty():
definition = queue.get()
algorithms.add((definition.algorithm, definition.docker_tag))
for algorithm, container_tag in algorithms:
run_custom(custom_cmd, args.definitions, algorithm, container_tag, args.dataset, args.count, args.runs, args.batch, args.force)
else:
while not queue.empty():
definition = queue.get()
if args.local:
run(definition, args.dataset, args.count, args.runs, args.batch)
else:
memory_margin = 500e6 # reserve some extra memory for misc stuff
mem_limit = int((psutil.virtual_memory().available - memory_margin) / args.parallelism)
cpu_limit = str(cpu) if not args.batch else f"0-{multiprocessing.cpu_count() - 1}"

run_docker(definition, args.dataset, args.count, args.runs, args.timeout, args.batch, cpu_limit, mem_limit)


def parse_arguments() -> argparse.Namespace:
Expand Down Expand Up @@ -117,6 +130,10 @@ def parse_arguments() -> argparse.Namespace:
action="store_true",
help="If set, then will run everything locally (inside the same " "process) rather than using Docker",
)
parser.add_argument(
'--custom-container',
help="If set, then it will execute a custom string provided by the user",
)
parser.add_argument("--batch", action="store_true", help="If set, algorithms get all queries at once")
parser.add_argument(
"--max-n-algorithms", type=int, help="Max number of algorithms to run (just used for testing)", default=-1
Expand All @@ -131,16 +148,16 @@ def parse_arguments() -> argparse.Namespace:


def filter_already_run_definitions(
definitions: List[Definition],
dataset: str,
count: int,
batch: bool,
definitions: List[Definition],
dataset: str,
count: int,
batch: bool,
force: bool
) -> List[Definition]:
"""Filters out the algorithm definitions based on whether they have already been run or not.

This function checks if there are existing results for each definition by constructing the
result filename from the algorithm definition and the provided arguments. If there are no
This function checks if there are existing results for each definition by constructing the
result filename from the algorithm definition and the provided arguments. If there are no
existing results or if the parameter `force=True`, the definition is kept. Otherwise, it is
discarded.

Expand All @@ -153,32 +170,32 @@ def filter_already_run_definitions(
batch (bool): If set, algorithms get all queries at once (only used in file naming convention).

Returns:
List[Definition]: A list of algorithm definitions that either have not been run or are
List[Definition]: A list of algorithm definitions that either have not been run or are
forced to be re-run.
"""
filtered_definitions = []

for definition in definitions:
not_yet_run = [
query_args
query_args
for query_args in (definition.query_argument_groups or [[]])
if force or not os.path.exists(build_result_filepath(dataset, count, definition, query_args, batch))
]

if not_yet_run:
definition = replace(definition, query_argument_groups=not_yet_run) if definition.query_argument_groups else definition
filtered_definitions.append(definition)

return filtered_definitions


def filter_by_available_docker_images(definitions: List[Definition]) -> List[Definition]:
"""
Filters out the algorithm definitions that do not have an associated, available Docker images.

This function uses the Docker API to list all Docker images available in the system. It
then checks the Docker tags associated with each algorithm definition against the list
of available Docker images, filtering out those that are unavailable.
This function uses the Docker API to list all Docker images available in the system. It
then checks the Docker tags associated with each algorithm definition against the list
of available Docker images, filtering out those that are unavailable.

Args:
definitions (List[Definition]): A list of algorithm definitions to be filtered.
Expand All @@ -194,24 +211,24 @@ def filter_by_available_docker_images(definitions: List[Definition]) -> List[Def
logger.info(f"not all docker images available, only: {docker_tags}")
logger.info(f"missing docker images: {missing_docker_images}")
definitions = [d for d in definitions if d.docker_tag in docker_tags]

return definitions


def check_module_import_and_constructor(df: Definition) -> bool:
"""
Verifies if the algorithm module can be imported and its constructor exists.

This function checks if the module specified in the definition can be imported.
Additionally, it verifies if the constructor for the algorithm exists within the
This function checks if the module specified in the definition can be imported.
Additionally, it verifies if the constructor for the algorithm exists within the
imported module.

Args:
df (Definition): A definition object containing the module and constructor
df (Definition): A definition object containing the module and constructor
for the algorithm.

Returns:
bool: True if the module can be imported and the constructor exists, False
bool: True if the module can be imported and the constructor exists, False
otherwise.
"""
status = algorithm_status(df)
Expand All @@ -224,7 +241,7 @@ def check_module_import_and_constructor(df: Definition) -> bool:
f"{df.module}.{df.constructor}({df.arguments}): the module '{df.module}' could not be loaded; skipping"
)
return False

return True

def create_workers_and_execute(definitions: List[Definition], args: argparse.Namespace):
Expand All @@ -233,10 +250,10 @@ def create_workers_and_execute(definitions: List[Definition], args: argparse.Nam

Args:
definitions (List[Definition]): List of algorithm definitions to be processed.
args (argparse.Namespace): User provided arguments for running workers.
args (argparse.Namespace): User provided arguments for running workers.

Raises:
Exception: If the level of parallelism exceeds the available CPU count or if batch mode is on with more than
Exception: If the level of parallelism exceeds the available CPU count or if batch mode is on with more than
one worker.
"""
cpu_count = multiprocessing.cpu_count()
Expand Down Expand Up @@ -284,7 +301,7 @@ def limit_algorithms(definitions: List[Definition], limit: int) -> List[Definiti
"""
Limits the number of algorithm definitions based on the given limit.

If the limit is negative, all definitions are returned. For valid
If the limit is negative, all definitions are returned. For valid
sampling, `definitions` should be shuffled before `limit_algorithms`.

Args:
Expand All @@ -307,6 +324,9 @@ def main():
if os.path.exists(INDEX_DIR):
shutil.rmtree(INDEX_DIR)

if args.custom_container and not os.path.exists(args.custom_container):
raise Exception("Custom container file does not exist.")

dataset, dimension = get_dataset(args.dataset)
definitions: List[Definition] = get_definitions(
dimension=dimension,
Expand All @@ -316,20 +336,20 @@ def main():
)
random.shuffle(definitions)

definitions = filter_already_run_definitions(definitions,
dataset=args.dataset,
count=args.count,
batch=args.batch,
definitions = filter_already_run_definitions(definitions,
dataset=args.dataset,
count=args.count,
batch=args.batch,
force=args.force,
)

if args.algorithm:
logger.info(f"running only {args.algorithm}")
definitions = [d for d in definitions if d.algorithm == args.algorithm]

if not args.local:
if not args.local and not args.custom_container:
definitions = filter_by_available_docker_images(definitions)
else:
elif args.local:
definitions = list(filter(
check_module_import_and_constructor, definitions
))
Expand Down
55 changes: 50 additions & 5 deletions ann_benchmarks/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import json
import logging
import os
import re
from string import Template
import threading
import time
from typing import Dict, Optional, Tuple, List, Union
Expand All @@ -19,7 +21,7 @@
from .results import store_results


def run_individual_query(algo: BaseANN, X_train: numpy.array, X_test: numpy.array, distance: str, count: int,
def run_individual_query(algo: BaseANN, X_train: numpy.array, X_test: numpy.array, distance: str, count: int,
run_count: int, batch: bool) -> Tuple[dict, list]:
"""Run a search query using the provided algorithm and report the results.

Expand Down Expand Up @@ -53,7 +55,7 @@ def single_query(v: numpy.array) -> Tuple[float, List[Tuple[int, float]]]:

Returns:
List[Tuple[float, List[Tuple[int, float]]]]: Tuple containing
1. Total time taken for each query
1. Total time taken for each query
2. Result pairs consisting of (point index, distance to candidate data )
"""
if prepared_queries:
Expand Down Expand Up @@ -87,7 +89,7 @@ def batch_query(X: numpy.array) -> List[Tuple[float, List[Tuple[int, float]]]]:

Returns:
List[Tuple[float, List[Tuple[int, float]]]]: List of tuples, each containing
1. Total time taken for each query
1. Total time taken for each query
2. Result pairs consisting of (point index, distance to candidate data )
"""
# TODO: consider using a dataclass to represent return value.
Expand Down Expand Up @@ -213,7 +215,7 @@ def run(definition: Definition, dataset_name: str, count: int, run_count: int, b
print(f"Running query argument group {pos} of {len(query_argument_groups)}...")
if query_arguments:
algo.set_query_arguments(*query_arguments)

descriptor, results = run_individual_query(algo, X_train, X_test, distance, count, run_count, batch)

descriptor.update({
Expand All @@ -227,8 +229,48 @@ def run(definition: Definition, dataset_name: str, count: int, run_count: int, b
finally:
algo.done()

def run_custom(cmd_template: str, definition: str, algo: str, container_tag: str,
dataset_name: str, count: int, runs: int, batch: bool, force: bool) -> None:
"""Run the algorithm benchmarking with a custom runner specified by `cmd_template`.

Args:
cmd_template (str): A templated custom runner string.
definition (str): The algorithm definition.
algo (str): The name of the algorithm.
container_tag (str): A reference to the original docker container.
dataset_name (str): The name of the dataset.
count (int): The number of results to return.
runs (int): The number of runs.
batch (bool): If true, runs in batch mode.
force (bool): If true, overwrite existing results.
"""
template = Template(cmd_template)

additional_cmd = [
"--runs",
str(runs),
"--count",
str(count),
]
if batch:
additional_cmd += ["--batch"]
if force:
additional_cmd += ["--force"]

additional_cmd = " ".join(additional_cmd)

cmd = template.safe_substitute(
additional=additional_cmd,
algo=re.escape(algo),
container=container_tag,
definition=definition,
ds=dataset_name,
)

os.system(cmd)

def run_from_cmdline():
"""Calls the function `run` using arguments from the command line. See `ArgumentParser` for
"""Calls the function `run` using arguments from the command line. See `ArgumentParser` for
arguments, all run it with `--help`.
"""
parser = argparse.ArgumentParser(
Expand Down Expand Up @@ -293,6 +335,9 @@ def run_docker(
See `run_from_cmdline` for details on the args.
"""
cmd = [
"python3",
"-u",
"run_algorithm.py",
"--dataset",
dataset,
"--algorithm",
Expand Down
9 changes: 9 additions & 0 deletions templates/custom_runner/convert_docker_to_singularity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import docker
import os

docker_client = docker.from_env()
docker_tags = {tag.split(":")[0] for image in docker_client.images.list() for tag in image.tags}
for tag in [tag for tag in docker_tags if tag.startswith("ann-benchmarks-")]:
os.system(f'docker save {tag} -o {tag}.tar')
os.system(f'singularity build {tag}.sif docker-archive://{tag}.tar')
os.system(f'rm {tag}.tar')
13 changes: 13 additions & 0 deletions templates/custom_runner/singularity.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# example singularity run
# docker images must be converted to singularity containers
# e.g., `docker save ann-benchmarks-$algo -o ann-benchmarks-$algo.tar, and
# `singularity build ann-benchmarks-$algo.sif docker-archive://ann-benchmarks-$algo.tar`

# need to provide $algo, $container, $ds, $definition, nothing else is exposed.

CONTAINER=$container.sif
if [ ! -f "$CONTAINER" ]; then
echo "$CONTAINER does not exist"
exit 1
fi
singularity run -B /run/shm:/run/shm $CONTAINER python3 -u run.py --dataset $ds --algorithm $algo --runs 1 --local --definition $definition $additional
14 changes: 14 additions & 0 deletions templates/custom_runner/slurm.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# example slurm run using singularity
# docker images must be converted to singularity containers
# e.g., `docker save ann-benchmarks-$algo -o ann-benchmarks-$algo.tar, and
# `singularity build ann-benchmarks-$algo.sif docker-archive://ann-benchmarks-$algo.tar`

# need to provide $algo, $container, $ds, $definition, $additional.

CONTAINER=$container.sif
if [ ! -f "$CONTAINER" ]; then
echo "$CONTAINER does not exist"
exit 1
fi

sbatch -J $algo-$ds -o log-$ds-$algo.log --cpus-per-task=1 --time=01:59:59 --partition=red --wrap="singularity run $CONTAINER python3 -u run.py --dataset $ds --algorithm $algo --local --runs 1 --definition $definition $additional"