forked from facebook/rocksdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathimport_column_family_job.cc
282 lines (243 loc) · 9.66 KB
/
import_column_family_job.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
#ifndef ROCKSDB_LITE
#include "db/import_column_family_job.h"
#include <algorithm>
#include <cinttypes>
#include <string>
#include <vector>
#include "db/version_edit.h"
#include "file/file_util.h"
#include "file/random_access_file_reader.h"
#include "table/merging_iterator.h"
#include "table/scoped_arena_iterator.h"
#include "table/sst_file_writer_collectors.h"
#include "table/table_builder.h"
#include "util/stop_watch.h"
namespace ROCKSDB_NAMESPACE {
Status ImportColumnFamilyJob::Prepare(uint64_t next_file_number,
SuperVersion* sv) {
Status status;
// Read the information of files we are importing
for (const auto& file_metadata : metadata_) {
const auto file_path = file_metadata.db_path + "/" + file_metadata.name;
IngestedFileInfo file_to_import;
status = GetIngestedFileInfo(file_path, &file_to_import, sv);
if (!status.ok()) {
return status;
}
files_to_import_.push_back(file_to_import);
}
auto num_files = files_to_import_.size();
if (num_files == 0) {
return Status::InvalidArgument("The list of files is empty");
} else if (num_files > 1) {
// Verify that passed files don't have overlapping ranges in any particular
// level.
int min_level = 1; // Check for overlaps in Level 1 and above.
int max_level = -1;
for (const auto& file_metadata : metadata_) {
if (file_metadata.level > max_level) {
max_level = file_metadata.level;
}
}
for (int level = min_level; level <= max_level; ++level) {
autovector<const IngestedFileInfo*> sorted_files;
for (size_t i = 0; i < num_files; i++) {
if (metadata_[i].level == level) {
sorted_files.push_back(&files_to_import_[i]);
}
}
std::sort(
sorted_files.begin(), sorted_files.end(),
[this](const IngestedFileInfo* info1, const IngestedFileInfo* info2) {
return cfd_->internal_comparator().Compare(
info1->smallest_internal_key,
info2->smallest_internal_key) < 0;
});
for (size_t i = 0; i + 1 < sorted_files.size(); i++) {
if (cfd_->internal_comparator().Compare(
sorted_files[i]->largest_internal_key,
sorted_files[i + 1]->smallest_internal_key) >= 0) {
return Status::InvalidArgument("Files have overlapping ranges");
}
}
}
}
for (const auto& f : files_to_import_) {
if (f.num_entries == 0) {
return Status::InvalidArgument("File contain no entries");
}
if (!f.smallest_internal_key.Valid() || !f.largest_internal_key.Valid()) {
return Status::Corruption("File has corrupted keys");
}
}
// Copy/Move external files into DB
auto hardlink_files = import_options_.move_files;
for (auto& f : files_to_import_) {
f.fd = FileDescriptor(next_file_number++, 0, f.file_size);
const auto path_outside_db = f.external_file_path;
const auto path_inside_db = TableFileName(
cfd_->ioptions()->cf_paths, f.fd.GetNumber(), f.fd.GetPathId());
if (hardlink_files) {
status =
fs_->LinkFile(path_outside_db, path_inside_db, IOOptions(), nullptr);
if (status.IsNotSupported()) {
// Original file is on a different FS, use copy instead of hard linking
hardlink_files = false;
}
}
if (!hardlink_files) {
status = CopyFile(fs_.get(), path_outside_db, path_inside_db, 0,
db_options_.use_fsync, io_tracer_);
}
if (!status.ok()) {
break;
}
f.copy_file = !hardlink_files;
f.internal_file_path = path_inside_db;
}
if (!status.ok()) {
// We failed, remove all files that we copied into the db
for (const auto& f : files_to_import_) {
if (f.internal_file_path.empty()) {
break;
}
const auto s =
fs_->DeleteFile(f.internal_file_path, IOOptions(), nullptr);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"AddFile() clean up for file %s failed : %s",
f.internal_file_path.c_str(), s.ToString().c_str());
}
}
}
return status;
}
// REQUIRES: we have become the only writer by entering both write_thread_ and
// nonmem_write_thread_
Status ImportColumnFamilyJob::Run() {
Status status;
edit_.SetColumnFamily(cfd_->GetID());
// We use the import time as the ancester time. This is the time the data
// is written to the database.
int64_t temp_current_time = 0;
uint64_t oldest_ancester_time = kUnknownOldestAncesterTime;
uint64_t current_time = kUnknownOldestAncesterTime;
if (clock_->GetCurrentTime(&temp_current_time).ok()) {
current_time = oldest_ancester_time =
static_cast<uint64_t>(temp_current_time);
}
for (size_t i = 0; i < files_to_import_.size(); ++i) {
const auto& f = files_to_import_[i];
const auto& file_metadata = metadata_[i];
edit_.AddFile(file_metadata.level, f.fd.GetNumber(), f.fd.GetPathId(),
f.fd.GetFileSize(), f.smallest_internal_key,
f.largest_internal_key, file_metadata.smallest_seqno,
file_metadata.largest_seqno, false, kInvalidBlobFileNumber,
oldest_ancester_time, current_time, kUnknownFileChecksum,
kUnknownFileChecksumFuncName);
// If incoming sequence number is higher, update local sequence number.
if (file_metadata.largest_seqno > versions_->LastSequence()) {
versions_->SetLastAllocatedSequence(file_metadata.largest_seqno);
versions_->SetLastPublishedSequence(file_metadata.largest_seqno);
versions_->SetLastSequence(file_metadata.largest_seqno);
}
}
return status;
}
void ImportColumnFamilyJob::Cleanup(const Status& status) {
if (!status.ok()) {
// We failed to add files to the database remove all the files we copied.
for (const auto& f : files_to_import_) {
const auto s =
fs_->DeleteFile(f.internal_file_path, IOOptions(), nullptr);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"AddFile() clean up for file %s failed : %s",
f.internal_file_path.c_str(), s.ToString().c_str());
}
}
} else if (status.ok() && import_options_.move_files) {
// The files were moved and added successfully, remove original file links
for (IngestedFileInfo& f : files_to_import_) {
const auto s =
fs_->DeleteFile(f.external_file_path, IOOptions(), nullptr);
if (!s.ok()) {
ROCKS_LOG_WARN(
db_options_.info_log,
"%s was added to DB successfully but failed to remove original "
"file link : %s",
f.external_file_path.c_str(), s.ToString().c_str());
}
}
}
}
Status ImportColumnFamilyJob::GetIngestedFileInfo(
const std::string& external_file, IngestedFileInfo* file_to_import,
SuperVersion* sv) {
file_to_import->external_file_path = external_file;
// Get external file size
Status status = fs_->GetFileSize(external_file, IOOptions(),
&file_to_import->file_size, nullptr);
if (!status.ok()) {
return status;
}
// Create TableReader for external file
std::unique_ptr<TableReader> table_reader;
std::unique_ptr<FSRandomAccessFile> sst_file;
std::unique_ptr<RandomAccessFileReader> sst_file_reader;
status = fs_->NewRandomAccessFile(external_file, env_options_,
&sst_file, nullptr);
if (!status.ok()) {
return status;
}
sst_file_reader.reset(new RandomAccessFileReader(
std::move(sst_file), external_file, nullptr /*Env*/, io_tracer_));
status = cfd_->ioptions()->table_factory->NewTableReader(
TableReaderOptions(*cfd_->ioptions(),
sv->mutable_cf_options.prefix_extractor.get(),
env_options_, cfd_->internal_comparator()),
std::move(sst_file_reader), file_to_import->file_size, &table_reader);
if (!status.ok()) {
return status;
}
// Get the external file properties
auto props = table_reader->GetTableProperties();
// Set original_seqno to 0.
file_to_import->original_seqno = 0;
// Get number of entries in table
file_to_import->num_entries = props->num_entries;
ParsedInternalKey key;
ReadOptions ro;
// During reading the external file we can cache blocks that we read into
// the block cache, if we later change the global seqno of this file, we will
// have block in cache that will include keys with wrong seqno.
// We need to disable fill_cache so that we read from the file without
// updating the block cache.
ro.fill_cache = false;
std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr,
/*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion));
// Get first (smallest) key from file
iter->SeekToFirst();
Status pik_status =
ParseInternalKey(iter->key(), &key, db_options_.allow_data_in_errors);
if (!pik_status.ok()) {
return Status::Corruption("Corrupted Key in external file. ",
pik_status.getState());
}
file_to_import->smallest_internal_key.SetFrom(key);
// Get last (largest) key from file
iter->SeekToLast();
pik_status =
ParseInternalKey(iter->key(), &key, db_options_.allow_data_in_errors);
if (!pik_status.ok()) {
return Status::Corruption("Corrupted Key in external file. ",
pik_status.getState());
}
file_to_import->largest_internal_key.SetFrom(key);
file_to_import->cf_id = static_cast<uint32_t>(props->column_family_id);
file_to_import->table_properties = *props;
return status;
}
} // namespace ROCKSDB_NAMESPACE
#endif // !ROCKSDB_LITE