Skip to content

Commit 2aaee90

Browse files
authored
[CINFRA] Fix propagation of writeConcern to collections and groups in Replication2 (arangodb#20434)
1 parent 7892528 commit 2aaee90

File tree

6 files changed

+116
-52
lines changed

6 files changed

+116
-52
lines changed

arangod/Cluster/AgencyCache.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -799,7 +799,7 @@ AgencyCache::change_set_t AgencyCache::changedSince(
799799
if (get_rest) { // All the rest, i.e. All keys excluding the usual suspects
800800
static std::vector<std::string> const exc{
801801
"Analyzers", "Collections", "Databases",
802-
"Views", "ReplicatedLogs", "ReplicatedStates"};
802+
"Views", "ReplicatedLogs", "CollectionGroups"};
803803
auto keys = _readDB.nodePtr(AgencyCommHelper::path(what))->keys();
804804
keys.erase(std::remove_if(std::begin(keys), std::end(keys),
805805
[&](auto const& x) {

arangod/Replication2/Supervision/CollectionGroupSupervision.cpp

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -933,13 +933,15 @@ struct TransactionBuilder {
933933
}
934934

935935
void operator()(UpdateCollectionGroupInPlan const& action) {
936-
auto write = env.write().emplace_object(
937-
basics::StringUtils::concatT("/arango/Plan/CollectionGroups/", database,
938-
"/", action.id.id(),
939-
"/attributes/mutable"),
940-
[&](VPackBuilder& builder) {
941-
velocypack::serialize(builder, action.spec);
942-
});
936+
auto write =
937+
env.write()
938+
.emplace_object(basics::StringUtils::concatT(
939+
"/arango/Plan/CollectionGroups/", database, "/",
940+
action.id.id(), "/attributes/mutable"),
941+
[&](VPackBuilder& builder) {
942+
velocypack::serialize(builder, action.spec);
943+
})
944+
.inc("/arango/Plan/Version");
943945
env = write.precs()
944946
.isNotEmpty(basics::StringUtils::concatT(
945947
"/arango/Target/CollectionGroups/", database, "/", gid.id()))

arangod/Sharding/ShardingInfo.cpp

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,9 @@ LogicalCollection* ShardingInfo::collection() const noexcept {
322322
return _collection;
323323
}
324324

325-
void ShardingInfo::toVelocyPack(VPackBuilder& result, bool translateCids,
325+
void ShardingInfo::toVelocyPack(VPackBuilder& result,
326+
bool ignoreCollectionGroupAttributes,
327+
bool translateCids,
326328
bool includeShardsEntry) const {
327329
result.add(StaticStrings::NumberOfShards, VPackValue(_numberOfShards));
328330

@@ -345,18 +347,21 @@ void ShardingInfo::toVelocyPack(VPackBuilder& result, bool translateCids,
345347
result.close(); // shards
346348
}
347349

348-
if (isSatellite()) {
349-
result.add(StaticStrings::ReplicationFactor,
350-
VPackValue(StaticStrings::Satellite));
351-
} else {
352-
result.add(StaticStrings::ReplicationFactor,
353-
VPackValue(_replicationFactor));
350+
if (!ignoreCollectionGroupAttributes) {
351+
// For replication Two this class is not responsible for the following
352+
// attributes.
353+
if (isSatellite()) {
354+
result.add(StaticStrings::ReplicationFactor,
355+
VPackValue(StaticStrings::Satellite));
356+
} else {
357+
result.add(StaticStrings::ReplicationFactor,
358+
VPackValue(_replicationFactor));
359+
}
360+
// minReplicationFactor deprecated in 3.6
361+
result.add(StaticStrings::WriteConcern, VPackValue(_writeConcern));
362+
result.add(StaticStrings::MinReplicationFactor, VPackValue(_writeConcern));
354363
}
355364

356-
// minReplicationFactor deprecated in 3.6
357-
result.add(StaticStrings::WriteConcern, VPackValue(_writeConcern));
358-
result.add(StaticStrings::MinReplicationFactor, VPackValue(_writeConcern));
359-
360365
if (!_distributeShardsLike.empty()) {
361366
if (ServerState::instance()->isCoordinator()) {
362367
// We either want to expose _distributeShardsLike if we're either on a

arangod/Sharding/ShardingInfo.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ class ShardingInfo {
6060
std::string shardingStrategyName() const;
6161

6262
LogicalCollection* collection() const noexcept;
63-
void toVelocyPack(arangodb::velocypack::Builder& result, bool translateCids,
63+
64+
void toVelocyPack(arangodb::velocypack::Builder& result,
65+
bool ignoreCollectionGroupAttributes, bool translateCids,
6466
bool includeShardsEntry = true) const;
6567

6668
std::string const& distributeShardsLike() const noexcept;

arangod/VocBase/LogicalCollection.cpp

Lines changed: 65 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,13 @@
2727
#include "ApplicationFeatures/ApplicationServer.h"
2828
#include "Aql/QueryCache.h"
2929
#include "Basics/DownCast.h"
30-
#include "Basics/ReadLocker.h"
3130
#include "Basics/StaticStrings.h"
3231
#include "Basics/VelocyPackHelper.h"
3332
#include "Basics/WriteLocker.h"
33+
#include "Cluster/ClusterCollectionMethods.h"
3434
#include "Cluster/ClusterFeature.h"
3535
#include "Cluster/ClusterInfo.h"
3636
#include "Cluster/ClusterMethods.h"
37-
#include "Cluster/ClusterCollectionMethods.h"
3837
#include "Cluster/FollowerInfo.h"
3938
#include "Cluster/ServerState.h"
4039
#include "Logger/LogMacros.h"
@@ -57,7 +56,6 @@
5756
#include "VocBase/Properties/UserInputCollectionProperties.h"
5857
#include "VocBase/Validators.h"
5958
#include "velocypack/Builder.h"
60-
#include "VocBase/Properties/UserInputCollectionProperties.h"
6159

6260
#ifdef USE_ENTERPRISE
6361
#include "Enterprise/Sharding/ShardingStrategyEE.h"
@@ -366,6 +364,20 @@ size_t LogicalCollection::replicationFactor() const noexcept {
366364
}
367365

368366
size_t LogicalCollection::writeConcern() const noexcept {
367+
if (_groupId.has_value() && ServerState::instance()->isDBServer()) {
368+
TRI_ASSERT(replicationVersion() == replication::Version::TWO)
369+
<< "Set a groupId although we are not in Replication Two";
370+
auto& ci = vocbase().server().getFeature<ClusterFeature>().clusterInfo();
371+
372+
auto const& group = ci.getCollectionGroupById(
373+
replication2::agency::CollectionGroupId{_groupId.value()});
374+
if (group) {
375+
return group->attributes.mutableAttributes.writeConcern;
376+
}
377+
// If we cannot find a group this means the shard is dropped
378+
// while we are still in this method. Let's just take the
379+
// value at creation then.
380+
}
369381
TRI_ASSERT(_sharding != nullptr);
370382
return _sharding->writeConcern();
371383
}
@@ -804,6 +816,16 @@ Result LogicalCollection::appendVPack(velocypack::Builder& build,
804816
VPackValue(std::to_string(planId().id())));
805817
}
806818

819+
std::shared_ptr<replication2::agency::CollectionGroupPlanSpecification const>
820+
group;
821+
if (_groupId.has_value()) {
822+
TRI_ASSERT(replicationVersion() == replication::Version::TWO)
823+
<< "Set a groupId although we are not in Replication Two";
824+
auto& ci = vocbase().server().getFeature<ClusterFeature>().clusterInfo();
825+
group = ci.getCollectionGroupById(
826+
replication2::agency::CollectionGroupId{_groupId.value()});
827+
}
828+
bool isReplicationTWO = group != nullptr;
807829
#ifdef USE_ENTERPRISE
808830
if (isSmart() && type() == TRI_COL_TYPE_EDGE &&
809831
ServerState::instance()->isRunningInCluster()) {
@@ -816,25 +838,54 @@ Result LogicalCollection::appendVPack(velocypack::Builder& build,
816838
}
817839
edgeCollection->shardMapToVelocyPack(build);
818840
bool includeShardsEntry = false;
819-
_sharding->toVelocyPack(build, ctx != Serialization::List,
841+
_sharding->toVelocyPack(build, isReplicationTWO, ctx != Serialization::List,
820842
includeShardsEntry);
821843
} else {
822-
_sharding->toVelocyPack(build, ctx != Serialization::List);
844+
_sharding->toVelocyPack(build, isReplicationTWO,
845+
ctx != Serialization::List);
823846
}
824847
#else
825-
_sharding->toVelocyPack(build, ctx != Serialization::List);
848+
_sharding->toVelocyPack(build, isReplicationTWO, ctx != Serialization::List);
826849
#endif
827-
828-
includeVelocyPackEnterprise(build);
829-
TRI_ASSERT(build.isOpenObject());
830-
831-
if (replicationVersion() == replication::Version::TWO &&
832-
_groupId.has_value()) {
850+
if (group) {
833851
build.add("groupId", VPackValue(_groupId.value()));
834852
if (_replicatedStateId) {
835853
build.add("replicatedStateId", VPackValue(*_replicatedStateId));
836854
}
855+
// For replication1 the _sharding is responsible.
856+
// For TWO the group contains those attributes
857+
858+
// Future TODO: It would be nice if we could use a "serializeInPlace"
859+
// method Which does serialization but not the open/close object.
860+
TRI_ASSERT(!build.hasKey(StaticStrings::ReplicationFactor))
861+
<< "replicationFactor already serialized from _sharding";
862+
TRI_ASSERT(!build.hasKey(StaticStrings::WriteConcern))
863+
<< "writeConcern already serialized from _sharding";
864+
TRI_ASSERT(!build.hasKey(StaticStrings::MinReplicationFactor))
865+
<< "minReplicationFactor already serialized from _sharding";
866+
if (group->attributes.mutableAttributes.replicationFactor == 0) {
867+
build.add(StaticStrings::ReplicationFactor,
868+
VPackValue(StaticStrings::Satellite));
869+
// For Backwards Compatibility, WriteConcern should return 0 here for
870+
// satellites
871+
build.add(StaticStrings::WriteConcern, VPackValue(0));
872+
// minReplicationFactor deprecated in 3.6
873+
build.add(StaticStrings::MinReplicationFactor, VPackValue(0));
874+
875+
} else {
876+
build.add(
877+
StaticStrings::ReplicationFactor,
878+
VPackValue(group->attributes.mutableAttributes.replicationFactor));
879+
build.add(StaticStrings::WriteConcern,
880+
VPackValue(group->attributes.mutableAttributes.writeConcern));
881+
// minReplicationFactor deprecated in 3.6
882+
build.add(StaticStrings::MinReplicationFactor,
883+
VPackValue(group->attributes.mutableAttributes.writeConcern));
884+
}
837885
}
886+
887+
includeVelocyPackEnterprise(build);
888+
TRI_ASSERT(build.isOpenObject());
838889
// We leave the object open
839890
return {};
840891
}
@@ -904,7 +955,7 @@ Result LogicalCollection::properties(velocypack::Slice slice) {
904955
}
905956

906957
size_t replicationFactor = _sharding->replicationFactor();
907-
size_t writeConcern = _sharding->writeConcern();
958+
size_t writeConcern = this->writeConcern();
908959
VPackSlice replicationFactorSlice =
909960
slice.get(StaticStrings::ReplicationFactor);
910961

@@ -1004,7 +1055,7 @@ Result LogicalCollection::properties(velocypack::Slice slice) {
10041055
if ((ServerState::instance()->isCoordinator() ||
10051056
(ServerState::instance()->isSingleServer() &&
10061057
(isSatellite() || isSmart()))) &&
1007-
writeConcern != _sharding->writeConcern()) { // check if changed
1058+
writeConcern != this->writeConcern()) { // check if changed
10081059
if (!_sharding->distributeShardsLike().empty()) {
10091060
CollectionNameResolver resolver(vocbase());
10101061
std::string name = resolver.getCollectionNameCluster(DataSourceId{

tests/js/client/shell/shell-collection-properties-propagation-cluster.js

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,12 @@ function CollectionPropertiesPropagationSuite() {
156156
internal.sleep(1);
157157
}
158158
assertEqual(2, servers.length);
159+
160+
// For ReplicationVersion 2 shards and followerCollections
161+
// all report the change in ReplicationFactor.
162+
// For ReplicationVersion 1 only the Leading collection
163+
// knows about the change.
164+
const expectedRF = db._properties().replicationVersion === "2" ? 2 : 3;
159165

160166
// shards do not see the replicationFactor update
161167
let keys;
@@ -168,7 +174,7 @@ function CollectionPropertiesPropagationSuite() {
168174
assertTrue(keys.length > 0);
169175
let found = 0;
170176
keys.forEach((s) => {
171-
if (p[s].replicationFactor === 3) {
177+
if (p[s].replicationFactor === expectedRF) {
172178
++found;
173179
}
174180
});
@@ -179,28 +185,18 @@ function CollectionPropertiesPropagationSuite() {
179185
}
180186

181187
keys.forEach((s) => {
182-
assertEqual(3, p[s].replicationFactor, {s, p});
188+
assertEqual(expectedRF, p[s].replicationFactor, {s, p});
183189
});
184190

185191
p = c.properties();
186192
assertEqual(proto, p.distributeShardsLike);
187-
let expectedRF;
188-
if (db._properties().replicationVersion === "2") {
189-
// with replication 2, the collection's replicationFactor is tied to the replicationFactor
190-
// of the collection group, so it will report the correct value#
191-
assertEqual(2, p.replicationFactor, p);
192-
} else {
193-
// with replication1, dependent collection won't see the replicationFactor update
194-
assertEqual(3, p.replicationFactor, p);
195-
}
193+
assertEqual(expectedRF, p.replicationFactor, p);
196194

197-
// the individual shards currently do not report the updated replicationFactor,
198-
// neither for replication 1 nor 2
199195
p = propertiesOnDBServers(c);
200196
keys = Object.keys(p);
201197
assertTrue(keys.length > 0);
202198
keys.forEach((s) => {
203-
assertEqual(3, p[s].replicationFactor, {s, p});
199+
assertEqual(expectedRF, p[s].replicationFactor, {s, p});
204200
});
205201
},
206202

@@ -210,7 +206,7 @@ function CollectionPropertiesPropagationSuite() {
210206
db[proto].properties({ writeConcern: 1 });
211207

212208
let p = db[proto].properties();
213-
assertEqual(1, p.writeConcern);
209+
assertEqual(1, p.writeConcern, p);
214210

215211
// shards will see the writeConcern update
216212
let keys;
@@ -238,16 +234,24 @@ function CollectionPropertiesPropagationSuite() {
238234
});
239235

240236
// dependent collection won't see the writeConcern update
237+
const isReplicationTwo = db._properties().replicationVersion === "2";
238+
// In Replication1 the followers won't see the writeConcern update
239+
// as they are not changed
240+
// In Replication2 the followers share the replicated log with the
241+
// leader, and the log holds the writeConcern, hence all are updated.
242+
// The above holds true for the collection as well as the corresponding
243+
// shards.
244+
const expectedFollowerWriteConcern = isReplicationTwo ? 1 : 2;
241245
p = c.properties();
242246
assertEqual(proto, p.distributeShardsLike);
243-
assertEqual(2, p.writeConcern);
244247

245-
// nor do its shards
248+
assertEqual(expectedFollowerWriteConcern, p.writeConcern);
249+
246250
p = propertiesOnDBServers(c);
247251
keys = Object.keys(p);
248252
assertTrue(keys.length > 0);
249253
keys.forEach((s) => {
250-
assertEqual(2, p[s].writeConcern, {s, p});
254+
assertEqual(expectedFollowerWriteConcern, p[s].writeConcern, {s, p});
251255
});
252256
},
253257

0 commit comments

Comments
 (0)