From b6284f8e13b96b9cb41f488895b492edbc8d7fa4 Mon Sep 17 00:00:00 2001 From: Xianzhe Ma Date: Wed, 3 Jul 2024 14:09:38 +0200 Subject: [PATCH] Batch evaluation intervals into a single request and a single evaluation process (#554) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per the title, this PR enables batching evaluations on many intervals into a single evaluation request. This resolves https://github.com/eth-easl/modyn/issues/536. The integration test is adjusted to cover this new functionality. ## How to review The best way to review is to first take a look at [modyn/protos/evaluator.proto](https://github.com/eth-easl/modyn/pull/554/files#diff-5017e582f2ffeabee6d3cfa42b2ca86f3f7035419df31cc3cff68be568279f04) to see on the API level what is changed. ## Miscellaneous After this PR, batching is only enabled on server side, on client side still one interval is passed at a time, as I am confused on the current way to generate `EvalRequest`. After this PR, @robinholzi could you make a PR to collect the intervals associated with the same `id_model` and `dataset_id`, and pack them in one evaluation round? I think just looking at the function `_single_batched_evaluation` in this file [modyn/supervisor/internal/pipeline_executor/evaluation_executor.py](https://github.com/eth-easl/modyn/pull/554/files#diff-f111ee1d407da6a1b5d46b118dd5221da51801ead85ea2c9271b008299c6a11f) should be enough to understand the change / how to make that PR. It should be a very easy and straightforward PR. I am just confused with the data models there. Thank you so much! --------- Co-authored-by: Maximilian Böther <2116466+MaxiBoether@users.noreply.github.com> --- .../evaluator/integrationtest_evaluator.py | 95 +++-- .../internal/grpc/evaluator_grpc_servicer.py | 175 ++++---- .../internal/grpc/generated/evaluator_pb2.py | 64 +-- .../internal/grpc/generated/evaluator_pb2.pyi | 125 ++++-- modyn/evaluator/internal/pytorch_evaluator.py | 114 +++--- .../internal/utils/evaluation_info.py | 21 +- .../internal/utils/evaluation_process_info.py | 4 - modyn/protos/README.md | 4 +- modyn/protos/evaluator.proto | 46 ++- .../internal/eval/result_writer/__init__.py | 13 - .../abstract_evaluation_result_writer.py | 38 -- .../eval/result_writer/json_result_writer.py | 36 -- .../tensorboard_result_writer.py | 23 -- modyn/supervisor/internal/grpc_handler.py | 145 +++---- .../pipeline_executor/evaluation_executor.py | 105 +++-- .../pipeline_executor/pipeline_executor.py | 22 +- .../grpc/test_evaluator_grpc_servicer.py | 373 ++++++++---------- .../internal/test_pytorch_evaluator.py | 298 +++++++------- .../test_abstract_evaluation_result_writer.py | 10 - .../result_writer/test_json_result_writer.py | 38 -- .../test_tensorboard_result_writer.py | 29 -- .../test_evaluation_executor.py | 56 ++- .../test_pipeline_executor.py | 30 +- .../supervisor/internal/test_grpc_handler.py | 205 ++++------ .../supervisor/internal/test_supervisor.py | 5 - scripts/run_integrationtests.sh | 2 + 26 files changed, 894 insertions(+), 1182 deletions(-) delete mode 100644 modyn/supervisor/internal/eval/result_writer/__init__.py delete mode 100644 modyn/supervisor/internal/eval/result_writer/abstract_evaluation_result_writer.py delete mode 100644 modyn/supervisor/internal/eval/result_writer/json_result_writer.py delete mode 100644 modyn/supervisor/internal/eval/result_writer/tensorboard_result_writer.py delete mode 100644 modyn/tests/supervisor/internal/eval/result_writer/test_abstract_evaluation_result_writer.py delete mode 100644 modyn/tests/supervisor/internal/eval/result_writer/test_json_result_writer.py delete mode 100644 modyn/tests/supervisor/internal/eval/result_writer/test_tensorboard_result_writer.py diff --git a/integrationtests/evaluator/integrationtest_evaluator.py b/integrationtests/evaluator/integrationtest_evaluator.py index fde0442e2..8e062ac9b 100644 --- a/integrationtests/evaluator/integrationtest_evaluator.py +++ b/integrationtests/evaluator/integrationtest_evaluator.py @@ -13,6 +13,7 @@ EvaluateModelRequest, EvaluateModelResponse, EvaluationAbortedReason, + EvaluationInterval, EvaluationResultRequest, EvaluationResultResponse, EvaluationStatusRequest, @@ -27,6 +28,8 @@ from modyn.model_storage.internal.grpc.generated.model_storage_pb2 import RegisterModelRequest, RegisterModelResponse from modyn.model_storage.internal.grpc.generated.model_storage_pb2_grpc import ModelStorageStub from modyn.models import ResNet18 +from modyn.storage.internal.grpc.generated.storage_pb2 import GetDatasetSizeRequest, GetDatasetSizeResponse +from modyn.storage.internal.grpc.generated.storage_pb2_grpc import StorageStub from modyn.utils import calculate_checksum TEST_MODELS_PATH = MODYN_MODELS_PATH / "test_models" @@ -63,6 +66,21 @@ def prepare_dataset(dataset_helper: ImageDatasetHelper) -> Tuple[int, int, int, split_ts2 = int(time.time()) + 1 time.sleep(2) dataset_helper.add_images_to_dataset(start_number=12, end_number=22) + # we need to wait a bit for the server to process the images + + storage_channel = connect_to_server("storage") + storage = StorageStub(storage_channel) + timeout = 60 + start_time = time.time() + request = GetDatasetSizeRequest(dataset_id=DATASET_ID) + resp: GetDatasetSizeResponse = storage.GetDatasetSize(request) + assert resp.success + while resp.num_keys != 22: + time.sleep(2) + if time.time() - start_time > timeout: + raise TimeoutError("Could not get the dataset size in time") + resp = storage.GetDatasetSize(request) + assert resp.success return split_ts1, split_ts2, 5, 7, 10 @@ -114,9 +132,9 @@ def prepare_model() -> int: return register_response.model_id -def evaluate_model( - model_id: int, start_timestamp: Optional[int], end_timestamp: Optional[int], evaluator: EvaluatorStub -) -> EvaluateModelResponse: +def evaluate_model(model_id: int, evaluator: EvaluatorStub, intervals=None) -> EvaluateModelResponse: + if intervals is None: + intervals = [(None, None)] eval_transform_function = ( "import torch\n" "def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n\t" @@ -135,8 +153,10 @@ def evaluate_model( dataset_info=DatasetInfo( dataset_id=DATASET_ID, num_dataloaders=1, - start_timestamp=start_timestamp, - end_timestamp=end_timestamp, + evaluation_intervals=[ + EvaluationInterval(start_timestamp=start_timestamp, end_timestamp=end_timestamp) + for start_timestamp, end_timestamp in intervals + ], ), device="cpu", batch_size=2, @@ -153,38 +173,49 @@ def evaluate_model( def test_evaluator(dataset_helper: ImageDatasetHelper) -> None: - def validate_eval_result(eval_result_resp: EvaluationResultResponse): - assert eval_result_resp.valid - assert len(eval_result_resp.evaluation_data) == 1 - assert eval_result_resp.evaluation_data[0].metric == "Accuracy" - evaluator_channel = connect_to_server("evaluator") evaluator = EvaluatorStub(evaluator_channel) split_ts1, split_ts2, split1_size, split2_size, split3_size = prepare_dataset(dataset_helper) model_id = prepare_model() - eval_model_resp = evaluate_model(model_id, split_ts2, split_ts1, evaluator) - assert not eval_model_resp.evaluation_started, "Evaluation should not start if start_timestamp > end_timestamp" - assert eval_model_resp.dataset_size == 0 - assert eval_model_resp.eval_aborted_reason == EvaluationAbortedReason.EMPTY_DATASET - - # (start_timestamp, end_timestamp, expected_dataset_size) - test_cases = [ - (None, split_ts1, split1_size), - (None, split_ts2, split1_size + split2_size), - (split_ts1, split_ts2, split2_size), - (split_ts1, None, split2_size + split3_size), - (split_ts2, None, split3_size), - (None, None, split1_size + split2_size + split3_size), - (0, split_ts1, split1_size), # verify that 0 has the same effect as None for start_timestamp + intervals = [ + (None, split_ts1), + (None, split_ts2), + (split_ts2, split_ts1), + (split_ts1, split_ts2), + (split_ts1, None), + (split_ts2, None), + (None, None), + (0, split_ts1), # verify that 0 has the same effect as None for start_timestamp ] - for start_ts, end_ts, expected_size in test_cases: - print(f"Testing model with start_timestamp={start_ts}, end_timestamp={end_ts}") - eval_model_resp = evaluate_model(model_id, start_ts, end_ts, evaluator) - assert eval_model_resp.evaluation_started - assert eval_model_resp.dataset_size == expected_size - - eval_result_resp = wait_for_evaluation(eval_model_resp.evaluation_id, evaluator) - validate_eval_result(eval_result_resp) + expected_data_sizes = [ + split1_size, + split1_size + split2_size, + None, + split2_size, + split2_size + split3_size, + split3_size, + split1_size + split2_size + split3_size, + split1_size, + ] + + eval_model_resp = evaluate_model(model_id, evaluator, intervals) + assert eval_model_resp.evaluation_started + assert len(eval_model_resp.interval_responses) == len(intervals) + for interval_resp, expected_size in zip(eval_model_resp.interval_responses, expected_data_sizes): + if expected_size is None: + assert interval_resp.eval_aborted_reason == EvaluationAbortedReason.EMPTY_DATASET + else: + assert interval_resp.dataset_size == expected_size + assert interval_resp.eval_aborted_reason == EvaluationAbortedReason.NOT_ABORTED + + eval_result_resp = wait_for_evaluation(eval_model_resp.evaluation_id, evaluator) + assert eval_result_resp.valid + expected_interval_ids = [idx for idx, data_size in enumerate(expected_data_sizes) if data_size is not None] + assert len(eval_result_resp.evaluation_results) == len(expected_interval_ids) + for interval_data, expected_interval_id in zip(eval_result_resp.evaluation_results, expected_interval_ids): + assert interval_data.interval_index == expected_interval_id + assert len(interval_data.evaluation_data) == 1 + assert interval_data.evaluation_data[0].metric == "Accuracy" if __name__ == "__main__": diff --git a/modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py b/modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py index f39170fc4..8cdbdddf4 100644 --- a/modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py +++ b/modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py @@ -6,29 +6,29 @@ import pathlib import queue from threading import Lock -from typing import Any, Optional +from typing import Optional import grpc from modyn.common.ftp import download_trained_model # pylint: disable-next=no-name-in-module from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import ( + EvaluateModelIntervalResponse, EvaluateModelRequest, EvaluateModelResponse, EvaluationAbortedReason, EvaluationCleanupRequest, EvaluationCleanupResponse, - EvaluationData, + EvaluationIntervalData, EvaluationResultRequest, EvaluationResultResponse, EvaluationStatusRequest, EvaluationStatusResponse, + SingleMetricResult, ) from modyn.evaluator.internal.grpc.generated.evaluator_pb2_grpc import EvaluatorServicer -from modyn.evaluator.internal.metric_factory import MetricFactory -from modyn.evaluator.internal.metrics import AbstractEvaluationMetric from modyn.evaluator.internal.pytorch_evaluator import evaluate -from modyn.evaluator.internal.utils import EvaluationInfo, EvaluationProcessInfo, EvaluatorMessages +from modyn.evaluator.internal.utils import EvaluationInfo, EvaluationProcessInfo from modyn.metadata_database.metadata_database_connection import MetadataDatabaseConnection from modyn.metadata_database.models import TrainedModel @@ -96,21 +96,31 @@ def connect_to_storage(storage_address: str) -> StorageStub: def evaluate_model(self, request: EvaluateModelRequest, context: grpc.ServicerContext) -> EvaluateModelResponse: logger.info("Received evaluate model request.") - + num_intervals = len(request.dataset_info.evaluation_intervals) with MetadataDatabaseConnection(self._config) as database: trained_model: Optional[TrainedModel] = database.session.get(TrainedModel, request.model_id) if not trained_model: logger.error(f"Trained model {request.model_id} does not exist!") return EvaluateModelResponse( - evaluation_started=False, eval_aborted_reason=EvaluationAbortedReason.MODEL_NOT_EXIST_IN_METADATA + evaluation_started=False, + interval_responses=[ + EvaluateModelIntervalResponse( + eval_aborted_reason=EvaluationAbortedReason.MODEL_NOT_EXIST_IN_METADATA + ) + ] + * num_intervals, ) model_class_name, model_config, amp = database.get_model_configuration(trained_model.pipeline_id) if not hasattr(dynamic_module_import("modyn.models"), model_class_name): logger.error(f"Model {model_class_name} not available!") return EvaluateModelResponse( - evaluation_started=False, eval_aborted_reason=EvaluationAbortedReason.MODEL_IMPORT_FAILURE + evaluation_started=False, + interval_responses=[ + EvaluateModelIntervalResponse(eval_aborted_reason=EvaluationAbortedReason.MODEL_IMPORT_FAILURE) + ] + * num_intervals, ) fetch_request = FetchModelRequest(model_id=request.model_id, load_metadata=False) @@ -122,34 +132,45 @@ def evaluate_model(self, request: EvaluateModelRequest, context: grpc.ServicerCo f"Evaluation cannot be started." ) return EvaluateModelResponse( - evaluation_started=False, eval_aborted_reason=EvaluationAbortedReason.MODEL_NOT_EXIST_IN_STORAGE + evaluation_started=False, + interval_responses=[ + EvaluateModelIntervalResponse( + eval_aborted_reason=EvaluationAbortedReason.MODEL_NOT_EXIST_IN_STORAGE + ) + ] + * num_intervals, ) - dataset_info = request.dataset_info - dataset_size_req = GetDatasetSizeRequest( - dataset_id=request.dataset_info.dataset_id, - start_timestamp=dataset_info.start_timestamp if dataset_info.HasField("start_timestamp") else None, - end_timestamp=dataset_info.end_timestamp if dataset_info.HasField("end_timestamp") else None, - ) - dataset_size_response: GetDatasetSizeResponse = self._storage_stub.GetDatasetSize(dataset_size_req) - - dataset_size = dataset_size_response.num_keys - if not dataset_size_response.success: - logger.error( - f"Total number of keys for dataset {dataset_size_req.dataset_id} cannot be fetched. " - f"Evaluation cannot be started." - ) - return EvaluateModelResponse( - evaluation_started=False, eval_aborted_reason=EvaluationAbortedReason.DATASET_NOT_FOUND + interval_responses = [] + not_failed_interval_ids: list[int] = [] + for idx, interval in enumerate(request.dataset_info.evaluation_intervals): + dataset_size_req = GetDatasetSizeRequest( + dataset_id=request.dataset_info.dataset_id, + start_timestamp=interval.start_timestamp if interval.HasField("start_timestamp") else None, + end_timestamp=interval.end_timestamp if interval.HasField("end_timestamp") else None, ) + dataset_size_response: GetDatasetSizeResponse = self._storage_stub.GetDatasetSize(dataset_size_req) - if dataset_size == 0: - logger.info( - f"Dataset {dataset_size_req.dataset_id} is empty in given time interval. Evaluation cannot be started." - ) - return EvaluateModelResponse( - evaluation_started=False, eval_aborted_reason=EvaluationAbortedReason.EMPTY_DATASET - ) + dataset_size = dataset_size_response.num_keys + if not dataset_size_response.success: + interval_responses.append( + EvaluateModelIntervalResponse(eval_aborted_reason=EvaluationAbortedReason.DATASET_NOT_FOUND) + ) + elif dataset_size == 0: + interval_responses.append( + EvaluateModelIntervalResponse(eval_aborted_reason=EvaluationAbortedReason.EMPTY_DATASET) + ) + else: + interval_responses.append( + EvaluateModelIntervalResponse( + eval_aborted_reason=EvaluationAbortedReason.NOT_ABORTED, dataset_size=dataset_size + ) + ) + not_failed_interval_ids.append(idx) + + if len(not_failed_interval_ids) == 0: + logger.error("All evaluations failed. Evaluation cannot be started.") + return EvaluateModelResponse(evaluation_started=False, interval_responses=interval_responses) with self._lock: evaluation_id = self._next_evaluation_id @@ -167,10 +188,12 @@ def evaluate_model(self, request: EvaluateModelRequest, context: grpc.ServicerCo if not trained_model_path: logger.error("Trained model could not be downloaded. Evaluation cannot be started.") return EvaluateModelResponse( - evaluation_started=False, eval_aborted_reason=EvaluationAbortedReason.DOWNLOAD_MODEL_FAILURE + evaluation_started=False, + interval_responses=[ + EvaluateModelIntervalResponse(eval_aborted_reason=EvaluationAbortedReason.DOWNLOAD_MODEL_FAILURE) + ] + * num_intervals, ) - - metrics = self._setup_metrics([metric.value for metric in request.metrics]) evaluation_info = EvaluationInfo( request, evaluation_id, @@ -178,34 +201,22 @@ def evaluate_model(self, request: EvaluateModelRequest, context: grpc.ServicerCo model_config, amp, self._storage_address, - metrics, trained_model_path, + not_failed_interval_ids, ) self._evaluation_dict[evaluation_id] = evaluation_info self._run_evaluation(evaluation_id) logger.info(f"Started evaluation {evaluation_id}.") - return EvaluateModelResponse(evaluation_started=True, evaluation_id=evaluation_id, dataset_size=dataset_size) - - @staticmethod - def _setup_metrics(metric_configurations: list[str]) -> list[AbstractEvaluationMetric]: - metrics = [] - # need to make sure that the metric names are unique as they are used for identification. - metric_names = set() - for configuration_json in metric_configurations: - metric = MetricFactory.get_evaluation_metric(configuration_json) - if metric.get_name() not in metric_names: - metrics.append(metric) - metric_names.add(metric.get_name()) - else: - logger.warning(f"Metric {metric.get_name()} is already registered.") - return metrics + return EvaluateModelResponse( + evaluation_started=True, + evaluation_id=evaluation_id, + interval_responses=interval_responses, + ) def _run_evaluation(self, evaluation_id: int) -> None: exception_queue: mp.Queue[str] = mp.Queue() # pylint: disable=unsubscriptable-object - status_query_queue: mp.Queue[str] = mp.Queue() # pylint: disable=unsubscriptable-object - status_response_queue: mp.Queue[dict[str, Any]] = mp.Queue() # pylint: disable=unsubscriptable-object metric_result_queue: mp.Queue[tuple[str, float]] = mp.Queue() # pylint: disable=unsubscriptable-object process = mp.Process( @@ -214,14 +225,12 @@ def _run_evaluation(self, evaluation_id: int) -> None: self._evaluation_dict[evaluation_id], self._base_dir / f"log-{evaluation_id}.txt", exception_queue, - status_query_queue, - status_response_queue, metric_result_queue, ), ) process.start() self._evaluation_process_dict[evaluation_id] = EvaluationProcessInfo( - process, exception_queue, status_query_queue, status_response_queue, metric_result_queue + process, exception_queue, metric_result_queue ) def get_evaluation_status( @@ -237,42 +246,14 @@ def get_evaluation_status( process_handler = self._evaluation_process_dict[evaluation_id].process_handler if process_handler.is_alive(): logger.info(f"Evaluation {evaluation_id} is still running, obtaining info from running process.") - num_batches, num_samples = self._get_status(evaluation_id) - response_kwargs_running: dict[str, Any] = { - "valid": True, - "is_running": True, - "blocked": num_batches is None, - "state_available": num_batches is not None and num_samples is not None, - "batches_seen": num_batches, - "samples_seen": num_samples, - } - cleaned_kwargs = {k: v for k, v in response_kwargs_running.items() if v is not None} - return EvaluationStatusResponse(**cleaned_kwargs) # type: ignore[arg-type] + return EvaluationStatusResponse(valid=True, is_running=True) exception = self._check_for_evaluation_exception(evaluation_id) logger.info( f"Evaluation {evaluation_id} is no longer running. " f"Process finished {'successfully' if not exception else 'with errors'}." ) - response_kwargs_finished: dict[str, Any] = { - "valid": True, - "is_running": False, - "blocked": False, - "state_available": False, - "exception": exception, - } - cleaned_kwargs = {k: v for k, v in response_kwargs_finished.items() if v is not None} - return EvaluationStatusResponse(**cleaned_kwargs) # type: ignore[arg-type] - - def _get_status(self, evaluation_id: int) -> tuple[Optional[int], Optional[int]]: - status_query_queue = self._evaluation_process_dict[evaluation_id].status_query_queue - status_query_queue.put(EvaluatorMessages.STATUS_QUERY_MESSAGE) - try: - # blocks for 30 seconds - response = self._evaluation_process_dict[evaluation_id].status_response_queue.get(timeout=30) - return response["num_batches"], response["num_samples"] - except queue.Empty: - return None, None + return EvaluationStatusResponse(valid=True, is_running=False, exception=exception) def _check_for_evaluation_exception(self, evaluation_id: int) -> Optional[str]: exception_queue = self._evaluation_process_dict[evaluation_id].exception_queue @@ -301,20 +282,26 @@ def get_evaluation_result( return EvaluationResultResponse(valid=False) logger.info("Returning results of all metrics.") - evaluation_data: list[EvaluationData] = [] + evaluation_data: list[EvaluationIntervalData] = [] metric_result_queue = self._evaluation_process_dict[evaluation_id].metric_result_queue - metric_results: list[tuple[str, float]] = [] - for _ in range(len(self._evaluation_dict[evaluation_id].metrics)): + + while True: try: - metric_results.append(metric_result_queue.get(timeout=0.1)) + interval_idx, metric_result = metric_result_queue.get(timeout=0.1) except queue.Empty: - logger.error(f"Evaluation with id {evaluation_id} did not return all metric results.") break + metric_result = [SingleMetricResult(metric=name, result=result) for name, result in metric_result] + single_eval_data = EvaluationIntervalData(interval_index=interval_idx, evaluation_data=metric_result) - for name, result in metric_results: - evaluation_data.append(EvaluationData(metric=name, result=result)) - return EvaluationResultResponse(valid=True, evaluation_data=evaluation_data) + evaluation_data.append(single_eval_data) + if len(evaluation_data) < len(self._evaluation_dict[evaluation_id].not_failed_interval_ids): + logger.error( + f"Could not retrieve results for all intervals of evaluation {evaluation_id}. " + f"Expected {len(self._evaluation_dict[evaluation_id].not_failed_interval_ids)}, " + f"but got {len(evaluation_data)}. Maybe an exception happened during evaluation." + ) + return EvaluationResultResponse(valid=True, evaluation_results=evaluation_data) def cleanup_evaluations( self, request: EvaluationCleanupRequest, context: grpc.ServicerContext diff --git a/modyn/evaluator/internal/grpc/generated/evaluator_pb2.py b/modyn/evaluator/internal/grpc/generated/evaluator_pb2.py index 26df2fecf..f1152d740 100644 --- a/modyn/evaluator/internal/grpc/generated/evaluator_pb2.py +++ b/modyn/evaluator/internal/grpc/generated/evaluator_pb2.py @@ -15,39 +15,45 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0f\x65valuator.proto\x12\x0fmodyn.evaluator\"\x9a\x01\n\x0b\x44\x61tasetInfo\x12\x12\n\ndataset_id\x18\x01 \x01(\t\x12\x17\n\x0fnum_dataloaders\x18\x02 \x01(\x05\x12\x1c\n\x0fstart_timestamp\x18\x03 \x01(\x03H\x00\x88\x01\x01\x12\x1a\n\rend_timestamp\x18\x04 \x01(\x03H\x01\x88\x01\x01\x42\x12\n\x10_start_timestampB\x10\n\x0e_end_timestamp\"\x1d\n\x0cPythonString\x12\r\n\x05value\x18\x01 \x01(\t\"\x1b\n\nJsonString\x12\r\n\x05value\x18\x01 \x01(\t\"\xfa\x02\n\x14\x45valuateModelRequest\x12\x10\n\x08model_id\x18\x01 \x01(\x05\x12\x32\n\x0c\x64\x61taset_info\x18\x02 \x01(\x0b\x32\x1c.modyn.evaluator.DatasetInfo\x12\x0e\n\x06\x64\x65vice\x18\x03 \x01(\t\x12\x12\n\nbatch_size\x18\x04 \x01(\x05\x12,\n\x07metrics\x18\x05 \x03(\x0b\x32\x1b.modyn.evaluator.JsonString\x12\x16\n\x0etransform_list\x18\x06 \x03(\t\x12\x33\n\x0c\x62ytes_parser\x18\x07 \x01(\x0b\x32\x1d.modyn.evaluator.PythonString\x12\x38\n\x11label_transformer\x18\x08 \x01(\x0b\x32\x1d.modyn.evaluator.PythonString\x12\x35\n\ttokenizer\x18\t \x01(\x0b\x32\x1d.modyn.evaluator.PythonStringH\x00\x88\x01\x01\x42\x0c\n\n_tokenizer\"\xa7\x01\n\x15\x45valuateModelResponse\x12\x1a\n\x12\x65valuation_started\x18\x01 \x01(\x08\x12\x15\n\revaluation_id\x18\x02 \x01(\x05\x12\x14\n\x0c\x64\x61taset_size\x18\x03 \x01(\x03\x12\x45\n\x13\x65val_aborted_reason\x18\x04 \x01(\x0e\x32(.modyn.evaluator.EvaluationAbortedReason\"0\n\x17\x45valuationStatusRequest\x12\x15\n\revaluation_id\x18\x01 \x01(\x05\"\xe5\x01\n\x18\x45valuationStatusResponse\x12\r\n\x05valid\x18\x01 \x01(\x08\x12\x12\n\nis_running\x18\x02 \x01(\x08\x12\x17\n\x0fstate_available\x18\x03 \x01(\x08\x12\x0f\n\x07\x62locked\x18\x04 \x01(\x08\x12\x16\n\texception\x18\x05 \x01(\tH\x00\x88\x01\x01\x12\x19\n\x0c\x62\x61tches_seen\x18\x06 \x01(\x03H\x01\x88\x01\x01\x12\x19\n\x0csamples_seen\x18\x07 \x01(\x03H\x02\x88\x01\x01\x42\x0c\n\n_exceptionB\x0f\n\r_batches_seenB\x0f\n\r_samples_seen\"0\n\x0e\x45valuationData\x12\x0e\n\x06metric\x18\x01 \x01(\t\x12\x0e\n\x06result\x18\x02 \x01(\x02\"0\n\x17\x45valuationResultRequest\x12\x15\n\revaluation_id\x18\x01 \x01(\x05\"2\n\x18\x45valuationCleanupRequest\x12\x16\n\x0e\x65valuation_ids\x18\x01 \x03(\x05\"c\n\x18\x45valuationResultResponse\x12\r\n\x05valid\x18\x01 \x01(\x08\x12\x38\n\x0f\x65valuation_data\x18\x02 \x03(\x0b\x32\x1f.modyn.evaluator.EvaluationData\".\n\x19\x45valuationCleanupResponse\x12\x11\n\tsucceeded\x18\x01 \x03(\x05*\xc7\x01\n\x17\x45valuationAbortedReason\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x1f\n\x1bMODEL_NOT_EXIST_IN_METADATA\x10\x01\x12\x18\n\x14MODEL_IMPORT_FAILURE\x10\x02\x12\x1e\n\x1aMODEL_NOT_EXIST_IN_STORAGE\x10\x03\x12\x15\n\x11\x44\x41TASET_NOT_FOUND\x10\x04\x12\x11\n\rEMPTY_DATASET\x10\x05\x12\x1a\n\x16\x44OWNLOAD_MODEL_FAILURE\x10\x06\x32\xbe\x03\n\tEvaluator\x12\x61\n\x0e\x65valuate_model\x12%.modyn.evaluator.EvaluateModelRequest\x1a&.modyn.evaluator.EvaluateModelResponse\"\x00\x12n\n\x15get_evaluation_status\x12(.modyn.evaluator.EvaluationStatusRequest\x1a).modyn.evaluator.EvaluationStatusResponse\"\x00\x12n\n\x15get_evaluation_result\x12(.modyn.evaluator.EvaluationResultRequest\x1a).modyn.evaluator.EvaluationResultResponse\"\x00\x12n\n\x13\x63leanup_evaluations\x12).modyn.evaluator.EvaluationCleanupRequest\x1a*.modyn.evaluator.EvaluationCleanupResponse\"\x00\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0f\x65valuator.proto\x12\x0fmodyn.evaluator\"t\n\x12\x45valuationInterval\x12\x1c\n\x0fstart_timestamp\x18\x01 \x01(\x03H\x00\x88\x01\x01\x12\x1a\n\rend_timestamp\x18\x02 \x01(\x03H\x01\x88\x01\x01\x42\x12\n\x10_start_timestampB\x10\n\x0e_end_timestamp\"}\n\x0b\x44\x61tasetInfo\x12\x12\n\ndataset_id\x18\x01 \x01(\t\x12\x17\n\x0fnum_dataloaders\x18\x02 \x01(\x05\x12\x41\n\x14\x65valuation_intervals\x18\x03 \x03(\x0b\x32#.modyn.evaluator.EvaluationInterval\"\x1d\n\x0cPythonString\x12\r\n\x05value\x18\x01 \x01(\t\"\x1b\n\nJsonString\x12\r\n\x05value\x18\x01 \x01(\t\"\xfa\x02\n\x14\x45valuateModelRequest\x12\x10\n\x08model_id\x18\x01 \x01(\x05\x12\x32\n\x0c\x64\x61taset_info\x18\x02 \x01(\x0b\x32\x1c.modyn.evaluator.DatasetInfo\x12\x0e\n\x06\x64\x65vice\x18\x03 \x01(\t\x12\x12\n\nbatch_size\x18\x04 \x01(\x05\x12,\n\x07metrics\x18\x05 \x03(\x0b\x32\x1b.modyn.evaluator.JsonString\x12\x16\n\x0etransform_list\x18\x06 \x03(\t\x12\x33\n\x0c\x62ytes_parser\x18\x07 \x01(\x0b\x32\x1d.modyn.evaluator.PythonString\x12\x38\n\x11label_transformer\x18\x08 \x01(\x0b\x32\x1d.modyn.evaluator.PythonString\x12\x35\n\ttokenizer\x18\t \x01(\x0b\x32\x1d.modyn.evaluator.PythonStringH\x00\x88\x01\x01\x42\x0c\n\n_tokenizer\"|\n\x1d\x45valuateModelIntervalResponse\x12\x14\n\x0c\x64\x61taset_size\x18\x01 \x01(\x03\x12\x45\n\x13\x65val_aborted_reason\x18\x02 \x01(\x0e\x32(.modyn.evaluator.EvaluationAbortedReason\"\x96\x01\n\x15\x45valuateModelResponse\x12\x1a\n\x12\x65valuation_started\x18\x01 \x01(\x08\x12\x15\n\revaluation_id\x18\x02 \x01(\x05\x12J\n\x12interval_responses\x18\x03 \x03(\x0b\x32..modyn.evaluator.EvaluateModelIntervalResponse\"0\n\x17\x45valuationStatusRequest\x12\x15\n\revaluation_id\x18\x01 \x01(\x05\"c\n\x18\x45valuationStatusResponse\x12\r\n\x05valid\x18\x01 \x01(\x08\x12\x12\n\nis_running\x18\x02 \x01(\x08\x12\x16\n\texception\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x0c\n\n_exception\"4\n\x12SingleMetricResult\x12\x0e\n\x06metric\x18\x01 \x01(\t\x12\x0e\n\x06result\x18\x02 \x01(\x02\"n\n\x16\x45valuationIntervalData\x12\x16\n\x0einterval_index\x18\x01 \x01(\x05\x12<\n\x0f\x65valuation_data\x18\x02 \x03(\x0b\x32#.modyn.evaluator.SingleMetricResult\"0\n\x17\x45valuationResultRequest\x12\x15\n\revaluation_id\x18\x01 \x01(\x05\"2\n\x18\x45valuationCleanupRequest\x12\x16\n\x0e\x65valuation_ids\x18\x01 \x03(\x05\"n\n\x18\x45valuationResultResponse\x12\r\n\x05valid\x18\x01 \x01(\x08\x12\x43\n\x12\x65valuation_results\x18\x02 \x03(\x0b\x32\'.modyn.evaluator.EvaluationIntervalData\".\n\x19\x45valuationCleanupResponse\x12\x11\n\tsucceeded\x18\x01 \x03(\x05*\xcb\x01\n\x17\x45valuationAbortedReason\x12\x0f\n\x0bNOT_ABORTED\x10\x00\x12\x1f\n\x1bMODEL_NOT_EXIST_IN_METADATA\x10\x01\x12\x18\n\x14MODEL_IMPORT_FAILURE\x10\x02\x12\x1e\n\x1aMODEL_NOT_EXIST_IN_STORAGE\x10\x03\x12\x15\n\x11\x44\x41TASET_NOT_FOUND\x10\x04\x12\x11\n\rEMPTY_DATASET\x10\x05\x12\x1a\n\x16\x44OWNLOAD_MODEL_FAILURE\x10\x06\x32\xbe\x03\n\tEvaluator\x12\x61\n\x0e\x65valuate_model\x12%.modyn.evaluator.EvaluateModelRequest\x1a&.modyn.evaluator.EvaluateModelResponse\"\x00\x12n\n\x15get_evaluation_status\x12(.modyn.evaluator.EvaluationStatusRequest\x1a).modyn.evaluator.EvaluationStatusResponse\"\x00\x12n\n\x15get_evaluation_result\x12(.modyn.evaluator.EvaluationResultRequest\x1a).modyn.evaluator.EvaluationResultResponse\"\x00\x12n\n\x13\x63leanup_evaluations\x12).modyn.evaluator.EvaluationCleanupRequest\x1a*.modyn.evaluator.EvaluationCleanupResponse\"\x00\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'evaluator_pb2', _globals) if not _descriptor._USE_C_DESCRIPTORS: DESCRIPTOR._loaded_options = None - _globals['_EVALUATIONABORTEDREASON']._serialized_start=1388 - _globals['_EVALUATIONABORTEDREASON']._serialized_end=1587 - _globals['_DATASETINFO']._serialized_start=37 - _globals['_DATASETINFO']._serialized_end=191 - _globals['_PYTHONSTRING']._serialized_start=193 - _globals['_PYTHONSTRING']._serialized_end=222 - _globals['_JSONSTRING']._serialized_start=224 - _globals['_JSONSTRING']._serialized_end=251 - _globals['_EVALUATEMODELREQUEST']._serialized_start=254 - _globals['_EVALUATEMODELREQUEST']._serialized_end=632 - _globals['_EVALUATEMODELRESPONSE']._serialized_start=635 - _globals['_EVALUATEMODELRESPONSE']._serialized_end=802 - _globals['_EVALUATIONSTATUSREQUEST']._serialized_start=804 - _globals['_EVALUATIONSTATUSREQUEST']._serialized_end=852 - _globals['_EVALUATIONSTATUSRESPONSE']._serialized_start=855 - _globals['_EVALUATIONSTATUSRESPONSE']._serialized_end=1084 - _globals['_EVALUATIONDATA']._serialized_start=1086 - _globals['_EVALUATIONDATA']._serialized_end=1134 - _globals['_EVALUATIONRESULTREQUEST']._serialized_start=1136 - _globals['_EVALUATIONRESULTREQUEST']._serialized_end=1184 - _globals['_EVALUATIONCLEANUPREQUEST']._serialized_start=1186 - _globals['_EVALUATIONCLEANUPREQUEST']._serialized_end=1236 - _globals['_EVALUATIONRESULTRESPONSE']._serialized_start=1238 - _globals['_EVALUATIONRESULTRESPONSE']._serialized_end=1337 - _globals['_EVALUATIONCLEANUPRESPONSE']._serialized_start=1339 - _globals['_EVALUATIONCLEANUPRESPONSE']._serialized_end=1385 - _globals['_EVALUATOR']._serialized_start=1590 - _globals['_EVALUATOR']._serialized_end=2036 + _globals['_EVALUATIONABORTEDREASON']._serialized_start=1581 + _globals['_EVALUATIONABORTEDREASON']._serialized_end=1784 + _globals['_EVALUATIONINTERVAL']._serialized_start=36 + _globals['_EVALUATIONINTERVAL']._serialized_end=152 + _globals['_DATASETINFO']._serialized_start=154 + _globals['_DATASETINFO']._serialized_end=279 + _globals['_PYTHONSTRING']._serialized_start=281 + _globals['_PYTHONSTRING']._serialized_end=310 + _globals['_JSONSTRING']._serialized_start=312 + _globals['_JSONSTRING']._serialized_end=339 + _globals['_EVALUATEMODELREQUEST']._serialized_start=342 + _globals['_EVALUATEMODELREQUEST']._serialized_end=720 + _globals['_EVALUATEMODELINTERVALRESPONSE']._serialized_start=722 + _globals['_EVALUATEMODELINTERVALRESPONSE']._serialized_end=846 + _globals['_EVALUATEMODELRESPONSE']._serialized_start=849 + _globals['_EVALUATEMODELRESPONSE']._serialized_end=999 + _globals['_EVALUATIONSTATUSREQUEST']._serialized_start=1001 + _globals['_EVALUATIONSTATUSREQUEST']._serialized_end=1049 + _globals['_EVALUATIONSTATUSRESPONSE']._serialized_start=1051 + _globals['_EVALUATIONSTATUSRESPONSE']._serialized_end=1150 + _globals['_SINGLEMETRICRESULT']._serialized_start=1152 + _globals['_SINGLEMETRICRESULT']._serialized_end=1204 + _globals['_EVALUATIONINTERVALDATA']._serialized_start=1206 + _globals['_EVALUATIONINTERVALDATA']._serialized_end=1316 + _globals['_EVALUATIONRESULTREQUEST']._serialized_start=1318 + _globals['_EVALUATIONRESULTREQUEST']._serialized_end=1366 + _globals['_EVALUATIONCLEANUPREQUEST']._serialized_start=1368 + _globals['_EVALUATIONCLEANUPREQUEST']._serialized_end=1418 + _globals['_EVALUATIONRESULTRESPONSE']._serialized_start=1420 + _globals['_EVALUATIONRESULTRESPONSE']._serialized_end=1530 + _globals['_EVALUATIONCLEANUPRESPONSE']._serialized_start=1532 + _globals['_EVALUATIONCLEANUPRESPONSE']._serialized_end=1578 + _globals['_EVALUATOR']._serialized_start=1787 + _globals['_EVALUATOR']._serialized_end=2233 # @@protoc_insertion_point(module_scope) diff --git a/modyn/evaluator/internal/grpc/generated/evaluator_pb2.pyi b/modyn/evaluator/internal/grpc/generated/evaluator_pb2.pyi index 41f06124c..3f5fabb86 100644 --- a/modyn/evaluator/internal/grpc/generated/evaluator_pb2.pyi +++ b/modyn/evaluator/internal/grpc/generated/evaluator_pb2.pyi @@ -25,7 +25,7 @@ class _EvaluationAbortedReason: class _EvaluationAbortedReasonEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[_EvaluationAbortedReason.ValueType], builtins.type): DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor - UNKNOWN: _EvaluationAbortedReason.ValueType # 0 + NOT_ABORTED: _EvaluationAbortedReason.ValueType # 0 MODEL_NOT_EXIST_IN_METADATA: _EvaluationAbortedReason.ValueType # 1 MODEL_IMPORT_FAILURE: _EvaluationAbortedReason.ValueType # 2 MODEL_NOT_EXIST_IN_STORAGE: _EvaluationAbortedReason.ValueType # 3 @@ -35,7 +35,7 @@ class _EvaluationAbortedReasonEnumTypeWrapper(google.protobuf.internal.enum_type class EvaluationAbortedReason(_EvaluationAbortedReason, metaclass=_EvaluationAbortedReasonEnumTypeWrapper): ... -UNKNOWN: EvaluationAbortedReason.ValueType # 0 +NOT_ABORTED: EvaluationAbortedReason.ValueType # 0 MODEL_NOT_EXIST_IN_METADATA: EvaluationAbortedReason.ValueType # 1 MODEL_IMPORT_FAILURE: EvaluationAbortedReason.ValueType # 2 MODEL_NOT_EXIST_IN_STORAGE: EvaluationAbortedReason.ValueType # 3 @@ -45,32 +45,48 @@ DOWNLOAD_MODEL_FAILURE: EvaluationAbortedReason.ValueType # 6 global___EvaluationAbortedReason = EvaluationAbortedReason @typing.final -class DatasetInfo(google.protobuf.message.Message): +class EvaluationInterval(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor - DATASET_ID_FIELD_NUMBER: builtins.int - NUM_DATALOADERS_FIELD_NUMBER: builtins.int START_TIMESTAMP_FIELD_NUMBER: builtins.int END_TIMESTAMP_FIELD_NUMBER: builtins.int - dataset_id: builtins.str - num_dataloaders: builtins.int start_timestamp: builtins.int end_timestamp: builtins.int def __init__( self, *, - dataset_id: builtins.str = ..., - num_dataloaders: builtins.int = ..., start_timestamp: builtins.int | None = ..., end_timestamp: builtins.int | None = ..., ) -> None: ... def HasField(self, field_name: typing.Literal["_end_timestamp", b"_end_timestamp", "_start_timestamp", b"_start_timestamp", "end_timestamp", b"end_timestamp", "start_timestamp", b"start_timestamp"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["_end_timestamp", b"_end_timestamp", "_start_timestamp", b"_start_timestamp", "dataset_id", b"dataset_id", "end_timestamp", b"end_timestamp", "num_dataloaders", b"num_dataloaders", "start_timestamp", b"start_timestamp"]) -> None: ... + def ClearField(self, field_name: typing.Literal["_end_timestamp", b"_end_timestamp", "_start_timestamp", b"_start_timestamp", "end_timestamp", b"end_timestamp", "start_timestamp", b"start_timestamp"]) -> None: ... @typing.overload def WhichOneof(self, oneof_group: typing.Literal["_end_timestamp", b"_end_timestamp"]) -> typing.Literal["end_timestamp"] | None: ... @typing.overload def WhichOneof(self, oneof_group: typing.Literal["_start_timestamp", b"_start_timestamp"]) -> typing.Literal["start_timestamp"] | None: ... +global___EvaluationInterval = EvaluationInterval + +@typing.final +class DatasetInfo(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + DATASET_ID_FIELD_NUMBER: builtins.int + NUM_DATALOADERS_FIELD_NUMBER: builtins.int + EVALUATION_INTERVALS_FIELD_NUMBER: builtins.int + dataset_id: builtins.str + num_dataloaders: builtins.int + @property + def evaluation_intervals(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___EvaluationInterval]: ... + def __init__( + self, + *, + dataset_id: builtins.str = ..., + num_dataloaders: builtins.int = ..., + evaluation_intervals: collections.abc.Iterable[global___EvaluationInterval] | None = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["dataset_id", b"dataset_id", "evaluation_intervals", b"evaluation_intervals", "num_dataloaders", b"num_dataloaders"]) -> None: ... + global___DatasetInfo = DatasetInfo @typing.final @@ -150,27 +166,47 @@ class EvaluateModelRequest(google.protobuf.message.Message): global___EvaluateModelRequest = EvaluateModelRequest +@typing.final +class EvaluateModelIntervalResponse(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + DATASET_SIZE_FIELD_NUMBER: builtins.int + EVAL_ABORTED_REASON_FIELD_NUMBER: builtins.int + dataset_size: builtins.int + """this value is only meaningful when eval_aborted_reason is NOT_ABORTED""" + eval_aborted_reason: global___EvaluationAbortedReason.ValueType + def __init__( + self, + *, + dataset_size: builtins.int = ..., + eval_aborted_reason: global___EvaluationAbortedReason.ValueType = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["dataset_size", b"dataset_size", "eval_aborted_reason", b"eval_aborted_reason"]) -> None: ... + +global___EvaluateModelIntervalResponse = EvaluateModelIntervalResponse + @typing.final class EvaluateModelResponse(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor EVALUATION_STARTED_FIELD_NUMBER: builtins.int EVALUATION_ID_FIELD_NUMBER: builtins.int - DATASET_SIZE_FIELD_NUMBER: builtins.int - EVAL_ABORTED_REASON_FIELD_NUMBER: builtins.int + INTERVAL_RESPONSES_FIELD_NUMBER: builtins.int evaluation_started: builtins.bool + """only when all interval evaluations failed, this field will be set to false""" evaluation_id: builtins.int - dataset_size: builtins.int - eval_aborted_reason: global___EvaluationAbortedReason.ValueType + @property + def interval_responses(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___EvaluateModelIntervalResponse]: + """always has the same size as the number of intervals""" + def __init__( self, *, evaluation_started: builtins.bool = ..., evaluation_id: builtins.int = ..., - dataset_size: builtins.int = ..., - eval_aborted_reason: global___EvaluationAbortedReason.ValueType = ..., + interval_responses: collections.abc.Iterable[global___EvaluateModelIntervalResponse] | None = ..., ) -> None: ... - def ClearField(self, field_name: typing.Literal["dataset_size", b"dataset_size", "eval_aborted_reason", b"eval_aborted_reason", "evaluation_id", b"evaluation_id", "evaluation_started", b"evaluation_started"]) -> None: ... + def ClearField(self, field_name: typing.Literal["evaluation_id", b"evaluation_id", "evaluation_started", b"evaluation_started", "interval_responses", b"interval_responses"]) -> None: ... global___EvaluateModelResponse = EvaluateModelResponse @@ -195,42 +231,25 @@ class EvaluationStatusResponse(google.protobuf.message.Message): VALID_FIELD_NUMBER: builtins.int IS_RUNNING_FIELD_NUMBER: builtins.int - STATE_AVAILABLE_FIELD_NUMBER: builtins.int - BLOCKED_FIELD_NUMBER: builtins.int EXCEPTION_FIELD_NUMBER: builtins.int - BATCHES_SEEN_FIELD_NUMBER: builtins.int - SAMPLES_SEEN_FIELD_NUMBER: builtins.int valid: builtins.bool is_running: builtins.bool - state_available: builtins.bool - blocked: builtins.bool exception: builtins.str - batches_seen: builtins.int - samples_seen: builtins.int def __init__( self, *, valid: builtins.bool = ..., is_running: builtins.bool = ..., - state_available: builtins.bool = ..., - blocked: builtins.bool = ..., exception: builtins.str | None = ..., - batches_seen: builtins.int | None = ..., - samples_seen: builtins.int | None = ..., ) -> None: ... - def HasField(self, field_name: typing.Literal["_batches_seen", b"_batches_seen", "_exception", b"_exception", "_samples_seen", b"_samples_seen", "batches_seen", b"batches_seen", "exception", b"exception", "samples_seen", b"samples_seen"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["_batches_seen", b"_batches_seen", "_exception", b"_exception", "_samples_seen", b"_samples_seen", "batches_seen", b"batches_seen", "blocked", b"blocked", "exception", b"exception", "is_running", b"is_running", "samples_seen", b"samples_seen", "state_available", b"state_available", "valid", b"valid"]) -> None: ... - @typing.overload - def WhichOneof(self, oneof_group: typing.Literal["_batches_seen", b"_batches_seen"]) -> typing.Literal["batches_seen"] | None: ... - @typing.overload + def HasField(self, field_name: typing.Literal["_exception", b"_exception", "exception", b"exception"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["_exception", b"_exception", "exception", b"exception", "is_running", b"is_running", "valid", b"valid"]) -> None: ... def WhichOneof(self, oneof_group: typing.Literal["_exception", b"_exception"]) -> typing.Literal["exception"] | None: ... - @typing.overload - def WhichOneof(self, oneof_group: typing.Literal["_samples_seen", b"_samples_seen"]) -> typing.Literal["samples_seen"] | None: ... global___EvaluationStatusResponse = EvaluationStatusResponse @typing.final -class EvaluationData(google.protobuf.message.Message): +class SingleMetricResult(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor METRIC_FIELD_NUMBER: builtins.int @@ -245,7 +264,27 @@ class EvaluationData(google.protobuf.message.Message): ) -> None: ... def ClearField(self, field_name: typing.Literal["metric", b"metric", "result", b"result"]) -> None: ... -global___EvaluationData = EvaluationData +global___SingleMetricResult = SingleMetricResult + +@typing.final +class EvaluationIntervalData(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + INTERVAL_INDEX_FIELD_NUMBER: builtins.int + EVALUATION_DATA_FIELD_NUMBER: builtins.int + interval_index: builtins.int + """multiple metrics are required on on evaluation on one interval""" + @property + def evaluation_data(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___SingleMetricResult]: ... + def __init__( + self, + *, + interval_index: builtins.int = ..., + evaluation_data: collections.abc.Iterable[global___SingleMetricResult] | None = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["evaluation_data", b"evaluation_data", "interval_index", b"interval_index"]) -> None: ... + +global___EvaluationIntervalData = EvaluationIntervalData @typing.final class EvaluationResultRequest(google.protobuf.message.Message): @@ -283,17 +322,19 @@ class EvaluationResultResponse(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor VALID_FIELD_NUMBER: builtins.int - EVALUATION_DATA_FIELD_NUMBER: builtins.int + EVALUATION_RESULTS_FIELD_NUMBER: builtins.int valid: builtins.bool @property - def evaluation_data(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___EvaluationData]: ... + def evaluation_results(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___EvaluationIntervalData]: + """each element in the list corresponds to the evaluation results on a single interval""" + def __init__( self, *, valid: builtins.bool = ..., - evaluation_data: collections.abc.Iterable[global___EvaluationData] | None = ..., + evaluation_results: collections.abc.Iterable[global___EvaluationIntervalData] | None = ..., ) -> None: ... - def ClearField(self, field_name: typing.Literal["evaluation_data", b"evaluation_data", "valid", b"valid"]) -> None: ... + def ClearField(self, field_name: typing.Literal["evaluation_results", b"evaluation_results", "valid", b"valid"]) -> None: ... global___EvaluationResultResponse = EvaluationResultResponse diff --git a/modyn/evaluator/internal/pytorch_evaluator.py b/modyn/evaluator/internal/pytorch_evaluator.py index f6b21649c..a1575a673 100644 --- a/modyn/evaluator/internal/pytorch_evaluator.py +++ b/modyn/evaluator/internal/pytorch_evaluator.py @@ -3,42 +3,35 @@ import multiprocessing as mp import os import pathlib -import queue import traceback -from typing import Union +from typing import Optional, Union import torch from modyn.evaluator.internal.dataset.evaluation_dataset import EvaluationDataset from modyn.evaluator.internal.metric_factory import MetricFactory -from modyn.evaluator.internal.metrics import AbstractDecomposableMetric, AbstractHolisticMetric -from modyn.evaluator.internal.utils import EvaluationInfo, EvaluatorMessages +from modyn.evaluator.internal.metrics import ( + AbstractDecomposableMetric, + AbstractEvaluationMetric, + AbstractHolisticMetric, +) +from modyn.evaluator.internal.utils import EvaluationInfo from modyn.utils import LABEL_TRANSFORMER_FUNC_NAME, deserialize_function class PytorchEvaluator: # pylint: disable=too-many-branches - def __init__( - self, - evaluation_info: EvaluationInfo, - status_query_queue: mp.Queue, - status_response_queue: mp.Queue, - metric_result_queue: mp.Queue, - logger: logging.Logger, - ) -> None: + def __init__(self, evaluation_info: EvaluationInfo, logger: logging.Logger, metric_result_queue: mp.Queue) -> None: self.logger = logger self._evaluation_id = evaluation_info.evaluation_id - + self._metric_result_queue = metric_result_queue self._model = evaluation_info.model_handler( evaluation_info.model_configuration_dict, evaluation_info.device, evaluation_info.amp ) self._load_state(evaluation_info.model_path) - # setup dataloaders - self._info("Setting up data loaders.") - self._dataloader = self._prepare_dataloader(evaluation_info) + self._eval_info = evaluation_info - self._metrics = evaluation_info.metrics self._label_transformer_function = deserialize_function( evaluation_info.label_transformer, LABEL_TRANSFORMER_FUNC_NAME ) @@ -47,17 +40,12 @@ def __init__( self._device_type = "cuda" if "cuda" in self._device else "cpu" self._amp = evaluation_info.amp - self._status_query_queue = status_query_queue - self._status_response_queue = status_response_queue - self._metric_result_queue = metric_result_queue - - self._num_samples = 0 - self._contains_holistic_metric = MetricFactory.prepare_metrics(self._metrics) - self._info("Initialized PyTorch evaluator.") - def _prepare_dataloader(self, evaluation_info: EvaluationInfo) -> torch.utils.data.DataLoader: - self._debug("Creating EvaluationDataset.") + @staticmethod + def _prepare_dataloader( + evaluation_info: EvaluationInfo, start_timestamp: Optional[int], end_timestamp: Optional[int] + ) -> torch.utils.data.DataLoader: dataset = EvaluationDataset( evaluation_info.dataset_id, evaluation_info.bytes_parser, @@ -65,10 +53,9 @@ def _prepare_dataloader(self, evaluation_info: EvaluationInfo) -> torch.utils.da evaluation_info.storage_address, evaluation_info.evaluation_id, evaluation_info.tokenizer, - evaluation_info.start_timestamp, - evaluation_info.end_timestamp, + start_timestamp, + end_timestamp, ) - self._debug("Creating DataLoader.") dataloader = torch.utils.data.DataLoader( dataset, batch_size=evaluation_info.batch_size, @@ -78,6 +65,18 @@ def _prepare_dataloader(self, evaluation_info: EvaluationInfo) -> torch.utils.da return dataloader + @staticmethod + def _setup_metrics(metric_configurations: list[str]) -> list[AbstractEvaluationMetric]: + metrics = [] + # need to make sure that the metric names are unique as they are used for identification. + metric_names = set() + for configuration_json in metric_configurations: + metric = MetricFactory.get_evaluation_metric(configuration_json) + if metric.get_name() not in metric_names: + metrics.append(metric) + metric_names.add(metric.get_name()) + return metrics + def _info(self, msg: str) -> None: self.logger.info(f"[Evaluation {self._evaluation_id}] {msg}") @@ -97,31 +96,18 @@ def _load_state(self, path: pathlib.Path) -> None: # delete trained model from disk path.unlink() - def send_status_to_server(self, batch_number: int) -> None: - self._status_response_queue.put({"num_batches": batch_number, "num_samples": self._num_samples}) - - def evaluate(self) -> None: + # pylint: disable-next=too-many-locals + def _single_interval_evaluate(self, dataloader: torch.utils.data.DataLoader, interval_idx: int) -> None: self._info(f"Process {os.getpid()} starts evaluation.") - + metrics = self._setup_metrics(self._eval_info.raw_metrics) + contains_holistic_metric = MetricFactory.prepare_metrics(metrics) y_true = [] y_score = [] self._model.model.eval() + num_samples = 0 with torch.inference_mode(): - batch_number = -1 - for batch_number, batch in enumerate(self._dataloader): - # As empty() is unreliable - # we try to fetch an element within 10ms. If there is no - # element within that timeframe returned, we continue. - try: - req = self._status_query_queue.get(timeout=0.01) - if req == EvaluatorMessages.STATUS_QUERY_MESSAGE: - self.send_status_to_server(batch_number) - else: - raise ValueError("Unknown message in the status query queue") - except queue.Empty: - pass - + for batch in dataloader: data: Union[torch.Tensor, dict] if isinstance(batch[1], torch.Tensor): data = batch[1].to(self._device) @@ -142,38 +128,42 @@ def evaluate(self) -> None: with torch.autocast(self._device_type, enabled=self._amp): output = self._model.model(data) - for metric in self._metrics: + for metric in metrics: if isinstance(metric, AbstractDecomposableMetric): metric.evaluate_batch(target, output, batch_size) - if self._contains_holistic_metric: + if contains_holistic_metric: y_true.append(target.detach().cpu()) y_score.append(output.detach().cpu()) - self._num_samples += batch_size + num_samples += batch_size if len(y_true) > 0: - assert self._contains_holistic_metric # We only track y_true in case of holistic metrics + assert contains_holistic_metric # We only track y_true in case of holistic metrics y_true = torch.cat(y_true) y_score = torch.cat(y_score) - for metric in self._metrics: + for metric in metrics: if isinstance(metric, AbstractHolisticMetric): - metric.evaluate_dataset(y_true, y_score, self._num_samples) + metric.evaluate_dataset(y_true, y_score, num_samples) - # We do this since we might also have just non-holistic metrics, in which case len(y_true) always is 0 - for metric in self._metrics: - self._metric_result_queue.put((metric.get_name(), metric.get_evaluation_result())) + metric_result = [] + for metric in metrics: + metric_result.append((metric.get_name(), metric.get_evaluation_result())) + self._metric_result_queue.put((interval_idx, metric_result)) + self._info(f"Finished evaluation: {num_samples} samples") - self._info(f"Finished evaluation: {self._num_samples} samples, {batch_number + 1} batches.") + def evaluate(self) -> None: + for interval_idx in self._eval_info.not_failed_interval_ids: + interval = self._eval_info.all_evaluation_intervals[interval_idx] + dataloader = self._prepare_dataloader(self._eval_info, interval[0], interval[1]) + self._single_interval_evaluate(dataloader, interval_idx) def evaluate( evaluation_info: EvaluationInfo, log_path: pathlib.Path, exception_queue: mp.Queue, - status_query_queue: mp.Queue, - status_response_queue: mp.Queue, metric_result_queue: mp.Queue, ) -> None: logging.basicConfig( @@ -186,9 +176,7 @@ def evaluate( logger.addHandler(file_handler) try: - evaluator = PytorchEvaluator( - evaluation_info, status_query_queue, status_response_queue, metric_result_queue, logger - ) + evaluator = PytorchEvaluator(evaluation_info, logger, metric_result_queue) evaluator.evaluate() except Exception: # pylint: disable=broad-except exception_msg = traceback.format_exc() diff --git a/modyn/evaluator/internal/utils/evaluation_info.py b/modyn/evaluator/internal/utils/evaluation_info.py index f36c35bc3..2afedb1a2 100644 --- a/modyn/evaluator/internal/utils/evaluation_info.py +++ b/modyn/evaluator/internal/utils/evaluation_info.py @@ -1,10 +1,10 @@ import json import logging import pathlib +from typing import Optional # pylint: disable=no-name-in-module from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import EvaluateModelRequest -from modyn.evaluator.internal.metrics import AbstractEvaluationMetric from modyn.utils import dynamic_module_import logger = logging.getLogger(__name__) @@ -21,22 +21,25 @@ def __init__( model_config: str, amp: bool, storage_address: str, - metrics: list[AbstractEvaluationMetric], model_path: pathlib.Path, + not_failed_interval_ids: list[int], ) -> None: # pragma: no cover self.model_id = request.model_id self.dataset_id = request.dataset_info.dataset_id self.num_dataloaders = request.dataset_info.num_dataloaders - self.start_timestamp = ( - request.dataset_info.start_timestamp if request.dataset_info.HasField("start_timestamp") else None - ) - self.end_timestamp = ( - request.dataset_info.end_timestamp if request.dataset_info.HasField("end_timestamp") else None - ) + self.all_evaluation_intervals: list[tuple[Optional[int], Optional[int]]] = [] + for interval in request.dataset_info.evaluation_intervals: + self.all_evaluation_intervals.append( + ( + interval.start_timestamp if interval.HasField("start_timestamp") else None, + interval.end_timestamp if interval.HasField("end_timestamp") else None, + ) + ) + self.not_failed_interval_ids = not_failed_interval_ids self.device = request.device self.amp = amp self.batch_size = request.batch_size - self.metrics = metrics + self.raw_metrics = [metric.value for metric in request.metrics] self.model_class_name = model_class_name model_module = dynamic_module_import("modyn.models") diff --git a/modyn/evaluator/internal/utils/evaluation_process_info.py b/modyn/evaluator/internal/utils/evaluation_process_info.py index 769ed281f..7720d0f7a 100644 --- a/modyn/evaluator/internal/utils/evaluation_process_info.py +++ b/modyn/evaluator/internal/utils/evaluation_process_info.py @@ -6,12 +6,8 @@ def __init__( self, process_handler: mp.Process, exception_queue: mp.Queue, - status_query_queue: mp.Queue, - status_response_queue: mp.Queue, metric_result_queue: mp.Queue, ): self.process_handler = process_handler self.exception_queue = exception_queue - self.status_query_queue = status_query_queue - self.status_response_queue = status_response_queue self.metric_result_queue = metric_result_queue diff --git a/modyn/protos/README.md b/modyn/protos/README.md index b60c2acfd..7cb3c53bb 100644 --- a/modyn/protos/README.md +++ b/modyn/protos/README.md @@ -1,8 +1,8 @@ -# How to generate the python files from the proto files +# How to generate the python files from the proto files This assumes python 3.6+ is installed. -## Generate the python files +## Generate the python files First move to the directory where you want to generate the python files. diff --git a/modyn/protos/evaluator.proto b/modyn/protos/evaluator.proto index b870f6414..426d78de5 100644 --- a/modyn/protos/evaluator.proto +++ b/modyn/protos/evaluator.proto @@ -9,15 +9,19 @@ service Evaluator { rpc cleanup_evaluations(EvaluationCleanupRequest) returns (EvaluationCleanupResponse) {} } +message EvaluationInterval { + optional int64 start_timestamp = 1; + optional int64 end_timestamp = 2; +} + message DatasetInfo { string dataset_id = 1; int32 num_dataloaders = 2; - optional int64 start_timestamp = 3; - optional int64 end_timestamp = 4; + repeated EvaluationInterval evaluation_intervals = 3; } enum EvaluationAbortedReason { - UNKNOWN = 0; + NOT_ABORTED = 0; MODEL_NOT_EXIST_IN_METADATA = 1; MODEL_IMPORT_FAILURE = 2; MODEL_NOT_EXIST_IN_STORAGE = 3; @@ -42,11 +46,20 @@ message EvaluateModelRequest { optional PythonString tokenizer = 9; } +message EvaluateModelIntervalResponse { + // this value is only meaningful when eval_aborted_reason is NOT_ABORTED + int64 dataset_size = 1; + EvaluationAbortedReason eval_aborted_reason = 2; +} + message EvaluateModelResponse { + // only when all interval evaluations failed, this field will be set to false + // it is a field of convenience for the client to decide whether to wait for the evaluation completion. + // the client can always check the interval_responses bool evaluation_started = 1; int32 evaluation_id = 2; - int64 dataset_size = 3; - EvaluationAbortedReason eval_aborted_reason = 4; + // always has the same size as the number of intervals + repeated EvaluateModelIntervalResponse interval_responses = 3; } message EvaluationStatusRequest { int32 evaluation_id = 1; } @@ -54,25 +67,34 @@ message EvaluationStatusRequest { int32 evaluation_id = 1; } message EvaluationStatusResponse { bool valid = 1; bool is_running = 2; - bool state_available = 3; - bool blocked = 4; - optional string exception = 5; - optional int64 batches_seen = 6; - optional int64 samples_seen = 7; + optional string exception = 3; } -message EvaluationData { +message SingleMetricResult { string metric = 1; float result = 2; } +message EvaluationIntervalData { + // Since not every interval evaluation from EvaluateModelRequest may be successful, + // the EvaluationIntervalData contained in the EvaluationResultResponse must explicitly specify what interval this + // evaluation data corresponds to. The interval_index is the index of the interval in the list + // Datainfo.evaluation_intervals in the EvaluateModelRequest. + // For example if Datainfo.evaluation_intervals have 3 intervals, [interval1, interval2, interval3], + // and interval2 fails. Then the EvaluationResultResponse will have 2 EvaluationIntervalData, one with interval_index + // 0 (which corresponds to interval1) and the other with interval_index 2 (which corresponds to interval3). + int32 interval_index = 1; + repeated SingleMetricResult evaluation_data = 2; +} + message EvaluationResultRequest { int32 evaluation_id = 1; } message EvaluationCleanupRequest { repeated int32 evaluation_ids = 1; } message EvaluationResultResponse { bool valid = 1; - repeated EvaluationData evaluation_data = 2; + // each element in the list corresponds to the evaluation results on a single interval + repeated EvaluationIntervalData evaluation_results = 2; } message EvaluationCleanupResponse { repeated int32 succeeded = 1; } diff --git a/modyn/supervisor/internal/eval/result_writer/__init__.py b/modyn/supervisor/internal/eval/result_writer/__init__.py deleted file mode 100644 index c760827f8..000000000 --- a/modyn/supervisor/internal/eval/result_writer/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -"""Supervisor module. The supervisor initiates a pipeline and coordinates all components. - -""" - -import os - -from .abstract_evaluation_result_writer import AbstractEvaluationResultWriter # noqa: F401 -from .json_result_writer import DedicatedJsonResultWriter, JsonResultWriter # noqa: F401 -from .tensorboard_result_writer import TensorboardResultWriter # noqa: F401 - -files = os.listdir(os.path.dirname(__file__)) -files.remove("__init__.py") -__all__ = [f[:-3] for f in files if f.endswith(".py")] diff --git a/modyn/supervisor/internal/eval/result_writer/abstract_evaluation_result_writer.py b/modyn/supervisor/internal/eval/result_writer/abstract_evaluation_result_writer.py deleted file mode 100644 index 619a6573d..000000000 --- a/modyn/supervisor/internal/eval/result_writer/abstract_evaluation_result_writer.py +++ /dev/null @@ -1,38 +0,0 @@ -from __future__ import annotations - -import pathlib -from abc import ABC, abstractmethod -from typing import Any - -# pylint: disable=no-name-in-module -from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import EvaluationData - - -class AbstractEvaluationResultWriter(ABC): - """ - Abstract class used to write evaluation results to the evaluation directory - """ - - def __init__(self, pipeline_id: int, trigger_id: int, eval_directory: pathlib.Path): - self.pipeline_id = pipeline_id - self.trigger_id = trigger_id - self.eval_directory = eval_directory - - @abstractmethod - def add_evaluation_data(self, dataset_id: str, dataset_size: int, evaluation_data: list[EvaluationData]) -> None: - """ - Called whenever a metric results are available for a particular dataset. - - Args: - dataset_id: the involved dataset. - dataset_size: the size (amount of samples) of the dataset. - evaluation_data: contains the metric results. - """ - raise NotImplementedError() - - @abstractmethod - def store_results(self) -> Any: - """ - Called in the end to store the results. - """ - raise NotImplementedError() diff --git a/modyn/supervisor/internal/eval/result_writer/json_result_writer.py b/modyn/supervisor/internal/eval/result_writer/json_result_writer.py deleted file mode 100644 index 3642f8a8f..000000000 --- a/modyn/supervisor/internal/eval/result_writer/json_result_writer.py +++ /dev/null @@ -1,36 +0,0 @@ -import json -import pathlib -from typing import Any - -# pylint: disable=no-name-in-module -from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import EvaluationData -from modyn.supervisor.internal.eval.result_writer import AbstractEvaluationResultWriter - - -class _JsonResultWriter(AbstractEvaluationResultWriter): - def __init__(self, pipeline_id: int, trigger_id: int, eval_directory: pathlib.Path): - super().__init__(pipeline_id, trigger_id, eval_directory) - self.results: dict = {"datasets": []} - - def add_evaluation_data(self, dataset_id: str, dataset_size: int, evaluation_data: list[EvaluationData]) -> None: - dataset_results: dict = {"dataset_size": dataset_size, "metrics": []} - for metric in evaluation_data: - dataset_results["metrics"].append({"name": metric.metric, "result": metric.result}) - self.results["datasets"].append({dataset_id: dataset_results}) - - -class JsonResultWriter(_JsonResultWriter): - """Does not dump into files itself but only returns the result.""" - - def store_results(self) -> dict[str, Any]: - return self.results - - -class DedicatedJsonResultWriter(_JsonResultWriter): - """Dumps every log result into a dedicated file.""" - - def store_results(self) -> pathlib.Path: - file_name = f"{self.pipeline_id}_{self.trigger_id}.eval" - with open(self.eval_directory / file_name, "w+", encoding="utf-8") as output_file: - json.dump(self.results, output_file) - return self.eval_directory / file_name diff --git a/modyn/supervisor/internal/eval/result_writer/tensorboard_result_writer.py b/modyn/supervisor/internal/eval/result_writer/tensorboard_result_writer.py deleted file mode 100644 index 8a7854e3f..000000000 --- a/modyn/supervisor/internal/eval/result_writer/tensorboard_result_writer.py +++ /dev/null @@ -1,23 +0,0 @@ -import pathlib - -# pylint: disable=no-name-in-module -from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import EvaluationData -from modyn.supervisor.internal.eval.result_writer.abstract_evaluation_result_writer import ( - AbstractEvaluationResultWriter, -) -from torch.utils.tensorboard import SummaryWriter - - -class TensorboardResultWriter(AbstractEvaluationResultWriter): - def __init__(self, pipeline_id: int, trigger_id: int, eval_directory: pathlib.Path): - super().__init__(pipeline_id, trigger_id, eval_directory) - self.writer = SummaryWriter(log_dir=str(eval_directory)) - - def add_evaluation_data(self, dataset_id: str, dataset_size: int, evaluation_data: list[EvaluationData]) -> None: - for metric in evaluation_data: - self.writer.add_scalar( - f"pipeline_{self.pipeline_id}/{dataset_id}/{metric.metric}", metric.result, self.trigger_id - ) - - def store_results(self) -> None: - self.writer.flush() diff --git a/modyn/supervisor/internal/grpc_handler.py b/modyn/supervisor/internal/grpc_handler.py index d22743a35..d83f9432e 100644 --- a/modyn/supervisor/internal/grpc_handler.py +++ b/modyn/supervisor/internal/grpc_handler.py @@ -3,9 +3,8 @@ import json import logging -from collections import deque from time import sleep -from typing import Any, Iterable, Optional, Sequence +from typing import Any, Iterable, Optional import grpc from modyn.common.benchmark import Stopwatch @@ -15,6 +14,8 @@ EvaluateModelRequest, EvaluationCleanupRequest, EvaluationCleanupResponse, + EvaluationInterval, + EvaluationIntervalData, EvaluationResultRequest, EvaluationResultResponse, EvaluationStatusRequest, @@ -44,8 +45,6 @@ GetNewDataSinceResponse, ) from modyn.storage.internal.grpc.generated.storage_pb2_grpc import StorageStub -from modyn.supervisor.internal.eval.result_writer import AbstractEvaluationResultWriter -from modyn.supervisor.internal.utils import EvaluationStatusReporter from modyn.utils import grpc_common_config, grpc_connection_established from tenacity import Retrying, stop_after_attempt, wait_random_exponential @@ -237,8 +236,7 @@ def prepare_evaluation_request( dataset_config: dict, model_id: int, device: str, - start_timestamp: Optional[int] = None, - end_timestamp: Optional[int] = None, + intervals: list[tuple[Optional[int], Optional[int]]], ) -> EvaluateModelRequest: dataset_id = dataset_config["dataset_id"] transform_list = dataset_config.get("transformations") or [] @@ -249,52 +247,41 @@ def prepare_evaluation_request( dataloader_workers = dataset_config["dataloader_workers"] metrics = [EvaluatorJsonString(value=json.dumps(metric)) for metric in dataset_config["metrics"]] - start_evaluation_kwargs = { - "model_id": model_id, - "dataset_info": DatasetInfo( - dataset_id=dataset_id, - num_dataloaders=dataloader_workers, - start_timestamp=start_timestamp, - end_timestamp=end_timestamp, - ), - "device": device, - "batch_size": batch_size, - "metrics": metrics, - "transform_list": transform_list, - "bytes_parser": EvaluatorPythonString(value=bytes_parser_function), - "label_transformer": EvaluatorPythonString(value=label_transformer), - } + eval_intervals: list[EvaluationInterval] = [] + for start_timestamp, end_timestamp in intervals: + eval_intervals.append(EvaluationInterval(start_timestamp=start_timestamp, end_timestamp=end_timestamp)) if dataset_config.get("tokenizer"): tokenizer = dataset_config["tokenizer"] - start_evaluation_kwargs["tokenizer"] = EvaluatorPythonString(value=tokenizer) + tokenizer_arg = EvaluatorPythonString(value=tokenizer) + else: + tokenizer_arg = None - return EvaluateModelRequest(**start_evaluation_kwargs) + return EvaluateModelRequest( + model_id=model_id, + dataset_info=DatasetInfo( + dataset_id=dataset_id, + num_dataloaders=dataloader_workers, + evaluation_intervals=eval_intervals, + ), + device=device, + batch_size=batch_size, + metrics=metrics, + transform_list=transform_list, + bytes_parser=EvaluatorPythonString(value=bytes_parser_function), + label_transformer=EvaluatorPythonString(value=label_transformer), + tokenizer=tokenizer_arg, + ) # pylint: disable=too-many-branches - def wait_for_evaluation_completion( - self, training_id: int, evaluations: dict[int, EvaluationStatusReporter] - ) -> None: + def wait_for_evaluation_completion(self, evaluation_id: int) -> bool: assert self.evaluator is not None if not self.connected_to_evaluator: raise ConnectionError("Tried to wait for evaluation to finish, but not there is no gRPC connection.") - logger.debug("wait for evaluation completion") - - # We are using a deque here in order to fetch the status of each evaluation - # sequentially in a round-robin manner. - working_queue: deque[int] = deque() - blocked_in_a_row: dict[int, int] = {} - for evaluation_id, evaluation_reporter in evaluations.items(): - evaluation_reporter.create_counter(training_id) - working_queue.append(evaluation_id) - blocked_in_a_row[evaluation_id] = 0 - - while working_queue: - current_evaluation_id = working_queue.popleft() - current_evaluation_reporter = evaluations[current_evaluation_id] - req = EvaluationStatusRequest(evaluation_id=current_evaluation_id) - + req = EvaluationStatusRequest(evaluation_id=evaluation_id) + has_exception = False + while True: for attempt in Retrying( stop=stop_after_attempt(5), wait=wait_random_exponential(multiplier=1, min=2, max=60), reraise=True ): @@ -303,71 +290,37 @@ def wait_for_evaluation_completion( res: EvaluationStatusResponse = self.evaluator.get_evaluation_status(req) except grpc.RpcError as e: # We catch and reraise to easily reconnect logger.error(e) - logger.error(f"[Training {training_id}]: gRPC connection error, trying to reconnect.") + logger.error(f"[Evaluation {evaluation_id}]: gRPC connection error, trying to reconnect.") self.init_evaluator() raise e if not res.valid: - exception_msg = f"Evaluation {current_evaluation_id} is invalid at server:\n{res}\n" - logger.warning(exception_msg) - current_evaluation_reporter.end_counter(error=True, exception_msg=exception_msg) - continue - - if res.blocked: - blocked_in_a_row[current_evaluation_id] += 1 - if blocked_in_a_row[current_evaluation_id] >= 3: - logger.warning( - f"Evaluator returned {blocked_in_a_row} blocked responses in a row, cannot update status." - ) - else: - blocked_in_a_row[current_evaluation_id] = 0 - - if res.HasField("exception") and res.exception is not None: - exception_msg = f"Exception at evaluator occurred:\n{res.exception}\n\n" - logger.warning(exception_msg) - current_evaluation_reporter.end_counter(error=True, exception_msg=exception_msg) - continue - if not res.is_running: - current_evaluation_reporter.end_counter(error=False) - continue - if res.state_available: - assert res.HasField("samples_seen") and res.HasField( - "batches_seen" - ), f"Inconsistent server response:\n{res}" - - current_evaluation_reporter.progress_counter(res.samples_seen) - elif res.is_running: - logger.warning("Evaluator is not blocked and is running, but no state is available.") - - working_queue.append(current_evaluation_id) + exception_msg = f"Evaluation {evaluation_id} is invalid at server:\n{res}\n" + logger.error(exception_msg) + raise RuntimeError(exception_msg) + + if res.HasField("exception"): + exception_msg = f"Exception at evaluator occurred:\n{res.exception}\n\n" + logger.error(exception_msg) + has_exception = True + break + if not res.is_running: + break sleep(1) + return not has_exception - logger.debug("Evaluation completed") - - def store_evaluation_results( - self, - evaluation_result_writers: Sequence[AbstractEvaluationResultWriter], - evaluations: dict[int, EvaluationStatusReporter], - ) -> None: + def get_evaluation_results(self, evaluation_id: int) -> list[EvaluationIntervalData]: assert self.evaluator is not None if not self.connected_to_evaluator: raise ConnectionError("Tried to wait for evaluation to finish, but not there is no gRPC connection.") - for evaluation_id in evaluations: - req = EvaluationResultRequest(evaluation_id=evaluation_id) - res: EvaluationResultResponse = self.evaluator.get_evaluation_result(req) - - if not res.valid: - logger.warning(f"Cannot get the evaluation result for evaluation {evaluation_id}") - continue - dataset_id = evaluations[evaluation_id].dataset_id - dataset_size = evaluations[evaluation_id].dataset_size - - for result_writer in evaluation_result_writers: - result_writer.add_evaluation_data(dataset_id, dataset_size, res.evaluation_data) + req = EvaluationResultRequest(evaluation_id=evaluation_id) + res: EvaluationResultResponse = self.evaluator.get_evaluation_result(req) - for result_writer in evaluation_result_writers: - result_writer.store_results() + if not res.valid: + logger.error(f"Cannot get the evaluation result for evaluation {evaluation_id}") + raise RuntimeError(f"Cannot get the evaluation result for evaluation {evaluation_id}") + return res.evaluation_results def cleanup_evaluations(self, evaluation_ids: list[int]) -> None: assert self.evaluator is not None diff --git a/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py b/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py index 4e5fefd64..9495309e8 100644 --- a/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py +++ b/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py @@ -7,8 +7,8 @@ from concurrent.futures import Future, ThreadPoolExecutor from dataclasses import dataclass from functools import partial -from multiprocessing import Queue from pathlib import Path +from typing import Optional import grpc import pandas as pd @@ -18,9 +18,7 @@ # pylint: disable-next=no-name-in-module from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import EvaluateModelResponse, EvaluationAbortedReason from modyn.supervisor.internal.eval.handler import EvalHandler, EvalRequest -from modyn.supervisor.internal.eval.result_writer.json_result_writer import JsonResultWriter -from modyn.supervisor.internal.grpc.enums import IdType, MsgType, PipelineStage -from modyn.supervisor.internal.grpc.template_msg import id_submsg, pipeline_stage_msg +from modyn.supervisor.internal.grpc.enums import PipelineStage from modyn.supervisor.internal.grpc_handler import GRPCHandler from modyn.supervisor.internal.pipeline_executor.models import ( ConfigLogs, @@ -29,7 +27,6 @@ StageLog, SupervisorLogs, ) -from modyn.supervisor.internal.utils.evaluation_status_reporter import EvaluationStatusReporter from modyn.utils.utils import current_time_micros, dynamic_module_import from pydantic import BaseModel from tenacity import Retrying, stop_after_attempt, wait_random_exponential @@ -123,8 +120,6 @@ def run_pipeline_evaluations( model_id: int, first_timestamp: int, last_timestamp: int, - pipeline_status_queue: Queue, - eval_status_queue: Queue, ) -> SupervisorLogs: """Run the evaluations as part of the core pipeline. @@ -134,17 +129,14 @@ def run_pipeline_evaluations( trigger_id: The trigger id to evaluate. training_id: The training id to evaluate. model_id: The model id to evaluate. - first_timestamp: Start of the interval to evaluate. - last_timestamp: End of the interval to evaluate. + first_timestamp: Start of the training interval. + last_timestamp: End of the training interval. Returns: The logs of the evaluations. """ assert self.grpc.evaluator is not None, "Evaluator not initialized." assert self.pipeline.evaluation is not None, "Evaluation config not set." - pipeline_status_queue.put( - pipeline_stage_msg(PipelineStage.EVALUATE, MsgType.ID, id_submsg(IdType.TRIGGER, trigger_id)) - ) eval_requests: list[EvalRequest] = [] @@ -163,10 +155,10 @@ def run_pipeline_evaluations( if len(eval_requests) == 0: return SupervisorLogs() num_workers = self.pipeline.evaluation.after_training_evaluation_workers - logs = self._launch_evaluations_async(eval_requests, log, eval_status_queue, num_workers) + logs = self._launch_evaluations_async(eval_requests, log, num_workers) return logs - def run_post_pipeline_evaluations(self, eval_status_queue: Queue, manual_run: bool = False) -> SupervisorLogs: + def run_post_pipeline_evaluations(self, manual_run: bool = False) -> SupervisorLogs: """Evaluate the trained models after the core pipeline and store the results.""" if not self.pipeline.evaluation: return SupervisorLogs(stage_runs=[]) @@ -210,7 +202,6 @@ def run_post_pipeline_evaluations(self, eval_status_queue: Queue, manual_run: bo sample_time=-1, trigger_idx=-1, ), - eval_status_queue=eval_status_queue, num_workers=self.pipeline.evaluation.after_pipeline_evaluation_workers, ) return logs @@ -223,7 +214,6 @@ def _launch_evaluations_async( self, eval_requests: list[EvalRequest], parent_log: StageLog, - eval_status_queue: Queue, num_workers: int = 1, ) -> SupervisorLogs: """Creates a thread pool to launch evaluations asynchronously. @@ -247,7 +237,14 @@ def worker_func(eval_req: EvalRequest) -> StageLog: trigger_idx=parent_log.trigger_idx, ) epoch_micros_start = current_time_micros() - self._single_evaluation(single_log, eval_status_queue, eval_req) + single_log.info = SingleEvaluationInfo(eval_request=eval_req) + optional_failure_reason, results = self._single_batched_evaluation( + eval_req.interval_start, eval_req.interval_end, eval_req.id_model, eval_req.dataset_id + ) + if optional_failure_reason: + single_log.info.failure_reason = optional_failure_reason + else: + single_log.info.results = results single_log.end = datetime.datetime.now() single_log.duration = datetime.timedelta(microseconds=current_time_micros() - epoch_micros_start) return single_log @@ -264,21 +261,27 @@ def worker_func(eval_req: EvalRequest) -> StageLog: return logs - def _single_evaluation(self, log: StageLog, eval_status_queue: Queue, eval_req: EvalRequest) -> None: + # pylint: disable-next=too-many-locals + def _single_batched_evaluation( + self, + interval_start: int, + interval_end: Optional[int], + model_id_to_eval: int, + dataset_id: str, + ) -> tuple[Optional[str], dict]: assert self.grpc.evaluator is not None, "Evaluator not initialized." assert self.pipeline.evaluation logger.info( - f"Evaluation Starts for model {eval_req.id_model} on split {eval_req.interval_start} " - f"to {eval_req.interval_end} of dataset {eval_req.dataset_id}." + f"Evaluation Starts for model {model_id_to_eval} on split {interval_start} " + f"to {interval_end} of dataset {dataset_id}." ) - dataset_config = next((d for d in self.pipeline.evaluation.datasets if d.dataset_id == eval_req.dataset_id)) - log.info = SingleEvaluationInfo(eval_request=eval_req) + dataset_config = next((d for d in self.pipeline.evaluation.datasets if d.dataset_id == dataset_id)) + request = GRPCHandler.prepare_evaluation_request( dataset_config.model_dump(by_alias=True), - eval_req.id_model, + model_id_to_eval, self.pipeline.evaluation.device, - eval_req.interval_start, - eval_req.interval_end, + intervals=[(interval_start, interval_end)], ) for attempt in Retrying( stop=stop_after_attempt(5), wait=wait_random_exponential(multiplier=1, min=2, max=60), reraise=True @@ -292,40 +295,30 @@ def _single_evaluation(self, log: StageLog, eval_status_queue: Queue, eval_req: self.grpc.init_evaluator() raise e + def get_failure_reason(eval_aborted_reason: EvaluationAbortedReason) -> str: + return EvaluationAbortedReason.DESCRIPTOR.values_by_number[eval_aborted_reason].name + if not response.evaluation_started: - log.info.failure_reason = EvaluationAbortedReason.DESCRIPTOR.values_by_number[ - response.eval_aborted_reason - ].name + failure_reason = get_failure_reason(response.interval_responses[0].eval_aborted_reason) logger.error( - f"Evaluation for model {eval_req.id_model} on split {eval_req.interval_start} to " - f"{eval_req.interval_end} not started with reason: {log.info.failure_reason}." + f"Evaluation for model {model_id_to_eval} on split {interval_start} to " + f"{interval_end} not started with reason: {failure_reason}." ) - return + return failure_reason, {} - logger.info( - f"Evaluation started for model {eval_req.id_model} on split {eval_req.interval_start} " - f"to {eval_req.interval_end}." - ) - reporter = EvaluationStatusReporter( - eval_status_queue, - response.evaluation_id, - eval_req.dataset_id, - response.dataset_size, - ) - evaluation = {response.evaluation_id: reporter} - reporter.create_tracker() - self.grpc.wait_for_evaluation_completion(eval_req.training_id, evaluation) - - eval_result_writer = JsonResultWriter(self.pipeline_id, eval_req.trigger_id, self.pipeline_logdir) - self.grpc.store_evaluation_results([eval_result_writer], evaluation) - self.grpc.cleanup_evaluations([int(i) for i in evaluation]) - assert isinstance(eval_result_writer, JsonResultWriter) - - log.info.results = ( - eval_result_writer.results["datasets"][0][dataset_config.dataset_id] - if eval_result_writer.results["datasets"] - else {} - ) + logger.info(f"Evaluation started for model {model_id_to_eval} on split {interval_start} " f"to {interval_end}.") + self.grpc.wait_for_evaluation_completion(response.evaluation_id) + eval_data = self.grpc.get_evaluation_results(response.evaluation_id) + self.grpc.cleanup_evaluations([response.evaluation_id]) + # here we assume only one interval is evaluated + eval_results: dict = {"dataset_size": response.interval_responses[0].dataset_size, "metrics": []} + + assert len(eval_data) == 1, "The evaluation request only contains one interval so we expect only one result." + assert eval_data[0].interval_index == 0 + for metric in eval_data[0].evaluation_data: + eval_results["metrics"].append({"name": metric.metric, "result": metric.result}) + + return None, eval_results # ------------------------------------------------------------------------------------ # @@ -349,5 +342,5 @@ def _single_evaluation(self, log: StageLog, eval_status_queue: Queue, eval_req: supervisor_logs=SupervisorLogs(), ) - logs_.supervisor_logs = ex.run_post_pipeline_evaluations(eval_status_queue=Queue()) + logs_.supervisor_logs = ex.run_post_pipeline_evaluations() logs_.materialize(snapshot_path, mode="final") diff --git a/modyn/supervisor/internal/pipeline_executor/pipeline_executor.py b/modyn/supervisor/internal/pipeline_executor/pipeline_executor.py index fdb93a4b1..36852a28a 100644 --- a/modyn/supervisor/internal/pipeline_executor/pipeline_executor.py +++ b/modyn/supervisor/internal/pipeline_executor/pipeline_executor.py @@ -10,13 +10,6 @@ from typing import Callable, Generator, TypeVar, cast import pandas as pd -from modyn.config.schema.pipeline import ResultWriterType -from modyn.supervisor.internal.eval.result_writer import JsonResultWriter -from modyn.supervisor.internal.eval.result_writer.abstract_evaluation_result_writer import ( - AbstractEvaluationResultWriter, -) -from modyn.supervisor.internal.eval.result_writer.json_result_writer import DedicatedJsonResultWriter -from modyn.supervisor.internal.eval.result_writer.tensorboard_result_writer import TensorboardResultWriter from modyn.supervisor.internal.grpc.enums import CounterAction, IdType, MsgType, PipelineStage from modyn.supervisor.internal.grpc.template_msg import counter_submsg, dataset_submsg, id_submsg, pipeline_stage_msg from modyn.supervisor.internal.grpc_handler import GRPCHandler @@ -47,11 +40,6 @@ logger = logging.getLogger(__name__) EXCEPTION_EXITCODE = 8 -EVAL_RESULT_WRITER_CLASSES: dict[ResultWriterType, type[AbstractEvaluationResultWriter]] = { - "json": JsonResultWriter, - "json_dedicated": DedicatedJsonResultWriter, - "tensorboard": TensorboardResultWriter, -} P = ParamSpec("P") # parameters of pipeline stage R = TypeVar("R") # result of pipeline stage @@ -720,6 +708,9 @@ def _evaluate_and_store_results( last_timestamp: int, ) -> None: """Evaluate the trained model and store the results.""" + s.pipeline_status_queue.put( + pipeline_stage_msg(PipelineStage.EVALUATE, MsgType.ID, id_submsg(IdType.TRIGGER, trigger_id)) + ) logs = self.eval_executor.run_pipeline_evaluations( log, trigger_id, @@ -727,8 +718,6 @@ def _evaluate_and_store_results( model_id, first_timestamp, last_timestamp, - s.pipeline_status_queue, - s.eval_status_queue, ) self.logs.supervisor_logs.merge(logs.stage_runs) @@ -756,7 +745,7 @@ def _post_pipeline_evaluation(self, s: ExecutionState, log: StageLog) -> None: if not s.pipeline_config.evaluation: return - eval_logs = self.eval_executor.run_post_pipeline_evaluations(s.eval_status_queue) + eval_logs = self.eval_executor.run_post_pipeline_evaluations() self.logs.supervisor_logs.merge(eval_logs.stage_runs) @pipeline_stage(PipelineStage.EXIT, parent=PipelineStage.MAIN) @@ -814,9 +803,6 @@ def _get_trigger_timespan( return first_timestamp, last_timestamp - def _init_evaluation_writer(self, pipeline_id: int, name: str, trigger_id: int) -> AbstractEvaluationResultWriter: - return EVAL_RESULT_WRITER_CLASSES[name](pipeline_id, trigger_id, self.state.eval_directory) # type: ignore - def _shutdown_trainer(self) -> None: if self.state.current_training_id is not None: self.grpc.stop_training_at_trainer_server(self.state.current_training_id) diff --git a/modyn/tests/evaluator/internal/grpc/test_evaluator_grpc_servicer.py b/modyn/tests/evaluator/internal/grpc/test_evaluator_grpc_servicer.py index c202760e2..c54fc0629 100644 --- a/modyn/tests/evaluator/internal/grpc/test_evaluator_grpc_servicer.py +++ b/modyn/tests/evaluator/internal/grpc/test_evaluator_grpc_servicer.py @@ -9,29 +9,29 @@ import tempfile from time import sleep from unittest import mock -from unittest.mock import MagicMock, patch +from unittest.mock import ANY, MagicMock, call, patch import pytest -from modyn.config.schema.pipeline import AccuracyMetricConfig, F1ScoreMetricConfig +from modyn.config.schema.pipeline import AccuracyMetricConfig from modyn.evaluator.internal.grpc.evaluator_grpc_servicer import EvaluatorGRPCServicer from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import ( DatasetInfo, + EvaluateModelIntervalResponse, EvaluateModelRequest, EvaluateModelResponse, EvaluationAbortedReason, EvaluationCleanupRequest, + EvaluationInterval, EvaluationResultRequest, EvaluationStatusRequest, ) from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import JsonString as EvaluatorJsonString from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import PythonString -from modyn.evaluator.internal.metrics import Accuracy, F1Score -from modyn.evaluator.internal.utils import EvaluationInfo, EvaluationProcessInfo, EvaluatorMessages +from modyn.evaluator.internal.utils import EvaluationInfo, EvaluationProcessInfo from modyn.metadata_database.metadata_database_connection import MetadataDatabaseConnection from modyn.metadata_database.utils import ModelStorageStrategyConfig from modyn.model_storage.internal.grpc.generated.model_storage_pb2 import FetchModelRequest, FetchModelResponse from modyn.storage.internal.grpc.generated.storage_pb2 import GetDatasetSizeRequest, GetDatasetSizeResponse -from pydantic import ValidationError DATABASE = pathlib.Path(os.path.abspath(__file__)).parent / "test_evaluator.database" @@ -99,14 +99,10 @@ def GetDatasetSize(self, request: GetDatasetSizeRequest) -> GetDatasetSizeRespon def get_evaluation_process_info(): - status_query_queue = mp.Queue() - status_response_queue = mp.Queue() exception_queue = mp.Queue() metric_result_queue = mp.Queue() - evaluation_process_info = EvaluationProcessInfo( - mp.Process(), exception_queue, status_query_queue, status_response_queue, metric_result_queue - ) + evaluation_process_info = EvaluationProcessInfo(mp.Process(), exception_queue, metric_result_queue) return evaluation_process_info @@ -128,11 +124,21 @@ def get_mock_evaluation_transformer(): ) -def get_evaluate_model_request(start_timestamp=None, end_timestamp=None): +def get_evaluate_model_request(intervals=None): + if intervals is None: + intervals = [(None, None)] return EvaluateModelRequest( model_id=1, dataset_info=DatasetInfo( - dataset_id="MNIST", num_dataloaders=1, start_timestamp=start_timestamp, end_timestamp=end_timestamp + dataset_id="MNIST", + num_dataloaders=1, + evaluation_intervals=[ + EvaluationInterval( + start_timestamp=start_timestamp, + end_timestamp=end_timestamp, + ) + for start_timestamp, end_timestamp in intervals + ], ), device="cpu", batch_size=4, @@ -143,17 +149,19 @@ def get_evaluate_model_request(start_timestamp=None, end_timestamp=None): ) -def get_evaluation_info(evaluation_id, model_path: pathlib.Path, config: dict): +def get_evaluation_info(evaluation_id, model_path: pathlib.Path, config: dict, intervals=None): + if intervals is None: + intervals = [(None, None)] storage_address = f"{config['storage']['hostname']}:{config['storage']['port']}" return EvaluationInfo( - request=get_evaluate_model_request(), + request=get_evaluate_model_request(intervals), evaluation_id=evaluation_id, model_class_name="ResNet18", - amp=False, + amp=True, model_config="{}", storage_address=storage_address, - metrics=[Accuracy(AccuracyMetricConfig()), F1Score(F1ScoreMetricConfig(num_classes=2, average="macro"))], model_path=model_path, + not_failed_interval_ids=list(range(len(intervals))), ) @@ -181,7 +189,9 @@ def test_evaluate_model_dynamic_module_import( assert not response.evaluation_started assert not evaluator._evaluation_dict assert evaluator._next_evaluation_id == 0 - assert response.eval_aborted_reason == EvaluationAbortedReason.MODEL_IMPORT_FAILURE + assert response.interval_responses == [ + EvaluateModelIntervalResponse(eval_aborted_reason=EvaluationAbortedReason.MODEL_IMPORT_FAILURE) + ] @patch.object(EvaluatorGRPCServicer, "connect_to_storage", return_value=DummyStorageStub()) @@ -194,20 +204,26 @@ def test_evaluate_model_invalid(test_connect_to_model_storage, test_connect_to_s req.model_id = 15 resp = evaluator.evaluate_model(req, None) assert not resp.evaluation_started - assert resp.eval_aborted_reason == EvaluationAbortedReason.MODEL_NOT_EXIST_IN_METADATA + assert resp.interval_responses == [ + EvaluateModelIntervalResponse(eval_aborted_reason=EvaluationAbortedReason.MODEL_NOT_EXIST_IN_METADATA) + ] req = get_evaluate_model_request() req.dataset_info.dataset_id = "unknown" resp = evaluator.evaluate_model(req, None) assert not resp.evaluation_started assert evaluator._next_evaluation_id == 0 - assert resp.eval_aborted_reason == EvaluationAbortedReason.DATASET_NOT_FOUND + assert resp.interval_responses == [ + EvaluateModelIntervalResponse(eval_aborted_reason=EvaluationAbortedReason.DATASET_NOT_FOUND) + ] req = get_evaluate_model_request() req.model_id = 2 resp = evaluator.evaluate_model(req, None) assert not resp.evaluation_started - assert resp.eval_aborted_reason == EvaluationAbortedReason.MODEL_NOT_EXIST_IN_STORAGE + assert resp.interval_responses == [ + EvaluateModelIntervalResponse(eval_aborted_reason=EvaluationAbortedReason.MODEL_NOT_EXIST_IN_STORAGE) + ] @patch.object(EvaluatorGRPCServicer, "connect_to_model_storage", return_value=DummyModelStorageStub()) @@ -221,7 +237,9 @@ def test_evaluate_model_empty_dataset(test_connect_to_model_storage): resp = evaluator.evaluate_model(req, None) assert not resp.evaluation_started assert evaluator._next_evaluation_id == 0 - assert resp.eval_aborted_reason == EvaluationAbortedReason.EMPTY_DATASET + assert resp.interval_responses == [ + EvaluateModelIntervalResponse(eval_aborted_reason=EvaluationAbortedReason.EMPTY_DATASET) + ] @patch("modyn.evaluator.internal.grpc.evaluator_grpc_servicer.download_trained_model", return_value=None) @@ -234,7 +252,9 @@ def test_evaluate_model_download_trained_model( evaluator = EvaluatorGRPCServicer(get_modyn_config(), pathlib.Path(modyn_temp)) resp = evaluator.evaluate_model(get_evaluate_model_request(), None) assert not resp.evaluation_started - assert resp.eval_aborted_reason == EvaluationAbortedReason.DOWNLOAD_MODEL_FAILURE + assert resp.interval_responses == [ + EvaluateModelIntervalResponse(eval_aborted_reason=EvaluationAbortedReason.DOWNLOAD_MODEL_FAILURE) + ] @patch.object(EvaluatorGRPCServicer, "_run_evaluation") @@ -243,15 +263,28 @@ def test_evaluate_model_download_trained_model( return_value=pathlib.Path("downloaded_model.modyn"), ) @patch.object(EvaluatorGRPCServicer, "connect_to_model_storage", return_value=DummyModelStorageStub()) -def test_evaluate_model_correct_time_range_used( - test_connect_to_model_storage, test_download_trained_model, test__run_evaluation +@patch("modyn.evaluator.internal.grpc.evaluator_grpc_servicer.EvaluationInfo", wraps=EvaluationInfo) +def test_evaluate_model_mixed( + test_evaluation_info, test_connect_to_model_storage, test_download_trained_model, test__run_evaluation ) -> None: - # (start_timestamp, end_timestamp, expected_dataset_size) - test_cases = [(None, None, 400), (1000, None, 200), (None, 2000, 300), (1000, 2000, 100)] + intervals = [(None, None), (200, 300), (1000, None), (2000, 1000), (None, 2000), (1000, 2000)] + expected_interval_responses = [ + EvaluateModelIntervalResponse(eval_aborted_reason=EvaluationAbortedReason.NOT_ABORTED, dataset_size=400), + EvaluateModelIntervalResponse(eval_aborted_reason=EvaluationAbortedReason.DATASET_NOT_FOUND), + EvaluateModelIntervalResponse(eval_aborted_reason=EvaluationAbortedReason.NOT_ABORTED, dataset_size=200), + EvaluateModelIntervalResponse(eval_aborted_reason=EvaluationAbortedReason.EMPTY_DATASET), + EvaluateModelIntervalResponse(eval_aborted_reason=EvaluationAbortedReason.NOT_ABORTED, dataset_size=300), + EvaluateModelIntervalResponse(eval_aborted_reason=EvaluationAbortedReason.NOT_ABORTED, dataset_size=100), + ] def fake_get_dataset_size(request: GetDatasetSizeRequest): if request.HasField("start_timestamp") and request.HasField("end_timestamp"): - resp = GetDatasetSizeResponse(success=True, num_keys=100) + if request.start_timestamp == 1000 and request.end_timestamp == 2000: + resp = GetDatasetSizeResponse(success=True, num_keys=100) + elif request.start_timestamp == 2000 and request.end_timestamp == 1000: + resp = GetDatasetSizeResponse(success=True, num_keys=0) + else: + resp = GetDatasetSizeResponse(success=False) elif request.HasField("start_timestamp") and not request.HasField("end_timestamp"): resp = GetDatasetSizeResponse(success=True, num_keys=200) elif not request.HasField("start_timestamp") and request.HasField("end_timestamp"): @@ -263,21 +296,28 @@ def fake_get_dataset_size(request: GetDatasetSizeRequest): storage_stub_mock = mock.Mock(spec=["GetDatasetSize"]) storage_stub_mock.GetDatasetSize.side_effect = fake_get_dataset_size + req = get_evaluate_model_request(intervals) with patch.object(EvaluatorGRPCServicer, "connect_to_storage", return_value=storage_stub_mock): with tempfile.TemporaryDirectory() as modyn_temp: - for start_timestamp, end_timestamp, expected_dataset_size in test_cases: - evaluator = EvaluatorGRPCServicer(get_modyn_config(), pathlib.Path(modyn_temp)) - req = get_evaluate_model_request(start_timestamp, end_timestamp) - resp = evaluator.evaluate_model(req, None) - assert resp.evaluation_started - assert resp.dataset_size == expected_dataset_size - - storage_stub_mock.GetDatasetSize.assert_called_once_with( - GetDatasetSizeRequest( - dataset_id="MNIST", start_timestamp=start_timestamp, end_timestamp=end_timestamp - ) - ) - storage_stub_mock.GetDatasetSize.reset_mock() + + evaluator = EvaluatorGRPCServicer(get_modyn_config(), pathlib.Path(modyn_temp)) + + resp = evaluator.evaluate_model(req, None) + assert resp.evaluation_started + assert len(resp.interval_responses) == len(intervals) + assert resp.interval_responses == expected_interval_responses + + storage_stub_mock.GetDatasetSize.assert_has_calls( + [ + call(GetDatasetSizeRequest(dataset_id="MNIST", start_timestamp=None, end_timestamp=None)), + call(GetDatasetSizeRequest(dataset_id="MNIST", start_timestamp=200, end_timestamp=300)), + call(GetDatasetSizeRequest(dataset_id="MNIST", start_timestamp=1000, end_timestamp=None)), + call(GetDatasetSizeRequest(dataset_id="MNIST", start_timestamp=2000, end_timestamp=1000)), + call(GetDatasetSizeRequest(dataset_id="MNIST", start_timestamp=None, end_timestamp=2000)), + call(GetDatasetSizeRequest(dataset_id="MNIST", start_timestamp=1000, end_timestamp=2000)), + ] + ) + test_evaluation_info.assert_called_with(req, 0, "ResNet18", "{}", True, ANY, ANY, [0, 2, 4, 5]) @patch( @@ -306,57 +346,6 @@ def test_evaluate_model_valid(test_connect_to_model_storage, test_connect_to_sto assert str(evaluator._evaluation_dict[resp.evaluation_id].model_path) == "downloaded_model.modyn" -@patch.object(EvaluatorGRPCServicer, "connect_to_storage", return_value=DummyStorageStub()) -@patch.object(EvaluatorGRPCServicer, "connect_to_model_storage", return_value=DummyModelStorageStub()) -def test_setup_metrics(test_connect_to_model_storage, test_connect_to_storage) -> None: - with tempfile.TemporaryDirectory() as modyn_temp: - evaluator = EvaluatorGRPCServicer(get_modyn_config(), pathlib.Path(modyn_temp)) - - acc_metric_config = AccuracyMetricConfig().model_dump_json() - metrics = evaluator._setup_metrics([acc_metric_config]) - - assert len(metrics) == 1 - assert isinstance(metrics[0], Accuracy) - - unknown_metric_config = '{"name": "UnknownMetric", "config": "", "evaluation_transformer_function": ""}' - with pytest.raises(ValidationError): - evaluator._setup_metrics([unknown_metric_config]) - - metrics = evaluator._setup_metrics([acc_metric_config, acc_metric_config]) - assert len(metrics) == 1 - assert isinstance(metrics[0], Accuracy) - - -@patch.object(EvaluatorGRPCServicer, "connect_to_storage", return_value=DummyStorageStub()) -@patch.object(EvaluatorGRPCServicer, "connect_to_model_storage", return_value=DummyModelStorageStub()) -def test_setup_metrics_multiple_f1(test_connect_to_model_storage, test_connect_to_storage): - with tempfile.TemporaryDirectory() as modyn_temp: - evaluator = EvaluatorGRPCServicer(get_modyn_config(), pathlib.Path(modyn_temp)) - - macro_f1_config = F1ScoreMetricConfig( - evaluation_transformer_function="", - num_classes=2, - average="macro", - ).model_dump_json() - - micro_f1_config = F1ScoreMetricConfig( - evaluation_transformer_function="", - num_classes=2, - average="micro", - ).model_dump_json() - - # not double macro, but macro and micro work - metrics = evaluator._setup_metrics([macro_f1_config, micro_f1_config, macro_f1_config]) - - assert len(metrics) == 2 - assert isinstance(metrics[0], F1Score) - assert isinstance(metrics[1], F1Score) - assert metrics[0].config.average == "macro" - assert metrics[1].config.average == "micro" - assert metrics[0].get_name() == "F1-macro" - assert metrics[1].get_name() == "F1-micro" - - @patch.object(EvaluatorGRPCServicer, "connect_to_storage", return_value=DummyStorageStub()) @patch.object(EvaluatorGRPCServicer, "connect_to_model_storage", return_value=DummyModelStorageStub()) def test_get_evaluation_status_not_registered(test_connect_to_model_storage, test_connect_to_storage): @@ -369,11 +358,9 @@ def test_get_evaluation_status_not_registered(test_connect_to_model_storage, tes @patch.object(EvaluatorGRPCServicer, "connect_to_storage", return_value=DummyStorageStub()) @patch.object(EvaluatorGRPCServicer, "connect_to_model_storage", return_value=DummyModelStorageStub()) @patch.object(mp.Process, "is_alive", return_value=True) -@patch.object(EvaluatorGRPCServicer, "_get_status", return_value=(3, 50)) @patch.object(EvaluatorGRPCServicer, "_check_for_evaluation_exception") def test_get_evaluation_status_alive( test_check_for_evaluation_exception, - test_get_status, test_is_alive, test_connect_to_model_storage, test_connect_to_storage, @@ -386,54 +373,22 @@ def test_get_evaluation_status_alive( response = evaluator.get_evaluation_status(EvaluationStatusRequest(evaluation_id=0), None) assert response.valid assert response.is_running - assert not response.blocked - assert not response.HasField("exception") - assert response.state_available - assert response.HasField("batches_seen") and response.HasField("samples_seen") - assert response.batches_seen == 3 - assert response.samples_seen == 50 - test_check_for_evaluation_exception.assert_not_called() - - -@patch.object(EvaluatorGRPCServicer, "connect_to_storage", return_value=DummyStorageStub()) -@patch.object(EvaluatorGRPCServicer, "connect_to_model_storage", return_value=DummyModelStorageStub()) -@patch.object(mp.Process, "is_alive", return_value=True) -@patch.object(EvaluatorGRPCServicer, "_get_status", return_value=(None, None)) -@patch.object(EvaluatorGRPCServicer, "_check_for_evaluation_exception") -def test_get_evaluation_status_alive_blocked( - test_check_for_evaluation_exception, - test_get_status, - test_is_alive, - test_connect_to_model_storage, - test_connect_to_storage, -): - with tempfile.TemporaryDirectory() as modyn_temp: - evaluator = EvaluatorGRPCServicer(get_modyn_config(), pathlib.Path(modyn_temp)) - evaluator._evaluation_process_dict[1] = get_evaluation_process_info() - evaluator._evaluation_dict[1] = None - - response = evaluator.get_evaluation_status(EvaluationStatusRequest(evaluation_id=1), None) - assert response.valid - assert response.is_running - assert response.blocked - assert not response.state_available - assert not response.HasField("exception") - assert not (response.HasField("batches_seen") or response.HasField("samples_seen")) test_check_for_evaluation_exception.assert_not_called() @patch.object(EvaluatorGRPCServicer, "connect_to_storage", return_value=DummyStorageStub()) @patch.object(EvaluatorGRPCServicer, "connect_to_model_storage", return_value=DummyModelStorageStub()) @patch.object(mp.Process, "is_alive", return_value=False) -@patch.object(EvaluatorGRPCServicer, "_check_for_evaluation_exception", return_value="exception") -@patch.object(EvaluatorGRPCServicer, "_get_status") -def test_get_evaluation_status_finished_with_exception( - test_get_status, +@patch.object(EvaluatorGRPCServicer, "_check_for_evaluation_exception") +@pytest.mark.parametrize("exception", [None, "exception"]) +def test_get_evaluation_status_finished( test_check_for_evaluation_exception, test_is_alive, test_connect_to_model_storage, test_connect_to_storage, + exception: str, ): + test_check_for_evaluation_exception.return_value = exception with tempfile.TemporaryDirectory() as modyn_temp: evaluator = EvaluatorGRPCServicer(get_modyn_config(), pathlib.Path(modyn_temp)) evaluator._evaluation_process_dict[2] = get_evaluation_process_info() @@ -442,52 +397,11 @@ def test_get_evaluation_status_finished_with_exception( response = evaluator.get_evaluation_status(EvaluationStatusRequest(evaluation_id=2), None) assert response.valid assert not response.is_running - assert not response.blocked - assert not response.state_available - assert not (response.HasField("batches_seen") or response.HasField("samples_seen")) - assert response.HasField("exception") - assert response.exception == "exception" - test_get_status.assert_not_called() - - -@patch.object(EvaluatorGRPCServicer, "connect_to_storage", return_value=DummyStorageStub()) -@patch.object(EvaluatorGRPCServicer, "connect_to_model_storage", return_value=DummyModelStorageStub()) -def test_get_evaluation_status(test_connect_to_model_storage, test_connect_to_storage): - with tempfile.TemporaryDirectory() as modyn_temp: - evaluator = EvaluatorGRPCServicer(get_modyn_config(), pathlib.Path(modyn_temp)) - state_dict = {"num_batches": 10, "num_samples": 100} - - evaluation_process_info = get_evaluation_process_info() - evaluator._evaluation_process_dict[1] = evaluation_process_info - evaluation_process_info.status_response_queue.put(state_dict) - num_batches, num_samples = evaluator._get_status(1) - assert num_batches == state_dict["num_batches"] - assert num_samples == state_dict["num_samples"] - - timeout = 5 - elapsed = 0 - - while True: - if not platform.system() == "Darwin": - if evaluation_process_info.status_query_queue.qsize() == 1: - break - else: - if not evaluation_process_info.status_query_queue.empty(): - break - - sleep(0.1) - elapsed += 0.1 - - if elapsed >= timeout: - raise AssertionError("Did not reach desired queue state after 5 seconds.") - - if not platform.system() == "Darwin": - assert evaluation_process_info.status_response_queue.qsize() == 0 + if exception is None: + assert not response.HasField("exception") else: - assert evaluation_process_info.status_response_queue.empty() - - query = evaluation_process_info.status_query_queue.get() - assert query == EvaluatorMessages.STATUS_QUERY_MESSAGE + assert response.HasField("exception") + assert response.exception == exception @patch.object(EvaluatorGRPCServicer, "connect_to_storage", return_value=DummyStorageStub()) @@ -515,6 +429,58 @@ def test_check_for_evaluation_exception_found(test_connect_to_model_storage, tes assert child_exception == exception_msg +def fake_evaluate( + evaluation_info: EvaluationInfo, log_path: pathlib.Path, exception_queue: mp.Queue, metric_result_queue: mp.Queue +): + num_evals = len(evaluation_info.not_failed_interval_ids) + for idx_idx, interval_idx in enumerate(evaluation_info.not_failed_interval_ids): + if idx_idx == num_evals - 1: + exception_queue.put("A fake exception for dataloader!") + else: + metric_result_queue.put((interval_idx, [("Accuracy", 0.5), ("F1Score", 0.6)])) + + +def fake_process_start(self): + # let the target function execute directly instead of starting a new process + self._target(*self._args, **self._kwargs) + + +@patch.object(EvaluatorGRPCServicer, "connect_to_model_storage", return_value=DummyModelStorageStub()) +@patch.object(EvaluatorGRPCServicer, "connect_to_storage", return_value=DummyStorageStub()) +@patch.object(mp.Process, "start", fake_process_start) +@patch("modyn.evaluator.internal.grpc.evaluator_grpc_servicer.evaluate", fake_evaluate) +def test__run_evaluation_retain_metrics_before_real_exception(test_connect_to_storage, test_connect_to_model_storage): + evaluation_id = 0 + modyn_config = get_modyn_config() + intervals = [(None, 100), (100, 200)] + exception_msg = "A fake exception for dataloader!" + evaluation_info = get_evaluation_info(evaluation_id, pathlib.Path("trained.model"), modyn_config, intervals) + with tempfile.TemporaryDirectory() as modyn_temp: + evaluator = EvaluatorGRPCServicer(get_modyn_config(), pathlib.Path(modyn_temp)) + evaluator._evaluation_dict[evaluation_id] = evaluation_info + evaluator._run_evaluation(evaluation_id) + + get_status_req = EvaluationStatusRequest(evaluation_id=evaluation_id) + get_status_resp = evaluator.get_evaluation_status(get_status_req, None) + # since now, it's single-process execution, the evaluation is finished + assert not get_status_resp.is_running + assert get_status_resp.valid + assert get_status_resp.HasField("exception") + assert get_status_resp.exception == exception_msg + + get_result_req = EvaluationResultRequest(evaluation_id=evaluation_id) + get_result_resp = evaluator.get_evaluation_result(get_result_req, None) + assert get_result_resp.valid + # evaluation on the last interval was not finished + assert len(get_result_resp.evaluation_results) == len(intervals) - 1 + assert get_result_resp.evaluation_results[0].interval_index == 0 + assert len(get_result_resp.evaluation_results[0].evaluation_data) == 2 + assert get_result_resp.evaluation_results[0].evaluation_data[0].metric == "Accuracy" + assert get_result_resp.evaluation_results[0].evaluation_data[0].result == pytest.approx(0.5) + assert get_result_resp.evaluation_results[0].evaluation_data[1].metric == "F1Score" + assert get_result_resp.evaluation_results[0].evaluation_data[1].result == pytest.approx(0.6) + + @patch.object(EvaluatorGRPCServicer, "connect_to_storage", return_value=DummyStorageStub()) @patch.object(EvaluatorGRPCServicer, "connect_to_model_storage", return_value=DummyModelStorageStub()) def test_get_evaluation_result_model_not_registered(test_connect_to_model_storage, test_connect_to_storage): @@ -538,20 +504,27 @@ def test_get_evaluation_result_still_running(test_is_alive, test_connect_to_mode @patch.object(EvaluatorGRPCServicer, "connect_to_storage", return_value=DummyStorageStub()) @patch.object(EvaluatorGRPCServicer, "connect_to_model_storage", return_value=DummyModelStorageStub()) @patch.object(mp.Process, "is_alive", return_value=False) -def test_get_evaluation_result_missing_metric(test_is_alive, test_connect_to_model_storage, test_connect_to_storage): +def test_get_evaluation_result_incomplete_metric(test_is_alive, test_connect_to_model_storage, test_connect_to_storage): with tempfile.TemporaryDirectory() as modyn_temp: evaluator = EvaluatorGRPCServicer(get_modyn_config(), pathlib.Path(modyn_temp)) evaluation_process_info = get_evaluation_process_info() evaluator._evaluation_process_dict[3] = evaluation_process_info config = get_modyn_config() - evaluator._evaluation_dict[3] = get_evaluation_info(3, pathlib.Path("trained.model"), config) + evaluator._evaluation_dict[3] = get_evaluation_info( + 3, pathlib.Path("trained.model"), config, intervals=((None, 100), (100, 200)) + ) + # though we have two intervals, one metric result is available because of exception + metric_result_queue = evaluation_process_info.metric_result_queue + metric_result_queue.put((1, [("Accuracy", 0.5)])) response = evaluator.get_evaluation_result(EvaluationResultRequest(evaluation_id=3), None) assert response.valid - assert len(response.evaluation_data) == 0 + assert len(response.evaluation_results) == 1 + assert response.evaluation_results[0].interval_index == 1 + assert len(response.evaluation_results[0].evaluation_data) == 1 + assert response.evaluation_results[0].evaluation_data[0].result == pytest.approx(0.5) + assert response.evaluation_results[0].evaluation_data[0].metric == "Accuracy" -@patch.object(Accuracy, "get_evaluation_result", return_value=0.5) -@patch.object(F1Score, "get_evaluation_result", return_value=0.75) @patch.object(EvaluatorGRPCServicer, "connect_to_storage", return_value=DummyStorageStub()) @patch.object(EvaluatorGRPCServicer, "connect_to_model_storage", return_value=DummyModelStorageStub()) @patch.object(mp.Process, "is_alive", return_value=False) @@ -559,29 +532,26 @@ def test_get_evaluation_result( test_is_alive, test_connect_to_model_storage, test_connect_to_storage, - test_f1: MagicMock, - test_acc: MagicMock, ): + intervals = [(4, None), (None, 8), (4, 8)] with tempfile.TemporaryDirectory() as temp: config = get_modyn_config() evaluator = EvaluatorGRPCServicer(config, pathlib.Path(temp)) - evaluator._evaluation_dict[1] = get_evaluation_info(1, pathlib.Path(temp) / "trained_model.modyn", config) - - assert len(evaluator._evaluation_dict[1].metrics) == 2 - assert isinstance(evaluator._evaluation_dict[1].metrics[0], Accuracy) - assert isinstance(evaluator._evaluation_dict[1].metrics[1], F1Score) + evaluator._evaluation_dict[1] = get_evaluation_info( + 1, pathlib.Path(temp) / "trained_model.modyn", config, intervals=intervals + ) evaluation_process_info = get_evaluation_process_info() evaluator._evaluation_process_dict[1] = evaluation_process_info - for metric in evaluator._evaluation_dict[1].metrics: - evaluation_process_info.metric_result_queue.put((metric.get_name(), metric.get_evaluation_result())) - + expected_metric_results = [(0.5, 0.6), (0.72, 0.75), (0.3, 0.4)] + for idx, (accuracy, f1score) in enumerate(expected_metric_results): + evaluation_process_info.metric_result_queue.put((idx, [("Accuracy", accuracy), ("F1Score", f1score)])) timeout = 5 elapsed = 0 while True: if not platform.system() == "Darwin": - if evaluation_process_info.metric_result_queue.qsize() == 2: + if evaluation_process_info.metric_result_queue.qsize() == len(intervals): break else: if not evaluation_process_info.metric_result_queue.empty(): @@ -595,16 +565,17 @@ def test_get_evaluation_result( response = evaluator.get_evaluation_result(EvaluationResultRequest(evaluation_id=1), None) assert response.valid - assert len(response.evaluation_data) == 2 - assert response.evaluation_data[0].metric == Accuracy(AccuracyMetricConfig()).get_name() - assert response.evaluation_data[0].result == 0.5 - test_acc.assert_called_once() - assert ( - response.evaluation_data[1].metric - == F1Score(F1ScoreMetricConfig(num_classes=2, average="macro")).get_name() - ) - assert response.evaluation_data[1].result == 0.75 - test_f1.assert_called_once() + assert len(response.evaluation_results) == len(intervals) + + for expected_interval_idx, (single_eval_data, expected_single_metric_results) in enumerate( + zip(response.evaluation_results, expected_metric_results) + ): + assert single_eval_data.interval_index == expected_interval_idx + assert len(single_eval_data.evaluation_data) == 2 + assert single_eval_data.evaluation_data[0].metric == "Accuracy" + assert single_eval_data.evaluation_data[1].metric == "F1Score" + assert single_eval_data.evaluation_data[0].result == pytest.approx(expected_single_metric_results[0]) + assert single_eval_data.evaluation_data[1].result == pytest.approx(expected_single_metric_results[1]) @patch.object(mp.Process, "is_alive", side_effect=[False, True, False, True, True]) diff --git a/modyn/tests/evaluator/internal/test_pytorch_evaluator.py b/modyn/tests/evaluator/internal/test_pytorch_evaluator.py index 3f492dfbf..c3b3e5922 100644 --- a/modyn/tests/evaluator/internal/test_pytorch_evaluator.py +++ b/modyn/tests/evaluator/internal/test_pytorch_evaluator.py @@ -2,22 +2,21 @@ import logging import multiprocessing as mp import pathlib -import platform import tempfile -from time import sleep from typing import Generator -from unittest.mock import MagicMock, patch +from unittest.mock import ANY, MagicMock, call, patch import pytest import torch +from modyn.config import F1ScoreMetricConfig from modyn.config.schema.pipeline import AccuracyMetricConfig -from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import DatasetInfo, EvaluateModelRequest +from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import DatasetInfo, EvaluateModelRequest, EvaluationInterval from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import JsonString as EvaluatorJsonString from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import PythonString -from modyn.evaluator.internal.metrics import AbstractEvaluationMetric, Accuracy +from modyn.evaluator.internal.metrics import Accuracy, F1Score from modyn.evaluator.internal.pytorch_evaluator import PytorchEvaluator -from modyn.evaluator.internal.utils import EvaluationInfo, EvaluatorMessages -from modyn.storage.internal.grpc.generated.storage_pb2_grpc import StorageStub +from modyn.evaluator.internal.utils import EvaluationInfo +from pydantic import ValidationError from torch.utils.data import IterableDataset @@ -30,10 +29,7 @@ def get_mock_bytes_parser(): def get_mock_label_transformer(): - return ( - "import torch\ndef label_transformer_function(x: torch.Tensor) -> " - "torch.Tensor:\n\treturn x.to(torch.float32)" - ) + return "import torch\ndef label_transformer_function(x: torch.Tensor) -> " "torch.Tensor:\n\treturn x" def get_mock_accuracy_transformer(): @@ -50,7 +46,7 @@ def __init__(self) -> None: self._weight = torch.nn.Parameter(torch.ones(1)) def forward(self, data): - return data * 2 + return torch.zeros_like(data) class MockModelWrapper: @@ -67,113 +63,104 @@ def train(self) -> None: class MockEvaluationDataset(IterableDataset): - # pylint: disable=abstract-method - - def __init__( - self, - dataset_id, - bytes_parser, - transform_list, - storage_address, - evaluation_id, - start_timestamp, - end_timestamp, - tokenizer, - ): - self.dataset = iter([(key, key, key * 2) for key in range(100)]) + + def __init__(self, input_to_output_func=lambda x: 0): + self.dataset = iter( + [ + (key, torch.tensor((key,)), torch.tensor((input_to_output_func(key),), dtype=torch.int)) + for key in range(100) + ] + ) def __iter__(self) -> Generator: yield from self.dataset +EVALUATION_INTERVALS = [ + EvaluationInterval(), + EvaluationInterval(end_timestamp=100), + EvaluationInterval(start_timestamp=100, end_timestamp=200), + EvaluationInterval(start_timestamp=150, end_timestamp=250), + EvaluationInterval(start_timestamp=200), +] + + @patch("modyn.evaluator.internal.utils.evaluation_info.dynamic_module_import") def get_evaluation_info( evaluation_id: int, storage_address: str, - metrics: [AbstractEvaluationMetric], + metrics: list[EvaluatorJsonString], trained_model_path: pathlib.Path, label_transformer: bool, + not_failed_interval_ids: list[int], model_dynamic_module_patch: MagicMock, -) -> None: +) -> EvaluationInfo: model_dynamic_module_patch.return_value = MockModule() request = EvaluateModelRequest( model_id=1, - dataset_info=DatasetInfo(dataset_id="MNIST", num_dataloaders=1), + dataset_info=DatasetInfo( + dataset_id="MNIST", + num_dataloaders=1, + evaluation_intervals=EVALUATION_INTERVALS, + ), device="cpu", batch_size=4, - metrics=[ - EvaluatorJsonString( - value=AccuracyMetricConfig( - evaluation_transformer_function=get_mock_accuracy_transformer() - ).model_dump_json() - ) - ], + metrics=metrics, transform_list=[], bytes_parser=PythonString(value=get_mock_bytes_parser()), label_transformer=PythonString(value=get_mock_label_transformer() if label_transformer else ""), ) - return EvaluationInfo(request, evaluation_id, "model", "{}", False, storage_address, metrics, trained_model_path) + return EvaluationInfo( + request, evaluation_id, "model", "{}", False, storage_address, trained_model_path, not_failed_interval_ids + ) -@patch.object(StorageStub, "__init__", noop_constructor_mock) def get_mock_evaluator( - query_queue: mp.Queue, - response_queue: mp.Queue, - metric_result_queue: mp.Queue, - trained_model_path: pathlib.Path, + trained_model_path: str, label_transformer: bool, -) -> None: + metric_queue: mp.Queue = mp.Queue(), + not_failed_interval_ids=None, + metric_jsons=None, +) -> PytorchEvaluator: + if not_failed_interval_ids is None: + not_failed_interval_ids = [0, 1, 2, 3] + if metric_jsons is None: + metric_jsons = [ + AccuracyMetricConfig(evaluation_transformer_function=get_mock_accuracy_transformer()).model_dump_json() + ] + proto_metrics = [EvaluatorJsonString(value=metric_json) for metric_json in metric_jsons] evaluation_info = get_evaluation_info( - 1, - "storage:5000", - [Accuracy(AccuracyMetricConfig(evaluation_transformer_function=get_mock_accuracy_transformer()))], - trained_model_path, - label_transformer, - ) - evaluator = PytorchEvaluator( - evaluation_info, query_queue, response_queue, metric_result_queue, logging.getLogger(__name__) + 1, "storage:5000", proto_metrics, pathlib.Path(trained_model_path), label_transformer, not_failed_interval_ids ) + evaluator = PytorchEvaluator(evaluation_info, logging.getLogger(__name__), metric_queue) return evaluator @patch.object(PytorchEvaluator, "_load_state") def test_evaluator_init(load_state_mock: MagicMock) -> None: - evaluator: PytorchEvaluator = get_mock_evaluator(mp.Queue(), mp.Queue(), mp.Queue(), "trained_model.modyn", True) + evaluator: PytorchEvaluator = get_mock_evaluator("trained_model.modyn", True) assert isinstance(evaluator._model, MockModelWrapper) assert isinstance(evaluator._model.model, MockModel) - assert len(evaluator._metrics) == 1 - assert isinstance(evaluator._metrics[0], Accuracy) - assert evaluator._dataloader is not None - assert torch.all( - torch.eq( - evaluator._metrics[0].evaluation_transformer_function(torch.ones(5)), - torch.ones(5), - ) - ) + assert evaluator._evaluation_id == 1 assert torch.all(torch.eq(evaluator._label_transformer_function(torch.ones(5) * 2) + 0.5, torch.ones(5) * 2 + 0.5)) assert evaluator._device == "cpu" assert evaluator._device_type == "cpu" assert not evaluator._amp - assert evaluator._num_samples == 0 - load_state_mock.assert_called_once_with("trained_model.modyn") + load_state_mock.assert_called_once_with(pathlib.Path("trained_model.modyn")) @patch.object(PytorchEvaluator, "_load_state") def test_no_transform_evaluator_init(load_state_mock: MagicMock): - evaluator: PytorchEvaluator = get_mock_evaluator(mp.Queue(), mp.Queue(), mp.Queue(), "trained_model.modyn", False) + evaluator: PytorchEvaluator = get_mock_evaluator("trained_model.modyn", False) assert isinstance(evaluator._model, MockModelWrapper) assert isinstance(evaluator._model.model, MockModel) - assert len(evaluator._metrics) == 1 - assert isinstance(evaluator._metrics[0], Accuracy) - assert evaluator._dataloader is not None assert not evaluator._label_transformer_function assert evaluator._device == "cpu" assert evaluator._device_type == "cpu" assert not evaluator._amp - assert evaluator._num_samples == 0 - load_state_mock.assert_called_once_with("trained_model.modyn") + load_state_mock.assert_called_once_with(pathlib.Path("trained_model.modyn")) def test_load_model(): @@ -184,108 +171,93 @@ def test_load_model(): torch.save(dict_to_save, model_path) - evaluator: PytorchEvaluator = get_mock_evaluator(mp.Queue(), mp.Queue(), mp.Queue(), model_path, False) + evaluator: PytorchEvaluator = get_mock_evaluator(str(model_path), False) assert evaluator._model.model.state_dict() == dict_to_save["model"] - assert torch.all(torch.eq(evaluator._model.model(torch.ones(4)), torch.ones(4) * 2)) assert not model_path.is_file() +@patch.object( + PytorchEvaluator, + "_prepare_dataloader", + side_effect=[ + # all samples are correctly labeled + MockEvaluationDataset(), + # only the first sample 0 is correctly labeled + MockEvaluationDataset(lambda x: 0 if x == 0 else 1), + # no samples are correctly labeled + MockEvaluationDataset(lambda x: 1), + # half of the samples are correctly labeled + MockEvaluationDataset(lambda x: x % 2), + ], +) @patch.object(PytorchEvaluator, "_load_state") -def test_send_status_to_server(load_state_mock: MagicMock): - response_queue = mp.Queue() - evaluator: PytorchEvaluator = get_mock_evaluator( - mp.Queue(), response_queue, mp.Queue(), "trained_model.modyn", True - ) - - evaluator.send_status_to_server(20) - response = response_queue.get() - assert response["num_batches"] == 20 - assert response["num_samples"] == 0 - - -@patch("modyn.evaluator.internal.pytorch_evaluator.EvaluationDataset", MockEvaluationDataset) -@patch.object(PytorchEvaluator, "_load_state") -def test_evaluate_invalid_query_message(load_state_mock: MagicMock): - query_status_queue = mp.Queue() - response_queue = mp.Queue() +def test_evaluate(_load_state_mock: MagicMock, prepare_dataloader_mock: MagicMock): + metric_queue = mp.Queue() + not_failed_interval_ids = [0, 1, 2, 4] + metric_jsons = [ + AccuracyMetricConfig(evaluation_transformer_function=get_mock_accuracy_transformer()).model_dump_json(), + F1ScoreMetricConfig(num_classes=2).model_dump_json(), + ] evaluator: PytorchEvaluator = get_mock_evaluator( - query_status_queue, response_queue, mp.Queue(), "trained_model.modyn", True + "trained_model.modyn", True, metric_queue, not_failed_interval_ids, metric_jsons ) + evaluator.evaluate() - query_status_queue.put("INVALID MESSAGE") - timeout = 5 - elapsed = 0 - while query_status_queue.empty(): - sleep(0.1) - elapsed += 0.1 - - if elapsed >= timeout: - raise TimeoutError("Did not reach desired queue state within time limit.") - - with pytest.raises(ValueError): - evaluator.evaluate() - - elapsed = 0 - while not (query_status_queue.empty() and response_queue.empty()): - sleep(0.1) - elapsed += 0.1 - - if elapsed >= timeout: - raise TimeoutError("Did not reach desired queue state within time limit.") - - -@patch("modyn.evaluator.internal.pytorch_evaluator.EvaluationDataset", MockEvaluationDataset) -@patch.object(PytorchEvaluator, "_load_state") -def test_evaluate(load_state_mock: MagicMock): - query_status_queue = mp.Queue() - response_queue = mp.Queue() - metric_result_queue = mp.Queue() - evaluator: PytorchEvaluator = get_mock_evaluator( - query_status_queue, response_queue, metric_result_queue, "trained_model.modyn", True + prepare_dataloader_mock.assert_has_calls( + [ + call(ANY, None, None), + call(ANY, None, 100), + call(ANY, 100, 200), + call(ANY, 200, None), + ] ) - query_status_queue.put(EvaluatorMessages.STATUS_QUERY_MESSAGE) - timeout = 2 - elapsed = 0 - - while query_status_queue.empty(): - sleep(0.1) - elapsed += 0.1 - - if elapsed >= timeout: - raise TimeoutError("Did not reach desired queue state within timelimit.") - - evaluator.evaluate() - assert evaluator._num_samples == 100 - elapsed = 0 - while not query_status_queue.empty(): - sleep(0.1) - elapsed += 0.1 - - if elapsed >= timeout: - raise TimeoutError("Did not reach desired queue state within timelimit.") - - elapsed = 0 - while True: - if not platform.system() == "Darwin": - if response_queue.qsize() == 1 and metric_result_queue.qsize() == 1: - break - else: - if not response_queue.empty() and not metric_result_queue.empty(): - break - - sleep(0.1) - elapsed += 0.1 - - if elapsed >= timeout: - raise AssertionError("Did not reach desired queue state after 5 seconds.") - - status = response_queue.get() - assert status["num_batches"] == 0 - assert status["num_samples"] == 0 - - # accuracy - metric_name, metric_result = metric_result_queue.get() - assert metric_name == Accuracy(AccuracyMetricConfig()).get_name() - assert metric_result == pytest.approx(1) + expected_accuracies = [1.0, 0.01, 0.0, 0.5] + expected_f1scores = [0.5, ANY, 0.0, 1 / 3] + for idx, accuracy, f1score in zip(not_failed_interval_ids, expected_accuracies, expected_f1scores): + # the accuracies are only correctly calculated if we correctly reset the + res = metric_queue.get() + assert res == (idx, [("Accuracy", pytest.approx(accuracy)), ("F1-macro", pytest.approx(f1score))]) + + +def test__setup_metrics(): + acc_metric_config = AccuracyMetricConfig().model_dump_json() + metrics = PytorchEvaluator._setup_metrics([acc_metric_config]) + + assert len(metrics) == 1 + assert isinstance(metrics[0], Accuracy) + unknown_metric_config = '{"name": "UnknownMetric", "config": "", "evaluation_transformer_function": ""}' + with pytest.raises(ValidationError): + PytorchEvaluator._setup_metrics([unknown_metric_config]) + + f1score_metric_config = F1ScoreMetricConfig(num_classes=2).model_dump_json() + metrics = PytorchEvaluator._setup_metrics([acc_metric_config, acc_metric_config, f1score_metric_config]) + assert len(metrics) == 2 + assert isinstance(metrics[0], Accuracy) + assert isinstance(metrics[1], F1Score) + + +def test__setup_metrics_multiple_f1(): + macro_f1_config = F1ScoreMetricConfig( + evaluation_transformer_function="", + num_classes=2, + average="macro", + ).model_dump_json() + + micro_f1_config = F1ScoreMetricConfig( + evaluation_transformer_function="", + num_classes=2, + average="micro", + ).model_dump_json() + + # not double macro, but macro and micro work + metrics = PytorchEvaluator._setup_metrics([macro_f1_config, micro_f1_config, macro_f1_config]) + + assert len(metrics) == 2 + assert isinstance(metrics[0], F1Score) + assert isinstance(metrics[1], F1Score) + assert metrics[0].config.average == "macro" + assert metrics[1].config.average == "micro" + assert metrics[0].get_name() == "F1-macro" + assert metrics[1].get_name() == "F1-micro" diff --git a/modyn/tests/supervisor/internal/eval/result_writer/test_abstract_evaluation_result_writer.py b/modyn/tests/supervisor/internal/eval/result_writer/test_abstract_evaluation_result_writer.py deleted file mode 100644 index 2672d98c6..000000000 --- a/modyn/tests/supervisor/internal/eval/result_writer/test_abstract_evaluation_result_writer.py +++ /dev/null @@ -1,10 +0,0 @@ -import pathlib - -from modyn.supervisor.internal.eval.result_writer import JsonResultWriter - - -def test_init(): - writer = JsonResultWriter(10, 15, pathlib.Path("")) - assert writer.pipeline_id == 10 - assert writer.trigger_id == 15 - assert str(writer.eval_directory) == "." diff --git a/modyn/tests/supervisor/internal/eval/result_writer/test_json_result_writer.py b/modyn/tests/supervisor/internal/eval/result_writer/test_json_result_writer.py deleted file mode 100644 index 737beff02..000000000 --- a/modyn/tests/supervisor/internal/eval/result_writer/test_json_result_writer.py +++ /dev/null @@ -1,38 +0,0 @@ -import json -import pathlib -import tempfile - -# pylint: disable=no-name-in-module -from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import EvaluationData -from modyn.supervisor.internal.eval.result_writer import DedicatedJsonResultWriter - - -def test_json_writer(): - with tempfile.TemporaryDirectory() as path: - eval_dir = pathlib.Path(path) - writer = DedicatedJsonResultWriter(10, 15, eval_dir) - writer.add_evaluation_data("mnist", 1000, [EvaluationData(metric="Accuracy", result=0.5)]) - writer.store_results() - - file_path = eval_dir / f"{10}_{15}.eval" - assert file_path.exists() and file_path.is_file() - - with open(file_path, "r", encoding="utf-8") as eval_file: - evaluation_results = json.load(eval_file) - assert evaluation_results == json.loads( - """{ - "datasets": [ - { - "mnist": { - "dataset_size": 1000, - "metrics": [ - { - "name": "Accuracy", - "result": 0.5 - } - ] - } - } - ] - }""" - ) diff --git a/modyn/tests/supervisor/internal/eval/result_writer/test_tensorboard_result_writer.py b/modyn/tests/supervisor/internal/eval/result_writer/test_tensorboard_result_writer.py deleted file mode 100644 index f6b0ff183..000000000 --- a/modyn/tests/supervisor/internal/eval/result_writer/test_tensorboard_result_writer.py +++ /dev/null @@ -1,29 +0,0 @@ -import os -import pathlib -import tempfile -from unittest.mock import patch - -# pylint: disable=no-name-in-module -from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import EvaluationData -from modyn.supervisor.internal.eval.result_writer import TensorboardResultWriter - - -def test_tensorboard_writer(): - with tempfile.TemporaryDirectory() as path: - eval_dir = pathlib.Path(path) - writer = TensorboardResultWriter(10, 15, eval_dir) - writer.add_evaluation_data("mnist", 1000, [EvaluationData(metric="Accuracy", result=0.5)]) - writer.store_results() - - assert len(os.listdir(eval_dir)) == 1 - - with tempfile.TemporaryDirectory() as path: - eval_dir = pathlib.Path(path) - result_writer = TensorboardResultWriter(10, 15, eval_dir) - - with patch.object(result_writer.writer, "add_scalar") as add_method: - result_writer.add_evaluation_data("mnist", 1000, [EvaluationData(metric="Accuracy", result=0.5)]) - - assert add_method.call_args[0][0] == "pipeline_10/mnist/Accuracy" - assert add_method.call_args[0][1] == 0.5 - assert add_method.call_args[0][2] == 15 diff --git a/modyn/tests/supervisor/internal/pipeline_executor/test_evaluation_executor.py b/modyn/tests/supervisor/internal/pipeline_executor/test_evaluation_executor.py index 190870bc8..0f32d5dc8 100644 --- a/modyn/tests/supervisor/internal/pipeline_executor/test_evaluation_executor.py +++ b/modyn/tests/supervisor/internal/pipeline_executor/test_evaluation_executor.py @@ -1,5 +1,4 @@ import datetime -from multiprocessing import Queue from pathlib import Path from tempfile import TemporaryDirectory from typing import Any, Iterator @@ -13,7 +12,12 @@ from modyn.config.schema.pipeline.evaluation.handler import EvalHandlerConfig from modyn.config.schema.pipeline.evaluation.strategy.slicing import SlicingEvalStrategyConfig from modyn.config.schema.system.config import ModynConfig -from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import EvaluateModelResponse, EvaluationAbortedReason +from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import ( + EvaluateModelIntervalResponse, + EvaluateModelResponse, + EvaluationAbortedReason, + EvaluationIntervalData, +) from modyn.supervisor.internal.eval.handler import EvalHandler, EvalRequest from modyn.supervisor.internal.grpc.enums import PipelineStage from modyn.supervisor.internal.grpc_handler import GRPCHandler @@ -179,8 +183,6 @@ def test_evaluation_handler_post_training( model_id=-1, first_timestamp=1, last_timestamp=3, - pipeline_status_queue=Queue(), - eval_status_queue=Queue(), ) test_get_eval_requests_after_training.assert_called_once() @@ -198,70 +200,66 @@ def test_evaluation_handler_post_training( PipelineStage.STORE_TRAINED_MODEL.name: tracking_df, } ) - evaluation_executor.run_post_pipeline_evaluations(Queue()) + evaluation_executor.run_post_pipeline_evaluations() test_get_eval_requests_after_training.assert_not_called() test_get_eval_requests_after_pipeline.assert_called_once() test_launch_evaluations_async.assert_called_once() -@patch.object(EvaluationExecutor, "_single_evaluation") +@patch.object(EvaluationExecutor, "_single_batched_evaluation", return_value=("failure", {})) def test_launch_evaluations_async( test_single_evaluation: Any, evaluation_executor: EvaluationExecutor, dummy_stage_log: StageLog ) -> None: evaluation_executor._launch_evaluations_async( eval_requests=[dummy_eval_request(), dummy_eval_request()], parent_log=dummy_stage_log, - eval_status_queue=Queue(), ) assert test_single_evaluation.call_count == 2 @pytest.mark.parametrize("test_failure", [False, True]) @patch.object(GRPCHandler, "cleanup_evaluations") -@patch.object(GRPCHandler, "store_evaluation_results") +@patch.object(GRPCHandler, "get_evaluation_results", return_value=[EvaluationIntervalData()]) @patch.object(GRPCHandler, "wait_for_evaluation_completion") @patch.object(GRPCHandler, "prepare_evaluation_request") -def test_single_evaluation( +def test_single_batched_evaluation( test_prepare_evaluation_request: Any, test_wait_for_evaluation_completion: Any, - test_store_evaluation_results: Any, + test_get_evaluation_results: Any, test_cleanup_evaluations: Any, evaluation_executor: EvaluationExecutor, test_failure: bool, ) -> None: evaluator_stub_mock = mock.Mock(spec=["evaluate_model"]) if test_failure: - evaluator_stub_mock.evaluate_model.side_effect = [ - EvaluateModelResponse(evaluation_started=False, eval_aborted_reason=EvaluationAbortedReason.EMPTY_DATASET) - ] + evaluator_stub_mock.evaluate_model.return_value = EvaluateModelResponse( + evaluation_started=False, + interval_responses=[ + EvaluateModelIntervalResponse(eval_aborted_reason=EvaluationAbortedReason.EMPTY_DATASET) + ], + ) else: evaluator_stub_mock.evaluate_model.return_value = EvaluateModelResponse( - evaluation_started=True, evaluation_id=42, dataset_size=10 + evaluation_started=True, + evaluation_id=42, + interval_responses=[ + EvaluateModelIntervalResponse(eval_aborted_reason=EvaluationAbortedReason.NOT_ABORTED, dataset_size=10) + ], ) - stage_log = StageLog( - id="log", - id_seq_num=-1, - start=datetime.datetime(2021, 1, 1), - batch_idx=-1, - sample_idx=-1, - sample_time=-1, - trigger_idx=-1, - ) evaluation_executor.grpc.evaluator = evaluator_stub_mock - evaluation_executor._single_evaluation( - log=stage_log, - eval_req=dummy_eval_request(), - eval_status_queue=Queue(), + eval_req = dummy_eval_request() + evaluation_executor._single_batched_evaluation( + eval_req.interval_start, eval_req.interval_end, eval_req.id_model, eval_req.dataset_id ) test_prepare_evaluation_request.assert_called_once() if test_failure: test_wait_for_evaluation_completion.assert_not_called() - test_store_evaluation_results.assert_not_called() + test_get_evaluation_results.assert_not_called() test_cleanup_evaluations.assert_not_called() else: test_wait_for_evaluation_completion.assert_called_once() - test_store_evaluation_results.assert_called_once() + test_get_evaluation_results.assert_called_once() test_cleanup_evaluations.assert_called_once() diff --git a/modyn/tests/supervisor/internal/pipeline_executor/test_pipeline_executor.py b/modyn/tests/supervisor/internal/pipeline_executor/test_pipeline_executor.py index c5280fcaf..a4ee2b76c 100644 --- a/modyn/tests/supervisor/internal/pipeline_executor/test_pipeline_executor.py +++ b/modyn/tests/supervisor/internal/pipeline_executor/test_pipeline_executor.py @@ -15,7 +15,12 @@ from modyn.config.schema.system import DatasetsConfig, ModynConfig, SupervisorConfig # pylint: disable=no-name-in-module -from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import EvaluateModelResponse, EvaluationAbortedReason +from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import ( + EvaluateModelIntervalResponse, + EvaluateModelResponse, + EvaluationAbortedReason, + EvaluationIntervalData, +) from modyn.supervisor.internal.eval.strategies.abstract import EvalInterval from modyn.supervisor.internal.eval.strategies.slicing import SlicingEvalStrategy from modyn.supervisor.internal.grpc.enums import PipelineStage @@ -700,9 +705,9 @@ def test_run_training_set_num_samples_to_pass( @pytest.mark.parametrize("test_failure", [False, True]) @patch.object(GRPCHandler, "wait_for_evaluation_completion", return_value={"num_batches": 0, "num_samples": 0}) @patch.object(GRPCHandler, "cleanup_evaluations") -@patch.object(GRPCHandler, "store_evaluation_results") +@patch.object(GRPCHandler, "get_evaluation_results", return_value=[EvaluationIntervalData()]) def test__start_evaluations( - test_store_evaluation_results: MagicMock, + test_get_evaluation_results: MagicMock, test_cleanup_evaluations: MagicMock, test_wait_for_evaluation_completion: MagicMock, test_failure: bool, @@ -713,17 +718,22 @@ def test__start_evaluations( dummy_pipeline_args.pipeline_config.evaluation = pipeline_evaluation_config evaluator_stub_mock = mock.Mock(spec=["evaluate_model"]) - success_response = EvaluateModelResponse(evaluation_started=True, evaluation_id=42, dataset_size=10) + success_response = EvaluateModelResponse( + evaluation_started=True, + evaluation_id=42, + interval_responses=[ + EvaluateModelIntervalResponse(eval_aborted_reason=EvaluationAbortedReason.NOT_ABORTED, dataset_size=10) + ], + ) failure_response = EvaluateModelResponse( - evaluation_started=False, eval_aborted_reason=EvaluationAbortedReason.EMPTY_DATASET + evaluation_started=False, + interval_responses=[EvaluateModelIntervalResponse(eval_aborted_reason=EvaluationAbortedReason.EMPTY_DATASET)], ) # we let the second evaluation fail; it shouldn't affect the third evaluation if test_failure: evaluator_stub_mock.evaluate_model.side_effect = [success_response, failure_response, success_response] else: - evaluator_stub_mock.evaluate_model.return_value = EvaluateModelResponse( - evaluation_started=True, evaluation_id=42, dataset_size=10 - ) + evaluator_stub_mock.evaluate_model.return_value = success_response pe = get_non_connecting_pipeline_executor(dummy_pipeline_args) pe.grpc.evaluator = evaluator_stub_mock @@ -751,7 +761,6 @@ def get_eval_intervals(training_intervals: Iterable[tuple[int, int]]) -> Iterabl if test_failure: assert evaluator_stub_mock.evaluate_model.call_count == 3 - assert test_store_evaluation_results.call_count == 2 assert test_cleanup_evaluations.call_count == 2 assert test_wait_for_evaluation_completion.call_count == 2 @@ -773,8 +782,7 @@ def get_eval_intervals(training_intervals: Iterable[tuple[int, int]]) -> Iterabl eval_dataset_config.model_dump(by_alias=True), model_id, pipeline_evaluation_config.device, - start_ts or 0, - end_ts, + [(start_ts, end_ts)], ) ) for start_ts, end_ts in intervals diff --git a/modyn/tests/supervisor/internal/test_grpc_handler.py b/modyn/tests/supervisor/internal/test_grpc_handler.py index 301e57951..dbf6e43ed 100644 --- a/modyn/tests/supervisor/internal/test_grpc_handler.py +++ b/modyn/tests/supervisor/internal/test_grpc_handler.py @@ -1,18 +1,16 @@ # pylint: disable=unused-argument,no-value-for-parameter,no-name-in-module import json -import multiprocessing as mp -import pathlib -import tempfile from unittest.mock import patch import grpc import pytest from modyn.config.schema.pipeline import EvalDataConfig from modyn.evaluator.internal.grpc.generated.evaluator_pb2 import ( - EvaluationData, + EvaluationIntervalData, EvaluationResultRequest, EvaluationResultResponse, EvaluationStatusResponse, + SingleMetricResult, ) from modyn.evaluator.internal.grpc.generated.evaluator_pb2_grpc import EvaluatorStub from modyn.selector.internal.grpc.generated.selector_pb2 import ( @@ -32,9 +30,7 @@ GetNewDataSinceResponse, ) from modyn.storage.internal.grpc.generated.storage_pb2_grpc import StorageStub -from modyn.supervisor.internal.eval.result_writer import DedicatedJsonResultWriter from modyn.supervisor.internal.grpc_handler import GRPCHandler -from modyn.supervisor.internal.utils import EvaluationStatusReporter from modyn.trainer_server.internal.grpc.generated.trainer_server_pb2_grpc import TrainerServerStub @@ -325,12 +321,17 @@ def test_stop_training_at_trainer_server(): handler.stop_training_at_trainer_server(training_id) -def test_prepare_evaluation_request(): +@pytest.mark.parametrize("tokenizer", [None, "DistilBertTokenizerTransform"]) +def test_prepare_evaluation_request(tokenizer: str): dataset_config = get_minimal_dataset_config() - dataset_config.tokenizer = "DistilBertTokenizerTransform" - request = GRPCHandler.prepare_evaluation_request( - dataset_config.model_dump(by_alias=True), 23, "cpu", start_timestamp=42, end_timestamp=43 - ) + dataset_config.tokenizer = tokenizer + intervals = [ + (None, None), + (None, 42), + (42, None), + (42, 43), + ] + request = GRPCHandler.prepare_evaluation_request(dataset_config.model_dump(by_alias=True), 23, "cpu", intervals) assert request.model_id == 23 assert request.device == "cpu" @@ -338,9 +339,24 @@ def test_prepare_evaluation_request(): assert request.dataset_info.dataset_id == "MNIST_eval" assert request.dataset_info.num_dataloaders == 2 assert json.loads(str(request.metrics[0].value))["name"] == "Accuracy" - assert request.tokenizer.value == "DistilBertTokenizerTransform" - assert request.dataset_info.start_timestamp == 42 - assert request.dataset_info.end_timestamp == 43 + if tokenizer: + assert request.HasField("tokenizer") + assert request.tokenizer.value == "DistilBertTokenizerTransform" + else: + assert not request.HasField("tokenizer") + assert len(request.dataset_info.evaluation_intervals) == len(intervals) + for expected_interval, interval in zip(intervals, request.dataset_info.evaluation_intervals): + expected_start_ts = expected_interval[0] + expected_end_ts = expected_interval[1] + if expected_start_ts: + assert interval.start_timestamp == expected_start_ts + else: + assert not interval.HasField("start_timestamp") + + if expected_end_ts: + assert interval.end_timestamp == expected_end_ts + else: + assert not interval.HasField("end_timestamp") @patch("modyn.supervisor.internal.grpc_handler.grpc_connection_established", return_value=True) @@ -349,135 +365,66 @@ def test_wait_for_evaluation_completion(*args): handler = GRPCHandler(get_simple_config()) handler.init_cluster_connection() assert handler.evaluator is not None - eval_status_queue = mp.Queue() - evaluations = { - 1: EvaluationStatusReporter( - dataset_id="MNIST_small", dataset_size=1000, evaluation_id=1, eval_status_queue=eval_status_queue - ), - 2: EvaluationStatusReporter( - dataset_id="MNIST_big", dataset_size=5000, evaluation_id=2, eval_status_queue=eval_status_queue - ), - 3: EvaluationStatusReporter( - dataset_id="MNIST_large", dataset_size=10000, evaluation_id=3, eval_status_queue=eval_status_queue - ), - } with patch.object(handler.evaluator, "get_evaluation_status") as status_method: status_method.side_effect = [ + EvaluationStatusResponse(valid=True, is_running=True), EvaluationStatusResponse(valid=False), - EvaluationStatusResponse(valid=True, blocked=True), - EvaluationStatusResponse( - valid=True, blocked=False, is_running=True, state_available=True, batches_seen=10, samples_seen=5000 - ), - EvaluationStatusResponse(valid=True, blocked=False, exception="Error"), - EvaluationStatusResponse(valid=True, blocked=False, is_running=False, state_available=False), + EvaluationStatusResponse(valid=True, is_running=False), + ] + with pytest.raises(RuntimeError): + handler.wait_for_evaluation_completion(10) + assert status_method.call_count == 2 + + status_method.reset_mock() + status_method.side_effect = [ + EvaluationStatusResponse(valid=True, exception="Some error"), + EvaluationStatusResponse(valid=True, is_running=False), ] - handler.wait_for_evaluation_completion(10, evaluations) - assert status_method.call_count == 5 + assert not handler.wait_for_evaluation_completion(10) + assert status_method.call_count == 1 - # from call get args (call[0]) then get first argument - called_ids = [call[0][0].evaluation_id for call in status_method.call_args_list] - assert called_ids == [1, 2, 3, 2, 3] + status_method.reset_mock() + status_method.side_effect = [ + EvaluationStatusResponse(valid=True, is_running=True), + EvaluationStatusResponse(valid=True, is_running=False), + ] + assert handler.wait_for_evaluation_completion(10) + assert status_method.call_count == 2 @patch("modyn.supervisor.internal.grpc_handler.grpc_connection_established", return_value=True) @patch("modyn.common.grpc.grpc_helpers.grpc_connection_established", return_value=True) -def test_store_evaluation_results(*args): +def test_get_evaluation_results(*args): handler = GRPCHandler(get_simple_config()) handler.init_cluster_connection() assert handler.evaluator is not None - res = EvaluationResultResponse( - valid=True, - evaluation_data=[EvaluationData(metric="Accuracy", result=0.5), EvaluationData(metric="F1-score", result=0.75)], - ) - eval_status_queue = mp.Queue() - evaluations = { - 10: EvaluationStatusReporter( - dataset_id="MNIST_small", dataset_size=1000, evaluation_id=10, eval_status_queue=eval_status_queue + evaluation_data = [ + EvaluationIntervalData( + evaluation_data=[ + SingleMetricResult(metric="Accuracy", result=0.42), + SingleMetricResult(metric="Loss", result=0.13), + ] ), - 15: EvaluationStatusReporter( - dataset_id="MNIST_large", dataset_size=5000, evaluation_id=15, eval_status_queue=eval_status_queue + EvaluationIntervalData( + evaluation_data=[ + SingleMetricResult(metric="Accuracy", result=0.43), + SingleMetricResult(metric="Loss", result=0.14), + ] ), - } - - with tempfile.TemporaryDirectory() as path: - with patch.object(handler.evaluator, "get_evaluation_result", return_value=res) as get_method: - eval_dir = pathlib.Path(path) - handler.store_evaluation_results([DedicatedJsonResultWriter(5, 3, eval_dir)], evaluations) - assert get_method.call_count == 2 - - called_ids = [call[0][0].evaluation_id for call in get_method.call_args_list] - assert called_ids == [10, 15] - - file_path = eval_dir / f"{5}_{3}.eval" - assert file_path.exists() and file_path.is_file() - - with open(file_path, "r", encoding="utf-8") as eval_file: - evaluation_results = json.load(eval_file) - assert evaluation_results == json.loads( - """{ - "datasets": [ - { - "MNIST_small": { - "dataset_size": 1000, - "metrics": [ - { - "name": "Accuracy", - "result": 0.5 - }, - { - "name": "F1-score", - "result": 0.75 - } - ] - } - }, - { - "MNIST_large": { - "dataset_size": 5000, - "metrics": [ - { - "name": "Accuracy", - "result": 0.5 - }, - { - "name": "F1-score", - "result": 0.75 - } - ] - } - } - ] - }""" - ) - - -@patch("modyn.supervisor.internal.grpc_handler.grpc_connection_established", return_value=True) -@patch("modyn.common.grpc.grpc_helpers.grpc_connection_established", return_value=True) -def test_store_evaluation_results_invalid(*args): - handler = GRPCHandler(get_simple_config()) - handler.init_cluster_connection() - assert handler.evaluator is not None - - res = EvaluationResultResponse(valid=False) - - evaluations = { - 10: EvaluationStatusReporter( - dataset_id="MNIST_small", dataset_size=1000, evaluation_id=10, eval_status_queue=mp.Queue() + ] + with patch.object(handler.evaluator, "get_evaluation_result") as result_method: + result_method.return_value = EvaluationResultResponse( + evaluation_results=evaluation_data, + valid=True, ) - } - - with tempfile.TemporaryDirectory() as path: - with patch.object(handler.evaluator, "get_evaluation_result", return_value=res) as get_method: - eval_dir = pathlib.Path(path) - - handler.store_evaluation_results([DedicatedJsonResultWriter(5, 3, eval_dir)], evaluations) - get_method.assert_called_with(EvaluationResultRequest(evaluation_id=10)) - - file_path = eval_dir / f"{5}_{3}.eval" - assert file_path.exists() and file_path.is_file() - - with open(file_path, "r", encoding="utf-8") as eval_file: - evaluation_results = json.load(eval_file) - assert evaluation_results["datasets"] == [] + res = handler.get_evaluation_results(10) + result_method.assert_called_once_with(EvaluationResultRequest(evaluation_id=10)) + assert res == evaluation_data + result_method.reset_mock() + result_method.return_value = EvaluationResultResponse( + valid=False, + ) + with pytest.raises(RuntimeError): + handler.get_evaluation_results(15) diff --git a/modyn/tests/supervisor/internal/test_supervisor.py b/modyn/tests/supervisor/internal/test_supervisor.py index 6f0575e36..bda282c23 100644 --- a/modyn/tests/supervisor/internal/test_supervisor.py +++ b/modyn/tests/supervisor/internal/test_supervisor.py @@ -12,17 +12,12 @@ from modyn.config.schema.pipeline import ModynPipelineConfig from modyn.config.schema.system import ModynConfig, SupervisorConfig from modyn.metadata_database.utils import ModelStorageStrategyConfig -from modyn.supervisor.internal.eval.result_writer import JsonResultWriter, TensorboardResultWriter from modyn.supervisor.internal.grpc.enums import PipelineStatus from modyn.supervisor.internal.grpc_handler import GRPCHandler from modyn.supervisor.internal.supervisor import Supervisor from modyn.supervisor.internal.utils import PipelineInfo EVALUATION_DIRECTORY: pathlib.Path = pathlib.Path(os.path.realpath(__file__)).parent / "test_eval_dir" -SUPPORTED_EVAL_RESULT_WRITERS: dict = { - "json": JsonResultWriter, - "tensorboard": TensorboardResultWriter, -} START_TIMESTAMP = 21 PIPELINE_ID = 42 diff --git a/scripts/run_integrationtests.sh b/scripts/run_integrationtests.sh index f850fc320..d9fb60535 100755 --- a/scripts/run_integrationtests.sh +++ b/scripts/run_integrationtests.sh @@ -43,6 +43,8 @@ echo "SELECTOR" docker logs $(docker compose ps -q selector) echo "TRAINERSERVER" docker logs $(docker compose ps -q trainer_server) +echo "EVALUATOR" +docker logs $(docker compose ps -q evaluator) echo "LOGS END" # Cleanup