Skip to content

Commit

Permalink
api: add HTTP API for hint sync points
Browse files Browse the repository at this point in the history
Adds HTTP endpoints for manipulating hint sync points:

- /hinted_handoff/sync_point (POST) - creates a new sync point for
  hints towards nodes listed in the `target_hosts` parameter
- /hinted_handoff/sync_point (GET) - checks the status of the sync
  point. If a non-zero `timeout` parameter is given, it waits until the
  sync point is reached or the timeout expires.
  • Loading branch information
piodul committed Aug 9, 2021
1 parent 9091ce5 commit 7e3966c
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 0 deletions.
55 changes: 55 additions & 0 deletions api/api-doc/hinted_handoff.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,61 @@
"application/json"
],
"apis":[
{
"path":"/hinted_handoff/sync_point",
"operations":[
{
"method":"POST",
"summary":"Creates a hints sync point. It can be used to wait until hints between given nodes are replayed. A sync point allows you to wait for hints accumulated at the moment of its creation - it won't wait for hints generated later. A sync point is described entirely by its ID - there is no state kept server-side, so there is no need to delete it.",
"type":"string",
"nickname":"create_hints_sync_point",
"produces":[
"application/json"
],
"parameters":[
{
"name":"target_hosts",
"description":"A list of nodes towards which hints should be replayed. Multiple hosts can be listed by separating them with commas. If not provided or empty, the point will resolve when current hints towards all nodes in the cluster are sent.",
"required":false,
"allowMultiple":false,
"type":"string",
"paramType":"query"
}
]
},
{
"method":"GET",
"summary":"Get the status of a hints sync point, possibly waiting for it to be reached.",
"type":"string",
"enum":[
"DONE",
"IN_PROGRESS"
],
"nickname":"get_hints_sync_point",
"produces":[
"application/json"
],
"parameters":[
{
"name":"id",
"description":"The ID of the hint sync point which should be checked or waited on",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"timeout",
"description":"Timeout in seconds after which the query returns even if hints are still being replayed. No value or 0 will cause the query to return immediately. A negative value will cause the query to wait until the sync point is reached",
"required":false,
"allowMultiple":false,
"type":"long",
"paramType":"query"
}
]
}
]
},
{
"path":"/hinted_handoff/hints",
"operations":[
Expand Down
82 changes: 82 additions & 0 deletions api/hinted_handoff.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,94 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/

#include <algorithm>
#include <vector>

#include "hinted_handoff.hh"
#include "api/api-doc/hinted_handoff.json.hh"

#include "gms/inet_address.hh"
#include "gms/gossiper.hh"
#include "service/storage_proxy.hh"

namespace api {

using namespace json;
namespace hh = httpd::hinted_handoff_json;

void set_hinted_handoff(http_context& ctx, routes& r) {
hh::create_hints_sync_point.set(r, [&ctx] (std::unique_ptr<request> req) -> future<json::json_return_type> {
auto parse_hosts_list = [] (sstring arg) {
std::vector<sstring> hosts_str = split(arg, ",");
std::vector<gms::inet_address> hosts;
hosts.reserve(hosts_str.size());

if (hosts_str.empty()) {
// No target_hosts specified means that we should wait for hints for all nodes to be sent
// TODO: Get rid of the `get_local_gossiper` invocation
const auto members_set = gms::get_local_gossiper().get_live_members();
std::copy(members_set.begin(), members_set.end(), std::back_inserter(hosts));
} else {
for (const auto& host_str : hosts_str) {
try {
gms::inet_address host;
host = gms::inet_address(host_str);
hosts.push_back(host);
} catch (std::exception& e) {
throw httpd::bad_param_exception(format("Failed to parse host address {}: {}", host_str, e.what()));
}
}
}
return hosts;
};

std::vector<gms::inet_address> target_hosts = parse_hosts_list(req->get_query_param("target_hosts"));
return ctx.sp.local().create_hint_sync_point(std::move(target_hosts)).then([] (db::hints::sync_point sync_point) {
return json::json_return_type(sync_point.encode());
});
});

hh::get_hints_sync_point.set(r, [&ctx] (std::unique_ptr<request> req) -> future<json::json_return_type> {
db::hints::sync_point sync_point;
const sstring encoded = req->get_query_param("id");
try {
sync_point = db::hints::sync_point::decode(encoded);
} catch (std::exception& e) {
throw httpd::bad_param_exception(format("Failed to parse the sync point description {}: {}", encoded, e.what()));
}

lowres_clock::time_point deadline;
const sstring timeout_str = req->get_query_param("timeout");
try {
deadline = [&] {
if (timeout_str.empty()) {
// Empty string - don't wait at all, just check the status
return lowres_clock::time_point::min();
} else {
const auto timeout = std::stoll(timeout_str);
if (timeout >= 0) {
// Wait until the point is reached, or until `timeout` seconds elapse
return lowres_clock::now() + std::chrono::seconds(timeout);
} else {
// Negative value indicates infinite timeout
return lowres_clock::time_point::max();
}
}
} ();
} catch (std::exception& e) {
throw httpd::bad_param_exception(format("Failed to parse the timeout parameter {}: {}", timeout_str, e.what()));
}

using return_type = hh::ns_get_hints_sync_point::get_hints_sync_point_return_type;
using return_type_wrapper = hh::ns_get_hints_sync_point::return_type_wrapper;

return ctx.sp.local().wait_for_hint_sync_point(std::move(sync_point), deadline).then([] {
return json::json_return_type(return_type_wrapper(return_type::DONE));
}).handle_exception_type([] (const timed_out_error&) {
return json::json_return_type(return_type_wrapper(return_type::IN_PROGRESS));
});
});

hh::list_endpoints_pending_hints.set(r, [] (std::unique_ptr<request> req) {
//TBD
unimplemented();
Expand Down Expand Up @@ -72,6 +151,9 @@ void set_hinted_handoff(http_context& ctx, routes& r) {
}

void unset_hinted_handoff(http_context& ctx, routes& r) {
hh::create_hints_sync_point.unset(r);
hh::get_hints_sync_point.unset(r);

hh::list_endpoints_pending_hints.unset(r);
hh::truncate_all_hints.unset(r);
hh::schedule_hint_delivery.unset(r);
Expand Down

0 comments on commit 7e3966c

Please sign in to comment.