Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch evaluation intervals into a single request and a single evaluation process #554

Merged
merged 25 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix & add tests
  • Loading branch information
XianzheMa committed Jun 30, 2024
commit 49d456ecb4fab83ea99781bdd051e537ce98e229
7 changes: 3 additions & 4 deletions modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,12 @@ def evaluate_model(self, request: EvaluateModelRequest, context: grpc.ServicerCo
evaluation_started=False, eval_aborted_reason=EvaluationAbortedReason.MODEL_NOT_EXIST_IN_STORAGE
)

dataset_info = request.dataset_info
data_sizes: list[int] = []
for interval in request.dataset_info.evaluation_intervals:
dataset_size_req = GetDatasetSizeRequest(
dataset_id=request.dataset_info.dataset_id,
start_timestamp=interval.start_timestamp if dataset_info.HasField("start_timestamp") else None,
end_timestamp=interval.end_timestamp if dataset_info.HasField("end_timestamp") else None,
start_timestamp=interval.start_timestamp if interval.HasField("start_timestamp") else None,
end_timestamp=interval.end_timestamp if interval.HasField("end_timestamp") else None,
)
dataset_size_response: GetDatasetSizeResponse = self._storage_stub.GetDatasetSize(dataset_size_req)

Expand Down Expand Up @@ -191,7 +190,7 @@ def evaluate_model(self, request: EvaluateModelRequest, context: grpc.ServicerCo
self._run_evaluation(evaluation_id)

logger.info(f"Started evaluation {evaluation_id}.")
return EvaluateModelResponse(evaluation_started=True, evaluation_id=evaluation_id, dataset_size=data_sizes)
return EvaluateModelResponse(evaluation_started=True, evaluation_id=evaluation_id, dataset_sizes=data_sizes)

@staticmethod
def _setup_metrics(metric_configurations: list[str]) -> list[AbstractEvaluationMetric]:
Expand Down
44 changes: 22 additions & 22 deletions modyn/evaluator/internal/grpc/generated/evaluator_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions modyn/evaluator/internal/grpc/generated/evaluator_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -172,22 +172,22 @@ class EvaluateModelResponse(google.protobuf.message.Message):

EVALUATION_STARTED_FIELD_NUMBER: builtins.int
EVALUATION_ID_FIELD_NUMBER: builtins.int
DATASET_SIZE_FIELD_NUMBER: builtins.int
DATASET_SIZES_FIELD_NUMBER: builtins.int
EVAL_ABORTED_REASON_FIELD_NUMBER: builtins.int
evaluation_started: builtins.bool
evaluation_id: builtins.int
eval_aborted_reason: global___EvaluationAbortedReason.ValueType
@property
def dataset_size(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.int]: ...
def dataset_sizes(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.int]: ...
def __init__(
self,
*,
evaluation_started: builtins.bool = ...,
evaluation_id: builtins.int = ...,
dataset_size: collections.abc.Iterable[builtins.int] | None = ...,
dataset_sizes: collections.abc.Iterable[builtins.int] | None = ...,
eval_aborted_reason: global___EvaluationAbortedReason.ValueType = ...,
) -> None: ...
def ClearField(self, field_name: typing.Literal["dataset_size", b"dataset_size", "eval_aborted_reason", b"eval_aborted_reason", "evaluation_id", b"evaluation_id", "evaluation_started", b"evaluation_started"]) -> None: ...
def ClearField(self, field_name: typing.Literal["dataset_sizes", b"dataset_sizes", "eval_aborted_reason", b"eval_aborted_reason", "evaluation_id", b"evaluation_id", "evaluation_started", b"evaluation_started"]) -> None: ...

global___EvaluateModelResponse = EvaluateModelResponse

Expand Down
7 changes: 2 additions & 5 deletions modyn/evaluator/internal/pytorch_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ def __init__(self, evaluation_info: EvaluationInfo, logger: logging.Logger) -> N

self._info("Initialized PyTorch evaluator.")

@staticmethod
def _prepare_dataloader(
self, evaluation_info: EvaluationInfo, start_timestamp: Optional[int], end_timestamp: Optional[int]
evaluation_info: EvaluationInfo, start_timestamp: Optional[int], end_timestamp: Optional[int]
) -> torch.utils.data.DataLoader:
self._debug("Creating EvaluationDataset.")
dataset = EvaluationDataset(
evaluation_info.dataset_id,
evaluation_info.bytes_parser,
Expand All @@ -55,7 +55,6 @@ def _prepare_dataloader(
start_timestamp,
end_timestamp,
)
self._debug("Creating DataLoader.")
dataloader = torch.utils.data.DataLoader(
dataset,
batch_size=evaluation_info.batch_size,
Expand Down Expand Up @@ -142,8 +141,6 @@ def evaluate(self, metric_result_queue: mp.Queue) -> None:
dataloader = self._prepare_dataloader(self._eval_info, interval[0], interval[1])
self._single_interval_evaluate(dataloader)
metric_result = []

# We do this since we might also have just non-holistic metrics, in which case len(y_true) always is 0
for metric in self._metrics:
metric_result.append((metric.get_name(), metric.get_evaluation_result()))
metric_result_queue.put(metric_result)
Expand Down
2 changes: 1 addition & 1 deletion modyn/protos/evaluator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ message EvaluateModelRequest {
message EvaluateModelResponse {
bool evaluation_started = 1;
int32 evaluation_id = 2;
repeated int64 dataset_size = 3;
repeated int64 dataset_sizes = 3;
EvaluationAbortedReason eval_aborted_reason = 4;
}

Expand Down
36 changes: 20 additions & 16 deletions modyn/supervisor/internal/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,24 +250,28 @@ def prepare_evaluation_request(
eval_intervals: list[EvaluationInterval] = []
for start_timestamp, end_timestamp in intervals:
eval_intervals.append(EvaluationInterval(start_timestamp=start_timestamp, end_timestamp=end_timestamp))
start_evaluation_kwargs = {
"model_id": model_id,
"dataset_info": DatasetInfo(
dataset_id=dataset_id, num_dataloaders=dataloader_workers, evaluation_intervals=eval_intervals
),
"device": device,
"batch_size": batch_size,
"metrics": metrics,
"transform_list": transform_list,
"bytes_parser": EvaluatorPythonString(value=bytes_parser_function),
"label_transformer": EvaluatorPythonString(value=label_transformer),
}

if dataset_config.get("tokenizer"):
tokenizer = dataset_config["tokenizer"]
start_evaluation_kwargs["tokenizer"] = EvaluatorPythonString(value=tokenizer)

return EvaluateModelRequest(**start_evaluation_kwargs)
tokenizer_arg = EvaluatorPythonString(value=tokenizer)
else:
tokenizer_arg = None

return EvaluateModelRequest(
model_id=model_id,
dataset_info=DatasetInfo(
dataset_id=dataset_id,
num_dataloaders=dataloader_workers,
evaluation_intervals=eval_intervals,
),
device=device,
batch_size=batch_size,
metrics=metrics,
transform_list=transform_list,
bytes_parser=EvaluatorPythonString(value=bytes_parser_function),
label_transformer=EvaluatorPythonString(value=label_transformer),
tokenizer=tokenizer_arg,
)

# pylint: disable=too-many-branches
def wait_for_evaluation_completion(self, evaluation_id: int) -> None:
Expand Down Expand Up @@ -313,7 +317,7 @@ def get_evaluation_results(self, evaluation_id: int) -> list[SingleEvaluationDat
if not res.valid:
logger.error(f"Cannot get the evaluation result for evaluation {evaluation_id}")
raise RuntimeError(f"Cannot get the evaluation result for evaluation {evaluation_id}")
return res.evaluation_data
return res.evaluation_results

def cleanup_evaluations(self, evaluation_ids: list[int]) -> None:
assert self.evaluator is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,13 @@ def worker_func(eval_req: EvalRequest) -> StageLog:
)
epoch_micros_start = current_time_micros()
single_log.info = SingleEvaluationInfo(eval_request=eval_req)
optional_failure_reason, result = self._single_batched_evaluation(
optional_failure_reason, results = self._single_batched_evaluation(
eval_req.interval_start, eval_req.interval_end, eval_req.id_model, eval_req.dataset_id
)
if optional_failure_reason:
single_log.info.failure_reason = optional_failure_reason
else:
single_log.info.result = result
single_log.info.results = results
single_log.end = datetime.datetime.now()
single_log.duration = datetime.timedelta(microseconds=current_time_micros() - epoch_micros_start)
return single_log
Expand Down Expand Up @@ -307,7 +307,7 @@ def _single_batched_evaluation(
eval_data = self.grpc.get_evaluation_results(response.evaluation_id)
self.grpc.cleanup_evaluations([response.evaluation_id])
# here we assume only one interval is evaluated
eval_results: dict = {"dataset_size": response.dataset_size[0], "metrics": []}
eval_results: dict = {"dataset_size": response.dataset_sizes[0], "metrics": []}
for metric in eval_data[0].evaluation_data:
eval_results["metrics"].append({"name": metric.metric, "result": metric.result})
return None, eval_results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from typing import Callable, Generator, TypeVar, cast

import pandas as pd
from modyn.config.schema.pipeline import ResultWriterType
from modyn.supervisor.internal.grpc.enums import CounterAction, IdType, MsgType, PipelineStage
from modyn.supervisor.internal.grpc.template_msg import counter_submsg, dataset_submsg, id_submsg, pipeline_stage_msg
from modyn.supervisor.internal.grpc_handler import GRPCHandler
Expand Down
Loading
Loading