Skip to content

Commit

Permalink
messaging_service: add verbs for hint sync points
Browse files Browse the repository at this point in the history
Adds two verbs: HINT_SYNC_POINT_CREATE and HINT_SYNC_POINT_CHECK.
Those will make it possible to create a sync point and regularly poll
to check its existence.
  • Loading branch information
piodul committed Apr 27, 2021
1 parent 244738b commit 82c4198
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 1 deletion.
1 change: 1 addition & 0 deletions configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,7 @@ def find_headers(repodir, excluded_dirs):
'idl/messaging_service.idl.hh',
'idl/paxos.idl.hh',
'idl/raft.idl.hh',
'idl/hinted_handoff.idl.hh',
]

headers = find_headers('.', excluded_dirs=['idl', 'build', 'seastar', '.git'])
Expand Down
58 changes: 58 additions & 0 deletions db/hints/messages.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (C) 2021 ScyllaDB
*/

/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/

#pragma once

#include <vector>
#include <seastar/core/lowres_clock.hh>

#include "utils/UUID.hh"

namespace db {

namespace hints {

struct sync_point_create_request {
// The ID of the sync point to create.
utils::UUID sync_point_id;
// Towards which nodes hints should be replayed for this sync point?
std::vector<gms::inet_address> target_endpoints;
// The sync point will be deleted at this point of time if hints won't be
// sent by that time.
lowres_clock::time_point mark_deadline;
};

struct sync_point_create_response {};

struct sync_point_check_request {
// The ID of the sync point whose status we want to check
utils::UUID sync_point_id;
};

struct sync_point_check_response {
// Returns true if the sync point has expired: either hints were replayed,
// or the deadline was reached.
bool expired;
};

}

}
44 changes: 44 additions & 0 deletions idl/hinted_handoff.idl.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2021 ScyllaDB
*/

/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/

namespace db {

namespace hints {

struct sync_point_create_request {
utils::UUID sync_point_id;
std::vector<gms::inet_address> target_endpoints;
lowres_clock::time_point mark_deadline;
};

struct sync_point_create_response {};

struct sync_point_check_request {
utils::UUID sync_point_id;
};

struct sync_point_check_response {
bool expired;
};

}

}
24 changes: 24 additions & 0 deletions message/messaging_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
#include "idl/messaging_service.dist.hh"
#include "idl/paxos.dist.hh"
#include "idl/raft.dist.hh"
#include "idl/hinted_handoff.dist.hh"
#include "serializer_impl.hh"
#include "serialization_visitors.hh"
#include "idl/consistency_level.dist.impl.hh"
Expand All @@ -89,6 +90,7 @@
#include "idl/messaging_service.dist.impl.hh"
#include "idl/paxos.dist.impl.hh"
#include "idl/raft.dist.impl.hh"
#include "idl/hinted_handoff.dist.impl.hh"
#include <seastar/rpc/lz4_compressor.hh>
#include <seastar/rpc/lz4_fragmented_compressor.hh>
#include <seastar/rpc/multi_algo_compressor_factory.hh>
Expand Down Expand Up @@ -554,6 +556,8 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM:
case messaging_verb::NODE_OPS_CMD:
case messaging_verb::HINT_MUTATION:
case messaging_verb::HINT_SYNC_POINT_CREATE:
case messaging_verb::HINT_SYNC_POINT_CHECK:
return 1;
case messaging_verb::CLIENT_ID:
case messaging_verb::MUTATION:
Expand Down Expand Up @@ -1498,6 +1502,26 @@ future<> messaging_service::send_hint_mutation(msg_addr id, clock_type::time_poi
std::move(reply_to), shard, std::move(response_id), std::move(trace_info));
}

void messaging_service::register_hint_sync_point_create(std::function<future<db::hints::sync_point_create_response> (db::hints::sync_point_create_request request)>&& func) {
register_handler(this, netw::messaging_verb::HINT_SYNC_POINT_CREATE, std::move(func));
}
future<> messaging_service::unregister_hint_sync_point_create() {
return unregister_handler(netw::messaging_verb::HINT_SYNC_POINT_CREATE);
}
future<db::hints::sync_point_create_response> messaging_service::send_hint_sync_point_create(msg_addr id, clock_type::time_point timeout, db::hints::sync_point_create_request request) {
return send_message_timeout<future<db::hints::sync_point_create_response>>(this, messaging_verb::HINT_SYNC_POINT_CREATE, std::move(id), timeout, std::move(request));
}

void messaging_service::register_hint_sync_point_check(std::function<future<db::hints::sync_point_check_response> (db::hints::sync_point_check_request request)>&& func) {
register_handler(this, netw::messaging_verb::HINT_SYNC_POINT_CHECK, std::move(func));
}
future<> messaging_service::unregister_hint_sync_point_check() {
return unregister_handler(netw::messaging_verb::HINT_SYNC_POINT_CHECK);
}
future<db::hints::sync_point_check_response> messaging_service::send_hint_sync_point_check(msg_addr id, clock_type::time_point timeout, db::hints::sync_point_check_request request) {
return send_message_timeout<future<db::hints::sync_point_check_response>>(this, messaging_verb::HINT_SYNC_POINT_CHECK, std::move(id), timeout, std::move(request));
}

void messaging_service::register_raft_send_snapshot(std::function<future<raft::snapshot_reply> (const rpc::client_info&, rpc::opt_time_point, uint64_t group_id, raft::server_id from_id, raft::server_id dst_id, raft::install_snapshot)>&& func) {
register_handler(this, netw::messaging_verb::RAFT_SEND_SNAPSHOT, std::move(func));
}
Expand Down
20 changes: 19 additions & 1 deletion message/messaging_service.hh
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "cache_temperature.hh"
#include "service/paxos/prepare_response.hh"
#include "raft/raft.hh"
#include "db/hints/messages.hh"

#include <list>
#include <vector>
Expand Down Expand Up @@ -71,6 +72,13 @@ namespace db::view {
class update_backlog;
}

namespace db::hints {
struct sync_point_create_request;
struct sync_point_create_response;
struct sync_point_check_request;
struct sync_point_check_response;
}

class frozen_mutation;
class frozen_schema;
class partition_checksum;
Expand Down Expand Up @@ -152,7 +160,9 @@ enum class messaging_verb : int32_t {
RAFT_VOTE_REQUEST = 49,
RAFT_VOTE_REPLY = 50,
RAFT_TIMEOUT_NOW = 51,
LAST = 52,
HINT_SYNC_POINT_CREATE = 52,
HINT_SYNC_POINT_CHECK = 53,
LAST = 54,
};

} // namespace netw
Expand Down Expand Up @@ -555,6 +565,14 @@ public:
future<> send_hint_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector<inet_address> forward,
inet_address reply_to, unsigned shard, response_id_type response_id, std::optional<tracing::trace_info> trace_info = std::nullopt);

void register_hint_sync_point_create(std::function<future<db::hints::sync_point_create_response> (db::hints::sync_point_create_request request)>&& func);
future<> unregister_hint_sync_point_create();
future<db::hints::sync_point_create_response> send_hint_sync_point_create(msg_addr id, clock_type::time_point timeout, db::hints::sync_point_create_request request);

void register_hint_sync_point_check(std::function<future<db::hints::sync_point_check_response> (db::hints::sync_point_check_request request)>&& func);
future<> unregister_hint_sync_point_check();
future<db::hints::sync_point_check_response> send_hint_sync_point_check(msg_addr id, clock_type::time_point timeout, db::hints::sync_point_check_request request);

// RAFT verbs
void register_raft_send_snapshot(std::function<future<raft::snapshot_reply> (const rpc::client_info&, rpc::opt_time_point, uint64_t group_id, raft::server_id from_id, raft::server_id dst_id, raft::install_snapshot)>&& func);
future<> unregister_raft_send_snapshot();
Expand Down

0 comments on commit 82c4198

Please sign in to comment.