forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtoken_metadata.cc
2044 lines (1763 loc) · 71.9 KB
/
token_metadata.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright (C) 2015 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "utils/UUID.hh"
#include "token_metadata.hh"
#include <optional>
#include "locator/snitch_base.hh"
#include "locator/abstract_replication_strategy.hh"
#include "log.hh"
#include "partition_range_compat.hh"
#include <unordered_map>
#include <algorithm>
#include <boost/icl/interval.hpp>
#include <boost/icl/interval_map.hpp>
#include <seastar/core/coroutine.hh>
#include <boost/range/adaptors.hpp>
namespace locator {
static logging::logger tlogger("token_metadata");
template <typename C, typename V>
static void remove_by_value(C& container, V value) {
for (auto it = container.begin(); it != container.end();) {
if (it->second == value) {
it = container.erase(it);
} else {
it++;
}
}
}
class token_metadata_impl final {
public:
using UUID = utils::UUID;
using inet_address = gms::inet_address;
private:
/**
* Maintains token to endpoint map of every node in the cluster.
* Each Token is associated with exactly one Address, but each Address may have
* multiple tokens. Hence, the BiMultiValMap collection.
*/
// FIXME: have to be BiMultiValMap
std::unordered_map<token, inet_address> _token_to_endpoint_map;
/** Maintains endpoint to host ID map of every node in the cluster */
std::unordered_map<inet_address, utils::UUID> _endpoint_to_host_id_map;
std::unordered_map<token, inet_address> _bootstrap_tokens;
std::unordered_set<inet_address> _leaving_endpoints;
// The map between the existing node to be replaced and the replacing node
std::unordered_map<inet_address, inet_address> _replacing_endpoints;
std::unordered_map<sstring, boost::icl::interval_map<token, std::unordered_set<inet_address>>> _pending_ranges_interval_map;
std::vector<token> _sorted_tokens;
topology _topology;
long _ring_version = 0;
// Note: if any member is added to this class
// clone_async() must be updated to copy that member.
void sort_tokens();
public:
token_metadata_impl(std::unordered_map<token, inet_address> token_to_endpoint_map, std::unordered_map<inet_address, utils::UUID> endpoints_map, topology topology);
token_metadata_impl() noexcept {};
token_metadata_impl(const token_metadata_impl&) = default;
token_metadata_impl(token_metadata_impl&&) noexcept = default;
const std::vector<token>& sorted_tokens() const;
future<> update_normal_token(token token, inet_address endpoint);
future<> update_normal_tokens(std::unordered_set<token> tokens, inet_address endpoint);
future<> update_normal_tokens(const std::unordered_map<inet_address, std::unordered_set<token>>& endpoint_tokens);
void update_normal_tokens_sync(std::unordered_set<token> tokens, inet_address endpoint);
void update_normal_tokens_sync(const std::unordered_map<inet_address, std::unordered_set<token>>& endpoint_tokens);
const token& first_token(const token& start) const;
size_t first_token_index(const token& start) const;
std::optional<inet_address> get_endpoint(const token& token) const;
std::vector<token> get_tokens(const inet_address& addr) const;
const std::unordered_map<token, inet_address>& get_token_to_endpoint() const {
return _token_to_endpoint_map;
}
const std::unordered_set<inet_address>& get_leaving_endpoints() const {
return _leaving_endpoints;
}
const std::unordered_map<token, inet_address>& get_bootstrap_tokens() const {
return _bootstrap_tokens;
}
void update_topology(inet_address ep) {
_topology.update_endpoint(ep);
}
/**
* Creates an iterable range of the sorted tokens starting at the token next
* after the given one.
*
* @param start A token that will define the beginning of the range
*
* @return The requested range (see the description above)
*/
boost::iterator_range<token_metadata::tokens_iterator> ring_range(const token& start) const;
boost::iterator_range<token_metadata::tokens_iterator> ring_range(
const std::optional<dht::partition_range::bound>& start) const;
topology& get_topology() {
return _topology;
}
const topology& get_topology() const {
return _topology;
}
void debug_show() const;
#if 0
private static final Logger logger = LoggerFactory.getLogger(TokenMetadata.class);
/**
* Maintains token to endpoint map of every node in the cluster.
* Each Token is associated with exactly one Address, but each Address may have
* multiple tokens. Hence, the BiMultiValMap collection.
*/
private final BiMultiValMap<Token, InetAddress> tokenToEndpointMap;
/** Maintains endpoint to host ID map of every node in the cluster */
private final BiMap<InetAddress, UUID> _endpoint_to_host_id_map;
// Prior to CASSANDRA-603, we just had <tt>Map<Range, InetAddress> pendingRanges<tt>,
// which was added to when a node began bootstrap and removed from when it finished.
//
// This is inadequate when multiple changes are allowed simultaneously. For example,
// suppose that there is a ring of nodes A, C and E, with replication factor 3.
// Node D bootstraps between C and E, so its pending ranges will be E-A, A-C and C-D.
// Now suppose node B bootstraps between A and C at the same time. Its pending ranges
// would be C-E, E-A and A-B. Now both nodes need to be assigned pending range E-A,
// which we would be unable to represent with the old Map. The same thing happens
// even more obviously for any nodes that boot simultaneously between same two nodes.
//
// So, we made two changes:
//
// First, we changed pendingRanges to a <tt>Multimap<Range, InetAddress></tt> (now
// <tt>Map<String, Multimap<Range, InetAddress>></tt>, because replication strategy
// and options are per-KeySpace).
//
// Second, we added the bootstrapTokens and leavingEndpoints collections, so we can
// rebuild pendingRanges from the complete information of what is going on, when
// additional changes are made mid-operation.
//
// Finally, note that recording the tokens of joining nodes in bootstrapTokens also
// means we can detect and reject the addition of multiple nodes at the same token
// before one becomes part of the ring.
private final BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<Token, InetAddress>();
// (don't need to record Token here since it's still part of tokenToEndpointMap until it's done leaving)
private final Set<InetAddress> leavingEndpoints = new HashSet<InetAddress>();
// this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints}
// nodes which are migrating to the new tokens in the ring
private final Set<Pair<Token, InetAddress>> _moving_endpoints = new HashSet<Pair<Token, InetAddress>>();
/* Use this lock for manipulating the token map */
private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
private volatile ArrayList<Token> sortedTokens;
private final Topology topology;
private static final Comparator<InetAddress> inetaddressCmp = new Comparator<InetAddress>()
{
public int compare(InetAddress o1, InetAddress o2)
{
return ByteBuffer.wrap(o1.getAddress()).compareTo(ByteBuffer.wrap(o2.getAddress()));
}
};
// signals replication strategies that nodes have joined or left the ring and they need to recompute ownership
private volatile long ringVersion = 0;
public TokenMetadata()
{
this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp),
HashBiMap.<InetAddress, UUID>create(),
new Topology());
}
private TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, BiMap<InetAddress, UUID> endpointsMap, Topology topology)
{
this.tokenToEndpointMap = tokenToEndpointMap;
this.topology = topology;
_endpoint_to_host_id_map = endpointsMap;
sortedTokens = sortTokens();
}
private ArrayList<Token> sortTokens()
{
return new ArrayList<Token>(tokenToEndpointMap.keySet());
}
/** @return the number of nodes bootstrapping into source's primary range */
public int pendingRangeChanges(InetAddress source)
{
int n = 0;
Collection<Range<Token>> sourceRanges = getPrimaryRangesFor(getTokens(source));
lock.readLock().lock();
try
{
for (Token token : _bootstrap_tokens.keySet())
for (Range<Token> range : sourceRanges)
if (range.contains(token))
n++;
}
finally
{
lock.readLock().unlock();
}
return n;
}
/**
* Update token map with a single token/endpoint pair in normal state.
*/
public void updateNormalToken(Token token, InetAddress endpoint)
{
updateNormalTokens(Collections.singleton(token), endpoint);
}
public void updateNormalTokens(Collection<Token> tokens, InetAddress endpoint)
{
Multimap<InetAddress, Token> endpointTokens = HashMultimap.create();
for (Token token : tokens)
endpointTokens.put(endpoint, token);
updateNormalTokens(endpointTokens);
}
/**
* Update token map with a set of token/endpoint pairs in normal state.
*
* Prefer this whenever there are multiple pairs to update, as each update (whether a single or multiple)
* is expensive (CASSANDRA-3831).
*
* @param endpointTokens
*/
public void updateNormalTokens(Multimap<InetAddress, Token> endpointTokens)
{
if (endpointTokens.isEmpty())
return;
lock.writeLock().lock();
try
{
boolean shouldSortTokens = false;
for (InetAddress endpoint : endpointTokens.keySet())
{
Collection<Token> tokens = endpointTokens.get(endpoint);
assert tokens != null && !tokens.isEmpty();
_bootstrap_tokens.removeValue(endpoint);
tokenToEndpointMap.removeValue(endpoint);
topology.addEndpoint(endpoint);
_leaving_endpoints.remove(endpoint);
removeFromMoving(endpoint); // also removing this endpoint from moving
for (Token token : tokens)
{
InetAddress prev = tokenToEndpointMap.put(token, endpoint);
if (!endpoint.equals(prev))
{
if (prev != null)
logger.warn("Token {} changing ownership from {} to {}", token, prev, endpoint);
shouldSortTokens = true;
}
}
}
if (shouldSortTokens)
sortedTokens = sortTokens();
}
finally
{
lock.writeLock().unlock();
}
}
#endif
/**
* Store an end-point to host ID mapping. Each ID must be unique, and
* cannot be changed after the fact.
*
* @param hostId
* @param endpoint
*/
void update_host_id(const UUID& host_id, inet_address endpoint);
/** Return the unique host ID for an end-point. */
UUID get_host_id(inet_address endpoint) const;
/// Return the unique host ID for an end-point or nullopt if not found.
std::optional<UUID> get_host_id_if_known(inet_address endpoint) const;
/** Return the end-point for a unique host ID */
std::optional<inet_address> get_endpoint_for_host_id(UUID host_id) const;
/** @return a copy of the endpoint-to-id map for read-only operations */
const std::unordered_map<inet_address, utils::UUID>& get_endpoint_to_host_id_map_for_reading() const;
void add_bootstrap_token(token t, inet_address endpoint);
void add_bootstrap_tokens(std::unordered_set<token> tokens, inet_address endpoint);
void remove_bootstrap_tokens(std::unordered_set<token> tokens);
void add_leaving_endpoint(inet_address endpoint);
void del_leaving_endpoint(inet_address endpoint);
public:
void remove_endpoint(inet_address endpoint);
#if 0
public Collection<Token> getTokens(InetAddress endpoint)
{
assert endpoint != null;
assert isMember(endpoint); // don't want to return nulls
lock.readLock().lock();
try
{
return new ArrayList<Token>(tokenToEndpointMap.inverse().get(endpoint));
}
finally
{
lock.readLock().unlock();
}
}
@Deprecated
public Token getToken(InetAddress endpoint)
{
return getTokens(endpoint).iterator().next();
}
#endif
bool is_member(inet_address endpoint) const;
bool is_leaving(inet_address endpoint) const;
// Is this node being replaced by another node
bool is_being_replaced(inet_address endpoint) const;
// Is any node being replaced by another node
bool is_any_node_being_replaced() const;
void add_replacing_endpoint(inet_address existing_node, inet_address replacing_node);
void del_replacing_endpoint(inet_address existing_node);
#if 0
private final AtomicReference<TokenMetadata> cachedTokenMap = new AtomicReference<TokenMetadata>();
#endif
public:
/**
* Create a full copy of token_metadata_impl using asynchronous continuations.
* The caller must ensure that the cloned object will not change if
* the function yields.
*/
future<token_metadata_impl> clone_async() const noexcept;
/**
* Create a copy of TokenMetadata with only tokenToEndpointMap. That is, pending ranges,
* bootstrap tokens and leaving endpoints are not included in the copy.
*/
token_metadata_impl clone_only_token_map_sync() const {
return token_metadata_impl(this->_token_to_endpoint_map, this->_endpoint_to_host_id_map, this->_topology);
}
#if 0
/**
* Return a cached TokenMetadata with only tokenToEndpointMap, i.e., the same as cloneOnlyTokenMap but
* uses a cached copy that is invalided when the ring changes, so in the common case
* no extra locking is required.
*
* Callers must *NOT* mutate the returned metadata object.
*/
public TokenMetadata cachedOnlyTokenMap()
{
TokenMetadata tm = cachedTokenMap.get();
if (tm != null)
return tm;
// synchronize to prevent thundering herd (CASSANDRA-6345)
synchronized (this)
{
if ((tm = cachedTokenMap.get()) != null)
return tm;
tm = cloneOnlyTokenMap();
cachedTokenMap.set(tm);
return tm;
}
}
#endif
/**
* Create a copy of TokenMetadata with only tokenToEndpointMap. That is, pending ranges,
* bootstrap tokens and leaving endpoints are not included in the copy.
* The caller must ensure that the cloned object will not change if
* the function yields.
*/
future<token_metadata_impl> clone_only_token_map(bool clone_sorted_tokens = true) const noexcept;
/**
* Create a copy of TokenMetadata with tokenToEndpointMap reflecting situation after all
* current leave operations have finished.
*
* @return new token metadata
*/
future<token_metadata_impl> clone_after_all_left() const noexcept {
return clone_only_token_map(false).then([this] (token_metadata_impl all_left_metadata) {
for (auto endpoint : _leaving_endpoints) {
all_left_metadata.remove_endpoint(endpoint);
}
all_left_metadata.sort_tokens();
return std::move(all_left_metadata);
});
}
/**
* Destroy the token_metadata members using continuations
* to prevent reactor stalls.
*/
future<> clear_gently() noexcept;
public:
dht::token_range_vector get_primary_ranges_for(std::unordered_set<token> tokens) const;
dht::token_range_vector get_primary_ranges_for(token right) const;
static boost::icl::interval<token>::interval_type range_to_interval(range<dht::token> r);
static range<dht::token> interval_to_range(boost::icl::interval<token>::interval_type i);
private:
void set_pending_ranges(const sstring& keyspace_name, std::unordered_multimap<range<token>, inet_address> new_pending_ranges, can_yield);
public:
bool has_pending_ranges(sstring keyspace_name, inet_address endpoint) const;
/**
* Calculate pending ranges according to bootsrapping and leaving nodes. Reasoning is:
*
* (1) When in doubt, it is better to write too much to a node than too little. That is, if
* there are multiple nodes moving, calculate the biggest ranges a node could have. Cleaning
* up unneeded data afterwards is better than missing writes during movement.
* (2) When a node leaves, ranges for other nodes can only grow (a node might get additional
* ranges, but it will not lose any of its current ranges as a result of a leave). Therefore
* we will first remove _all_ leaving tokens for the sake of calculation and then check what
* ranges would go where if all nodes are to leave. This way we get the biggest possible
* ranges with regard current leave operations, covering all subsets of possible final range
* values.
* (3) When a node bootstraps, ranges of other nodes can only get smaller. Without doing
* complex calculations to see if multiple bootstraps overlap, we simply base calculations
* on the same token ring used before (reflecting situation after all leave operations have
* completed). Bootstrapping nodes will be added and removed one by one to that metadata and
* checked what their ranges would be. This will give us the biggest possible ranges the
* node could have. It might be that other bootstraps make our actual final ranges smaller,
* but it does not matter as we can clean up the data afterwards.
*
* NOTE: This is heavy and ineffective operation. This will be done only once when a node
* changes state in the cluster, so it should be manageable.
*/
future<> update_pending_ranges(
const token_metadata& unpimplified_this,
const abstract_replication_strategy& strategy, const sstring& keyspace_name);
void calculate_pending_ranges_for_leaving(
const token_metadata& unpimplified_this,
const abstract_replication_strategy& strategy,
std::unordered_multimap<range<token>, inet_address>& new_pending_ranges,
mutable_token_metadata_ptr all_left_metadata) const;
void calculate_pending_ranges_for_bootstrap(
const abstract_replication_strategy& strategy,
std::unordered_multimap<range<token>, inet_address>& new_pending_ranges,
mutable_token_metadata_ptr all_left_metadata) const;
void calculate_pending_ranges_for_replacing(
const token_metadata& unpimplified_this,
const abstract_replication_strategy& strategy,
std::unordered_multimap<range<token>, inet_address>& new_pending_ranges) const;
public:
token get_predecessor(token t) const;
#if 0
public Token getSuccessor(Token token)
{
List tokens = sortedTokens();
int index = Collections.binarySearch(tokens, token);
assert index >= 0 : token + " not found in " + StringUtils.join(tokenToEndpointMap.keySet(), ", ");
return (Token) ((index == (tokens.size() - 1)) ? tokens.get(0) : tokens.get(index + 1));
}
/** @return a copy of the bootstrapping tokens map */
public BiMultiValMap<Token, InetAddress> getBootstrapTokens()
{
lock.readLock().lock();
try
{
return new BiMultiValMap<Token, InetAddress>(_bootstrap_tokens);
}
finally
{
lock.readLock().unlock();
}
}
#endif
// Returns nodes that are officially part of the ring. It does not include
// node that is still joining the cluster, e.g., a node that is still
// streaming data before it finishes the bootstrap process and turns into
// NORMAL status.
std::vector<inet_address> get_all_endpoints() const {
auto tmp = boost::copy_range<std::unordered_set<gms::inet_address>>(_token_to_endpoint_map | boost::adaptors::map_values);
return std::vector<inet_address>(tmp.begin(), tmp.end());
}
/* Returns the number of different endpoints that own tokens in the ring.
* Bootstrapping tokens are not taken into account. */
size_t count_normal_token_owners() const;
#if 0
public Set<InetAddress> getAllEndpoints()
{
lock.readLock().lock();
try
{
return ImmutableSet.copyOf(_endpoint_to_host_id_map.keySet());
}
finally
{
lock.readLock().unlock();
}
}
/** caller should not modify _leaving_endpoints */
public Set<InetAddress> getLeavingEndpoints()
{
lock.readLock().lock();
try
{
return ImmutableSet.copyOf(_leaving_endpoints);
}
finally
{
lock.readLock().unlock();
}
}
/**
* Endpoints which are migrating to the new tokens
* @return set of addresses of moving endpoints
*/
public Set<Pair<Token, InetAddress>> getMovingEndpoints()
{
lock.readLock().lock();
try
{
return ImmutableSet.copyOf(_moving_endpoints);
}
finally
{
lock.readLock().unlock();
}
}
public static int firstTokenIndex(final ArrayList ring, Token start, boolean insertMin)
{
assert ring.size() > 0;
// insert the minimum token (at index == -1) if we were asked to include it and it isn't a member of the ring
int i = Collections.binarySearch(ring, start);
if (i < 0)
{
i = (i + 1) * (-1);
if (i >= ring.size())
i = insertMin ? -1 : 0;
}
return i;
}
public static Token firstToken(final ArrayList<Token> ring, Token start)
{
return ring.get(firstTokenIndex(ring, start, false));
}
/**
* iterator over the Tokens in the given ring, starting with the token for the node owning start
* (which does not have to be a Token in the ring)
* @param includeMin True if the minimum token should be returned in the ring even if it has no owner.
*/
public static Iterator<Token> ringIterator(final ArrayList<Token> ring, Token start, boolean includeMin)
{
if (ring.isEmpty())
return includeMin ? Iterators.singletonIterator(StorageService.getPartitioner().getMinimumToken())
: Iterators.<Token>emptyIterator();
final boolean insertMin = includeMin && !ring.get(0).isMinimum();
final int startIndex = firstTokenIndex(ring, start, insertMin);
return new AbstractIterator<Token>()
{
int j = startIndex;
protected Token computeNext()
{
if (j < -1)
return endOfData();
try
{
// return minimum for index == -1
if (j == -1)
return StorageService.getPartitioner().getMinimumToken();
// return ring token for other indexes
return ring.get(j);
}
finally
{
j++;
if (j == ring.size())
j = insertMin ? -1 : 0;
if (j == startIndex)
// end iteration
j = -2;
}
}
};
}
/** used by tests */
public void clearUnsafe()
{
lock.writeLock().lock();
try
{
tokenToEndpointMap.clear();
_endpoint_to_host_id_map.clear();
_bootstrap_tokens.clear();
_leaving_endpoints.clear();
sortedTokens.clear();
topology.clear();
invalidateCachedRings();
}
finally
{
lock.writeLock().unlock();
}
}
public String toString()
{
StringBuilder sb = new StringBuilder();
lock.readLock().lock();
try
{
Set<InetAddress> eps = tokenToEndpointMap.inverse().keySet();
if (!eps.isEmpty())
{
sb.append("Normal Tokens:");
sb.append(System.getProperty("line.separator"));
for (InetAddress ep : eps)
{
sb.append(ep);
sb.append(":");
sb.append(tokenToEndpointMap.inverse().get(ep));
sb.append(System.getProperty("line.separator"));
}
}
if (!_bootstrap_tokens.isEmpty())
{
sb.append("Bootstrapping Tokens:" );
sb.append(System.getProperty("line.separator"));
for (Map.Entry<Token, InetAddress> entry : _bootstrap_tokens.entrySet())
{
sb.append(entry.getValue()).append(":").append(entry.getKey());
sb.append(System.getProperty("line.separator"));
}
}
if (!_leaving_endpoints.isEmpty())
{
sb.append("Leaving Endpoints:");
sb.append(System.getProperty("line.separator"));
for (InetAddress ep : _leaving_endpoints)
{
sb.append(ep);
sb.append(System.getProperty("line.separator"));
}
}
if (!_pending_ranges.isEmpty())
{
sb.append("Pending Ranges:");
sb.append(System.getProperty("line.separator"));
sb.append(printPendingRanges());
}
}
finally
{
lock.readLock().unlock();
}
return sb.toString();
}
#endif
public:
// returns empty vector if keyspace_name not found.
inet_address_vector_topology_change pending_endpoints_for(const token& token, const sstring& keyspace_name) const;
#if 0
/**
* @deprecated retained for benefit of old tests
*/
public Collection<InetAddress> getWriteEndpoints(Token token, String keyspaceName, Collection<InetAddress> naturalEndpoints)
{
return ImmutableList.copyOf(Iterables.concat(naturalEndpoints, pendingEndpointsFor(token, keyspaceName)));
}
#endif
public:
/** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */
std::multimap<inet_address, token> get_endpoint_to_token_map_for_reading() const;
/**
* @return a (stable copy, won't be modified) Token to Endpoint map for all the normal and bootstrapping nodes
* in the cluster.
*/
std::map<token, inet_address> get_normal_and_bootstrapping_token_to_endpoint_map() const;
#if 0
/**
* @return the Topology map of nodes to DCs + Racks
*
* This is only allowed when a copy has been made of TokenMetadata, to avoid concurrent modifications
* when Topology methods are subsequently used by the caller.
*/
public Topology getTopology()
{
assert this != StorageService.instance.getTokenMetadata();
return topology;
}
public long getRingVersion()
{
return ringVersion;
}
public void invalidateCachedRings()
{
ringVersion++;
cachedTokenMap.set(null);
}
/**
* Tracks the assignment of racks and endpoints in each datacenter for all the "normal" endpoints
* in this TokenMetadata. This allows faster calculation of endpoints in NetworkTopologyStrategy.
*/
public static class Topology
{
/** multi-map of DC to endpoints in that DC */
private final Multimap<String, InetAddress> dcEndpoints;
/** map of DC to multi-map of rack to endpoints in that rack */
private final Map<String, Multimap<String, InetAddress>> dcRacks;
/** reverse-lookup map for endpoint to current known dc/rack assignment */
private final Map<InetAddress, Pair<String, String>> currentLocations;
protected Topology()
{
dcEndpoints = HashMultimap.create();
dcRacks = new HashMap<String, Multimap<String, InetAddress>>();
currentLocations = new HashMap<InetAddress, Pair<String, String>>();
}
protected void clear()
{
dcEndpoints.clear();
dcRacks.clear();
currentLocations.clear();
}
/**
* construct deep-copy of other
*/
protected Topology(Topology other)
{
dcEndpoints = HashMultimap.create(other.dcEndpoints);
dcRacks = new HashMap<String, Multimap<String, InetAddress>>();
for (String dc : other.dcRacks.keySet())
dcRacks.put(dc, HashMultimap.create(other.dcRacks.get(dc)));
currentLocations = new HashMap<InetAddress, Pair<String, String>>(other.currentLocations);
}
/**
* Stores current DC/rack assignment for ep
*/
protected void addEndpoint(InetAddress ep)
{
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
String dc = snitch.getDatacenter(ep);
String rack = snitch.getRack(ep);
Pair<String, String> current = currentLocations.get(ep);
if (current != null)
{
if (current.left.equals(dc) && current.right.equals(rack))
return;
dcRacks.get(current.left).remove(current.right, ep);
dcEndpoints.remove(current.left, ep);
}
dcEndpoints.put(dc, ep);
if (!dcRacks.containsKey(dc))
dcRacks.put(dc, HashMultimap.<String, InetAddress>create());
dcRacks.get(dc).put(rack, ep);
currentLocations.put(ep, Pair.create(dc, rack));
}
/**
* Removes current DC/rack assignment for ep
*/
protected void removeEndpoint(InetAddress ep)
{
if (!currentLocations.containsKey(ep))
return;
Pair<String, String> current = currentLocations.remove(ep);
dcEndpoints.remove(current.left, ep);
dcRacks.get(current.left).remove(current.right, ep);
}
/**
* @return multi-map of DC to endpoints in that DC
*/
public Multimap<String, InetAddress> getDatacenterEndpoints()
{
return dcEndpoints;
}
/**
* @return map of DC to multi-map of rack to endpoints in that rack
*/
public Map<String, Multimap<String, InetAddress>> getDatacenterRacks()
{
return dcRacks;
}
}
#endif
long get_ring_version() const {
return _ring_version;
}
void invalidate_cached_rings() {
++_ring_version;
//cachedTokenMap.set(null);
}
friend class token_metadata;
};
token_metadata::tokens_iterator::tokens_iterator(const token& start, const token_metadata_impl* token_metadata)
: _token_metadata(token_metadata) {
_cur_it = _token_metadata->sorted_tokens().begin() + _token_metadata->first_token_index(start);
_remaining = _token_metadata->sorted_tokens().size();
}
bool token_metadata::tokens_iterator::operator==(const tokens_iterator& it) const {
return _remaining == it._remaining;
}
const token& token_metadata::tokens_iterator::operator*() const {
return *_cur_it;
}
token_metadata::tokens_iterator& token_metadata::tokens_iterator::operator++() {
++_cur_it;
if (_cur_it == _token_metadata->sorted_tokens().end()) {
_cur_it = _token_metadata->sorted_tokens().begin();
}
--_remaining;
return *this;
}
inline
boost::iterator_range<token_metadata::tokens_iterator>
token_metadata_impl::ring_range(const token& start) const {
auto begin = token_metadata::tokens_iterator(start, this);
auto end = token_metadata::tokens_iterator();
return boost::make_iterator_range(begin, end);
}
token_metadata_impl::token_metadata_impl(std::unordered_map<token, inet_address> token_to_endpoint_map, std::unordered_map<inet_address, utils::UUID> endpoints_map, topology topology) :
_token_to_endpoint_map(token_to_endpoint_map), _endpoint_to_host_id_map(endpoints_map), _topology(topology) {
sort_tokens();
}
future<token_metadata_impl> token_metadata_impl::clone_async() const noexcept {
return clone_only_token_map().then([this] (token_metadata_impl ret) {
return do_with(std::move(ret), [this] (token_metadata_impl& ret) {
ret._bootstrap_tokens.reserve(_bootstrap_tokens.size());
return do_for_each(_bootstrap_tokens, [&ret] (const auto& p) {
ret._bootstrap_tokens.emplace(p);
}).then([this, &ret] {
ret._leaving_endpoints = _leaving_endpoints;
ret._replacing_endpoints = _replacing_endpoints;
}).then([this, &ret] {
return do_for_each(_pending_ranges_interval_map,
[this, &ret] (const auto& p) {
ret._pending_ranges_interval_map.emplace(p);
});
}).then([this, &ret] {
ret._ring_version = _ring_version;
return make_ready_future<token_metadata_impl>(std::move(ret));
});
});
});
}
future<token_metadata_impl> token_metadata_impl::clone_only_token_map(bool clone_sorted_tokens) const noexcept {
return do_with(token_metadata_impl(), [this, clone_sorted_tokens] (token_metadata_impl& ret) {
ret._token_to_endpoint_map.reserve(_token_to_endpoint_map.size());
return do_for_each(_token_to_endpoint_map, [&ret] (const auto& p) {
ret._token_to_endpoint_map.emplace(p);
}).then([this, &ret] {
ret._endpoint_to_host_id_map = _endpoint_to_host_id_map;
}).then([this, &ret] {
ret._topology = _topology;
}).then([this, &ret, clone_sorted_tokens] {
if (clone_sorted_tokens) {
ret._sorted_tokens = _sorted_tokens;
}
return make_ready_future<token_metadata_impl>(std::move(ret));
});
});
}
template <typename Container>
static future<> clear_container_gently(Container& c) noexcept;
// The vector elements we use here (token / inet_address) have trivial destructors
// so they can be safely cleared in bulk
template <typename T, typename Container = std::vector<T>>
static future<> clear_container_gently(Container& vect) noexcept {
vect.clear();
return make_ready_future<>();
}
template <typename Container>
static future<> clear_container_gently(Container& c) noexcept {
for (auto b = c.begin(); b != c.end(); b = c.erase(b)) {
co_await make_ready_future<>(); // maybe yield
}
}
template <typename Container>
static future<> clear_nested_container_gently(Container& c) noexcept {
for (auto b = c.begin(); b != c.end(); b = c.erase(b)) {
co_await clear_container_gently(b->second);
}
}
future<> token_metadata_impl::clear_gently() noexcept {
co_await clear_container_gently(_token_to_endpoint_map);
co_await clear_container_gently(_endpoint_to_host_id_map);
co_await clear_container_gently(_bootstrap_tokens);
co_await clear_container_gently(_leaving_endpoints);
co_await clear_container_gently(_replacing_endpoints);
co_await clear_container_gently(_pending_ranges_interval_map);
co_await clear_container_gently(_sorted_tokens);
co_await _topology.clear_gently();
co_return;
}
void token_metadata_impl::sort_tokens() {
std::vector<token> sorted;
sorted.reserve(_token_to_endpoint_map.size());
for (auto&& i : _token_to_endpoint_map) {
sorted.push_back(i.first);