diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/search.py b/components/clp-package-utils/clp_package_utils/scripts/native/search.py index b7bb77f8d..bca76974c 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/search.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/search.py @@ -2,18 +2,15 @@ import argparse import asyncio -import datetime import logging import multiprocessing import pathlib -import socket import sys import time from contextlib import closing import msgpack import pymongo -import zstandard from clp_package_utils.general import ( CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, @@ -21,13 +18,13 @@ get_clp_home ) from clp_py_utils.clp_config import ( - CLP_METADATA_TABLE_PREFIX, + SEARCH_JOBS_TABLE_NAME, Database, ResultsCache ) from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.job_config import SearchConfig -from job_orchestration.scheduler.constants import JobStatus +from job_orchestration.search_scheduler.common import JobStatus # Setup logging # Create logger @@ -75,81 +72,31 @@ def process_error_callback(err): def create_and_monitor_job_in_db(db_config: Database, results_cache: ResultsCache, wildcard_query: str, begin_timestamp: int | None, - end_timestamp: int | None, path_filter: str, - search_controller_host: str, search_controller_port: int): + end_timestamp: int | None, path_filter: str): search_config = SearchConfig( - search_controller_host=search_controller_host, - search_controller_port=search_controller_port, - wildcard_query=wildcard_query, + query_string=wildcard_query, begin_timestamp=begin_timestamp, end_timestamp=end_timestamp, path_filter=path_filter ) sql_adapter = SQL_Adapter(db_config) - zstd_cctx = zstandard.ZstdCompressor(level=3) with closing(sql_adapter.create_connection(True)) as \ db_conn, closing(db_conn.cursor(dictionary=True)) as db_cursor: # Create job - db_cursor.execute(f"INSERT INTO `search_jobs` (`search_config`) VALUES (%s)", - (zstd_cctx.compress(msgpack.packb(search_config.dict())),)) + db_cursor.execute(f"INSERT INTO `{SEARCH_JOBS_TABLE_NAME}` (`search_config`) VALUES (%s)", + (msgpack.packb(search_config.dict()),)) db_conn.commit() job_id = db_cursor.lastrowid - next_pagination_id = 0 - pagination_limit = 64 - num_tasks_added = 0 - query_base_conditions = [] - if begin_timestamp is not None: - query_base_conditions.append(f"`end_timestamp` >= {begin_timestamp}") - if end_timestamp is not None: - query_base_conditions.append(f"`begin_timestamp` <= {end_timestamp}") - while True: - # Get next `limit` rows - query_conditions = query_base_conditions + [f"`pagination_id` >= {next_pagination_id}"] - query = f""" - SELECT `id` FROM {CLP_METADATA_TABLE_PREFIX}archives - WHERE {" AND ".join(query_conditions)} - LIMIT {pagination_limit} - """ - db_cursor.execute(query) - rows = db_cursor.fetchall() - if len(rows) == 0: - break - - # Insert tasks - db_cursor.execute(f""" - INSERT INTO `search_tasks` (`job_id`, `archive_id`, `scheduled_time`) - VALUES ({"), (".join(f"{job_id}, '{row['id']}', '{datetime.datetime.utcnow()}'" for row in rows)}) - """) - db_conn.commit() - num_tasks_added += len(rows) - - if len(rows) < pagination_limit: - # Less than limit rows returned, so there are no more rows - break - next_pagination_id += pagination_limit - - # Mark job as scheduled - db_cursor.execute(f""" - UPDATE `search_jobs` - SET num_tasks={num_tasks_added}, status = '{JobStatus.SCHEDULED}' - WHERE id = {job_id} - """) - db_conn.commit() - # Wait for the job to be marked complete - job_complete = False - while not job_complete: - db_cursor.execute(f"SELECT `status`, `status_msg` FROM `search_jobs` WHERE `id` = {job_id}") + while True: + db_cursor.execute(f"SELECT `status` FROM `{SEARCH_JOBS_TABLE_NAME}` WHERE `id` = {job_id}") # There will only ever be one row since it's impossible to have more than one job with the same ID - row = db_cursor.fetchall()[0] - if JobStatus.SUCCEEDED == row['status']: - job_complete = True - elif JobStatus.FAILED == row['status']: - logger.error(row['status_msg']) - job_complete = True + new_status = db_cursor.fetchall()[0]['status'] db_conn.commit() + if new_status in (JobStatus.SUCCESS, JobStatus.FAILED, JobStatus.CANCELLED): + break time.sleep(0.5) @@ -159,50 +106,17 @@ def create_and_monitor_job_in_db(db_config: Database, results_cache: ResultsCach print(f"{document['original_path']}: {document['message']}", end='') -async def worker_connection_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): - try: - buf = await reader.read(1024) - if b'' == buf: - # Worker closed - return - except asyncio.CancelledError: - return - finally: - writer.close() - - async def do_search(db_config: Database, results_cache: ResultsCache, wildcard_query: str, - begin_timestamp: int | None, end_timestamp: int | None, path_filter: str, host: str): - # Start a server - try: - server = await asyncio.start_server(client_connected_cb=worker_connection_handler, host=host, port=0, - family=socket.AF_INET) - except asyncio.CancelledError: - # Search cancelled - return - port = server.sockets[0].getsockname()[1] - - server_task = asyncio.ensure_future(server.serve_forever()) - + begin_timestamp: int | None, end_timestamp: int | None, path_filter: str): db_monitor_task = asyncio.ensure_future( run_function_in_process(create_and_monitor_job_in_db, db_config, results_cache, wildcard_query, - begin_timestamp, end_timestamp, path_filter, host, port)) + begin_timestamp, end_timestamp, path_filter)) # Wait for the job to complete or an error to occur - pending = [server_task, db_monitor_task] try: - done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) - if db_monitor_task in done: - server.close() - await server.wait_closed() - else: - logger.error("server task unexpectedly returned") - db_monitor_task.cancel() - await db_monitor_task - except asyncio.CancelledError: - server.close() - await server.wait_closed() await db_monitor_task + except asyncio.CancelledError: + pass def main(argv): @@ -237,18 +151,8 @@ def main(argv): logger.exception("Failed to load config.") return -1 - # Get IP of local machine - host_ip = None - for ip in set(socket.gethostbyname_ex(socket.gethostname())[2]): - host_ip = ip - break - if host_ip is None: - logger.error("Could not determine IP of local machine.") - return -1 - asyncio.run(do_search(clp_config.database, clp_config.results_cache, parsed_args.wildcard_query, - parsed_args.begin_time, parsed_args.end_time, parsed_args.file_path, - host_ip)) + parsed_args.begin_time, parsed_args.end_time, parsed_args.file_path)) return 0 diff --git a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py index fda5e4889..e2c41b06d 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py @@ -11,6 +11,8 @@ import yaml +from pydantic import BaseModel + from clp_package_utils.general import ( CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, CONTAINER_CLP_HOME, @@ -35,6 +37,8 @@ QUEUE_COMPONENT_NAME, RESULTS_CACHE_COMPONENT_NAME, SCHEDULER_COMPONENT_NAME, + SEARCH_SCHEDULER_COMPONENT_NAME, + SEARCH_WORKER_COMPONENT_NAME, WORKER_COMPONENT_NAME, ) from job_orchestration.scheduler.constants import QueueName @@ -330,6 +334,153 @@ def start_scheduler(instance_id: str, clp_config: CLPConfig, container_clp_confi logger.info(f"Started {SCHEDULER_COMPONENT_NAME}.") +def start_search_scheduler(instance_id: str, clp_config: CLPConfig, container_clp_config: CLPConfig, + mounts: CLPDockerMounts): + component_name = SEARCH_SCHEDULER_COMPONENT_NAME + logger.info(f"Starting {component_name}...") + + container_name = f'clp-{component_name}-{instance_id}' + if container_exists(container_name): + logger.info(f"{SEARCH_SCHEDULER_COMPONENT_NAME} already running.") + return + + container_config_filename = f'{container_name}.yml' + container_config_file_path = clp_config.logs_directory / container_config_filename + with open(container_config_file_path, 'w') as f: + yaml.safe_dump(container_clp_config.dump_to_primitive_dict(), f) + + logs_dir = clp_config.logs_directory / component_name + logs_dir.mkdir(parents=True, exist_ok=True) + container_logs_dir = container_clp_config.logs_directory / component_name + + clp_site_packages_dir = CONTAINER_CLP_HOME / 'lib' / 'python3' / 'site-packages' + container_start_cmd = [ + 'docker', 'run', + '-di', + '--network', 'host', + '-w', str(CONTAINER_CLP_HOME), + '--rm', + '--name', container_name, + '-e', f'PYTHONPATH={clp_site_packages_dir}', + '-e', f'BROKER_URL=amqp://' + f'{container_clp_config.queue.username}:{container_clp_config.queue.password}@' + f'{container_clp_config.queue.host}:{container_clp_config.queue.port}', + '-e', f'RESULT_BACKEND=rpc://' + f'{container_clp_config.queue.username}:{container_clp_config.queue.password}@' + f'{container_clp_config.queue.host}:{container_clp_config.queue.port}', + '-e', f'CLP_LOGS_DIR={container_logs_dir}', + '-e', f'CLP_LOGGING_LEVEL={clp_config.search_scheduler.logging_level}', + '-u', f'{os.getuid()}:{os.getgid()}', + '--mount', str(mounts.clp_home), + ] + necessary_mounts = [ + mounts.logs_dir, + ] + for mount in necessary_mounts: + if mount: + container_start_cmd.append('--mount') + container_start_cmd.append(str(mount)) + container_start_cmd.append(clp_config.execution_container) + + scheduler_cmd = [ + 'python3', '-u', '-m', + 'job_orchestration.search_scheduler.search_scheduler', + '--config', str(container_clp_config.logs_directory / container_config_filename), + ] + cmd = container_start_cmd + scheduler_cmd + subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) + + logger.info(f"Started {component_name}.") + + +def start_search_worker(instance_id: str, clp_config: CLPConfig, container_clp_config: CLPConfig, + num_cpus: int, mounts: CLPDockerMounts): + celery_method = 'job_orchestration.executor.search' + celery_route = f"{QueueName.SEARCH}" + generic_start_worker( + SEARCH_WORKER_COMPONENT_NAME, + instance_id, + clp_config, + clp_config.search_worker, + container_clp_config, + celery_method, + celery_route, + num_cpus, + mounts + ) + + +def generic_start_worker(component_name: str, instance_id: str, clp_config: CLPConfig, worker_config: BaseModel, + container_clp_config: CLPConfig, celery_method: str, celery_route: str, + num_cpus: int, mounts: CLPDockerMounts): + logger.info(f"Starting {component_name}...") + + container_name = f'clp-{component_name}-{instance_id}' + if container_exists(container_name): + logger.info(f"{component_name} already running.") + return + + validate_worker_config(clp_config) + + logs_dir = clp_config.logs_directory / component_name + logs_dir.mkdir(parents=True, exist_ok=True) + container_logs_dir = container_clp_config.logs_directory / component_name + + # Create necessary directories + clp_config.archive_output.directory.mkdir(parents=True, exist_ok=True) + + clp_site_packages_dir = CONTAINER_CLP_HOME / 'lib' / 'python3' / 'site-packages' + container_start_cmd = [ + 'docker', 'run', + '-di', + '--network', 'host', + '-w', str(CONTAINER_CLP_HOME), + '--rm', + '--name', container_name, + '-e', f'PYTHONPATH={clp_site_packages_dir}', + '-e', f'BROKER_URL=amqp://' + f'{container_clp_config.queue.username}:{container_clp_config.queue.password}@' + f'{container_clp_config.queue.host}:{container_clp_config.queue.port}', + '-e', f'RESULT_BACKEND=rpc://' + f'{container_clp_config.queue.username}:{container_clp_config.queue.password}@' + f'{container_clp_config.queue.host}:{container_clp_config.queue.port}', + '-e', f'CLP_HOME={CONTAINER_CLP_HOME}', + '-e', f'CLP_DATA_DIR={container_clp_config.data_directory}', + '-e', f'CLP_ARCHIVE_OUTPUT_DIR={container_clp_config.archive_output.directory}', + '-e', f'CLP_LOGS_DIR={container_logs_dir}', + '-e', f'CLP_LOGGING_LEVEL={worker_config.logging_level}', + '-u', f'{os.getuid()}:{os.getgid()}', + '--mount', str(mounts.clp_home), + ] + necessary_mounts = [ + mounts.data_dir, + mounts.logs_dir, + mounts.archives_output_dir, + mounts.input_logs_dir, + ] + for mount in necessary_mounts: + if mount: + container_start_cmd.append('--mount') + container_start_cmd.append(str(mount)) + container_start_cmd.append(clp_config.execution_container) + + worker_cmd = [ + 'python3', str(clp_site_packages_dir / 'bin' / 'celery'), + '-A', + celery_method, + 'worker', + '--concurrency', str(num_cpus), + '--loglevel', 'WARNING', + '-f', str(container_logs_dir / "worker.log"), + '-Q', celery_route, + '-n', component_name, + ] + cmd = container_start_cmd + worker_cmd + subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) + + logger.info(f"Started {component_name}.") + + def start_worker(instance_id: str, clp_config: CLPConfig, container_clp_config: CLPConfig, num_cpus: int, mounts: CLPDockerMounts): logger.info(f"Starting {WORKER_COMPONENT_NAME}...") @@ -385,7 +536,7 @@ def start_worker(instance_id: str, clp_config: CLPConfig, container_clp_config: 'worker', '--concurrency', str(num_cpus), '--loglevel', 'WARNING', - '-Q', f"{QueueName.COMPRESSION},{QueueName.SEARCH}", + '-Q', f"{QueueName.COMPRESSION}", ] cmd = container_start_cmd + worker_cmd subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) @@ -429,9 +580,12 @@ def main(argv): clp_config = validate_and_load_config_file(config_file_path, default_config_file_path, clp_home) # Validate and load necessary credentials - if component_name in ['', DB_COMPONENT_NAME, SCHEDULER_COMPONENT_NAME]: + if component_name in ['', DB_COMPONENT_NAME, SCHEDULER_COMPONENT_NAME, + SEARCH_SCHEDULER_COMPONENT_NAME]: validate_and_load_db_credentials_file(clp_config, clp_home, True) - if component_name in ['', QUEUE_COMPONENT_NAME, SCHEDULER_COMPONENT_NAME, WORKER_COMPONENT_NAME]: + if component_name in ['', QUEUE_COMPONENT_NAME, SCHEDULER_COMPONENT_NAME, + WORKER_COMPONENT_NAME, SEARCH_SCHEDULER_COMPONENT_NAME, + SEARCH_WORKER_COMPONENT_NAME]: validate_and_load_queue_credentials_file(clp_config, clp_home, True) clp_config.validate_data_dir() @@ -475,6 +629,10 @@ def main(argv): start_results_cache(instance_id, clp_config, conf_dir) if '' == component_name or SCHEDULER_COMPONENT_NAME == component_name: start_scheduler(instance_id, clp_config, container_clp_config, mounts) + if '' == component_name or SEARCH_SCHEDULER_COMPONENT_NAME == component_name: + start_search_scheduler(instance_id, clp_config, container_clp_config, mounts) + if '' == component_name or SEARCH_WORKER_COMPONENT_NAME == component_name: + start_search_worker(instance_id, clp_config, container_clp_config, num_cpus, mounts) if '' == component_name or WORKER_COMPONENT_NAME == component_name: start_worker(instance_id, clp_config, container_clp_config, num_cpus, mounts) except Exception as ex: diff --git a/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py b/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py index d1c8c6397..44186bc0e 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py @@ -16,6 +16,8 @@ DB_COMPONENT_NAME, QUEUE_COMPONENT_NAME, RESULTS_CACHE_COMPONENT_NAME, + SEARCH_SCHEDULER_COMPONENT_NAME, + SEARCH_WORKER_COMPONENT_NAME, SCHEDULER_COMPONENT_NAME, WORKER_COMPONENT_NAME ) @@ -89,10 +91,19 @@ def main(argv): if '' == component_name or WORKER_COMPONENT_NAME == component_name: stop_container(f'clp-{WORKER_COMPONENT_NAME}-{instance_id}') + if '' == component_name or SEARCH_WORKER_COMPONENT_NAME == component_name: + stop_container(f'clp-{SEARCH_WORKER_COMPONENT_NAME}-{instance_id}') if '' == component_name or SCHEDULER_COMPONENT_NAME == component_name: container_name = f'clp-{SCHEDULER_COMPONENT_NAME}-{instance_id}' stop_container(container_name) + container_config_file_path = logs_dir / f'{container_name}.yml' + if container_config_file_path.exists(): + container_config_file_path.unlink() + if '' == component_name or SEARCH_SCHEDULER_COMPONENT_NAME == component_name: + container_name = f'clp-{SEARCH_SCHEDULER_COMPONENT_NAME}-{instance_id}' + stop_container(container_name) + container_config_file_path = logs_dir / f'{container_name}.yml' if container_config_file_path.exists(): container_config_file_path.unlink() diff --git a/components/clp-py-utils/clp_py_utils/clp_config.py b/components/clp-py-utils/clp_py_utils/clp_config.py index e25f15819..7ccb56da7 100644 --- a/components/clp-py-utils/clp_py_utils/clp_config.py +++ b/components/clp-py-utils/clp_py_utils/clp_config.py @@ -4,6 +4,7 @@ from pydantic import BaseModel, validator from .core import get_config_value, make_config_path_absolute, read_yaml_config_file, validate_path_could_be_dir +from .clp_logging import is_valid_logging_level, get_valid_logging_level # Constants # Component names @@ -11,10 +12,13 @@ QUEUE_COMPONENT_NAME = 'queue' RESULTS_CACHE_COMPONENT_NAME = 'results_cache' SCHEDULER_COMPONENT_NAME = 'scheduler' +SEARCH_SCHEDULER_COMPONENT_NAME = 'search_scheduler' +SEARCH_WORKER_COMPONENT_NAME = 'search_worker' WORKER_COMPONENT_NAME = 'worker' CLP_DEFAULT_CREDENTIALS_FILE_PATH = pathlib.Path('etc') / 'credentials.yml' CLP_METADATA_TABLE_PREFIX = 'clp_' +SEARCH_JOBS_TABLE_NAME = 'distributed_search_jobs' class Database(BaseModel): @@ -96,11 +100,37 @@ def get_clp_connection_params_and_type(self, disable_localhost_socket_connection connection_params_and_type['ssl_cert'] = self.ssl_cert return connection_params_and_type +def _validate_logging_level(cls, field): + if not is_valid_logging_level(field): + raise ValueError( + f"{cls.__name__}: '{field}' is not a valid logging level. Use one of" + f" {get_valid_logging_level()}" + ) + class Scheduler(BaseModel): jobs_poll_delay: int = 1 # seconds +class SearchScheduler(BaseModel): + jobs_poll_delay: float = 0.1 # seconds + logging_level: str = 'INFO' + + @validator('logging_level') + def validate_logging_level(cls, field): + _validate_logging_level(cls, field) + return field + + +class SearchWorker(BaseModel): + logging_level: str = 'INFO' + + @validator('logging_level') + def validate_logging_level(cls, field): + _validate_logging_level(cls, field) + return field + + class ResultsCache(BaseModel): host: str = 'localhost' port: int = 27017 @@ -174,6 +204,8 @@ class CLPConfig(BaseModel): queue: Queue = Queue() results_cache: ResultsCache = ResultsCache() scheduler: Scheduler = Scheduler() + search_scheduler: SearchScheduler = SearchScheduler() + search_worker: SearchWorker = SearchWorker() credentials_file_path: pathlib.Path = CLP_DEFAULT_CREDENTIALS_FILE_PATH archive_output: ArchiveOutput = ArchiveOutput() diff --git a/components/clp-py-utils/clp_py_utils/clp_logging.py b/components/clp-py-utils/clp_py_utils/clp_logging.py new file mode 100644 index 000000000..419432072 --- /dev/null +++ b/components/clp-py-utils/clp_py_utils/clp_logging.py @@ -0,0 +1,40 @@ +import logging + +LOGGING_LEVEL_MAPPING = { + 'INFO': logging.INFO, + 'DEBUG': logging.DEBUG, + 'WARN': logging.WARNING, + 'WARNING': logging.WARNING, + 'ERROR': logging.ERROR, + 'CRITICAL': logging.CRITICAL +} + +def get_logging_formatter(): + return logging.Formatter("%(asctime)s %(name)s [%(levelname)s] %(message)s") + +def get_logger(name: str): + logger = logging.getLogger(name) + # Setup console logging + logging_console_handler = logging.StreamHandler() + logging_console_handler.setFormatter(get_logging_formatter()) + logger.addHandler(logging_console_handler) + # Prevent double logging from sub loggers + logger.propagate = False + return logger + + +def get_valid_logging_level(): + return [i for i in LOGGING_LEVEL_MAPPING.keys()] + + +def is_valid_logging_level(level: str): + return level in LOGGING_LEVEL_MAPPING + + +def set_logging_level(logger: logging.Logger, level: str): + if not is_valid_logging_level(level): + logger.warning(f"Invalid logging level: {level}, using INFO as default") + logger.setLevel(logging.INFO) + return + logger.setLevel(LOGGING_LEVEL_MAPPING[level]) + diff --git a/components/clp-py-utils/clp_py_utils/create-db-tables.py b/components/clp-py-utils/clp_py_utils/create-db-tables.py index b591a0d0a..9bd0a5b89 100644 --- a/components/clp-py-utils/clp_py_utils/create-db-tables.py +++ b/components/clp-py-utils/clp_py_utils/create-db-tables.py @@ -36,6 +36,12 @@ def main(argv): ] subprocess.run(cmd, check=True) + cmd = [ + 'python3', str(script_dir / 'initialize-search-scheduler-db.py'), + '--config', str(config_file_path) + ] + subprocess.run(cmd, check=True) + return 0 diff --git a/components/clp-py-utils/clp_py_utils/initialize-search-scheduler-db.py b/components/clp-py-utils/clp_py_utils/initialize-search-scheduler-db.py new file mode 100644 index 000000000..b3aa12622 --- /dev/null +++ b/components/clp-py-utils/clp_py_utils/initialize-search-scheduler-db.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python3 +import argparse +import sys +from contextlib import closing + +from clp_py_utils.clp_config import Database, SEARCH_JOBS_TABLE_NAME +from clp_py_utils.clp_logging import get_logger +from clp_py_utils.core import read_yaml_config_file +from job_orchestration.search_scheduler.common import JobStatus +from sql_adapter import SQL_Adapter + +logger = get_logger(__file__) + +def main(argv): + args_parser = argparse.ArgumentParser(description="Sets up tables for the search scheduler.") + args_parser.add_argument('--config', required=True, help="Database config file.") + parsed_args = args_parser.parse_args(argv[1:]) + + try: + database_config = Database.parse_obj(read_yaml_config_file(parsed_args.config)) + if database_config is None: + raise ValueError(f"Database configuration file '{parsed_args.config}' is empty.") + sql_adapter = SQL_Adapter(database_config) + with closing(sql_adapter.create_connection(True)) as scheduling_db, \ + closing(scheduling_db.cursor(dictionary=True)) as scheduling_db_cursor: + scheduling_db_cursor.execute(f""" + CREATE TABLE IF NOT EXISTS `{SEARCH_JOBS_TABLE_NAME}` ( + `id` INT NOT NULL AUTO_INCREMENT, + `status` INT NOT NULL DEFAULT '{JobStatus.PENDING}', + `submission_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), + `search_config` VARBINARY(60000) NOT NULL, + PRIMARY KEY (`id`) USING BTREE, + INDEX `JOB_STATUS` (`status`) USING BTREE + ) ROW_FORMAT=DYNAMIC + """) + + scheduling_db.commit() + except: + logger.exception("Failed to create search scheduler tables.") + return -1 + + return 0 + + +if '__main__' == __name__: + sys.exit(main(sys.argv)) diff --git a/components/core/src/clp/clo/CMakeLists.txt b/components/core/src/clp/clo/CMakeLists.txt index 7ac8a36ed..aab338047 100644 --- a/components/core/src/clp/clo/CMakeLists.txt +++ b/components/core/src/clp/clo/CMakeLists.txt @@ -108,8 +108,6 @@ set( clo.cpp CommandLineArguments.cpp CommandLineArguments.hpp - ControllerMonitoringThread.cpp - ControllerMonitoringThread.hpp ResultsCacheClient.cpp ResultsCacheClient.hpp ) diff --git a/components/core/src/clp/clo/CommandLineArguments.cpp b/components/core/src/clp/clo/CommandLineArguments.cpp index 7a5d98a29..87bfd3d0b 100644 --- a/components/core/src/clp/clo/CommandLineArguments.cpp +++ b/components/core/src/clp/clo/CommandLineArguments.cpp @@ -97,12 +97,6 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) { po::options_description hidden_positional_options; // clang-format off hidden_positional_options.add_options()( - "search-controller-host", - po::value(&m_search_controller_host) - )( - "search-controller-port", - po::value(&m_search_controller_port) - )( "mongodb-uri", po::value(&m_mongodb_uri) )( @@ -120,8 +114,6 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) { ); // clang-format on po::positional_options_description positional_options_description; - positional_options_description.add("search-controller-host", 1); - positional_options_description.add("search-controller-port", 1); positional_options_description.add("mongodb-uri", 1); positional_options_description.add("mongodb-collection", 1); positional_options_description.add("archive-path", 1); @@ -178,10 +170,9 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) { cerr << "Examples:" << endl; cerr << R"( # Search ARCHIVE_PATH for " ERROR " and send results to )" R"(mongodb://127.0.0.1:27017/test "result" collection )" - R"(and use localhost:5555 as the search controller)" << endl; cerr << " " << get_program_name() - << R"(localhost 5555 mongodb://127.0.0.1:27017/test result )" + << R"(mongodb://127.0.0.1:27017/test result )" R"(ARCHIVE_PATH " ERROR ")" << endl; cerr << endl; @@ -198,16 +189,6 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) { return ParsingResult::InfoCommand; } - // Validate search controller host was specified - if (m_search_controller_host.empty()) { - throw invalid_argument("SEARCH_CONTROLLER_HOST not specified or empty."); - } - - // Validate search controller port was specified - if (m_search_controller_port.empty()) { - throw invalid_argument("SEARCH_CONTROLLER_PORT not specified or empty."); - } - // Validate mongodb uri was specified if (m_mongodb_uri.empty()) { throw invalid_argument("MONGODB_URI not specified or empty."); @@ -291,9 +272,7 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) { } void CommandLineArguments::print_basic_usage() const { - cerr << "Usage: " << get_program_name() - << " [OPTIONS] SEARCH_CONTROLLER_HOST SEARCH_CONTROLLER_PORT " - "MONGODB_URI MONGODB_COLLECTION " + cerr << "Usage: " << get_program_name() << " [OPTIONS] MONGODB_URI MONGODB_COLLECTION " << R"(ARCHIVE_PATH "WILDCARD STRING" [FILE])" << endl; } } // namespace clp::clo diff --git a/components/core/src/clp/clo/CommandLineArguments.hpp b/components/core/src/clp/clo/CommandLineArguments.hpp index 82890f3e2..c2233e819 100644 --- a/components/core/src/clp/clo/CommandLineArguments.hpp +++ b/components/core/src/clp/clo/CommandLineArguments.hpp @@ -23,10 +23,6 @@ class CommandLineArguments : public CommandLineArgumentsBase { // Methods ParsingResult parse_arguments(int argc, char const* argv[]) override; - std::string const& get_search_controller_host() const { return m_search_controller_host; } - - std::string const& get_search_controller_port() const { return m_search_controller_port; } - std::string const& get_mongodb_uri() const { return m_mongodb_uri; } std::string const& get_mongodb_collection() const { return m_mongodb_collection; } @@ -50,8 +46,6 @@ class CommandLineArguments : public CommandLineArgumentsBase { void print_basic_usage() const override; // Variables - std::string m_search_controller_host; - std::string m_search_controller_port; std::string m_mongodb_uri; std::string m_mongodb_collection; uint64_t m_batch_size; diff --git a/components/core/src/clp/clo/ControllerMonitoringThread.cpp b/components/core/src/clp/clo/ControllerMonitoringThread.cpp deleted file mode 100644 index 0e5a4589a..000000000 --- a/components/core/src/clp/clo/ControllerMonitoringThread.cpp +++ /dev/null @@ -1,47 +0,0 @@ -#include "ControllerMonitoringThread.hpp" - -#include - -#include "../networking/socket_utils.hpp" -#include "../spdlog_with_specializations.hpp" - -namespace clp::clo { -void ControllerMonitoringThread::thread_method() { - // Wait for the controller socket to close - constexpr size_t cBufLen = 4096; - char buf[cBufLen]; - size_t num_bytes_received; - for (bool exit = false; false == exit;) { - auto error_code - = networking::try_receive(m_controller_socket_fd, buf, cBufLen, num_bytes_received); - switch (error_code) { - case ErrorCode_EndOfFile: - // Controller closed the connection - m_query_cancelled = true; - exit = true; - break; - case ErrorCode_Success: - // Unexpectedly received data - SPDLOG_ERROR( - "Unexpected received {} bytes of data from controller.", - num_bytes_received - ); - break; - case ErrorCode_BadParam: - SPDLOG_ERROR("Bad parameter sent to try_receive.", num_bytes_received); - exit = true; - break; - case ErrorCode_errno: - SPDLOG_ERROR("Failed to receive data from controller, errno={}.", errno); - exit = true; - break; - default: - SPDLOG_ERROR("Unexpected error from try_receive, error_code={}.", error_code); - exit = true; - break; - } - } - - close(m_controller_socket_fd); -} -} // namespace clp::clo diff --git a/components/core/src/clp/clo/ControllerMonitoringThread.hpp b/components/core/src/clp/clo/ControllerMonitoringThread.hpp deleted file mode 100644 index 5c273be5d..000000000 --- a/components/core/src/clp/clo/ControllerMonitoringThread.hpp +++ /dev/null @@ -1,31 +0,0 @@ -#ifndef CLP_CLO_CONTROLLERMONITORINGTHREAD_HPP -#define CLP_CLO_CONTROLLERMONITORINGTHREAD_HPP - -#include "../Thread.hpp" - -namespace clp::clo { -/** - * A thread that waits for the controller to close the connection at which time it will indicate the - * query has been cancelled. - */ -class ControllerMonitoringThread : public Thread { -public: - // Constructor - ControllerMonitoringThread(int controller_socket_fd) - : m_controller_socket_fd(controller_socket_fd), - m_query_cancelled(false) {} - - std::atomic_bool const& get_query_cancelled() const { return m_query_cancelled; } - -protected: - // Methods - void thread_method() override; - -private: - // Variables - int m_controller_socket_fd; - std::atomic_bool m_query_cancelled; -}; -} // namespace clp::clo - -#endif // CLP_CLO_CONTROLLERMONITORINGTHREAD_HPP diff --git a/components/core/src/clp/clo/clo.cpp b/components/core/src/clp/clo/clo.cpp index fdcfd2fce..0f83db713 100644 --- a/components/core/src/clp/clo/clo.cpp +++ b/components/core/src/clp/clo/clo.cpp @@ -1,5 +1,3 @@ -#include - #include #include @@ -9,13 +7,11 @@ #include "../Defs.h" #include "../Grep.hpp" -#include "../networking/socket_utils.hpp" #include "../Profiler.hpp" #include "../spdlog_with_specializations.hpp" #include "../streaming_archive/Constants.hpp" #include "../Utils.hpp" #include "CommandLineArguments.hpp" -#include "ControllerMonitoringThread.hpp" #include "ResultsCacheClient.hpp" using clp::clo::CommandLineArguments; @@ -48,21 +44,11 @@ enum class SearchFilesResult { Success }; -/** - * Connects to the search controller - * @param controller_host - * @param controller_port - * @return -1 on failure - * @return Search controller socket file descriptor otherwise - */ -static int -connect_to_search_controller(string const& controller_host, string const& controller_port); /** * Searches all files referenced by a given database cursor * @param query * @param archive * @param file_metadata_ix - * @param query_cancelled * @param results_cache_client * @return SearchFilesResult::OpenFailure on failure to open a compressed file * @return SearchFilesResult::ResultSendFailure on failure to send a result @@ -72,78 +58,25 @@ static SearchFilesResult search_files( Query& query, Archive& archive, MetadataDB::FileIterator& file_metadata_ix, - std::atomic_bool const& query_cancelled, ResultsCacheClient& results_cache_client ); /** * Searches an archive with the given path * @param command_line_args * @param archive_path - * @param query_cancelled * @param results_cache_client * @return true on success, false otherwise */ static bool search_archive( CommandLineArguments const& command_line_args, boost::filesystem::path const& archive_path, - std::atomic_bool const& query_cancelled, ResultsCacheClient& results_cache_client ); -static int -connect_to_search_controller(string const& controller_host, string const& controller_port) { - // Get address info for controller - struct addrinfo hints = {}; - // Address can be IPv4 or IPV6 - hints.ai_family = AF_UNSPEC; - // TCP socket - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = 0; - hints.ai_protocol = 0; - struct addrinfo* addresses_head = nullptr; - int error = getaddrinfo( - controller_host.c_str(), - controller_port.c_str(), - &hints, - &addresses_head - ); - if (0 != error) { - SPDLOG_ERROR("Failed to get address information for search controller, error={}", error); - return -1; - } - - // Try each address until a socket can be created and connected to - int controller_socket_fd = -1; - for (auto curr = addresses_head; nullptr != curr; curr = curr->ai_next) { - // Create socket - controller_socket_fd = socket(curr->ai_family, curr->ai_socktype, curr->ai_protocol); - if (-1 == controller_socket_fd) { - continue; - } - - // Connect to address - if (connect(controller_socket_fd, curr->ai_addr, curr->ai_addrlen) != -1) { - break; - } - - // Failed to connect, so close socket - close(controller_socket_fd); - controller_socket_fd = -1; - } - freeaddrinfo(addresses_head); - if (-1 == controller_socket_fd) { - SPDLOG_ERROR("Failed to connect to search controller, errno={}", errno); - return -1; - } - - return controller_socket_fd; -} - static SearchFilesResult search_files( Query& query, Archive& archive, MetadataDB::FileIterator& file_metadata_ix, - std::atomic_bool const& query_cancelled, ResultsCacheClient& results_cache_client ) { SearchFilesResult result = SearchFilesResult::Success; @@ -168,14 +101,13 @@ static SearchFilesResult search_files( } query.make_sub_queries_relevant_to_segment(compressed_file.get_segment_id()); - while (false == query_cancelled - && Grep::search_and_decompress( - query, - archive, - compressed_file, - compressed_message, - decompressed_message - )) + while (Grep::search_and_decompress( + query, + archive, + compressed_file, + compressed_message, + decompressed_message + )) { results_cache_client.add_result( compressed_file.get_orig_path(), @@ -193,7 +125,6 @@ static SearchFilesResult search_files( static bool search_archive( CommandLineArguments const& command_line_args, boost::filesystem::path const& archive_path, - std::atomic_bool const& query_cancelled, ResultsCacheClient& results_cache_client ) { if (false == boost::filesystem::exists(archive_path)) { @@ -267,13 +198,7 @@ static bool search_archive( auto& file_metadata_ix = *file_metadata_ix_ptr; for (auto segment_id : ids_of_segments_to_search) { file_metadata_ix.set_segment_id(segment_id); - auto result = search_files( - query, - archive_reader, - file_metadata_ix, - query_cancelled, - results_cache_client - ); + auto result = search_files(query, archive_reader, file_metadata_ix, results_cache_client); if (SearchFilesResult::ResultSendFailure == result) { // Stop search now since results aren't reaching the controller break; @@ -312,14 +237,6 @@ int main(int argc, char const* argv[]) { break; } - int controller_socket_fd = connect_to_search_controller( - command_line_args.get_search_controller_host(), - command_line_args.get_search_controller_port() - ); - if (-1 == controller_socket_fd) { - return -1; - } - mongocxx::instance mongocxx_instance{}; ResultsCacheClient results_cache_client( command_line_args.get_mongodb_uri(), @@ -329,19 +246,9 @@ int main(int argc, char const* argv[]) { auto const archive_path = boost::filesystem::path(command_line_args.get_archive_path()); - clp::clo::ControllerMonitoringThread controller_monitoring_thread(controller_socket_fd); - controller_monitoring_thread.start(); - int return_value = 0; try { - if (false - == search_archive( - command_line_args, - archive_path, - controller_monitoring_thread.get_query_cancelled(), - results_cache_client - )) - { + if (false == search_archive(command_line_args, archive_path, results_cache_client)) { return_value = -1; } } catch (TraceableException& e) { @@ -365,38 +272,5 @@ int main(int argc, char const* argv[]) { } return_value = -1; } - - // Unblock the controller monitoring thread if it's blocked - auto shutdown_result = shutdown(controller_socket_fd, SHUT_RDWR); - if (0 != shutdown_result) { - if (ENOTCONN != shutdown_result) { - SPDLOG_ERROR("Failed to shutdown socket, error={}", shutdown_result); - } // else connection already disconnected, so nothing to do - } - - try { - controller_monitoring_thread.join(); - } catch (TraceableException& e) { - auto error_code = e.get_error_code(); - if (ErrorCode_errno == error_code) { - SPDLOG_ERROR( - "Failed to join with controller monitoring thread: {}:{} {}, errno={}", - e.get_filename(), - e.get_line_number(), - e.what(), - errno - ); - } else { - SPDLOG_ERROR( - "Failed to join with controller monitoring thread: {}:{} {}, error_code={}", - e.get_filename(), - e.get_line_number(), - e.what(), - error_code - ); - } - return_value = -1; - } - return return_value; } diff --git a/components/job-orchestration/job_orchestration/executor/celeryconfig.py b/components/job-orchestration/job_orchestration/executor/celeryconfig.py index f0322863f..af9e55b0b 100644 --- a/components/job-orchestration/job_orchestration/executor/celeryconfig.py +++ b/components/job-orchestration/job_orchestration/executor/celeryconfig.py @@ -7,14 +7,12 @@ worker_prefetch_multiplier = 1 imports = [ 'job_orchestration.executor.compression_task', - 'job_orchestration.executor.search_task' ] # Queue settings task_queue_max_priority = TASK_QUEUE_HIGHEST_PRIORITY task_routes = { 'job_orchestration.executor.compression_task.compress': QueueName.COMPRESSION, - 'job_orchestration.executor.search_task.search': QueueName.SEARCH } task_create_missing_queues = True diff --git a/components/job-orchestration/job_orchestration/executor/search/__init__.py b/components/job-orchestration/job_orchestration/executor/search/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/components/job-orchestration/job_orchestration/executor/search/celery.py b/components/job-orchestration/job_orchestration/executor/search/celery.py new file mode 100644 index 000000000..7320426d8 --- /dev/null +++ b/components/job-orchestration/job_orchestration/executor/search/celery.py @@ -0,0 +1,5 @@ +from celery import Celery +from . import celeryconfig # type: ignore + +app = Celery("search") +app.config_from_object(celeryconfig) diff --git a/components/job-orchestration/job_orchestration/executor/search/celeryconfig.py b/components/job-orchestration/job_orchestration/executor/search/celeryconfig.py new file mode 100644 index 000000000..b283b70b8 --- /dev/null +++ b/components/job-orchestration/job_orchestration/executor/search/celeryconfig.py @@ -0,0 +1,32 @@ +import os + +from job_orchestration.scheduler.constants import QueueName + +imports = ("job_orchestration.executor.search.fs_search_task") + +task_routes = { + 'job_orchestration.executor.search.fs_search_task.search': QueueName.SEARCH, +} +task_create_missing_queues = True + +broker_url = os.getenv('BROKER_URL') +result_backend = os.getenv('RESULT_BACKEND') + +result_persistent = True + +# Differentiate between tasks that have started v.s. tasks still in queue +task_track_started = True + +accept_content = [ + "application/json", # json + "application/x-python-serialize", # pickle +] + +result_accept_content = [ + "application/json", # json + "application/x-python-serialize", # pickle +] + +# TODO: Choose a different serialization format for tasks and results. Sticking with json is +# probably not a good idea. +result_serializer = "json" diff --git a/components/job-orchestration/job_orchestration/executor/search/fs_search_task.py b/components/job-orchestration/job_orchestration/executor/search/fs_search_task.py new file mode 100644 index 000000000..86503c8b2 --- /dev/null +++ b/components/job-orchestration/job_orchestration/executor/search/fs_search_task.py @@ -0,0 +1,103 @@ +import logging +import os +import sys +import signal +import subprocess +from pathlib import Path +from typing import Any, Dict + +from celery.app.task import Task +from celery.utils.log import get_task_logger + +from clp_py_utils.clp_logging import set_logging_level + +from job_orchestration.executor.search.celery import app +from job_orchestration.job_config import SearchConfig, SearchTaskResult + +# Setup logging +logger = get_task_logger(__name__) + +@app.task(bind=True) +def search( + self: Task, + job_id: str, + search_config_obj: dict, + archive_id: str, + results_cache_uri: str, +) -> Dict[str, Any]: + task_id = str(self.request.id) + clp_home = Path(os.getenv("CLP_HOME")) + archive_directory = Path(os.getenv('CLP_ARCHIVE_OUTPUT_DIR')) + clp_logs_dir = Path(os.getenv("CLP_LOGS_DIR")) + clp_logging_level = str(os.getenv("CLP_LOGGING_LEVEL")) + + # Setup logging to file + worker_logs_dir = clp_logs_dir / job_id + worker_logs_dir.mkdir(exist_ok=True, parents=True) + set_logging_level(logger, clp_logging_level) + clo_log_path = worker_logs_dir / f"{task_id}-clo.log" + clo_log_file = open(clo_log_path, "w") + + logger.info(f"Started task for job {job_id}") + + search_config = SearchConfig.parse_obj(search_config_obj) + search_cmd = [ + str(clp_home / "bin" / "clo"), + results_cache_uri, + job_id, + str(archive_directory / archive_id), + search_config.query_string, + ] + + if search_config.begin_timestamp is not None: + search_cmd.append('--tge') + search_cmd.append(str(search_config.begin_timestamp)) + if search_config.end_timestamp is not None: + search_cmd.append('--tle') + search_cmd.append(str(search_config.end_timestamp)) + if search_config.path_filter is not None: + search_cmd.append(search_config.path_filter) + + logger.info(f'Running: {" ".join(search_cmd)}') + search_successful = False + search_proc = subprocess.Popen( + search_cmd, + preexec_fn=os.setpgrp, + close_fds=True, + stdout=clo_log_file, + stderr=clo_log_file, + ) + + def sigterm_handler(_signo, _stack_frame): + logger.debug("Entered sigterm handler") + if search_proc.poll() is None: + logger.debug("Trying to kill search process") + # Kill the process group in case the search process also forked + os.killpg(os.getpgid(search_proc.pid), signal.SIGTERM) + os.waitpid(search_proc.pid, 0) + logger.info(f"Cancelling search task.") + # Add 128 to follow convention for exit codes from signals + # https://tldp.org/LDP/abs/html/exitcodes.html#AEN23549 + sys.exit(_signo + 128) + + # Register the function to kill the child process at exit + signal.signal(signal.SIGTERM, sigterm_handler) + + logger.info("Waiting for search to finish") + # communicate is equivalent to wait in this case, but avoids deadlocks if we switch to piping + # stdout/stderr in the future. + search_proc.communicate() + return_code = search_proc.returncode + if 0 != return_code: + logger.error(f"Failed search task for job {job_id} - return_code={return_code}") + else: + search_successful = True + logger.info(f"Search task completed for job {job_id}") + + # Close log files + clo_log_file.close() + + return SearchTaskResult( + success=search_successful, + task_id=task_id, + ).dict() diff --git a/components/job-orchestration/job_orchestration/executor/search_task.py b/components/job-orchestration/job_orchestration/executor/search_task.py index cf3d77237..7fe573d94 100644 --- a/components/job-orchestration/job_orchestration/executor/search_task.py +++ b/components/job-orchestration/job_orchestration/executor/search_task.py @@ -38,7 +38,7 @@ def run_clo(job_id: int, task_id: int, clp_home: pathlib.Path, archive_output_di results_cache_uri, str(job_id), str(archive_output_dir / archive_id), - search_config.wildcard_query + search_config.query_string ] if search_config.begin_timestamp is not None: cmd.append('--tge') @@ -76,7 +76,7 @@ def run_clo(job_id: int, task_id: int, clp_home: pathlib.Path, archive_output_di return search_successful, f"See {stderr_filename} in logs directory." -@app.task() +#@app.task() def search(job_id: int, task_id: int, search_config_json: str, archive_id: str, results_cache_uri: str): clp_home = os.getenv('CLP_HOME') diff --git a/components/job-orchestration/job_orchestration/job_config.py b/components/job-orchestration/job_orchestration/job_config.py index cb0cd255c..20f9aa08e 100644 --- a/components/job-orchestration/job_orchestration/job_config.py +++ b/components/job-orchestration/job_orchestration/job_config.py @@ -30,9 +30,11 @@ class ClpIoConfig(BaseModel): class SearchConfig(BaseModel): - search_controller_host: str - search_controller_port: int - wildcard_query: str + query_string: str begin_timestamp: typing.Optional[int] = None end_timestamp: typing.Optional[int] = None path_filter: typing.Optional[str] = None + +class SearchTaskResult(BaseModel): + success: bool + task_id: str diff --git a/components/job-orchestration/job_orchestration/search_scheduler/__init__.py b/components/job-orchestration/job_orchestration/search_scheduler/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/components/job-orchestration/job_orchestration/search_scheduler/common.py b/components/job-orchestration/job_orchestration/search_scheduler/common.py new file mode 100644 index 000000000..094b7288c --- /dev/null +++ b/components/job-orchestration/job_orchestration/search_scheduler/common.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +from enum import IntEnum, auto + +# When adding new states always add them to the end of this enum +# and make necessary changes in the UI, Search Scheduler, and Reducer +class JobStatus(IntEnum): + PENDING = 0 + RUNNING = auto() + SUCCESS = auto() + FAILED = auto() + CANCELLING = auto() + CANCELLED = auto() + + @staticmethod + def from_str(label: str) -> JobStatus: + return JobStatus[label.upper()] + + def __str__(self) -> str: + return str(self.value) + + def to_str(self) -> str: + return str(self.name) + diff --git a/components/job-orchestration/job_orchestration/search_scheduler/search_scheduler.py b/components/job-orchestration/job_orchestration/search_scheduler/search_scheduler.py new file mode 100644 index 000000000..8217b20b8 --- /dev/null +++ b/components/job-orchestration/job_orchestration/search_scheduler/search_scheduler.py @@ -0,0 +1,283 @@ +#!/usr/bin/env python3 + +import argparse +import contextlib +import logging +import os +import sys +import time +from pathlib import Path +from typing import Dict, List, Optional + +import msgpack +import pathlib + +import celery +from clp_py_utils.clp_config import SEARCH_JOBS_TABLE_NAME, CLP_METADATA_TABLE_PREFIX +from job_orchestration.executor.search.fs_search_task import search +from job_orchestration.job_config import SearchConfig, SearchTaskResult + +from pydantic import ValidationError + +from clp_py_utils.clp_config import CLPConfig +from clp_py_utils.clp_logging import get_logger, get_logging_formatter, set_logging_level +from clp_py_utils.core import read_yaml_config_file +from clp_py_utils.sql_adapter import SQL_Adapter + +from .common import JobStatus # type: ignore + +# Setup logging +logger = get_logger("search-job-handler") + +class SearchJob: + def __init__(self, async_task_result: any) -> None: + self.async_task_result: any = async_task_result + +# Dictionary of active jobs indexed by job id +active_jobs : Dict[str, SearchJob] = {} + +def cancel_job(job_id): + global active_jobs + active_jobs[job_id].async_task_result.revoke(terminate=True) + try: + active_jobs[job_id].async_task_result.get() + except Exception: + pass + del active_jobs[job_id] + + +def fetch_new_search_jobs(db_cursor) -> list: + db_cursor.execute(f""" + SELECT {SEARCH_JOBS_TABLE_NAME}.id as job_id, + {SEARCH_JOBS_TABLE_NAME}.status as job_status, + {SEARCH_JOBS_TABLE_NAME}.search_config, + {SEARCH_JOBS_TABLE_NAME}.submission_time + FROM {SEARCH_JOBS_TABLE_NAME} + WHERE {SEARCH_JOBS_TABLE_NAME}.status={JobStatus.PENDING} + """) + return db_cursor.fetchall() + + +def fetch_cancelling_search_jobs(db_cursor) -> list: + db_cursor.execute(f""" + SELECT {SEARCH_JOBS_TABLE_NAME}.id as job_id + FROM {SEARCH_JOBS_TABLE_NAME} + WHERE {SEARCH_JOBS_TABLE_NAME}.status={JobStatus.CANCELLING} + """) + return db_cursor.fetchall() + + +def set_job_status( + db_conn, job_id: str, status: JobStatus, prev_status: Optional[JobStatus] = None, **kwargs +) -> bool: + field_set_expressions = [f'{k}="{v}"' for k, v in kwargs.items()] + field_set_expressions.append(f"status={status}") + update = f'UPDATE {SEARCH_JOBS_TABLE_NAME} SET {", ".join(field_set_expressions)} WHERE id={job_id}' + + if prev_status is not None: + update += f' AND status={prev_status}' + + with contextlib.closing(db_conn.cursor()) as cursor: + cursor.execute(update) + db_conn.commit() + rval = cursor.rowcount != 0 + return rval + + +def handle_cancelling_search_jobs(db_conn) -> None: + global active_jobs + + with contextlib.closing(db_conn.cursor(dictionary=True)) as cursor: + cancelling_jobs = fetch_cancelling_search_jobs(cursor) + db_conn.commit() + + for job in cancelling_jobs: + job_id = job['job_id'] + if job_id in active_jobs: + cancel_job(job_id) + if set_job_status(db_conn, job_id, JobStatus.CANCELLED, prev_status=JobStatus.CANCELLING): + logger.info(f"Cancelled job {job_id}.") + else: + logger.error(f"Failed to cancel job {job_id}.") + + +def get_archives_for_search( + db_conn, + search_config: SearchConfig, +): + query = f"""SELECT id as archive_id + FROM {CLP_METADATA_TABLE_PREFIX}archives + """ + filter_clauses = [] + if search_config.end_timestamp is not None: + filter_clauses.append(f"begin_timestamp <= {search_config.end_timestamp}") + if search_config.begin_timestamp is not None: + filter_clauses.append(f"end_timestamp >= {search_config.begin_timestamp}") + if len(filter_clauses) > 0: + query += " WHERE " + " AND ".join(filter_clauses) + query += " ORDER BY end_timestamp DESC" + + with contextlib.closing(db_conn.cursor(dictionary=True)) as cursor: + cursor.execute( + query + ) + archives_for_search = [archive['archive_id'] for archive in cursor.fetchall()] + db_conn.commit() + return archives_for_search + + +def get_task_group_for_job( + archives_for_search: List[str], + job_id: str, + search_config: SearchConfig, + results_cache_uri: str, +): + search_config_obj = search_config.dict() + return celery.group( + search.s( + job_id=job_id, + archive_id=archive_id, + search_config_obj=search_config_obj, + results_cache_uri=results_cache_uri, + ) for archive_id in archives_for_search + ) + + +def dispatch_search_job( + archives_for_search: List[str], + job_id: str, + search_config: SearchConfig, + results_cache_uri: str +) -> None: + global active_jobs + task_group = get_task_group_for_job(archives_for_search, job_id, search_config, results_cache_uri) + active_jobs[job_id] = SearchJob(task_group.apply_async()) + + +def handle_pending_search_jobs(db_conn, results_cache_uri: str) -> None: + global active_jobs + + with contextlib.closing(db_conn.cursor(dictionary=True)) as cursor: + new_jobs = fetch_new_search_jobs(cursor) + db_conn.commit() + + for job in new_jobs: + logger.debug(f"Got job {job['job_id']} with status {job['job_status']}.") + search_config_obj = SearchConfig.parse_obj(msgpack.unpackb(job['search_config'])) + archives_for_search = get_archives_for_search(db_conn, search_config_obj) + if len(archives_for_search) == 0: + if set_job_status(db_conn, job['job_id'], JobStatus.SUCCESS, job['job_status']): + logger.info(f"No matching archives, skipping job {job['job_id']}.") + continue + + dispatch_search_job(archives_for_search, str(job['job_id']), search_config_obj, results_cache_uri) + if set_job_status(db_conn, job['job_id'], JobStatus.RUNNING, job['job_status']): + logger.info(f"Dispatched job {job['job_id']} with {len(archives_for_search)} archives to search.") + + +def try_getting_task_result(async_task_result): + """ + Ideally, we'd use this code: + + if not async_task_result.ready(): + return None + return async_task_result.get() + + But because of https://github.com/celery/celery/issues/4084, wew have to use the following + timeout based approach until we switch to the Redis result backend. + """ + try: + return async_task_result.get(timeout=0.1) + except celery.exceptions.TimeoutError: + return None + + +def check_job_status_and_update_db(db_conn): + global active_jobs + + for job_id in list(active_jobs.keys()): + try: + returned_results = try_getting_task_result(active_jobs[job_id].async_task_result) + except Exception as e: + logger.error(f"Job `{job_id}` failed: {e}.") + # clean up + del active_jobs[job_id] + set_job_status(db_conn, job_id, JobStatus.FAILED, JobStatus.RUNNING) + continue + + if returned_results is not None: + new_job_status = JobStatus.SUCCESS + for task_result_obj in returned_results: + task_result = SearchTaskResult.parse_obj(task_result_obj) + if not task_result.success: + task_id = task_result.task_id + new_job_status = JobStatus.FAILED + logger.debug(f"Task {task_id} failed - result {task_result}.") + + del active_jobs[job_id] + + if set_job_status(db_conn, job_id, new_job_status, JobStatus.RUNNING): + if new_job_status != JobStatus.FAILED: + logger.info(f"Completed job {job_id}.") + else: + logger.info(f"Completed job {job_id} with failing tasks.") + + +def handle_jobs( + db_conn, + results_cache_uri: str, + jobs_poll_delay: float, +) -> None: + while True: + handle_pending_search_jobs(db_conn, results_cache_uri) + handle_cancelling_search_jobs(db_conn) + check_job_status_and_update_db(db_conn) + time.sleep(jobs_poll_delay) + + +def main(argv: List[str]) -> int: + args_parser = argparse.ArgumentParser(description="Wait for and run search jobs.") + args_parser.add_argument('--config', '-c', required=True, help='CLP configuration file.') + + parsed_args = args_parser.parse_args(argv[1:]) + + # Setup logging to file + log_file = Path(os.getenv("CLP_LOGS_DIR")) / "search_scheduler.log" + logging_file_handler = logging.FileHandler(filename=log_file, encoding="utf-8") + logging_file_handler.setFormatter(get_logging_formatter()) + logger.addHandler(logging_file_handler) + + # Update logging level based on config + set_logging_level(logger, os.getenv("CLP_LOGGING_LEVEL")) + + # Load configuration + config_path = pathlib.Path(parsed_args.config) + try: + clp_config = CLPConfig.parse_obj(read_yaml_config_file(config_path)) + except ValidationError as err: + logger.error(err) + return -1 + except Exception as ex: + logger.error(ex) + return -1 + + sql_adapter = SQL_Adapter(clp_config.database) + + logger.debug(f"Job polling interval {clp_config.search_scheduler.jobs_poll_delay} seconds.") + try: + with contextlib.closing(sql_adapter.create_connection(True)) as db_conn: + logger.info(f"Connected to archive database {clp_config.database.host}:{clp_config.database.port}.") + logger.info("Search scheduler started.") + handle_jobs( + db_conn=db_conn, + results_cache_uri=clp_config.results_cache.get_uri(), + jobs_poll_delay=clp_config.search_scheduler.jobs_poll_delay, + ) + except Exception: + logger.exception(f"Uncaught exception in job handling loop.") + + return 0 + + +if "__main__" == __name__: + sys.exit(main(sys.argv)) diff --git a/components/job-orchestration/pyproject.toml b/components/job-orchestration/pyproject.toml index 487ee8538..b99bb0e22 100644 --- a/components/job-orchestration/pyproject.toml +++ b/components/job-orchestration/pyproject.toml @@ -8,7 +8,11 @@ readme = "README.md" [tool.poetry.dependencies] python = "^3.8 || ^3.10" celery = "^5.3.6" +# mariadb version must be compatible with libmariadev installed in runtime env. +# See https://mariadb.com/docs/server/connect/programming-languages/python/install/#Dependencies +mariadb = "~1.0.11" msgpack = "^1.0.7" +mysql-connector-python = "^8.2.0" pika = "^1.3.2" pydantic = "^1.10.13" PyYAML = "^6.0.1" diff --git a/components/package-template/src/etc/clp-config.yml b/components/package-template/src/etc/clp-config.yml index 1882f8270..291752f53 100644 --- a/components/package-template/src/etc/clp-config.yml +++ b/components/package-template/src/etc/clp-config.yml @@ -25,6 +25,12 @@ # port: 27017 # db_name: "clp-search" # +#search_scheduler: +# jobs_poll_delay: 0.1 # seconds +# logging_level: "INFO" +#search_worker: +# logging_level: "INFO" +# ## Where archives should be output to #archive_output: # directory: "var/data/archives"