From 7e493597202e3dc4847054b5cb081a5f3dfb4c45 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 23 Sep 2021 11:31:19 +0300 Subject: [PATCH] storage_service, api: Keep sstables loading API handlers separate Right now the handlers sit in one boat with the rest of the storage service APIs. Next patches will switch this particular endpoint to use previously introduced sstables_loader, before doing so here's the respective API set/unset stubs. Signed-off-by: Pavel Emelyanov --- api/api.cc | 8 ++++++ api/api_init.hh | 4 +++ api/storage_service.cc | 56 +++++++++++++++++++++++------------------- api/storage_service.hh | 3 +++ main.cc | 4 +++ 5 files changed, 50 insertions(+), 25 deletions(-) diff --git a/api/api.cc b/api/api.cc index 62909edc1b66..ef8b0115aebb 100644 --- a/api/api.cc +++ b/api/api.cc @@ -115,6 +115,14 @@ future<> set_server_storage_service(http_context& ctx, sharded set_server_sstables_loader(http_context& ctx, sharded& sst_loader, sharded& ss) { + return ctx.http_server.set_routes([&ctx, &sst_loader, &ss] (routes& r) { set_sstables_loader(ctx, r, sst_loader, ss); }); +} + +future<> unset_server_sstables_loader(http_context& ctx) { + return ctx.http_server.set_routes([&ctx] (routes& r) { unset_sstables_loader(ctx, r); }); +} + future<> set_server_repair(http_context& ctx, sharded& repair) { return ctx.http_server.set_routes([&ctx, &repair] (routes& r) { set_repair(ctx, r, repair); }); } diff --git a/api/api_init.hh b/api/api_init.hh index 7a00616af95d..fa0d98650dc6 100644 --- a/api/api_init.hh +++ b/api/api_init.hh @@ -34,6 +34,8 @@ class storage_service; } // namespace service +class sstables_loader; + namespace locator { class token_metadata; @@ -81,6 +83,8 @@ future<> set_server_init(http_context& ctx); future<> set_server_config(http_context& ctx, const db::config& cfg); future<> set_server_snitch(http_context& ctx); future<> set_server_storage_service(http_context& ctx, sharded& ss, sharded& g, sharded& cdc_gs); +future<> set_server_sstables_loader(http_context& ctx, sharded& sst_loader, sharded& ss); +future<> unset_server_sstables_loader(http_context& ctx); future<> set_server_repair(http_context& ctx, sharded& repair); future<> unset_server_repair(http_context& ctx); future<> set_transport_controller(http_context& ctx, cql_transport::controller& ctl); diff --git a/api/storage_service.cc b/api/storage_service.cc index 0a734b0bc476..75e7d7b72370 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -296,6 +296,37 @@ void unset_repair(http_context& ctx, routes& r) { ss::force_terminate_all_repair_sessions_new.unset(r); } +void set_sstables_loader(http_context& ctx, routes& r, sharded& sst_loader, sharded& ss) { + ss::load_new_ss_tables.set(r, [&ctx, &ss](std::unique_ptr req) { + auto ks = validate_keyspace(ctx, req->param); + auto cf = req->get_query_param("cf"); + auto stream = req->get_query_param("load_and_stream"); + auto primary_replica = req->get_query_param("primary_replica_only"); + boost::algorithm::to_lower(stream); + boost::algorithm::to_lower(primary_replica); + bool load_and_stream = stream == "true" || stream == "1"; + bool primary_replica_only = primary_replica == "true" || primary_replica == "1"; + // No need to add the keyspace, since all we want is to avoid always sending this to the same + // CPU. Even then I am being overzealous here. This is not something that happens all the time. + auto coordinator = std::hash()(cf) % smp::count; + return ss.invoke_on(coordinator, + [ks = std::move(ks), cf = std::move(cf), + load_and_stream, primary_replica_only] (service::storage_service& s) { + return s.load_new_sstables(ks, cf, load_and_stream, primary_replica_only); + }).then_wrapped([] (auto&& f) { + if (f.failed()) { + auto msg = fmt::format("Failed to load new sstables: {}", f.get_exception()); + return make_exception_future(httpd::server_error_exception(msg)); + } + return make_ready_future(json_void()); + }); + }); +} + +void unset_sstables_loader(http_context& ctx, routes& r) { + ss::load_new_ss_tables.unset(r); +} + void set_storage_service(http_context& ctx, routes& r, sharded& ss, gms::gossiper& g, sharded& cdc_gs) { ss::local_hostid.set(r, [](std::unique_ptr req) { return db::system_keyspace::load_local_host_id().then([](const utils::UUID& id) { @@ -828,31 +859,6 @@ void set_storage_service(http_context& ctx, routes& r, sharded(json_void()); }); - ss::load_new_ss_tables.set(r, [&ctx, &ss](std::unique_ptr req) { - auto ks = validate_keyspace(ctx, req->param); - auto cf = req->get_query_param("cf"); - auto stream = req->get_query_param("load_and_stream"); - auto primary_replica = req->get_query_param("primary_replica_only"); - boost::algorithm::to_lower(stream); - boost::algorithm::to_lower(primary_replica); - bool load_and_stream = stream == "true" || stream == "1"; - bool primary_replica_only = primary_replica == "true" || primary_replica == "1"; - // No need to add the keyspace, since all we want is to avoid always sending this to the same - // CPU. Even then I am being overzealous here. This is not something that happens all the time. - auto coordinator = std::hash()(cf) % smp::count; - return ss.invoke_on(coordinator, - [ks = std::move(ks), cf = std::move(cf), - load_and_stream, primary_replica_only] (service::storage_service& s) { - return s.load_new_sstables(ks, cf, load_and_stream, primary_replica_only); - }).then_wrapped([] (auto&& f) { - if (f.failed()) { - auto msg = fmt::format("Failed to load new sstables: {}", f.get_exception()); - return make_exception_future(httpd::server_error_exception(msg)); - } - return make_ready_future(json_void()); - }); - }); - ss::sample_key_range.set(r, [](std::unique_ptr req) { //TBD unimplemented(); diff --git a/api/storage_service.hh b/api/storage_service.hh index 52aedef415f7..f1cbea97ad1e 100644 --- a/api/storage_service.hh +++ b/api/storage_service.hh @@ -31,6 +31,7 @@ namespace db { class snapshot_ctl; } namespace netw { class messaging_service; } class repair_service; namespace cdc { class generation_service; } +class sstables_loader; namespace gms { @@ -41,6 +42,8 @@ class gossiper; namespace api { void set_storage_service(http_context& ctx, routes& r, sharded& ss, gms::gossiper& g, sharded& cdc_gs); +void set_sstables_loader(http_context& ctx, routes& r, sharded& sst_loader, sharded& ss); +void unset_sstables_loader(http_context& ctx, routes& r); void set_repair(http_context& ctx, routes& r, sharded& repair); void unset_repair(http_context& ctx, routes& r); void set_transport_controller(http_context& ctx, routes& r, cql_transport::controller& ctl); diff --git a/main.cc b/main.cc index c30a2b90e4c0..bf28a17d8863 100644 --- a/main.cc +++ b/main.cc @@ -1141,6 +1141,10 @@ int main(int ac, char** av) { auto stop_sst_loader = defer_verbose_shutdown("sstables loader", [&sst_loader] { sst_loader.stop().get(); }); + api::set_server_sstables_loader(ctx, sst_loader, ss).get(); + auto stop_sstl_api = defer_verbose_shutdown("sstables loader API", [&ctx] { + api::unset_server_sstables_loader(ctx).get(); + }); gossiper.local().register_(ss.local().shared_from_this());