Skip to content

Commit

Permalink
Merge pull request apple#4000 from xumengpanda/mengxu/ha-code-read
Browse files Browse the repository at this point in the history
Add comments  to TLog, SS, and DD related code
  • Loading branch information
jzhou77 authored Nov 14, 2020
2 parents f0f6920 + 4b0fba6 commit 569ab46
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 47 deletions.
13 changes: 9 additions & 4 deletions fdbclient/FDBTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,23 @@ typedef StringRef ValueRef;
typedef int64_t Generation;

enum {
tagLocalitySpecial = -1,
tagLocalitySpecial = -1, // tag with this locality means it is invalidTag (id=0), txsTag (id=1), or cacheTag (id=2)
tagLocalityLogRouter = -2,
tagLocalityRemoteLog = -3,
tagLocalityRemoteLog = -3, // tag created by log router for remote tLogs
tagLocalityUpgraded = -4,
tagLocalitySatellite = -5,
tagLocalityLogRouterMapped = -6,
tagLocalityLogRouterMapped = -6, // The pseudo tag used by log routers to pop the real LogRouter tag (i.e., -2)
tagLocalityTxs = -7,
tagLocalityInvalid = -99
}; //The TLog and LogRouter require these number to be as compact as possible
}; // The TLog and LogRouter require these number to be as compact as possible

#pragma pack(push, 1)
struct Tag {
// if locality > 0,
// locality decides which DC id the tLog is in;
// id decides which SS owns the tag; id <-> SS mapping is in the system keyspace: serverTagKeys.
// if locality < 0, locality decides the type of tLog set: satellite, LR, or remote tLog, etc.
// id decides which tLog in the tLog type will be used.
int8_t locality;
uint16_t id;

Expand Down
6 changes: 6 additions & 0 deletions fdbclient/NativeAPI.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1230,6 +1230,12 @@ ACTOR Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLoca
}
}

// Get the SS locations for each shard in the 'keys' key-range;
// Returned vector size is the number of shards in the input keys key-range.
// Returned vector element is <ShardRange, storage server location info> pairs, where
// ShardRange is the whole shard key-range, not a part of the given key range.
// Example: If query the function with key range (b, d), the returned list of pairs could be something like:
// [([a, b1), locationInfo), ([b1, c), locationInfo), ([c, d1), locationInfo)].
template <class F>
Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLocations( Database const& cx, KeyRange const& keys, int limit, bool reverse, F StorageServerInterface::*member, TransactionInfo const& info ) {
ASSERT (!keys.empty());
Expand Down
13 changes: 5 additions & 8 deletions fdbrpc/LoadBalance.actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,11 @@ void addLaggingRequest(Future<Optional<Reply>> reply, Promise<Void> requestFinis
// failMon's information for load balancing and avoiding failed servers
// If ALL the servers are failed and the list of servers is not fresh, throws an exception to let the caller refresh the list of servers
ACTOR template <class Interface, class Request, class Multi>
Future< REPLY_TYPE(Request) > loadBalance(
Reference<MultiInterface<Multi>> alternatives,
RequestStream<Request> Interface::* channel,
Request request = Request(),
TaskPriority taskID = TaskPriority::DefaultPromiseEndpoint,
bool atMostOnce = false, // if true, throws request_maybe_delivered() instead of retrying automatically
QueueModel* model = NULL)
{
Future<REPLY_TYPE(Request)> loadBalance(
Reference<MultiInterface<Multi>> alternatives, RequestStream<Request> Interface::*channel,
Request request = Request(), TaskPriority taskID = TaskPriority::DefaultPromiseEndpoint,
bool atMostOnce = false, // if true, throws request_maybe_delivered() instead of retrying automatically
QueueModel* model = NULL) {
state Future<Optional<REPLY_TYPE(Request)>> firstRequest;
state Optional<uint64_t> firstRequestEndpoint;
state Future<Optional<REPLY_TYPE(Request)>> secondRequest;
Expand Down
2 changes: 1 addition & 1 deletion fdbserver/DataDistribution.actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ struct InitialDataDistribution : ReferenceCounted<InitialDataDistribution> {
struct ShardMetrics {
StorageMetrics metrics;
double lastLowBandwidthStartTime;
int shardCount;
int shardCount; // number of smaller shards whose metrics are aggregated in the ShardMetrics

bool operator==(ShardMetrics const& rhs) const {
return metrics == rhs.metrics && lastLowBandwidthStartTime == rhs.lastLowBandwidthStartTime &&
Expand Down
23 changes: 17 additions & 6 deletions fdbserver/DataDistributionTracker.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ ACTOR Future<Void> trackShardBytes(

loop {
Transaction tr(self->cx);
// metrics.second is the number of key-ranges (i.e., shards) in the 'keys' key-range
std::pair<Optional<StorageMetrics>, int> metrics = wait( tr.waitStorageMetrics( keys, bounds.min, bounds.max, bounds.permittedError, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT, shardCount ) );
if(metrics.first.present()) {
BandwidthStatus newBandwidthStatus = getBandwidthStatus( metrics.first.get() );
Expand Down Expand Up @@ -486,6 +487,8 @@ Future<Void> shardMerger(
shardsMerged++;

auto shardBounds = getShardSizeBounds( merged, maxShardSize );
// If we just recently get the current shard's metrics (i.e., less than DD_LOW_BANDWIDTH_DELAY ago), it means
// the shard's metric may not be stable yet. So we cannot continue merging in this direction.
if( endingStats.bytes >= shardBounds.min.bytes ||
getBandwidthStatus( endingStats ) != BandwidthStatusLow ||
now() - lastLowBandwidthStartTime < SERVER_KNOBS->DD_LOW_BANDWIDTH_DELAY ||
Expand Down Expand Up @@ -516,13 +519,21 @@ Future<Void> shardMerger(
//restarting shard tracker will derefenced values in the shard map, so make a copy
KeyRange mergeRange = merged;

// OldKeys: Shards in the key range are merged as one shard defined by NewKeys;
// NewKeys: New key range after shards are merged;
// EndingSize: The new merged shard size in bytes;
// BatchedMerges: The number of shards merged. Each shard is defined in self->shards;
// LastLowBandwidthStartTime: When does a shard's bandwidth status becomes BandwidthStatusLow. If a shard's status
// becomes BandwidthStatusLow less than DD_LOW_BANDWIDTH_DELAY ago, the merging logic will stop at the shard;
// ShardCount: The number of non-splittable shards that are merged. Each shard is defined in self->shards may have
// more than 1 shards.
TraceEvent("RelocateShardMergeMetrics", self->distributorId)
.detail("OldKeys", keys)
.detail("NewKeys", mergeRange)
.detail("EndingSize", endingStats.bytes)
.detail("BatchedMerges", shardsMerged)
.detail("LastLowBandwidthStartTime", lastLowBandwidthStartTime)
.detail("ShardCount", shardCount);
.detail("OldKeys", keys)
.detail("NewKeys", mergeRange)
.detail("EndingSize", endingStats.bytes)
.detail("BatchedMerges", shardsMerged)
.detail("LastLowBandwidthStartTime", lastLowBandwidthStartTime)
.detail("ShardCount", shardCount);

if(mergeRange.begin < systemKeys.begin) {
self->systemSizeEstimate -= systemBytes;
Expand Down
39 changes: 29 additions & 10 deletions fdbserver/LogRouter.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,23 @@ struct LogRouterData {
UID dbgid;
Reference<AsyncVar<Reference<ILogSystem>>> logSystem;
Optional<UID> primaryPeekLocation;
NotifiedVersion version;
NotifiedVersion minPopped;
NotifiedVersion version; // The largest version at which the log router has peeked mutations
// from satellite tLog or primary tLogs.
NotifiedVersion minPopped; // The minimum version among all tags that has been popped by remote tLogs.
Version startVersion;
Version minKnownCommittedVersion;
Version minKnownCommittedVersion; // The minimum durable version among all LRs.
// A LR's durable version is the maximum version of mutations that have been
// popped by remote tLog.
Version poppedVersion;
Deque<std::pair<Version, Standalone<VectorRef<uint8_t>>>> messageBlocks;
Tag routerTag;
bool allowPops;
LogSet logSet;
bool foundEpochEnd;
double waitForVersionTime = 0;
double maxWaitForVersionTime = 0;
double getMoreTime = 0;
double maxGetMoreTime = 0;
bool foundEpochEnd; // Cluster is not fully recovered yet. LR has to handle recovery
double waitForVersionTime = 0; // The total amount of time LR waits for remote tLog to peek and pop its data.
double maxWaitForVersionTime = 0; // The max one-instance wait time when LR must wait for remote tLog to pop data.
double getMoreTime = 0; // The total amount of time LR waits for satellite tLog's data to become available.
double maxGetMoreTime = 0; // The max wait time LR spent in a pull-data-request to satellite tLog.
int64_t generation = -1;
Reference<Histogram> peekLatencyDist;

Expand All @@ -103,7 +106,9 @@ struct LogRouterData {
std::map<UID, PeekTrackerData> peekTracker;

CounterCollection cc;
Counter getMoreCount, getMoreBlockedCount;
Counter getMoreCount; // Increase by 1 when LR tries to pull data from satellite tLog.
Counter
getMoreBlockedCount; // Increase by 1 if data is not available when LR tries to pull data from satellite tLog.
Future<Void> logger;
Reference<EventCacheHolder> eventCacheHolder;

Expand Down Expand Up @@ -148,8 +153,10 @@ struct LogRouterData {

eventCacheHolder = Reference<EventCacheHolder>( new EventCacheHolder(dbgid.shortString() + ".PeekLocation") );

specialCounter(cc, "Version", [this](){ return this->version.get(); });
// FetchedVersions: How many version of mutations buffered at LR and have not been popped by remote tLogs
specialCounter(cc, "Version", [this]() { return this->version.get(); });
specialCounter(cc, "MinPopped", [this](){ return this->minPopped.get(); });
// TODO: Add minPopped locality and minPoppedId, similar as tLog Metrics
specialCounter(cc, "FetchedVersions", [this](){ return std::max<Version>(0, std::min<Version>(SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS, this->version.get() - this->minPopped.get())); });
specialCounter(cc, "MinKnownCommittedVersion", [this](){ return this->minKnownCommittedVersion; });
specialCounter(cc, "PoppedVersion", [this](){ return this->poppedVersion; });
Expand Down Expand Up @@ -222,15 +229,25 @@ ACTOR Future<Void> waitForVersion( LogRouterData *self, Version ver ) {
// Since one set of log routers is created per generation of transaction logs, the gap caused by epoch end will be within MAX_VERSIONS_IN_FLIGHT of the log routers start version.
state double startTime = now();
if(self->version.get() < self->startVersion) {
// Log router needs to wait for remote tLogs to process data, whose version is less than self->startVersion,
// before the log router can pull more data (i.e., data after self->startVersion) from satellite tLog;
// This prevents LR from getting OOM due to it pulls too much data from satellite tLog at once;
// Note: each commit writes data to both primary tLog and satellite tLog. Satellite tLog can be viewed as
// a part of primary tLogs.
if(ver > self->startVersion) {
self->version.set(self->startVersion);
// Wait for remote tLog to peek and pop from LR,
// so that LR's minPopped version can increase to self->startVersion
wait(self->minPopped.whenAtLeast(self->version.get()));
}
self->waitForVersionTime += now() - startTime;
self->maxWaitForVersionTime = std::max(self->maxWaitForVersionTime, now() - startTime);
return Void();
}
if(!self->foundEpochEnd) {
// Similar to proxy that does not keep more than MAX_READ_TRANSACTION_LIFE_VERSIONS transactions oustanding;
// Log router does not keep more than MAX_READ_TRANSACTION_LIFE_VERSIONS transactions outstanding because
// remote SS cannot roll back to more than MAX_READ_TRANSACTION_LIFE_VERSIONS ago.
wait(self->minPopped.whenAtLeast(std::min(self->version.get(), ver - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)));
} else {
while(self->minPopped.get() + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS < ver) {
Expand All @@ -250,6 +267,7 @@ ACTOR Future<Void> waitForVersion( LogRouterData *self, Version ver ) {
return Void();
}

// Log router pull data from satellite tLog
ACTOR Future<Void> pullAsyncData( LogRouterData *self ) {
state Future<Void> dbInfoChange = Void();
state Reference<ILogSystem::IPeekCursor> r;
Expand Down Expand Up @@ -582,6 +600,7 @@ ACTOR Future<Void> logRouterCore(
addActor.send( logRouterPeekMessages( &logRouterData, req ) );
}
when( TLogPopRequest req = waitNext( interf.popMessages.getFuture() ) ) {
// Request from remote tLog to pop data from LR
addActor.send( logRouterPop( &logRouterData, req ) );
}
when (wait(error)) {}
Expand Down
39 changes: 29 additions & 10 deletions fdbserver/TLogServer.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ struct TLogData : NonCopyable {
FlowLock concurrentLogRouterReads;
FlowLock persistentDataCommitLock;

// Beginning of fields used by snapshot based backup and restore
bool ignorePopRequest; // ignore pop request from storage servers
double ignorePopDeadline; // time until which the ignorePopRequest will be
// honored
Expand All @@ -337,6 +338,8 @@ struct TLogData : NonCopyable {
std::map<Tag, Version> toBePopped; // map of Tag->Version for all the pops
// that came when ignorePopRequest was set
Reference<AsyncVar<bool>> degraded;
// End of fields used by snapshot based backup and restore

std::vector<TagsAndMessage> tempTagMessages;

Reference<Histogram> commitLatencyDist;
Expand Down Expand Up @@ -438,13 +441,19 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
bool stopped, initialized;
DBRecoveryCount recoveryCount;

// If persistentDataVersion != persistentDurableDataVersion,
// then spilling is happening from persistentDurableDataVersion to persistentDataVersion.
// Data less than persistentDataDurableVersion is spilled on disk (or fully popped from the TLog);
VersionMetricHandle persistentDataVersion, persistentDataDurableVersion; // The last version number in the portion of the log (written|durable) to persistentData
NotifiedVersion version, queueCommittedVersion;
NotifiedVersion version;
NotifiedVersion queueCommittedVersion; // The disk queue has committed up until the queueCommittedVersion version.
Version queueCommittingVersion;
Version knownCommittedVersion, durableKnownCommittedVersion, minKnownCommittedVersion;
Version queuePoppedVersion;
Version knownCommittedVersion; // The maximum version that a proxy has told us that is committed (all TLogs have
// ack'd a commit for this version).
Version durableKnownCommittedVersion, minKnownCommittedVersion;
Version queuePoppedVersion; // The disk queue has been popped up until the location which represents this version.
Version minPoppedTagVersion;
Tag minPoppedTag;
Tag minPoppedTag; // The tag that makes tLog hold its data and cause tLog's disk queue increasing.

Deque<std::pair<Version, Standalone<VectorRef<uint8_t>>>> messageBlocks;
std::vector<std::vector<Reference<TagData>>> tag_data; //tag.locality | tag.id
Expand Down Expand Up @@ -487,7 +496,8 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
Version unrecoveredBefore, recoveredAt;

struct PeekTrackerData {
std::map<int, Promise<std::pair<Version, bool>>> sequence_version;
std::map<int, Promise<std::pair<Version, bool>>>
sequence_version; // second: Version is peeked begin version. bool is onlySpilled
double lastUpdate;

Tag tag;
Expand Down Expand Up @@ -560,12 +570,15 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
queueCommittedVersion.initMetric(LiteralStringRef("TLog.QueueCommittedVersion"), cc.id);

specialCounter(cc, "Version", [this](){ return this->version.get(); });
specialCounter(cc, "QueueCommittedVersion", [this](){ return this->queueCommittedVersion.get(); });
specialCounter(cc, "QueueCommittedVersion", [this]() { return this->queueCommittedVersion.get(); });
specialCounter(cc, "PersistentDataVersion", [this](){ return this->persistentDataVersion; });
specialCounter(cc, "PersistentDataDurableVersion", [this](){ return this->persistentDataDurableVersion; });
specialCounter(cc, "KnownCommittedVersion", [this](){ return this->knownCommittedVersion; });
specialCounter(cc, "QueuePoppedVersion", [this](){ return this->queuePoppedVersion; });
specialCounter(cc, "MinPoppedTagVersion", [this](){ return this->minPoppedTagVersion; });
specialCounter(cc, "MinPoppedTagVersion", [this]() { return this->minPoppedTagVersion; });
// The locality and id of the tag that is responsible for making the TLog hold onto its oldest piece of data.
// If disk queues are growing and no one is sure why, then you shall look at this to find the tag responsible
// for why the TLog thinks it can't throw away data.
specialCounter(cc, "MinPoppedTagLocality", [this](){ return this->minPoppedTag.locality; });
specialCounter(cc, "MinPoppedTagId", [this](){ return this->minPoppedTag.id; });
specialCounter(cc, "SharedBytesInput", [tLogData](){ return tLogData->bytesInput; });
Expand Down Expand Up @@ -766,6 +779,9 @@ ACTOR Future<Void> updatePoppedLocation( TLogData* self, Reference<LogData> logD
return Void();
}

// It runs against the oldest TLog instance, calculates the first location in the disk queue that contains un-popped
// data, and then issues a pop to the disk queue at that location so that anything earlier can be
// removed/forgotten/overwritten. In effect, it applies the effect of TLogPop RPCs to disk.
ACTOR Future<Void> popDiskQueue( TLogData* self, Reference<LogData> logData ) {
if (!logData->initialized) return Void();

Expand Down Expand Up @@ -980,9 +996,11 @@ ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logD
return Void();
}

// This function (and updatePersistentData, which is called by this function) run at a low priority and can soak up all CPU resources.
// For this reason, they employ aggressive use of yields to avoid causing slow tasks that could introduce latencies for more important
// work (e.g. commits).
// This function (and updatePersistentData, which is called by this function) run at a low priority and can soak up all
// CPU resources. For this reason, they employ aggressive use of yields to avoid causing slow tasks that could introduce
// latencies for more important work (e.g. commits).
// This actor is just a loop that calls updatePersistentData and popDiskQueue whenever
// (a) there's data to be spilled or (b) we should update metadata after some commits have been fully popped.
ACTOR Future<Void> updateStorage( TLogData* self ) {
while(self->spillOrder.size() && !self->id_data.count(self->spillOrder.front())) {
self->spillOrder.pop_front();
Expand Down Expand Up @@ -2250,6 +2268,7 @@ void removeLog( TLogData* self, Reference<LogData> logData ) {
}
}

// remote tLog pull data from log routers
ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, std::vector<Tag> tags, Version beginVersion, Optional<Version> endVersion, bool poppedIsKnownCommitted ) {
state Future<Void> dbInfoChange = Void();
state Reference<ILogSystem::IPeekCursor> r;
Expand Down
Loading

0 comments on commit 569ab46

Please sign in to comment.