Skip to content

Commit

Permalink
storage_proxy: add functions for syncing with hints queue
Browse files Browse the repository at this point in the history
Adds two methods to `storage_proxy`:

- `create_hint_queue_sync_point` - creates a "hint sync point" which
  is kept present in storage_proxy until all hint queues on the local
  node reach their curent end. It will also disappear if given deadline
  is reached first.
- `check_hint_queue_sync_point` - checks if given hint sync point still
  exists.

The created sync point waits for hint queues in all hint managers, on
all shards.
  • Loading branch information
piodul committed Apr 27, 2021
1 parent 427bbf6 commit 244738b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 0 deletions.
1 change: 1 addition & 0 deletions db/hints/manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "database.hh"
#include "service_permit.hh"
#include "utils/directories.hh"
#include "utils/UUID_gen.hh"

using namespace std::literals::chrono_literals;

Expand Down
40 changes: 40 additions & 0 deletions service/storage_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5260,6 +5260,46 @@ const db::hints::host_filter& storage_proxy::get_hints_host_filter() const {
return _hints_manager.get_host_filter();
}

future<> storage_proxy::create_hint_queue_sync_point(utils::UUID sync_point_id, std::vector<gms::inet_address> endpoints, clock_type::time_point deadline) {
return container().invoke_on(0, [sync_point_id, endpoints = std::move(endpoints), deadline] (storage_proxy& sp) mutable {
auto [it, was_inserted] = sp._hint_queue_checkpoints.emplace(sync_point_id);
if (!was_inserted) {
return make_exception_future<>(std::runtime_error(format("Hint sync point {} already exists", sync_point_id)));
}

// Waited indirectly by keeping a pointer to the storage_proxy.
// When drain_on_shutdown is triggered, hints manager will report an error
// and this future will resolve.
(void)sp.container().invoke_on_all([endpoints = std::move(endpoints), sync_point_id, deadline] (storage_proxy& sp) {
auto wait_for_hints_manager = [&endpoints, sync_point_id, deadline] (db::hints::manager& mgr, const char* mgr_name) {
return mgr.wait_until_hints_are_replayed(endpoints, deadline).then_wrapped([mgr_name, sync_point_id] (future<>&& f) {
if (!f.failed()) {
slogger.debug("Hint sync point {} for {} reached", sync_point_id, mgr_name);
f.get();
} else {
slogger.debug("An error occured when waiting for hint sync point {} for {} to resolve: {}", sync_point_id, mgr_name, f.get_exception());
}
});
};

return when_all(
wait_for_hints_manager(sp._hints_manager, "hints manager"),
wait_for_hints_manager(sp._hints_for_views_manager, "hints for view manager")
).discard_result();
}).then([&sp, sync_point_id, guard = sp.shared_from_this()] {
sp._hint_queue_checkpoints.erase(sync_point_id);
});

return make_ready_future<>();
});
}

future<bool> storage_proxy::check_hint_queue_sync_point(utils::UUID sync_point) {
return container().invoke_on(0, [sync_point] (storage_proxy& sp) {
return !sp._hint_queue_checkpoints.contains(sync_point);
});
}

void storage_proxy::on_join_cluster(const gms::inet_address& endpoint) {};

void storage_proxy::on_leave_cluster(const gms::inet_address& endpoint) {};
Expand Down
5 changes: 5 additions & 0 deletions service/storage_proxy.hh
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ private:
cdc::cdc_service* _cdc = nullptr;

cdc_stats _cdc_stats;

std::unordered_set<utils::UUID> _hint_queue_checkpoints;
private:
future<coordinator_query_result> query_singular(lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector&& partition_ranges,
Expand Down Expand Up @@ -445,6 +447,9 @@ private:
future<> mutate_counters(Range&& mutations, db::consistency_level cl, tracing::trace_state_ptr tr_state, service_permit permit, clock_type::time_point timeout);

void retire_view_response_handlers(noncopyable_function<bool(const abstract_write_response_handler&)> filter_fun);

future<> create_hint_queue_sync_point(utils::UUID sync_point_id, std::vector<gms::inet_address> endpoints, clock_type::time_point deadline);
future<bool> check_hint_queue_sync_point(utils::UUID sync_point);
public:
storage_proxy(distributed<database>& db, config cfg, db::view::node_update_backlog& max_view_update_backlog,
scheduling_group_key stats_key, gms::feature_service& feat, const locator::shared_token_metadata& stm, netw::messaging_service& ms);
Expand Down

0 comments on commit 244738b

Please sign in to comment.