Skip to content

Commit

Permalink
token_metadata: shared_token_metadata: add mutate_token_metadata
Browse files Browse the repository at this point in the history
mutate_token_metadata acquires the shared_token_metadata lock,
clones the token_metadata (using clone_async)
and calls an asynchronous functor on
the cloned copy of the token_metadata to mutate it.

If the functor is successful, the mutated clone
is set back to to the shared_token_metadata,
otherwise, the clone is destroyed.

With that, get rid of shared_token_metadata::clone

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
  • Loading branch information
bhalevy committed Dec 22, 2020
1 parent e089c22 commit 56aa49c
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 17 deletions.
7 changes: 7 additions & 0 deletions locator/token_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2101,4 +2101,11 @@ future<token_metadata_lock> shared_token_metadata::get_lock() noexcept {
return get_units(_sem, 1);
}

future<> shared_token_metadata::mutate_token_metadata(seastar::noncopyable_function<future<> (token_metadata&)> func) {
auto lk = co_await get_lock();
auto tm = co_await _shared->clone_async();
co_await func(tm);
set(make_token_metadata_ptr(std::move(tm)));
}

} // namespace locator
14 changes: 10 additions & 4 deletions locator/token_metadata.hh
Original file line number Diff line number Diff line change
Expand Up @@ -369,15 +369,21 @@ public:
return _shared;
}

mutable_token_metadata_ptr clone() const {
return make_token_metadata_ptr(*_shared);
}

void set(mutable_token_metadata_ptr tmptr) noexcept {
_shared = std::move(tmptr);
}

future<token_metadata_lock> get_lock() noexcept;

// mutate_token_metadata acquires the shared_token_metadata lock,
// clones the token_metadata (using clone_async)
// and calls an asynchronous functor on
// the cloned copy of the token_metadata to mutate it.
//
// If the functor is successful, the mutated clone
// is set back to to the shared_token_metadata,
// otherwise, the clone is destroyed.
future<> mutate_token_metadata(seastar::noncopyable_function<future<> (token_metadata&)> func);
};

}
28 changes: 15 additions & 13 deletions test/boost/network_topology_strategy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <boost/algorithm/cxx11/iota.hpp>
#include "test/lib/log.hh"
#include "test/lib/cql_test_env.hh"
#include <seastar/core/coroutine.hh>

using namespace locator;

Expand Down Expand Up @@ -194,7 +195,6 @@ void simple_test() {

locator::shared_token_metadata stm;

auto tmptr = stm.clone();
std::vector<ring_point> ring_points = {
{ 1.0, inet_address("192.100.10.1") },
{ 2.0, inet_address("192.101.10.1") },
Expand All @@ -209,12 +209,13 @@ void simple_test() {
{ 11.0, inet_address("192.102.40.2") }
};
// Initialize the token_metadata
stm.mutate_token_metadata([&ring_points] (token_metadata& tm) -> future<> {
for (unsigned i = 0; i < ring_points.size(); i++) {
tmptr->update_normal_token(
co_await tm.update_normal_token(
{dht::token::kind::key, d2t(ring_points[i].point / ring_points.size())},
ring_points[i].host).get();
ring_points[i].host);
}
stm.set(tmptr);
}).get();

/////////////////////////////////////
// Create the replication strategy
Expand Down Expand Up @@ -252,9 +253,10 @@ void simple_test() {
// points will be taken from the cache when it shouldn't and the
// corresponding check will fail.
//
tmptr = stm.clone();
tmptr->invalidate_cached_rings();
stm.set(std::move(tmptr));
stm.mutate_token_metadata([] (token_metadata& tm) {
tm.invalidate_cached_rings();
return make_ready_future<>();
}).get();
full_ring_check(ring_points, options320, ars_ptr);
}

Expand All @@ -275,7 +277,6 @@ void heavy_origin_test() {
std::vector<int> dc_endpoints = {128, 256, 512};
std::vector<int> dc_replication = {2, 6, 6};

auto tmptr = stm.clone();
std::map<sstring, sstring> config_options;
std::unordered_map<inet_address, std::unordered_set<token>> tokens;
std::vector<ring_point> ring_points;
Expand Down Expand Up @@ -316,8 +317,9 @@ void heavy_origin_test() {
}
}

tmptr->update_normal_tokens(tokens).get();
stm.set(std::move(tmptr));
stm.mutate_token_metadata([&tokens] (token_metadata& tm) {
return tm.update_normal_tokens(tokens);
}).get();

auto ars_uptr = abstract_replication_strategy::create_replication_strategy(
"test keyspace", "NetworkTopologyStrategy", stm, config_options);
Expand Down Expand Up @@ -607,17 +609,17 @@ SEASTAR_THREAD_TEST_CASE(testCalculateEndpoints) {

for (size_t run = 0; run < RUNS; ++run) {
shared_token_metadata stm;
auto tmptr = stm.clone();
// not doing anything sharded. We can just play fast and loose with the snitch.
(void)snitch.stop();
snitch = generate_snitch(datacenters, nodes);

stm.mutate_token_metadata([&nodes] (token_metadata& tm) -> future<> {
for (auto& node : nodes) {
for (size_t i = 0; i < VNODES; ++i) {
tmptr->update_normal_token(dht::token::get_random_token(), node).get();
co_await tm.update_normal_token(dht::token::get_random_token(), node);
}
}
stm.set(std::move(tmptr));
}).get();
test_equivalence(stm, snitch, datacenters);
}
}
Expand Down

0 comments on commit 56aa49c

Please sign in to comment.