forked from apple/foundationdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRestoreCommon.actor.h
398 lines (341 loc) · 15.9 KB
/
RestoreCommon.actor.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
/*
* RestoreCommon.actor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// This file includes the code copied from the old restore in FDB 5.2
// The functions and structure declared in this file can be shared by
// the old restore and the new performant restore systems
#pragma once
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_RESTORECOMMON_ACTOR_G_H)
#define FDBSERVER_RESTORECOMMON_ACTOR_G_H
#include "fdbserver/RestoreCommon.actor.g.h"
#elif !defined(FDBSERVER_RESTORECOMMON_ACTOR_H)
#define FDBSERVER_RESTORECOMMON_ACTOR_H
#include "flow/flow.h"
#include "flow/genericactors.actor.h"
#include "fdbclient/Tuple.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbrpc/IAsyncFile.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbserver/Knobs.h"
#include "flow/actorcompiler.h" // has to be last include
// RestoreConfig copied from FileBackupAgent.actor.cpp
// We copy RestoreConfig instead of using (and potentially changing) it in place
// to avoid conflict with the existing code.
// We also made minor changes to allow RestoreConfig to be ReferenceCounted
// TODO: Merge this RestoreConfig with the original RestoreConfig in FileBackupAgent.actor.cpp
// For convenience
typedef FileBackupAgent::ERestoreState ERestoreState;
struct RestoreFileFR;
// We copy RestoreConfig copied from FileBackupAgent.actor.cpp instead of using (and potentially changing) it in place
// to avoid conflict with the existing code Split RestoreConfig defined in FileBackupAgent.actor.cpp to declaration in
// Restore.actor.h and implementation in RestoreCommon.actor.cpp, so that we can use in both the existing restore and
// the new fast restore subsystems. We use RestoreConfig as a Reference<RestoreConfig>, which leads to some
// non-functional changes in RestoreConfig
class RestoreConfigFR : public KeyBackedConfig, public ReferenceCounted<RestoreConfigFR> {
public:
RestoreConfigFR(UID uid = UID()) : KeyBackedConfig(fileRestorePrefixRange.begin, uid) {}
RestoreConfigFR(Reference<Task> task) : KeyBackedConfig(fileRestorePrefixRange.begin, task) {}
KeyBackedProperty<ERestoreState> stateEnum();
Future<StringRef> stateText(Reference<ReadYourWritesTransaction> tr);
KeyBackedProperty<Key> addPrefix();
KeyBackedProperty<Key> removePrefix();
// XXX: Remove restoreRange() once it is safe to remove. It has been changed to restoreRanges
KeyBackedProperty<KeyRange> restoreRange();
KeyBackedProperty<std::vector<KeyRange>> restoreRanges();
KeyBackedProperty<Key> batchFuture();
KeyBackedProperty<Version> restoreVersion();
KeyBackedProperty<Reference<IBackupContainer>> sourceContainer();
// Get the source container as a bare URL, without creating a container instance
KeyBackedProperty<Value> sourceContainerURL();
// Total bytes written by all log and range restore tasks.
KeyBackedBinaryValue<int64_t> bytesWritten();
// File blocks that have had tasks created for them by the Dispatch task
KeyBackedBinaryValue<int64_t> filesBlocksDispatched();
// File blocks whose tasks have finished
KeyBackedBinaryValue<int64_t> fileBlocksFinished();
// Total number of files in the fileMap
KeyBackedBinaryValue<int64_t> fileCount();
// Total number of file blocks in the fileMap
KeyBackedBinaryValue<int64_t> fileBlockCount();
Future<std::vector<KeyRange>> getRestoreRangesOrDefault(Reference<ReadYourWritesTransaction> tr);
ACTOR static Future<std::vector<KeyRange>> getRestoreRangesOrDefault_impl(RestoreConfigFR* self,
Reference<ReadYourWritesTransaction> tr);
// Describes a file to load blocks from during restore. Ordered by version and then fileName to enable
// incrementally advancing through the map, saving the version and path of the next starting point.
struct RestoreFile {
Version version;
std::string fileName;
bool isRange; // false for log file
int64_t blockSize;
int64_t fileSize;
Version endVersion; // not meaningful for range files
Tuple pack() const {
// fprintf(stderr, "Filename:%s\n", fileName.c_str());
return Tuple()
.append(version)
.append(StringRef(fileName))
.append(isRange)
.append(fileSize)
.append(blockSize)
.append(endVersion);
}
static RestoreFile unpack(Tuple const& t) {
RestoreFile r;
int i = 0;
r.version = t.getInt(i++);
r.fileName = t.getString(i++).toString();
r.isRange = t.getInt(i++) != 0;
r.fileSize = t.getInt(i++);
r.blockSize = t.getInt(i++);
r.endVersion = t.getInt(i++);
return r;
}
};
// typedef KeyBackedSet<RestoreFile> FileSetT;
KeyBackedSet<RestoreFile> fileSet();
Future<bool> isRunnable(Reference<ReadYourWritesTransaction> tr);
Future<Void> logError(Database cx, Error e, std::string const& details, void* taskInstance = nullptr);
Key mutationLogPrefix();
Key applyMutationsMapPrefix();
ACTOR Future<int64_t> getApplyVersionLag_impl(Reference<ReadYourWritesTransaction> tr, UID uid);
Future<int64_t> getApplyVersionLag(Reference<ReadYourWritesTransaction> tr);
void initApplyMutations(Reference<ReadYourWritesTransaction> tr, Key addPrefix, Key removePrefix);
void clearApplyMutationsKeys(Reference<ReadYourWritesTransaction> tr);
void setApplyBeginVersion(Reference<ReadYourWritesTransaction> tr, Version ver);
void setApplyEndVersion(Reference<ReadYourWritesTransaction> tr, Version ver);
Future<Version> getApplyEndVersion(Reference<ReadYourWritesTransaction> tr);
ACTOR static Future<std::string> getProgress_impl(Reference<RestoreConfigFR> restore,
Reference<ReadYourWritesTransaction> tr);
Future<std::string> getProgress(Reference<ReadYourWritesTransaction> tr);
ACTOR static Future<std::string> getFullStatus_impl(Reference<RestoreConfigFR> restore,
Reference<ReadYourWritesTransaction> tr);
Future<std::string> getFullStatus(Reference<ReadYourWritesTransaction> tr);
std::string toString(); // Added by Meng
};
// typedef RestoreConfigFR::RestoreFile RestoreFile;
// Describes a file to load blocks from during restore. Ordered by version and then fileName to enable
// incrementally advancing through the map, saving the version and path of the next starting point.
// NOTE: The struct RestoreFileFR can NOT be named RestoreFile, because compiler will get confused in linking which
// RestoreFile should be used. If we use RestoreFile, compilation succeeds, but weird segmentation fault will happen.
struct RestoreFileFR {
Version version;
std::string fileName;
bool isRange; // false for log file
int64_t blockSize;
int64_t fileSize;
Version endVersion; // not meaningful for range files
Version beginVersion; // range file's beginVersion == endVersion; log file contains mutations in version
// [beginVersion, endVersion)
int64_t cursor; // The start block location to be restored. All blocks before cursor have been scheduled to load and
// restore
int fileIndex; // index of backup file. Must be identical per file.
int partitionId = -1; // Partition ID (Log Router Tag ID) for mutation files.
Tuple pack() const {
return Tuple()
.append(version)
.append(StringRef(fileName))
.append(isRange)
.append(fileSize)
.append(blockSize)
.append(endVersion)
.append(beginVersion)
.append(cursor)
.append(fileIndex)
.append(partitionId);
}
static RestoreFileFR unpack(Tuple const& t) {
RestoreFileFR r;
int i = 0;
r.version = t.getInt(i++);
r.fileName = t.getString(i++).toString();
r.isRange = t.getInt(i++) != 0;
r.fileSize = t.getInt(i++);
r.blockSize = t.getInt(i++);
r.endVersion = t.getInt(i++);
r.beginVersion = t.getInt(i++);
r.cursor = t.getInt(i++);
r.fileIndex = t.getInt(i++);
r.partitionId = t.getInt(i++);
return r;
}
bool operator<(const RestoreFileFR& rhs) const {
return std::tie(beginVersion, endVersion, fileIndex, fileName) <
std::tie(rhs.beginVersion, rhs.endVersion, rhs.fileIndex, rhs.fileName);
}
RestoreFileFR()
: version(invalidVersion), isRange(false), blockSize(0), fileSize(0), endVersion(invalidVersion),
beginVersion(invalidVersion), cursor(0), fileIndex(0) {}
explicit RestoreFileFR(const RangeFile& f)
: version(f.version), fileName(f.fileName), isRange(true), blockSize(f.blockSize), fileSize(f.fileSize),
endVersion(f.version), beginVersion(f.version), cursor(0), fileIndex(0) {}
explicit RestoreFileFR(const LogFile& f)
: version(f.beginVersion), fileName(f.fileName), isRange(false), blockSize(f.blockSize), fileSize(f.fileSize),
endVersion(f.endVersion), beginVersion(f.beginVersion), cursor(0), fileIndex(0), partitionId(f.tagId) {}
std::string toString() const {
std::stringstream ss;
ss << "version:" << version << " fileName:" << fileName << " isRange:" << isRange << " blockSize:" << blockSize
<< " fileSize:" << fileSize << " endVersion:" << endVersion << " beginVersion:" << beginVersion
<< " cursor:" << cursor << " fileIndex:" << fileIndex << " partitionId:" << partitionId;
return ss.str();
}
};
namespace parallelFileRestore {
ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeLogFileBlock(Reference<IAsyncFile> file,
int64_t offset,
int len);
} // namespace parallelFileRestore
// Send each request in requests via channel of the request's interface.
// Save replies to replies if replies != nullptr
// The UID in a request is the UID of the interface to handle the request
ACTOR template <class Interface, class Request>
Future<Void> getBatchReplies(RequestStream<Request> Interface::*channel,
std::map<UID, Interface> interfaces,
std::vector<std::pair<UID, Request>> requests,
std::vector<REPLY_TYPE(Request)>* replies,
TaskPriority taskID = TaskPriority::Low,
bool trackRequestLatency = true) {
if (requests.empty()) {
return Void();
}
state double start = now();
state int oustandingReplies = requests.size();
loop {
try {
state std::vector<Future<REPLY_TYPE(Request)>> cmdReplies;
state std::vector<std::tuple<UID, Request, double>> replyDurations; // double is end time of the request
for (auto& request : requests) {
RequestStream<Request> const* stream = &(interfaces[request.first].*channel);
cmdReplies.push_back(stream->getReply(request.second, taskID));
replyDurations.emplace_back(request.first, request.second, 0);
}
state std::vector<Future<REPLY_TYPE(Request)>> ongoingReplies;
state std::vector<int> ongoingRepliesIndex;
loop {
ongoingReplies.clear();
ongoingRepliesIndex.clear();
for (int i = 0; i < cmdReplies.size(); ++i) {
if (SERVER_KNOBS->FASTRESTORE_REQBATCH_LOG) {
TraceEvent(SevInfo, "FastRestoreGetBatchReplies")
.suppressFor(1.0)
.detail("Requests", requests.size())
.detail("OutstandingReplies", oustandingReplies)
.detail("ReplyIndex", i)
.detail("ReplyIsReady", cmdReplies[i].isReady())
.detail("ReplyIsError", cmdReplies[i].isError())
.detail("RequestNode", requests[i].first)
.detail("Request", requests[i].second.toString());
}
if (!cmdReplies[i].isReady()) { // still wait for reply
ongoingReplies.push_back(cmdReplies[i]);
ongoingRepliesIndex.push_back(i);
}
}
ASSERT(ongoingReplies.size() == oustandingReplies);
if (ongoingReplies.empty()) {
break;
} else {
wait(
quorum(ongoingReplies,
std::min((int)SERVER_KNOBS->FASTRESTORE_REQBATCH_PARALLEL, (int)ongoingReplies.size())));
}
// At least one reply is received; Calculate the reply duration
for (int j = 0; j < ongoingReplies.size(); ++j) {
if (ongoingReplies[j].isReady()) {
std::get<2>(replyDurations[ongoingRepliesIndex[j]]) = now();
--oustandingReplies;
} else if (ongoingReplies[j].isError()) {
// When this happens,
// the above assertion ASSERT(ongoingReplies.size() == oustandingReplies) will fail
TraceEvent(SevError, "FastRestoreGetBatchRepliesReplyError")
.detail("OngoingReplyIndex", j)
.detail("FutureError", ongoingReplies[j].getError().what());
}
}
}
ASSERT(oustandingReplies == 0);
if (trackRequestLatency && SERVER_KNOBS->FASTRESTORE_TRACK_REQUEST_LATENCY) {
// Calculate the latest end time for each interface
std::map<UID, double> maxEndTime;
UID bathcID = deterministicRandom()->randomUniqueID();
for (int i = 0; i < replyDurations.size(); ++i) {
double endTime = std::get<2>(replyDurations[i]);
TraceEvent(SevInfo, "ProfileSendRequestBatchLatency", bathcID)
.detail("Node", std::get<0>(replyDurations[i]))
.detail("Request", std::get<1>(replyDurations[i]).toString())
.detail("Duration", endTime - start);
auto item = maxEndTime.emplace(std::get<0>(replyDurations[i]), endTime);
item.first->second = std::max(item.first->second, endTime);
}
// Check the time gap between the earliest and latest node
double earliest = std::numeric_limits<double>::max();
double latest = std::numeric_limits<double>::min();
UID earliestNode, latestNode;
for (auto& endTime : maxEndTime) {
if (earliest > endTime.second) {
earliest = endTime.second;
earliestNode = endTime.first;
}
if (latest < endTime.second) {
latest = endTime.second;
latestNode = endTime.first;
}
}
if (latest - earliest > SERVER_KNOBS->FASTRESTORE_STRAGGLER_THRESHOLD_SECONDS) {
TraceEvent(SevWarn, "ProfileSendRequestBatchLatencyFoundStraggler", bathcID)
.detail("SlowestNode", latestNode)
.detail("FatestNode", earliestNode)
.detail("EarliestEndtime", earliest)
.detail("LagTime", latest - earliest);
}
}
// Update replies
if (replies != nullptr) {
for (int i = 0; i < cmdReplies.size(); ++i) {
replies->emplace_back(cmdReplies[i].get());
}
}
break;
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled)
break;
// fprintf(stdout, "sendBatchRequests Error code:%d, error message:%s\n", e.code(), e.what());
TraceEvent(SevWarn, "FastRestoreSendBatchRequests").error(e);
for (auto& request : requests) {
TraceEvent(SevWarn, "FastRestoreSendBatchRequests")
.detail("SendBatchRequests", requests.size())
.detail("RequestID", request.first)
.detail("Request", request.second.toString());
resetReply(request.second);
}
}
}
return Void();
}
// Similar to getBatchReplies except that the caller does not expect to process the reply info.
ACTOR template <class Interface, class Request>
Future<Void> sendBatchRequests(RequestStream<Request> Interface::*channel,
std::map<UID, Interface> interfaces,
std::vector<std::pair<UID, Request>> requests,
TaskPriority taskID = TaskPriority::Low,
bool trackRequestLatency = true) {
wait(getBatchReplies(channel, interfaces, requests, nullptr, taskID, trackRequestLatency));
return Void();
}
#include "flow/unactorcompiler.h"
#endif // FDBSERVER_RESTORECOMMON_ACTOR_H