Skip to content

Commit

Permalink
storage_service, api: Keep sstables loading API handlers separate
Browse files Browse the repository at this point in the history
Right now the handlers sit in one boat with the rest of the storage
service APIs. Next patches will switch this particular endpoint to
use previously introduced sstables_loader, before doing so here's
the respective API set/unset stubs.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
  • Loading branch information
xemul committed Oct 11, 2021
1 parent 13ab22d commit 7e49359
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 25 deletions.
8 changes: 8 additions & 0 deletions api/api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ future<> set_server_storage_service(http_context& ctx, sharded<service::storage_
});
}

future<> set_server_sstables_loader(http_context& ctx, sharded<sstables_loader>& sst_loader, sharded<service::storage_service>& ss) {
return ctx.http_server.set_routes([&ctx, &sst_loader, &ss] (routes& r) { set_sstables_loader(ctx, r, sst_loader, ss); });
}

future<> unset_server_sstables_loader(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_sstables_loader(ctx, r); });
}

future<> set_server_repair(http_context& ctx, sharded<repair_service>& repair) {
return ctx.http_server.set_routes([&ctx, &repair] (routes& r) { set_repair(ctx, r, repair); });
}
Expand Down
4 changes: 4 additions & 0 deletions api/api_init.hh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class storage_service;

} // namespace service

class sstables_loader;

namespace locator {

class token_metadata;
Expand Down Expand Up @@ -81,6 +83,8 @@ future<> set_server_init(http_context& ctx);
future<> set_server_config(http_context& ctx, const db::config& cfg);
future<> set_server_snitch(http_context& ctx);
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, sharded<gms::gossiper>& g, sharded<cdc::generation_service>& cdc_gs);
future<> set_server_sstables_loader(http_context& ctx, sharded<sstables_loader>& sst_loader, sharded<service::storage_service>& ss);
future<> unset_server_sstables_loader(http_context& ctx);
future<> set_server_repair(http_context& ctx, sharded<repair_service>& repair);
future<> unset_server_repair(http_context& ctx);
future<> set_transport_controller(http_context& ctx, cql_transport::controller& ctl);
Expand Down
56 changes: 31 additions & 25 deletions api/storage_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,37 @@ void unset_repair(http_context& ctx, routes& r) {
ss::force_terminate_all_repair_sessions_new.unset(r);
}

void set_sstables_loader(http_context& ctx, routes& r, sharded<sstables_loader>& sst_loader, sharded<service::storage_service>& ss) {
ss::load_new_ss_tables.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
auto ks = validate_keyspace(ctx, req->param);
auto cf = req->get_query_param("cf");
auto stream = req->get_query_param("load_and_stream");
auto primary_replica = req->get_query_param("primary_replica_only");
boost::algorithm::to_lower(stream);
boost::algorithm::to_lower(primary_replica);
bool load_and_stream = stream == "true" || stream == "1";
bool primary_replica_only = primary_replica == "true" || primary_replica == "1";
// No need to add the keyspace, since all we want is to avoid always sending this to the same
// CPU. Even then I am being overzealous here. This is not something that happens all the time.
auto coordinator = std::hash<sstring>()(cf) % smp::count;
return ss.invoke_on(coordinator,
[ks = std::move(ks), cf = std::move(cf),
load_and_stream, primary_replica_only] (service::storage_service& s) {
return s.load_new_sstables(ks, cf, load_and_stream, primary_replica_only);
}).then_wrapped([] (auto&& f) {
if (f.failed()) {
auto msg = fmt::format("Failed to load new sstables: {}", f.get_exception());
return make_exception_future<json::json_return_type>(httpd::server_error_exception(msg));
}
return make_ready_future<json::json_return_type>(json_void());
});
});
}

void unset_sstables_loader(http_context& ctx, routes& r) {
ss::load_new_ss_tables.unset(r);
}

void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss, gms::gossiper& g, sharded<cdc::generation_service>& cdc_gs) {
ss::local_hostid.set(r, [](std::unique_ptr<request> req) {
return db::system_keyspace::load_local_host_id().then([](const utils::UUID& id) {
Expand Down Expand Up @@ -828,31 +859,6 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return make_ready_future<json::json_return_type>(json_void());
});

ss::load_new_ss_tables.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
auto ks = validate_keyspace(ctx, req->param);
auto cf = req->get_query_param("cf");
auto stream = req->get_query_param("load_and_stream");
auto primary_replica = req->get_query_param("primary_replica_only");
boost::algorithm::to_lower(stream);
boost::algorithm::to_lower(primary_replica);
bool load_and_stream = stream == "true" || stream == "1";
bool primary_replica_only = primary_replica == "true" || primary_replica == "1";
// No need to add the keyspace, since all we want is to avoid always sending this to the same
// CPU. Even then I am being overzealous here. This is not something that happens all the time.
auto coordinator = std::hash<sstring>()(cf) % smp::count;
return ss.invoke_on(coordinator,
[ks = std::move(ks), cf = std::move(cf),
load_and_stream, primary_replica_only] (service::storage_service& s) {
return s.load_new_sstables(ks, cf, load_and_stream, primary_replica_only);
}).then_wrapped([] (auto&& f) {
if (f.failed()) {
auto msg = fmt::format("Failed to load new sstables: {}", f.get_exception());
return make_exception_future<json::json_return_type>(httpd::server_error_exception(msg));
}
return make_ready_future<json::json_return_type>(json_void());
});
});

ss::sample_key_range.set(r, [](std::unique_ptr<request> req) {
//TBD
unimplemented();
Expand Down
3 changes: 3 additions & 0 deletions api/storage_service.hh
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ namespace db { class snapshot_ctl; }
namespace netw { class messaging_service; }
class repair_service;
namespace cdc { class generation_service; }
class sstables_loader;

namespace gms {

Expand All @@ -41,6 +42,8 @@ class gossiper;
namespace api {

void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss, gms::gossiper& g, sharded<cdc::generation_service>& cdc_gs);
void set_sstables_loader(http_context& ctx, routes& r, sharded<sstables_loader>& sst_loader, sharded<service::storage_service>& ss);
void unset_sstables_loader(http_context& ctx, routes& r);
void set_repair(http_context& ctx, routes& r, sharded<repair_service>& repair);
void unset_repair(http_context& ctx, routes& r);
void set_transport_controller(http_context& ctx, routes& r, cql_transport::controller& ctl);
Expand Down
4 changes: 4 additions & 0 deletions main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1141,6 +1141,10 @@ int main(int ac, char** av) {
auto stop_sst_loader = defer_verbose_shutdown("sstables loader", [&sst_loader] {
sst_loader.stop().get();
});
api::set_server_sstables_loader(ctx, sst_loader, ss).get();
auto stop_sstl_api = defer_verbose_shutdown("sstables loader API", [&ctx] {
api::unset_server_sstables_loader(ctx).get();
});


gossiper.local().register_(ss.local().shared_from_this());
Expand Down

0 comments on commit 7e49359

Please sign in to comment.