Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2117,6 +2117,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

absl::flat_hash_set<ui64> sendingShardsSet;
absl::flat_hash_set<ui64> receivingShardsSet;
ui64 arbiter = 0;

// Gather shards that need to send/receive readsets (shards with effects)
if (needCommit) {
Expand All @@ -2143,6 +2144,48 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
if (auto tabletIds = Request.TopicOperations.GetReceivingTabletIds()) {
receivingShardsSet.insert(tabletIds.begin(), tabletIds.end());
}

// The current value of 5 is arbitrary. Writing to 5 shards in
// a single transaction is unusual enough, and having latency
// regressions is unlikely. Full mesh readset count grows like
// 2n(n-1), and arbiter reduces it to 4(n-1). Here's a readset
// count table for various small `n`:
//
// n = 2: 4 -> 4
// n = 3: 12 -> 8
// n = 4: 24 -> 12
// n = 5: 40 -> 16
// n = 6: 60 -> 20
// n = 7: 84 -> 24
//
// The ideal crossover is at n = 4, since the readset count
// doesn't change when going from 3 to 4 shards, but the
// increase in latency may not really be worth it. With n = 5
// the readset count lowers from 24 to 16 readsets when going
// from 4 to 5 shards. This makes 5 shards potentially cheaper
// than 4 shards when readsets dominate the workload, but at
// the price of possible increase in latency. Too many readsets
// cause interconnect overload and reduce throughput however,
// so we don't want to use a crossover value that is too high.
const size_t minArbiterMeshSize = 5; // TODO: make configurable?
if (VolatileTx &&
receivingShardsSet.size() >= minArbiterMeshSize &&
AppData()->FeatureFlags.GetEnableVolatileTransactionArbiters())
{
std::vector<ui64> candidates;
candidates.reserve(receivingShardsSet.size());
for (ui64 candidate : receivingShardsSet) {
// Note: all receivers are also senders in volatile transactions
if (Y_LIKELY(sendingShardsSet.contains(candidate))) {
candidates.push_back(candidate);
}
}
if (candidates.size() >= minArbiterMeshSize) {
// Select a random arbiter
ui32 index = RandomNumber<ui32>(candidates.size());
arbiter = candidates.at(index);
}
}
}

// Encode sending/receiving shards in tx bodies
Expand All @@ -2157,12 +2200,16 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
shardTx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
*shardTx->MutableLocks()->MutableSendingShards() = sendingShards;
*shardTx->MutableLocks()->MutableReceivingShards() = receivingShards;
if (arbiter) {
shardTx->MutableLocks()->SetArbiterShard(arbiter);
}
}

for (auto& [_, tx] : topicTxs) {
tx.SetOp(NKikimrPQ::TDataTransaction::Commit);
*tx.MutableSendingShards() = sendingShards;
*tx.MutableReceivingShards() = receivingShards;
YQL_ENSURE(!arbiter);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,5 @@ message TFeatureFlags {
optional bool EnableReplaceIfExistsForExternalEntities = 116 [ default = false];
optional bool EnableCMSRequestPriorities = 117 [default = false];
optional bool EnableStableNodeNames = 122 [default = false];
optional bool EnableVolatileTransactionArbiters = 124 [default = false];
}