From 0c1c2020a6533bc47dc9605a51e866fbefe7d205 Mon Sep 17 00:00:00 2001 From: David Goodwin Date: Thu, 15 Nov 2018 17:39:12 -0800 Subject: [PATCH] (Re)load a model from the repository in response to any change. Rework the ModelRepositoryManager to be more general and ready for future non-eager loading. --- qa/L0_infer/infer_test.py | 12 +- qa/L0_lifecycle/lifecycle_test.py | 95 +++++++- qa/L0_lifecycle/test.sh | 31 ++- src/core/model_repository_manager.cc | 351 +++++++++++++++++---------- src/core/model_repository_manager.h | 88 ++++--- src/core/server.cc | 147 ++++++++--- src/core/server_status.cc | 44 +--- src/core/server_status.h | 11 +- 8 files changed, 535 insertions(+), 244 deletions(-) diff --git a/qa/L0_infer/infer_test.py b/qa/L0_infer/infer_test.py index e4ff28890a..bcdfd0e694 100644 --- a/qa/L0_infer/infer_test.py +++ b/qa/L0_infer/infer_test.py @@ -125,7 +125,7 @@ def test_raw_version_latest_1(self): self.assertEqual("inference:0", ex.server_id()) self.assertGreater(ex.request_id(), 0) self.assertTrue( - ex.message().startswith("Servable not found for request")) + ex.message().startswith("Inference request for unknown model")) try: iu.infer_exact(self, platform, tensor_shape, 1, True, @@ -135,7 +135,7 @@ def test_raw_version_latest_1(self): self.assertEqual("inference:0", ex.server_id()) self.assertGreater(ex.request_id(), 0) self.assertTrue( - ex.message().startswith("Servable not found for request")) + ex.message().startswith("Inference request for unknown model")) iu.infer_exact(self, platform, tensor_shape, 1, True, np.int8, np.int8, np.int8, @@ -156,7 +156,7 @@ def test_raw_version_latest_2(self): self.assertEqual("inference:0", ex.server_id()) self.assertGreater(ex.request_id(), 0) self.assertTrue( - ex.message().startswith("Servable not found for request")) + ex.message().startswith("Inference request for unknown model")) iu.infer_exact(self, platform, tensor_shape, 1, True, np.int16, np.int16, np.int16, @@ -201,7 +201,7 @@ def test_raw_version_specific_1(self): self.assertEqual("inference:0", ex.server_id()) self.assertGreater(ex.request_id(), 0) self.assertTrue( - ex.message().startswith("Servable not found for request")) + ex.message().startswith("Inference request for unknown model")) try: iu.infer_exact(self, platform, tensor_shape, 1, True, @@ -211,7 +211,7 @@ def test_raw_version_specific_1(self): self.assertEqual("inference:0", ex.server_id()) self.assertGreater(ex.request_id(), 0) self.assertTrue( - ex.message().startswith("Servable not found for request")) + ex.message().startswith("Inference request for unknown model")) def test_raw_version_specific_1_3(self): input_size = 16 @@ -232,7 +232,7 @@ def test_raw_version_specific_1_3(self): self.assertEqual("inference:0", ex.server_id()) self.assertGreater(ex.request_id(), 0) self.assertTrue( - ex.message().startswith("Servable not found for request")) + ex.message().startswith("Inference request for unknown model")) iu.infer_exact(self, platform, tensor_shape, 1, True, np.float32, np.float32, np.float32, diff --git a/qa/L0_lifecycle/lifecycle_test.py b/qa/L0_lifecycle/lifecycle_test.py index caed40080c..68f576aaf7 100644 --- a/qa/L0_lifecycle/lifecycle_test.py +++ b/qa/L0_lifecycle/lifecycle_test.py @@ -125,7 +125,8 @@ def test_parse_error_modelfail(self): self.assertEqual("inference:0", ex.server_id()) self.assertGreater(ex.request_id(), 0) self.assertTrue( - ex.message().startswith("Servable not found for request")) + ex.message().startswith( + "Inference request for unknown model 'graphdef_float32_float32_float32'")) def test_dynamic_model_load_unload(self): input_size = 16 @@ -231,7 +232,7 @@ def test_dynamic_model_load_unload(self): self.assertGreater(ex.request_id(), 0) self.assertTrue( ex.message().startswith( - "no configuration for model 'savedmodel_float32_float32_float32'")) + "Inference request for unknown model 'savedmodel_float32_float32_float32'")) # Add back the same model. The status/stats should be reset. try: @@ -287,7 +288,7 @@ def test_dynamic_model_load_unload(self): self.assertGreater(ex.request_id(), 0) self.assertTrue( ex.message().startswith( - "no configuration for model 'netdef_float32_float32_float32'")) + "Inference request for unknown model 'netdef_float32_float32_float32'")) def test_dynamic_model_load_unload_disabled(self): input_size = 16 @@ -457,7 +458,7 @@ def test_dynamic_version_load_unload(self): self.assertGreater(ex.request_id(), 0) self.assertTrue( ex.message().startswith( - "Servable not found for request: Specific(graphdef_int32_int32_int32, 1)")) + "Inference request for unknown model 'graphdef_int32_int32_int32'")) # Add back the same version. The status/stats should be # retained for versions (note that this is different behavior @@ -572,6 +573,92 @@ def test_dynamic_version_load_unload_disabled(self): except InferenceServerException as ex: self.assertTrue(False, "unexpected error {}".format(ex)) + def test_dynamic_model_modify(self): + input_size = 16 + models_base = ('savedmodel', 'plan') + models_shape = ((input_size,), (input_size, 1, 1)) + models = list() + for m in models_base: + models.append(tu.get_model_name(m, np.float32, np.float32, np.float32)) + + # Make sure savedmodel and plan are in the status + for model_name in models: + try: + for pair in [("localhost:8000", ProtocolType.HTTP), ("localhost:8001", ProtocolType.GRPC)]: + ctx = ServerStatusContext(pair[0], pair[1], model_name, True) + ss = ctx.get_server_status() + self.assertEqual(os.environ["TENSORRT_SERVER_VERSION"], ss.version) + self.assertEqual("inference:0", ss.id) + self.assertEqual(server_status.SERVER_READY, ss.ready_state) + + self.assertEqual(len(ss.model_status), 1) + self.assertTrue(model_name in ss.model_status, + "expected status for model " + model_name) + for (k, v) in iteritems(ss.model_status[model_name].version_status): + self.assertEqual(v.ready_state, server_status.MODEL_READY) + except InferenceServerException as ex: + self.assertTrue(False, "unexpected error {}".format(ex)) + + # Run inference on the model, both versions 1 and 3 + for version in (1, 3): + for model_name, model_shape in zip(models_base, models_shape): + try: + iu.infer_exact(self, model_name, model_shape, 1, True, + np.float32, np.float32, np.float32, swap=(version == 3), + model_version=version) + except InferenceServerException as ex: + self.assertTrue(False, "unexpected error {}".format(ex)) + + # Change the model configuration to have the default version + # policy (so that only version 3) if available. + for base_name, model_name in zip(models_base, models): + shutil.copyfile("config.pbtxt." + base_name, "models/" + model_name + "/config.pbtxt") + + time.sleep(5) # wait for models to reload + for model_name in models: + try: + for pair in [("localhost:8000", ProtocolType.HTTP), ("localhost:8001", ProtocolType.GRPC)]: + ctx = ServerStatusContext(pair[0], pair[1], model_name, True) + ss = ctx.get_server_status() + self.assertEqual(os.environ["TENSORRT_SERVER_VERSION"], ss.version) + self.assertEqual("inference:0", ss.id) + self.assertEqual(server_status.SERVER_READY, ss.ready_state) + self.assertEqual(len(ss.model_status), 1) + self.assertTrue(model_name in ss.model_status, + "expected status for model " + model_name) + self.assertTrue(1 in ss.model_status[model_name].version_status, + "expected status for version 1 of model " + model_name) + self.assertTrue(3 in ss.model_status[model_name].version_status, + "expected status for version 3 of model " + model_name) + self.assertEqual(ss.model_status[model_name].version_status[1].ready_state, + server_status.MODEL_UNAVAILABLE) + self.assertEqual(ss.model_status[model_name].version_status[3].ready_state, + server_status.MODEL_READY) + except InferenceServerException as ex: + self.assertTrue(False, "unexpected error {}".format(ex)) + + # Attempt inferencing using version 1, should fail since + # change in model policy makes that no longer available. + for model_name, model_shape in zip(models_base, models_shape): + try: + iu.infer_exact(self, model_name, model_shape, 1, True, + np.float32, np.float32, np.float32, swap=False, + model_version=1) + self.assertTrue(False, "expected error for unavailable model " + model_name) + except InferenceServerException as ex: + self.assertEqual("inference:0", ex.server_id()) + self.assertGreater(ex.request_id(), 0) + self.assertTrue( + ex.message().startswith("Inference request for unknown model")) + + # Version 3 should continue to work... + for model_name, model_shape in zip(models_base, models_shape): + try: + iu.infer_exact(self, model_name, model_shape, 1, True, + np.float32, np.float32, np.float32, swap=True, + model_version=3) + except InferenceServerException as ex: + self.assertTrue(False, "unexpected error {}".format(ex)) if __name__ == '__main__': unittest.main() diff --git a/qa/L0_lifecycle/test.sh b/qa/L0_lifecycle/test.sh index c19c782e6d..edc0d418ee 100755 --- a/qa/L0_lifecycle/test.sh +++ b/qa/L0_lifecycle/test.sh @@ -206,7 +206,7 @@ done SERVER_ARGS="--model-store=`pwd`/models --repository-poll-secs=1 \ --allow-poll-model-repository=false --exit-timeout-secs=5" -SERVER_LOG="./inference_server_5.log" +SERVER_LOG="./inference_server_6.log" run_server if [ "$SERVER_PID" == "0" ]; then echo -e "\n***\n*** Failed to start $SERVER\n***" @@ -225,6 +225,35 @@ set -e kill $SERVER_PID wait $SERVER_PID +# LifeCycleTest.test_dynamic_model_modify +rm -fr models config.pbtxt.* +mkdir models +for i in savedmodel plan ; do + cp -r $DATADIR/qa_model_repository/${i}_float32_float32_float32 models/. + sed '/^version_policy/d' \ + $DATADIR/qa_model_repository/${i}_float32_float32_float32/config.pbtxt > config.pbtxt.${i} +done + +SERVER_ARGS="--model-store=`pwd`/models --repository-poll-secs=1 --exit-timeout-secs=5" +SERVER_LOG="./inference_server_7.log" +run_server +if [ "$SERVER_PID" == "0" ]; then + echo -e "\n***\n*** Failed to start $SERVER\n***" + cat $SERVER_LOG + exit 1 +fi + +set +e +python $LC_TEST LifeCycleTest.test_dynamic_model_modify >>$CLIENT_LOG 2>&1 +if [ $? -ne 0 ]; then + echo -e "\n***\n*** Test Failed\n***" + RET=1 +fi +set -e + +kill $SERVER_PID +wait $SERVER_PID + # python unittest seems to swallow ImportError and still return 0 exit # code. So need to explicitly check CLIENT_LOG to make sure we see diff --git a/src/core/model_repository_manager.cc b/src/core/model_repository_manager.cc index 46a5686a5e..426de6e71c 100644 --- a/src/core/model_repository_manager.cc +++ b/src/core/model_repository_manager.cc @@ -33,41 +33,138 @@ #include "tensorflow/core/lib/core/errors.h" #include "tensorflow/core/lib/io/path.h" #include "tensorflow/core/platform/env.h" +#include "tensorflow/core/platform/file_statistics.h" namespace nvidia { namespace inferenceserver { -ModelRepositoryManager* -ModelRepositoryManager::GetSingleton() +namespace { + +int64_t +GetModifiedTime(const std::string& path) { - static ModelRepositoryManager singleton; - return &singleton; + // If there is an error in any step the fall-back default + // modification time is 0. This means that in error cases 'path' + // will show as not modified. This is the safe fall-back to avoid + // assuming a model is constantly being modified. + + // If 'path' is a file return its mtime. + if (!tensorflow::Env::Default()->IsDirectory(path).ok()) { + tensorflow::FileStatistics stat; + if (!tensorflow::Env::Default()->Stat(path, &stat).ok()) { + LOG_ERROR << "Failed to determine modification time for '" << path + << "', assuming 0"; + return 0; + } + + return stat.mtime_nsec; + } + + // 'path' is a directory. Return the most recent mtime of the + // contents of the directory. + // + // GetChildren() returns all descendants instead for cloud storage + // like GCS. In such case we should filter out all non-direct + // descendants. + std::vector children; + if (!tensorflow::Env::Default()->GetChildren(path, &children).ok()) { + LOG_ERROR << "Failed to determine modification time for '" << path + << "', assuming 0"; + } + + std::set real_children; + for (size_t i = 0; i < children.size(); ++i) { + const std::string& child = children[i]; + real_children.insert(child.substr(0, child.find_first_of('/'))); + } + + int64_t mtime = 0; + for (const auto& child : real_children) { + const auto full_path = tensorflow::io::JoinPath(path, child); + mtime = std::max(mtime, GetModifiedTime(full_path)); + } + + return mtime; +} + +// Return true if any file in the subdirectory root at 'path' has been +// modified more recently than 'last'. Return the most-recent modified +// time in 'last'. +bool +IsModified(const std::string& path, int64_t* last_ns) +{ + const int64_t repo_ns = GetModifiedTime(path); + bool modified = repo_ns > *last_ns; + *last_ns = repo_ns; + return modified; +} + +} // namespace + +ModelRepositoryManager* ModelRepositoryManager::singleton = nullptr; + +ModelRepositoryManager::ModelRepositoryManager( + const std::string& repository_path, const bool autofill) + : repository_path_(repository_path), autofill_(autofill) +{ +} + +tensorflow::Status +ModelRepositoryManager::Create( + const std::string& repository_path, const bool autofill) +{ + if (singleton != nullptr) { + return tensorflow::errors::AlreadyExists( + "ModelRepositoryManager singleton already created"); + } + + singleton = new ModelRepositoryManager(repository_path, autofill); + + return tensorflow::Status::OK(); } tensorflow::Status ModelRepositoryManager::GetModelConfig( const std::string& name, ModelConfig* model_config) { - ModelRepositoryManager* singleton = GetSingleton(); - std::lock_guard lock(singleton->mu_); - return singleton->GetModelConfigInternal(name, model_config); + std::lock_guard lock(singleton->infos_mu_); + + const auto itr = singleton->infos_.find(name); + if (itr == singleton->infos_.end()) { + return tensorflow::errors::NotFound( + "no configuration for model '", name, "'"); + } + + *model_config = itr->second.model_config_; + return tensorflow::Status::OK(); +} + +tensorflow::Status +ModelRepositoryManager::GetTFSModelConfig( + const std::string& name, tfs::ModelConfig* tfs_model_config) +{ + std::lock_guard lock(singleton->infos_mu_); + + const auto itr = singleton->infos_.find(name); + if (itr == singleton->infos_.end()) { + return tensorflow::errors::NotFound( + "no TFS configuration for model '", name, "'"); + } + + *tfs_model_config = itr->second.tfs_model_config_; + return tensorflow::Status::OK(); } tensorflow::Status ModelRepositoryManager::GetModelPlatform( const std::string& name, Platform* platform) { - ModelRepositoryManager* singleton = GetSingleton(); - std::lock_guard lock(singleton->mu_); - - // Lazily initialize the platform map... - const auto itr = singleton->platforms_.find(name); - if (itr == singleton->platforms_.end()) { - ModelConfig mc; - TF_RETURN_IF_ERROR(singleton->GetModelConfigInternal(name, &mc)); - *platform = GetPlatform(mc.platform()); - singleton->platforms_.emplace(name, *platform); + std::lock_guard lock(singleton->infos_mu_); + + const auto itr = singleton->infos_.find(name); + if (itr == singleton->infos_.end()) { + *platform = Platform::PLATFORM_UNKNOWN; } else { - *platform = itr->second; + *platform = itr->second.platform_; } if (*platform == Platform::PLATFORM_UNKNOWN) { @@ -79,25 +176,28 @@ ModelRepositoryManager::GetModelPlatform( } tensorflow::Status -ModelRepositoryManager::SetModelConfigs(const ModelConfigMap& model_configs) +ModelRepositoryManager::Poll( + std::set* added, std::set* deleted, + std::set* modified, std::set* unmodified) { - ModelRepositoryManager* singleton = GetSingleton(); - std::lock_guard lock(singleton->mu_); - singleton->configs_ = model_configs; - singleton->platforms_.clear(); - return tensorflow::Status::OK(); -} + // Serialize all polling operation... + std::lock_guard lock(singleton->poll_mu_); -tensorflow::Status -ModelRepositoryManager::ReadModelConfigs( - const std::string& model_store_path, const bool autofill, - ModelConfigMap* model_configs, tfs::ModelServerConfig* tfs_model_configs) -{ - // Each subdirectory of model_store_path is a model directory from + added->clear(); + deleted->clear(); + modified->clear(); + unmodified->clear(); + + // We don't modify 'infos_' in place to minimize how long we need to + // hold the lock and also prevent any partial changes to do an error + // during processing. + ModelInfoMap new_infos; + + // Each subdirectory of repository path is a model directory from // which we read the model configuration. std::vector children; - TF_RETURN_IF_ERROR( - tensorflow::Env::Default()->GetChildren(model_store_path, &children)); + TF_RETURN_IF_ERROR(tensorflow::Env::Default()->GetChildren( + singleton->repository_path_, &children)); // GetChildren() returns all descendants instead for cloud storage // like GCS. In such case we should filter out all non-direct @@ -109,115 +209,118 @@ ModelRepositoryManager::ReadModelConfigs( } for (const auto& child : real_children) { - const auto full_path = tensorflow::io::JoinPath(model_store_path, child); + const auto full_path = + tensorflow::io::JoinPath(singleton->repository_path_, child); if (!tensorflow::Env::Default()->IsDirectory(full_path).ok()) { continue; } - const auto& ret = model_configs->emplace(child, ModelConfig{}); - if (!ret.second) { - return tensorflow::errors::InvalidArgument( - "repeated model name '", child, "'"); + // If 'child' is a new model or an existing model that has been + // modified since the last time it was polled, then need to + // (re)load, normalize and validate the configuration. + bool need_load = false; + int64_t mtime_ns; + const auto iitr = singleton->infos_.find(child); + if (iitr == singleton->infos_.end()) { + added->insert(child); + mtime_ns = GetModifiedTime(std::string(full_path)); + need_load = true; + } else { + mtime_ns = iitr->second.mtime_nsec_; + if (IsModified(std::string(full_path), &mtime_ns)) { + modified->insert(child); + need_load = true; + } else { + unmodified->insert(child); + const auto& ret = new_infos.emplace(child, iitr->second); + if (!ret.second) { + return tensorflow::errors::AlreadyExists( + "unexpected model info for model '", child, "'"); + } + } } - ModelConfig* model_config = &(ret.first->second); - - // If enabled, try to automatically generate missing parts of the - // model configuration from the model definition. In all cases - // normalize and validate the config. - TF_RETURN_IF_ERROR( - GetNormalizedModelConfig(full_path, autofill, model_config)); - TF_RETURN_IF_ERROR(ValidateModelConfig(*model_config, std::string())); - - // Make sure the name of the model matches the name of the - // directory. This is a somewhat arbitrary requirement but seems - // like good practice to require it of the user. It also acts as a - // check to make sure we don't have two different models with the - // same name. - if (model_config->name() != child) { - return tensorflow::errors::InvalidArgument( - "unexpected directory name '", child, "' for model '", - model_config->name(), "', directory name must equal model name"); - } + if (need_load) { + const auto& ret = new_infos.emplace(child, ModelInfo{}); + if (!ret.second) { + return tensorflow::errors::AlreadyExists( + "unexpected model info for model '", child, "'"); + } - tfs::ModelConfig* tfs_config = - tfs_model_configs->mutable_model_config_list()->add_config(); - tfs_config->set_name(model_config->name()); - tfs_config->set_base_path(full_path); - tfs_config->set_model_platform(model_config->platform()); - - // Create the appropriate TFS version policy from the model - // configuration policy. - if (model_config->version_policy().has_latest()) { - tfs::FileSystemStoragePathSourceConfig::ServableVersionPolicy::Latest - latest; - latest.set_num_versions( - model_config->version_policy().latest().num_versions()); - tfs_config->mutable_model_version_policy()->mutable_latest()->CopyFrom( - latest); - } else if (model_config->version_policy().has_all()) { - tfs::FileSystemStoragePathSourceConfig::ServableVersionPolicy::All all; - tfs_config->mutable_model_version_policy()->mutable_all()->CopyFrom(all); - } else if (model_config->version_policy().has_specific()) { - tfs::FileSystemStoragePathSourceConfig::ServableVersionPolicy::Specific - specific; - specific.mutable_versions()->CopyFrom( - model_config->version_policy().specific().versions()); - tfs_config->mutable_model_version_policy()->mutable_specific()->CopyFrom( - specific); - } else { - return tensorflow::errors::Internal( - "expected version policy for model '", model_config->name()); - } - } + ModelInfo& model_info = ret.first->second; + ModelConfig& model_config = model_info.model_config_; + tfs::ModelConfig& tfs_config = model_info.tfs_model_config_; + model_info.mtime_nsec_ = mtime_ns; - return tensorflow::Status::OK(); -} + // If enabled, try to automatically generate missing parts of + // the model configuration (autofill) from the model + // definition. In all cases normalize and validate the config. + TF_RETURN_IF_ERROR(GetNormalizedModelConfig( + full_path, singleton->autofill_, &model_config)); + TF_RETURN_IF_ERROR(ValidateModelConfig(model_config, std::string())); -bool -ModelRepositoryManager::CompareModelConfigs( - const ModelConfigMap& next, std::set* added, - std::set* removed) -{ - ModelRepositoryManager* singleton = GetSingleton(); - std::lock_guard lock(singleton->mu_); + model_info.platform_ = GetPlatform(model_config.platform()); - std::set current_names, next_names; - for (const auto& p : singleton->configs_) { - current_names.insert(p.first); - } - for (const auto& p : next) { - next_names.insert(p.first); - } + // Make sure the name of the model matches the name of the + // directory. This is a somewhat arbitrary requirement but seems + // like good practice to require it of the user. It also acts as a + // check to make sure we don't have two different models with the + // same name. + if (model_config.name() != child) { + return tensorflow::errors::InvalidArgument( + "unexpected directory name '", child, "' for model '", + model_config.name(), "', directory name must equal model name"); + } - if (added != nullptr) { - std::set_difference( - next_names.begin(), next_names.end(), current_names.begin(), - current_names.end(), std::inserter(*added, added->end())); - } + tfs_config.set_name(model_config.name()); + tfs_config.set_base_path(full_path); + tfs_config.set_model_platform(model_config.platform()); - if (removed != nullptr) { - std::set_difference( - current_names.begin(), current_names.end(), next_names.begin(), - next_names.end(), std::inserter(*removed, removed->end())); + // Create the appropriate TFS version policy from the model + // configuration policy. + if (model_config.version_policy().has_latest()) { + tfs::FileSystemStoragePathSourceConfig::ServableVersionPolicy::Latest + latest; + latest.set_num_versions( + model_config.version_policy().latest().num_versions()); + tfs_config.mutable_model_version_policy()->mutable_latest()->CopyFrom( + latest); + } else if (model_config.version_policy().has_all()) { + tfs::FileSystemStoragePathSourceConfig::ServableVersionPolicy::All all; + tfs_config.mutable_model_version_policy()->mutable_all()->CopyFrom(all); + } else if (model_config.version_policy().has_specific()) { + tfs::FileSystemStoragePathSourceConfig::ServableVersionPolicy::Specific + specific; + specific.mutable_versions()->CopyFrom( + model_config.version_policy().specific().versions()); + tfs_config.mutable_model_version_policy()->mutable_specific()->CopyFrom( + specific); + } else { + return tensorflow::errors::Internal( + "expected version policy for model '", model_config.name()); + } + } } - return current_names != next_names; -} + // Anything in 'infos_' that is not in "added", "modified", or + // "unmodified" is deleted. + for (const auto& pr : singleton->infos_) { + if ( + (added->find(pr.first) == added->end()) && + (modified->find(pr.first) == modified->end()) && + (unmodified->find(pr.first) == unmodified->end())) { + deleted->insert(pr.first); + } + } -tensorflow::Status -ModelRepositoryManager::GetModelConfigInternal( - const std::string& name, ModelConfig* model_config) -{ - const auto itr = configs_.find(name); - if (itr == configs_.end()) { - return tensorflow::errors::NotFound( - "no configuration for model '", name, "'"); + // Swap the new infos in place under a short-lived lock and only if + // there were no errors encountered during polling. + { + std::lock_guard lock(singleton->infos_mu_); + singleton->infos_.swap(new_infos); } - *model_config = itr->second; return tensorflow::Status::OK(); } - }} // namespace nvidia::inferenceserver diff --git a/src/core/model_repository_manager.h b/src/core/model_repository_manager.h index 688b73d609..27ef50bb96 100644 --- a/src/core/model_repository_manager.h +++ b/src/core/model_repository_manager.h @@ -36,49 +36,77 @@ namespace tfs = tensorflow::serving; namespace nvidia { namespace inferenceserver { -// A singleton to manage the model repository active in the server. A -// singleton is used because the servables have no connection to the -// server itself but they need to have access to the configuration. +/// A singleton to manage the model repository active in the server. A +/// singleton is used because the servables have no connection to the +/// server itself but they need to have access to the configuration. class ModelRepositoryManager { public: - // Map from model name to a model configuration. - using ModelConfigMap = std::unordered_map; + /// Create a manager for a repository. + /// \param repositpory_path The file-system path of the repository. + /// \param autofill If true attempt to autofill missing required + /// information in each model configuration. + /// \return The error status. + static tensorflow::Status Create( + const std::string& repository_path, const bool autofill); - // Get the configuration for a named model. Return OK if found, - // NOT_FOUND otherwise. + /// Poll the model repository to determine the new set of models and + /// compare with the current set. Return the additions, deletions, + /// and modifications that have occurred since the last Poll(). + /// \param added The names of the models added to the repository. + /// \param deleted The names of the models removed from the repository. + /// \param modified The names of the models remaining in the + /// repository that have been changed. + /// \param unmodified The names of the models remaining in the + /// repository that have not changed. + /// \return The error status. + static tensorflow::Status Poll( + std::set* added, std::set* deleted, + std::set* modified, std::set* unmodified); + + /// Get the configuration for a named model. + /// \param name The model name. + /// \param model_config Returns the model configuration. + /// \return OK if found, NOT_FOUND otherwise. static tensorflow::Status GetModelConfig( const std::string& name, ModelConfig* model_config); - // Get the platform for a named model. Return OK if found, NO_FOUND - // otherwise. + /// Get TFS-style configuration for a named model. + /// \param name The model name. + /// \param tfs_model_config Returns the TFS-style model configuration. + /// \return OK if found, NOT_FOUND otherwise. + static tensorflow::Status GetTFSModelConfig( + const std::string& name, tfs::ModelConfig* tfs_model_config); + + /// Get the platform for a named model. + /// \param name The model name. + /// \param platform Returns the Platform. + /// \return OK if found, NOT_FOUND otherwise. static tensorflow::Status GetModelPlatform( const std::string& name, Platform* platform); - // Set the model configurations, removing any existing model - // configurations. - static tensorflow::Status SetModelConfigs( - const ModelConfigMap& model_configs); - - // Read the model configurations from all models in a model - // repository. - static tensorflow::Status ReadModelConfigs( - const std::string& model_store_path, const bool autofill, - ModelConfigMap* model_configs, tfs::ModelServerConfig* tfs_model_configs); + private: + struct ModelInfo { + int64_t mtime_nsec_; + ModelConfig model_config_; + tfs::ModelConfig tfs_model_config_; + Platform platform_; + }; - static bool CompareModelConfigs( - const ModelConfigMap& next, std::set* added, - std::set* removed); + // Map from model name to information about the model. + using ModelInfoMap = std::unordered_map; - private: - ModelRepositoryManager() = default; + ModelRepositoryManager( + const std::string& repository_path, const bool autofill); ~ModelRepositoryManager() = default; - static ModelRepositoryManager* GetSingleton(); - tensorflow::Status GetModelConfigInternal( - const std::string& name, ModelConfig* model_config); - std::mutex mu_; - ModelConfigMap configs_; - std::map platforms_; + static ModelRepositoryManager* singleton; + + const std::string repository_path_; + const bool autofill_; + + std::mutex poll_mu_; + std::mutex infos_mu_; + ModelInfoMap infos_; }; }} // namespace nvidia::inferenceserver diff --git a/src/core/server.cc b/src/core/server.cc index 4b73fcef1c..408a4b1012 100644 --- a/src/core/server.cc +++ b/src/core/server.cc @@ -793,32 +793,47 @@ InferenceServer::Init(int argc, char** argv) } LOG_VERBOSE(1) << options.platform_config_map.DebugString(); - // Create the model configuration for all the models in the model - // store. - ModelRepositoryManager::ModelConfigMap model_configs; - status = ModelRepositoryManager::ReadModelConfigs( - model_store_path_, !strict_model_config_, &model_configs, - &options.model_server_config); + // Create the global manager for the repository. Add all models' + // into the server core 'options' so that they are eagerly loaded + // below when ServerCore is created. + status = + ModelRepositoryManager::Create(model_store_path_, !strict_model_config_); if (!status.ok()) { LogInitError(status.error_message()); return !exit_on_error; } - LOG_VERBOSE(1) << options.model_server_config.DebugString(); - // Register the configurations with the manager. - status = ModelRepositoryManager::SetModelConfigs(model_configs); + std::set added, deleted, modified, unmodified; + status = + ModelRepositoryManager::Poll(&added, &deleted, &modified, &unmodified); if (!status.ok()) { LogInitError(status.error_message()); return !exit_on_error; } - // Create the server status object. - status = status_manager_->InitModelConfigs(model_configs); - if (!status.ok()) { - LogInitError(status.error_message()); + if (!deleted.empty() || !modified.empty() || !unmodified.empty()) { + LogInitError("Unexpected initial state for model repository"); return !exit_on_error; } + for (const auto& name : added) { + tfs::ModelConfig* tfs_config = + options.model_server_config.mutable_model_config_list()->add_config(); + status = ModelRepositoryManager::GetTFSModelConfig(name, tfs_config); + if (!status.ok()) { + LogInitError("Internal: model repository manager inconsistency"); + return !exit_on_error; + } + + status = status_manager_->InitForModel(name); + if (!status.ok()) { + LogInitError(status.error_message()); + return !exit_on_error; + } + } + + LOG_VERBOSE(1) << options.model_server_config.DebugString(); + // Create the server core. We assume that any failure is due to a // model not loading correctly so we just continue if not exiting on // error. @@ -892,41 +907,100 @@ InferenceServer::Close() void InferenceServer::Wait() { + tensorflow::Status status; + // If model load/unload is enabled for the model store, then // periodically look for changes and update the loaded model // configurations appropriately. if (poll_model_repository_enabled_) { while (ready_state_ != ServerReadyState::SERVER_EXITING) { if (ready_state_ == ServerReadyState::SERVER_READY) { - ModelRepositoryManager::ModelConfigMap mc; + std::set added, deleted, modified, unmodified; + status = ModelRepositoryManager::Poll( + &added, &deleted, &modified, &unmodified); + if (!status.ok()) { + LOG_ERROR << "Failed to poll model repository: " + << status.error_message(); + goto next; + } + + // Nothing to do if no model adds, deletes or modifies. + if (added.empty() && deleted.empty() && modified.empty()) { + goto next; + } + + // There was a change in the model repository so need to + // create a new TFS model configuration and reload it into the + // server to cause the appropriate models to be loaded and + // unloaded. tfs::ModelServerConfig msc; - tensorflow::Status status = ModelRepositoryManager::ReadModelConfigs( - model_store_path_, !strict_model_config_, &mc, &msc); + msc.mutable_model_config_list(); + + // Added models should be loaded and be initialized for status + // reporting. + for (const auto& name : added) { + tfs::ModelConfig* tfs_config = + msc.mutable_model_config_list()->add_config(); + status = ModelRepositoryManager::GetTFSModelConfig(name, tfs_config); + if (!status.ok()) { + LOG_ERROR << "Failed to create server config for '" << name + << "': " << status.error_message(); + goto next; + } + + status = status_manager_->InitForModel(name); + if (!status.ok()) { + LOG_ERROR << "Failed to initialize status for '" << name + << "': " << status.error_message(); + goto next; + } + } + + // Keep unmodified models... + for (const auto& name : unmodified) { + tfs::ModelConfig* tfs_config = + msc.mutable_model_config_list()->add_config(); + status = ModelRepositoryManager::GetTFSModelConfig(name, tfs_config); + if (!status.ok()) { + LOG_ERROR << "Failed to create server config for '" << name + << "': " << status.error_message(); + goto next; + } + } + + status = core_->ReloadConfig(msc); if (!status.ok()) { - LOG_ERROR << "Failed to build new model configurations: " + LOG_ERROR << "Failed to reload model configurations: " << status.error_message(); - } else { - // Determine if there is any change in the models in the - // model store. We simply compare the names of the active - // models against those currently in the model store. If - // there is any change then need to reload the model - // configs. - std::set added, removed; - if (ModelRepositoryManager::CompareModelConfigs( - mc, &added, &removed)) { - ModelRepositoryManager::SetModelConfigs(mc); - status = core_->ReloadConfig(msc); + goto next; + } + + // If there are any modified model, (re)load them to pick up + // the changes. We want to keep the current status information + // so don't re-init it. + if (!modified.empty()) { + for (const auto& name : modified) { + tfs::ModelConfig* tfs_config = + msc.mutable_model_config_list()->add_config(); + status = + ModelRepositoryManager::GetTFSModelConfig(name, tfs_config); if (!status.ok()) { - LOG_ERROR << "Failed to reload new model configurations: " - << status.error_message(); + LOG_ERROR << "Failed to create server config for '" << name + << "': " << status.error_message(); + goto next; } + } - // Update status to match new model configuration. - status_manager_->UpdateModelConfigs(mc, added, removed); + status = core_->ReloadConfig(msc); + if (!status.ok()) { + LOG_ERROR << "Failed to reload modified model configurations: " + << status.error_message(); + goto next; } } } + next: tensorflow::Env::Default()->SleepForMicroseconds( repository_poll_secs_ * 1000 * 1000); } @@ -1250,13 +1324,12 @@ InferenceServer::HandleInfer( infer_stats->SetModelServable(state->is); infer_stats->SetBatchSize(request_provider->RequestHeader().batch_size()); - if (status.ok() && (state->is == nullptr)) { - status = tensorflow::errors::InvalidArgument( - "unable to find platform for requested model '", - request_provider->ModelName(), "'"); + if (!status.ok() || (state->is == nullptr)) { + status = tensorflow::errors::Unavailable( + "Inference request for unknown model '", request_provider->ModelName(), + "'"); } - auto OnCompleteHandleInfer = [this, OnCompleteInferRPC, state, response_provider, request_status, request_id, infer_stats](tensorflow::Status status) mutable { diff --git a/src/core/server_status.cc b/src/core/server_status.cc index 33cd32cb14..3e000cdc5f 100644 --- a/src/core/server_status.cc +++ b/src/core/server_status.cc @@ -92,45 +92,23 @@ ServerStatusManager::ServerStatusManager(const std::string& server_version) } tensorflow::Status -ServerStatusManager::InitModelConfigs( - const ModelRepositoryManager::ModelConfigMap& model_configs) +ServerStatusManager::InitForModel(const std::string& model_name) { - for (const auto& p : model_configs) { - const ModelConfig& model = p.second; - auto& ms = *server_status_.mutable_model_status(); - ms[model.name()].mutable_config()->CopyFrom(model); - } - - return tensorflow::Status::OK(); -} + ModelConfig model_config; + TF_RETURN_IF_ERROR( + ModelRepositoryManager::GetModelConfig(model_name, &model_config)); -tensorflow::Status -ServerStatusManager::UpdateModelConfigs( - const ModelRepositoryManager::ModelConfigMap& model_configs, - const std::set& added, const std::set& removed) -{ std::lock_guard lock(mu_); - // Add status for all 'added' models... - for (const auto& p : model_configs) { - const ModelConfig& model = p.second; - auto& ms = *server_status_.mutable_model_status(); - - if (added.find(model.name()) != added.end()) { - if (ms.find(model.name()) == ms.end()) { - LOG_INFO << "New status tracking for model '" << model.name() << "'"; - } else { - LOG_INFO << "New status tracking for re-added model '" << model.name() - << "'"; - ms[model.name()].Clear(); - } - - ms[model.name()].mutable_config()->CopyFrom(model); - } + auto& ms = *server_status_.mutable_model_status(); + if (ms.find(model_name) == ms.end()) { + LOG_INFO << "New status tracking for model '" << model_name << "'"; + } else { + LOG_INFO << "New status tracking for re-added model '" << model_name << "'"; + ms[model_name].Clear(); } - // We do not remove status for models that are no longer - // loaded. These will show up as unavailable in the status. + ms[model_name].mutable_config()->CopyFrom(model_config); return tensorflow::Status::OK(); } diff --git a/src/core/server_status.h b/src/core/server_status.h index d5609cebc7..29404c6684 100644 --- a/src/core/server_status.h +++ b/src/core/server_status.h @@ -164,15 +164,8 @@ class ServerStatusManager { // Create a manager for server status explicit ServerStatusManager(const std::string& server_version); - // Initialize status for a set of model configurations - tensorflow::Status InitModelConfigs( - const ModelRepositoryManager::ModelConfigMap& model_configs); - - // Update status as appropriate for a set of model configurations - // with a given set of added and removed configurations. - tensorflow::Status UpdateModelConfigs( - const ModelRepositoryManager::ModelConfigMap& model_configs, - const std::set& added, const std::set& removed); + // Initialize status for a model. + tensorflow::Status InitForModel(const std::string& model_name); // Get the entire server status, including status for all models. tensorflow::Status Get(