Skip to content

Commit

Permalink
Merge branch 'main' of github.com:apple/foundationdb into fix-arm-nig…
Browse files Browse the repository at this point in the history
…htly
  • Loading branch information
sfc-gh-clin committed Feb 15, 2023
2 parents e6d5fa8 + bf85c9f commit 785206b
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 49 deletions.
152 changes: 119 additions & 33 deletions fdbclient/BackupAgentBase.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/BlobCipher.h"
#include "fdbclient/CommitProxyInterface.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/GetEncryptCipherKeys.actor.h"
Expand Down Expand Up @@ -290,10 +291,28 @@ std::pair<Version, uint32_t> decodeBKMutationLogKey(Key key) {
bigEndian32(*(int32_t*)(key.begin() + backupLogPrefixBytes + sizeof(UID) + sizeof(uint8_t) + sizeof(int64_t))));
}

void _addResult(bool* tenantMapChanging,
VectorRef<MutationRef>* result,
int* mutationSize,
Arena* arena,
MutationRef logValue,
KeyRangeRef tenantMapRange) {
*tenantMapChanging = *tenantMapChanging || TenantAPI::tenantMapChanging(logValue, tenantMapRange);
result->push_back_deep(*arena, logValue);
*mutationSize += logValue.expectedSize();
}

/*
This actor is responsible for taking an original transaction which was added to the backup mutation log (represented
by "value" parameter), breaking it up into the individual MutationRefs (that constitute the transaction), decrypting
each mutation (if needed) and adding/removing prefixes from the mutations. The final mutations are then added to the
"result" vector alongside their encrypted counterparts (which is added to the "encryptedResult" vector)
*/
ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
VectorRef<MutationRef>* result,
VectorRef<Optional<MutationRef>>* encryptedResult,
int* mutationSize,
bool* tenantMapChanging,
Standalone<StringRef> value,
Key addPrefix,
Key removePrefix,
Expand Down Expand Up @@ -325,6 +344,7 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,

state int originalOffset = offset;
state DatabaseConfiguration config = wait(getDatabaseConfiguration(cx));
state KeyRangeRef tenantMapRange = TenantMetadata::tenantMap().subspace;

while (consumed < totalBytes) {
uint32_t type = 0;
Expand Down Expand Up @@ -410,8 +430,7 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
logValue.param1 = logValue.param1.withPrefix(addPrefix, tempArena);
}
logValue.param2 = addPrefix == StringRef() ? allKeys.end : strinc(addPrefix, tempArena);
result->push_back_deep(*arena, logValue);
*mutationSize += logValue.expectedSize();
_addResult(tenantMapChanging, result, mutationSize, arena, logValue, tenantMapRange);
} else {
logValue.param1 = std::max(r.range().begin, range.begin);
logValue.param2 = minKey;
Expand All @@ -423,8 +442,7 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
logValue.param1 = logValue.param1.withPrefix(addPrefix, tempArena);
logValue.param2 = logValue.param2.withPrefix(addPrefix, tempArena);
}
result->push_back_deep(*arena, logValue);
*mutationSize += logValue.expectedSize();
_addResult(tenantMapChanging, result, mutationSize, arena, logValue, tenantMapRange);
}
if (originalLogValue.param1 == logValue.param1 && originalLogValue.param2 == logValue.param2) {
encryptedResult->push_back_deep(*arena, encryptedLogValue);
Expand All @@ -443,8 +461,7 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
if (addPrefix.size()) {
logValue.param1 = logValue.param1.withPrefix(addPrefix, tempArena);
}
result->push_back_deep(*arena, logValue);
*mutationSize += logValue.expectedSize();
_addResult(tenantMapChanging, result, mutationSize, arena, logValue, tenantMapRange);
// If we did not remove/add prefixes to the mutation then keep the original encrypted mutation so we
// do not have to re-encrypt unnecessarily
if (originalLogValue.param1 == logValue.param1 && originalLogValue.param2 == logValue.param2) {
Expand Down Expand Up @@ -695,6 +712,41 @@ Future<Void> readCommitted(Database cx,
cx, results, Void(), lock, range, groupBy, Terminator::True, AccessSystemKeys::True, LockAware::True);
}

ACTOR Future<Void> sendCommitTransactionRequest(CommitTransactionRequest req,
Key uid,
Version newBeginVersion,
Key rangeBegin,
NotifiedVersion* committedVersion,
int* totalBytes,
int* mutationSize,
PromiseStream<Future<Void>> addActor,
FlowLock* commitLock,
PublicRequestStream<CommitTransactionRequest> commit) {
Key applyBegin = uid.withPrefix(applyMutationsBeginRange.begin);
Key versionKey = BinaryWriter::toValue(newBeginVersion, Unversioned());
Key rangeEnd = getApplyKey(newBeginVersion, uid);

// mutations and encrypted mutations (and their relationship) is described in greater detail in the defenition of
// CommitTransactionRef in CommitTransaction.h
req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::SetValue, applyBegin, versionKey));
req.transaction.encryptedMutations.push_back_deep(req.arena, Optional<MutationRef>());
req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(applyBegin));
req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::ClearRange, rangeBegin, rangeEnd));
req.transaction.encryptedMutations.push_back_deep(req.arena, Optional<MutationRef>());
req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(rangeBegin));

// The commit request contains no read conflict ranges, so regardless of what read version we
// choose, it's impossible for us to get a transaction_too_old error back, and it's impossible
// for our transaction to be aborted due to conflicts.
req.transaction.read_snapshot = committedVersion->get();
req.flags = req.flags | CommitTransactionRequest::FLAG_IS_LOCK_AWARE;

*totalBytes += *mutationSize;
wait(commitLock->take(TaskPriority::DefaultYield, *mutationSize));
addActor.send(commitLock->releaseWhen(success(commit.getReply(req)), *mutationSize));
return Void();
}

ACTOR Future<int> kvMutationLogToTransactions(Database cx,
PromiseStream<RCGroup> results,
Reference<FlowLock> lock,
Expand All @@ -717,20 +769,26 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
state CommitTransactionRequest req;
state Version newBeginVersion = invalidVersion;
state int mutationSize = 0;
state bool tenantMapChanging = false;
loop {
try {
state RCGroup group = waitNext(results.getFuture());
state CommitTransactionRequest curReq;
lock->release(group.items.expectedSize());
state int curBatchMutationSize = 0;
tenantMapChanging = false;

BinaryWriter bw(Unversioned());
for (int i = 0; i < group.items.size(); ++i) {
bw.serializeBytes(group.items[i].value);
}
// Parse a single transaction from the backup mutation log
Standalone<StringRef> value = bw.toValue();
wait(decodeBackupLogValue(&req.arena,
&req.transaction.mutations,
&req.transaction.encryptedMutations,
&mutationSize,
wait(decodeBackupLogValue(&curReq.arena,
&curReq.transaction.mutations,
&curReq.transaction.encryptedMutations,
&curBatchMutationSize,
&tenantMapChanging,
value,
addPrefix,
removePrefix,
Expand All @@ -739,8 +797,48 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
cx,
tenantMap,
provisionalProxy));

// A single call to decodeBackupLogValue (above) will only parse mutations from a single transaction,
// however in the code below we batch the results across several calls to decodeBackupLogValue and send
// it in one big CommitTransactionRequest (so one CTR contains mutations from multiple transactions).
// Generally, this would be fine since the mutations in the log are ordered (and thus so are the results
// after calling decodeBackupLogValue). However in the CommitProxy we do not allow mutations which
// change the tenant map to appear alongside regular normalKey mutations in a single
// CommitTransactionRequest. Thus the code below will immediately send any mutations accumulated thus
// far if the latest call to decodeBackupLogValue contained a transaction which changed the tenant map
// (before processing the mutations which caused the tenant map to change).
if (tenantMapChanging && req.transaction.mutations.size()) {
// If the tenantMap is changing send the previous CommitTransactionRequest to the CommitProxy
TraceEvent("MutationLogRestoreTenantMapChanging").detail("BeginVersion", newBeginVersion);
CODE_PROBE(true, "mutation log tenant map changing");
wait(sendCommitTransactionRequest(req,
uid,
newBeginVersion,
rangeBegin,
committedVersion,
&totalBytes,
&mutationSize,
addActor,
commitLock,
commit));
req = CommitTransactionRequest();
mutationSize = 0;
}

state int i;
for (i = 0; i < curReq.transaction.mutations.size(); i++) {
req.transaction.mutations.push_back_deep(req.arena, curReq.transaction.mutations[i]);
req.transaction.encryptedMutations.push_back_deep(req.arena,
curReq.transaction.encryptedMutations[i]);
}
mutationSize += curBatchMutationSize;
newBeginVersion = group.groupKey + 1;
if (mutationSize >= CLIENT_KNOBS->BACKUP_LOG_WRITE_BATCH_MAX_SIZE) {

// At this point if the tenant map changed we would have already sent any normalKey mutations
// accumulated thus far, so all thats left to do is to send all the mutations in the the offending
// transaction that changed the tenant map. This is necessary so that we don't batch these tenant map
// mutations with future normalKey mutations (which will result in the same problem discussed above).
if (tenantMapChanging || mutationSize >= CLIENT_KNOBS->BACKUP_LOG_WRITE_BATCH_MAX_SIZE) {
break;
}
} catch (Error& e) {
Expand All @@ -756,28 +854,16 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
throw;
}
}

Key applyBegin = uid.withPrefix(applyMutationsBeginRange.begin);
Key versionKey = BinaryWriter::toValue(newBeginVersion, Unversioned());
Key rangeEnd = getApplyKey(newBeginVersion, uid);

req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::SetValue, applyBegin, versionKey));
req.transaction.encryptedMutations.push_back_deep(req.arena, Optional<MutationRef>());
req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(applyBegin));
req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::ClearRange, rangeBegin, rangeEnd));
req.transaction.encryptedMutations.push_back_deep(req.arena, Optional<MutationRef>());
req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(rangeBegin));

// The commit request contains no read conflict ranges, so regardless of what read version we
// choose, it's impossible for us to get a transaction_too_old error back, and it's impossible
// for our transaction to be aborted due to conflicts.
req.transaction.read_snapshot = committedVersion->get();
req.flags = req.flags | CommitTransactionRequest::FLAG_IS_LOCK_AWARE;

totalBytes += mutationSize;
wait(commitLock->take(TaskPriority::DefaultYield, mutationSize));
addActor.send(commitLock->releaseWhen(success(commit.getReply(req)), mutationSize));

wait(sendCommitTransactionRequest(req,
uid,
newBeginVersion,
rangeBegin,
committedVersion,
&totalBytes,
&mutationSize,
addActor,
commitLock,
commit));
if (endOfStream) {
return totalBytes;
}
Expand Down
10 changes: 10 additions & 0 deletions fdbclient/TenantManagement.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ int64_t extractTenantIdFromKeyRef(StringRef s) {
return TenantAPI::prefixToId(prefix, EnforceValidTenantId::False);
}

bool tenantMapChanging(MutationRef const& mutation, KeyRangeRef const& tenantMapRange) {
if (isSingleKeyMutation((MutationRef::Type)mutation.type) && mutation.param1.startsWith(tenantMapRange.begin)) {
return true;
} else if (mutation.type == MutationRef::ClearRange &&
tenantMapRange.intersects(KeyRangeRef(mutation.param1, mutation.param2))) {
return true;
}
return false;
}

// validates whether the lastTenantId and the nextTenantId share the same 2 byte prefix
bool nextTenantIdPrefixMatches(int64_t lastTenantId, int64_t nextTenantId) {
if (getTenantIdPrefix(nextTenantId) != getTenantIdPrefix(lastTenantId)) {
Expand Down
2 changes: 0 additions & 2 deletions fdbclient/include/fdbclient/BlobCipher.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,6 @@ struct BlobCipherDetails {
const EncryptCipherRandomSalt& random)
: encryptDomainId(dId), baseCipherId(bId), salt(random) {}

bool isValid() const { return encryptDomainId != INVALID_ENCRYPT_DOMAIN_ID; }

bool operator==(const BlobCipherDetails& o) const {
return encryptDomainId == o.encryptDomainId && baseCipherId == o.baseCipherId && salt == o.salt;
}
Expand Down
1 change: 1 addition & 0 deletions fdbclient/include/fdbclient/TenantManagement.actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ Future<Void> checkTenantMode(Transaction tr, ClusterType expectedClusterType) {
TenantMode tenantModeForClusterType(ClusterType clusterType, TenantMode tenantMode);
int64_t extractTenantIdFromMutation(MutationRef m);
int64_t extractTenantIdFromKeyRef(StringRef s);
bool tenantMapChanging(MutationRef const& mutation, KeyRangeRef const& tenantMapRange);
bool nextTenantIdPrefixMatches(int64_t lastTenantId, int64_t nextTenantId);
int64_t getMaxAllowableTenantId(int64_t curTenantId);
int64_t getTenantIdPrefix(int64_t tenantId);
Expand Down
19 changes: 6 additions & 13 deletions fdbserver/CommitProxyServer.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1070,17 +1070,6 @@ bool validTenantAccess(MutationRef m, std::map<int64_t, TenantName> const& tenan
return true;
}

inline bool tenantMapChanging(MutationRef const& mutation) {
const KeyRangeRef tenantMapRange = TenantMetadata::tenantMap().subspace;
if (isSingleKeyMutation((MutationRef::Type)mutation.type) && mutation.param1.startsWith(tenantMapRange.begin)) {
return true;
} else if (mutation.type == MutationRef::ClearRange &&
tenantMapRange.intersects(KeyRangeRef(mutation.param1, mutation.param2))) {
return true;
}
return false;
}

// return an iterator to the first tenantId whose idToPrefix(id) >= prefix[0..8] in lexicographic order. If no such id,
// return tenantMap.end()
inline auto lowerBoundTenantId(const StringRef& prefix, const std::map<int64_t, TenantName>& tenantMap) {
Expand Down Expand Up @@ -1380,11 +1369,12 @@ Error validateAndProcessTenantAccess(Arena& arena,

std::vector<std::pair<int, std::vector<MutationRef>>> idxSplitMutations;
int newMutationSize = mutations.size();
KeyRangeRef tenantMapRange = TenantMetadata::tenantMap().subspace;
for (int i = 0; i < mutations.size(); ++i) {
auto& mutation = mutations[i];
Optional<int64_t> tenantId;
bool validAccess = true;
changeTenant = changeTenant || tenantMapChanging(mutation);
changeTenant = changeTenant || TenantAPI::tenantMapChanging(mutation, tenantMapRange);

if (mutation.type == MutationRef::ClearRange) {
int newClearSize = processClearRangeMutation(
Expand Down Expand Up @@ -1471,6 +1461,7 @@ Error validateAndProcessTenantAccess(CommitTransactionRequest& tr,
void applyMetadataEffect(CommitBatchContext* self) {
bool initialState = self->isMyFirstBatch;
self->firstStateMutations = self->isMyFirstBatch;
KeyRangeRef tenantMapRange = TenantMetadata::tenantMap().subspace;
for (int versionIndex = 0; versionIndex < self->resolution[0].stateMutations.size(); versionIndex++) {
// pProxyCommitData->logAdapter->setNextVersion( ??? ); << Ideally we would be telling the log adapter that the
// pushes in this commit will be in the version at which these state mutations were committed by another proxy,
Expand All @@ -1492,7 +1483,9 @@ void applyMetadataEffect(CommitBatchContext* self) {
// fail transaction if it contain both of tenant changes and normal key writing
auto& mutations = self->resolution[0].stateMutations[versionIndex][transactionIndex].mutations;
committed =
tenantIds.get().empty() || std::none_of(mutations.begin(), mutations.end(), tenantMapChanging);
tenantIds.get().empty() || std::none_of(mutations.begin(), mutations.end(), [&](MutationRef m) {
return TenantAPI::tenantMapChanging(m, tenantMapRange);
});

// check if all tenant ids are valid if committed == true
committed = committed &&
Expand Down
2 changes: 1 addition & 1 deletion fdbserver/workloads/GetMappedRange.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ struct GetMappedRangeWorkload : ApiWorkload {
}
expectedCnt = std::min(expectedCnt, boundByRecord);
std::cout << "boundByRecord: " << boundByRecord << std::endl;
ASSERT(result.size() == expectedCnt);
ASSERT_LE(result.size(), expectedCnt);
beginSelector = KeySelector(firstGreaterThan(result.back().key));
}
} else {
Expand Down

0 comments on commit 785206b

Please sign in to comment.