From 601c2e1e995a1950a0456dae1dce919b2bb969c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20B=C3=B6ther?= <2116466+MaxiBoether@users.noreply.github.com> Date: Wed, 19 Jun 2024 11:16:28 +0200 Subject: [PATCH] Multithreaded error handling in storage (#524) If a thread in the get implementation throws an error, the storage currently dies because C++ does not propagate exceptions to the caller. With this change, we're able to return a gRPC error instead. Also, we throw now in case the file path is empty. --- modyn/evaluator/internal/pytorch_evaluator.py | 5 +- .../internal/grpc/storage_service_impl.hpp | 52 ++++++++++++++++--- .../internal/dataset/data_utils.py | 6 ++- 3 files changed, 54 insertions(+), 9 deletions(-) diff --git a/modyn/evaluator/internal/pytorch_evaluator.py b/modyn/evaluator/internal/pytorch_evaluator.py index aca6b3b2c..f6b21649c 100644 --- a/modyn/evaluator/internal/pytorch_evaluator.py +++ b/modyn/evaluator/internal/pytorch_evaluator.py @@ -70,7 +70,10 @@ def _prepare_dataloader(self, evaluation_info: EvaluationInfo) -> torch.utils.da ) self._debug("Creating DataLoader.") dataloader = torch.utils.data.DataLoader( - dataset, batch_size=evaluation_info.batch_size, num_workers=evaluation_info.num_dataloaders, timeout=60 + dataset, + batch_size=evaluation_info.batch_size, + num_workers=evaluation_info.num_dataloaders, + timeout=60 if evaluation_info.num_dataloaders > 0 else 0, ) return dataloader diff --git a/modyn/storage/include/internal/grpc/storage_service_impl.hpp b/modyn/storage/include/internal/grpc/storage_service_impl.hpp index 203474537..b4c680434 100644 --- a/modyn/storage/include/internal/grpc/storage_service_impl.hpp +++ b/modyn/storage/include/internal/grpc/storage_service_impl.hpp @@ -5,7 +5,11 @@ #include #include +#include +#include +#include #include +#include #include #include #include @@ -319,6 +323,8 @@ class StorageServiceImpl final : public modyn::storage::Storage::Service { get_samples_and_send(begin, end, writer, &writer_mutex, &dataset_data, &config_, sample_batch_size_); } else { + std::vector thread_exceptions(retrieval_threads_); + std::mutex exception_mutex; std::vector::const_iterator, std::vector::const_iterator>> its_per_thread = get_keys_per_thread(request_keys, retrieval_threads_); std::vector retrieval_threads_vector(retrieval_threads_); @@ -326,9 +332,18 @@ class StorageServiceImpl final : public modyn::storage::Storage::Service { const std::vector::const_iterator begin = its_per_thread[thread_id].first; const std::vector::const_iterator end = its_per_thread[thread_id].second; - retrieval_threads_vector[thread_id] = - std::thread(StorageServiceImpl::get_samples_and_send, begin, end, writer, &writer_mutex, - &dataset_data, &config_, sample_batch_size_); + retrieval_threads_vector[thread_id] = std::thread([thread_id, begin, end, writer, &writer_mutex, &dataset_data, + &thread_exceptions, &exception_mutex, this]() { + try { + get_samples_and_send(begin, end, writer, &writer_mutex, &dataset_data, &config_, + sample_batch_size_); + } catch (const std::exception& e) { + const std::lock_guard lock(exception_mutex); + spdlog::error( + fmt::format("Error in thread {} started by send_sample_data_from_keys: {}", thread_id, e.what())); + thread_exceptions[thread_id] = std::current_exception(); + } + }); } for (uint64_t thread_id = 0; thread_id < retrieval_threads_; ++thread_id) { @@ -337,6 +352,17 @@ class StorageServiceImpl final : public modyn::storage::Storage::Service { } } retrieval_threads_vector.clear(); + // In order for the gRPC call to return an error, we need to rethrow the threaded exceptions. + for (auto& e_ptr : thread_exceptions) { + if (e_ptr) { + try { + std::rethrow_exception(e_ptr); + } catch (const std::exception& e) { + SPDLOG_ERROR("Error while unwinding thread: {}\nPropagating it up the call chain.", e.what()); + throw; + } + } + } } } @@ -529,6 +555,12 @@ class StorageServiceImpl final : public modyn::storage::Storage::Service { // keys than this try { const uint64_t num_keys = sample_keys.size(); + + if (num_keys == 0) { + SPDLOG_ERROR("num_keys is 0, this should not have happened. Exiting send_sample_data_for_keys_and_file"); + return; + } + std::vector sample_labels(num_keys); std::vector sample_indices(num_keys); std::vector sample_fileids(num_keys); @@ -539,15 +571,16 @@ class StorageServiceImpl final : public modyn::storage::Storage::Service { session << sample_query, soci::into(sample_labels), soci::into(sample_indices), soci::into(sample_fileids), soci::use(dataset_data.dataset_id); - int64_t current_file_id = sample_fileids[0]; + int64_t current_file_id = sample_fileids.at(0); uint64_t current_file_start_idx = 0; std::string current_file_path; session << "SELECT path FROM files WHERE file_id = :file_id AND dataset_id = :dataset_id", soci::into(current_file_path), soci::use(current_file_id), soci::use(dataset_data.dataset_id); - if (current_file_path.empty()) { - SPDLOG_ERROR(fmt::format("Could not obtain full path of file id {} in dataset {}", current_file_id, - dataset_data.dataset_id)); + if (current_file_path.empty() || current_file_path.find_first_not_of(' ') == std::string::npos) { + SPDLOG_ERROR(fmt::format("Sample query is {}", sample_query)); + throw modyn::utils::ModynException(fmt::format("Could not obtain full path of file id {} in dataset {}", + current_file_id, dataset_data.dataset_id)); } const YAML::Node file_wrapper_config_node = YAML::Load(dataset_data.file_wrapper_config); auto filesystem_wrapper = @@ -594,6 +627,11 @@ class StorageServiceImpl final : public modyn::storage::Storage::Service { current_file_path = "", session << "SELECT path FROM files WHERE file_id = :file_id AND dataset_id = :dataset_id", soci::into(current_file_path), soci::use(current_file_id), soci::use(dataset_data.dataset_id); + if (current_file_path.empty() || current_file_path.find_first_not_of(' ') == std::string::npos) { + SPDLOG_ERROR(fmt::format("Sample query is {}", sample_query)); + throw modyn::utils::ModynException(fmt::format("Could not obtain full path of file id {} in dataset {}", + current_file_id, dataset_data.dataset_id)); + } file_wrapper->set_file_path(current_file_path); current_file_start_idx = sample_idx; } diff --git a/modyn/trainer_server/internal/dataset/data_utils.py b/modyn/trainer_server/internal/dataset/data_utils.py index e2445ef5f..1b79f601c 100644 --- a/modyn/trainer_server/internal/dataset/data_utils.py +++ b/modyn/trainer_server/internal/dataset/data_utils.py @@ -67,7 +67,11 @@ def prepare_dataloaders( ) logger.debug("Creating DataLoader.") train_dataloader = torch.utils.data.DataLoader( - train_set, batch_size=batch_size, num_workers=num_dataloaders, drop_last=drop_last, timeout=60 + train_set, + batch_size=batch_size, + num_workers=num_dataloaders, + drop_last=drop_last, + timeout=60 if num_dataloaders > 0 else 0, ) # TODO(#50): what to do with the val set in the general case?