Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rate limiter priority to ReadOptions #9424

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
3c7107a
add ReadOptions::priority; basic plumbing to RandomAccessFileReader
ajkr Jan 24, 2022
044adfd
test nonzero compaction_readahead_bytes (path where FilePrefetchBuffe…
ajkr Jan 24, 2022
a7c9ee1
remove TODO for rate limiting a utility (SstFileDumper) code path
ajkr Jan 24, 2022
d21036d
add rate_limit_user_ops db_bench flag
ajkr Jan 24, 2022
63fba1c
add benchmarks for VerifyChecksum(), VerifyFileChecksums()
ajkr Jan 24, 2022
62adb1f
bug fix for VerifyChecksum() to use correct ReadOptions, rate_limiter
ajkr Jan 24, 2022
8268f3d
support more ReadOptions in new benchmarks
ajkr Jan 24, 2022
ab6ff84
db_bench option for file_checksum
ajkr Jan 24, 2022
eda989a
finish important parts of BlobFileReader
ajkr Jan 24, 2022
6274abf
remove impractical TODO
ajkr Jan 24, 2022
dc1d50c
more specific TODOs
ajkr Jan 24, 2022
7114e4b
clarify more TODOs
ajkr Jan 24, 2022
dcbe527
clarify api doc
ajkr Jan 24, 2022
797ea3b
update HISTORY.md
ajkr Jan 24, 2022
6532cdd
make format
ajkr Jan 24, 2022
b4f717a
complete list missing parts in API doc
ajkr Jan 24, 2022
c134a67
fix lite build
ajkr Jan 24, 2022
9ac84cc
basic test
ajkr Jan 25, 2022
36dde9a
fix lite
ajkr Jan 25, 2022
d333d11
try fix for no direct I/O support
ajkr Jan 25, 2022
a72a194
add MultiGet tests
ajkr Feb 2, 2022
649c99a
update doc for MultiGet
ajkr Feb 2, 2022
c3af9e6
make format
ajkr Feb 2, 2022
f7d91e9
add test for VerifyChecksum, VerifyFileChecksums
ajkr Feb 2, 2022
0e73a59
s/operation/option/
ajkr Feb 2, 2022
064e22d
remove API doc ref to non-public old BlobDB
ajkr Feb 2, 2022
855f161
API doc code reference for cuckoo/plain table
ajkr Feb 2, 2022
46a033d
API doc move small read details
ajkr Feb 2, 2022
9081582
update DBOptions::rate_limiter doc
ajkr Feb 2, 2022
9cda4cc
fix lite
ajkr Feb 2, 2022
b14fb92
rename priority -> rate_limiter_priority
ajkr Feb 3, 2022
10574a5
address comments
ajkr Feb 17, 2022
3c11f2b
fix visual studio compiler error
ajkr Feb 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,7 @@ if(WITH_TESTS)
db/db_options_test.cc
db/db_properties_test.cc
db/db_range_del_test.cc
db/db_rate_limiter_test.cc
db/db_secondary_test.cc
db/db_sst_test.cc
db/db_statistics_test.cc
Expand Down
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

### Public API changes
* Require C++17 compatible compiler (GCC >= 7, Clang >= 5, Visual Studio >= 2017). See #9388.
* Added `ReadOptions::rate_limiter_priority`. When set to something other than `Env::IO_TOTAL`, the internal rate limiter (`DBOptions::rate_limiter`) will be charged at the specified priority for file reads associated with the API to which the `ReadOptions` was provided.
* Remove HDFS support from main repo.
* Remove librados support from main repo.
* Remove obsolete backupable_db.h and type alias `BackupableDBOptions`. Use backup_engine.h and `BackupEngineOptions`. Similar renamings are in the C and Java APIs.
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1533,6 +1533,9 @@ db_options_test: $(OBJ_DIR)/db/db_options_test.o $(TEST_LIBRARY) $(LIBRARY)
db_range_del_test: $(OBJ_DIR)/db/db_range_del_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

db_rate_limiter_test: $(OBJ_DIR)/db/db_rate_limiter_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

db_sst_test: $(OBJ_DIR)/db/db_sst_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

Expand Down
7 changes: 7 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -1441,6 +1441,13 @@ ROCKS_TESTS = [
[],
[],
],
[
"db_rate_limiter_test",
"db/db_rate_limiter_test.cc",
"parallel",
[],
[],
],
[
"db_secondary_test",
"db/db_secondary_test.cc",
Expand Down
30 changes: 18 additions & 12 deletions db/blob/blob_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,10 @@ Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
constexpr uint64_t read_offset = 0;
constexpr size_t read_size = BlobLogHeader::kSize;

const Status s =
ReadFromFile(file_reader, read_offset, read_size, statistics,
&header_slice, &buf, &aligned_buf);
// TODO: rate limit reading headers from blob files.
const Status s = ReadFromFile(file_reader, read_offset, read_size,
statistics, &header_slice, &buf, &aligned_buf,
Env::IO_TOTAL /* rate_limiter_priority */);
if (!s.ok()) {
return s;
}
Expand Down Expand Up @@ -198,9 +199,10 @@ Status BlobFileReader::ReadFooter(const RandomAccessFileReader* file_reader,
const uint64_t read_offset = file_size - BlobLogFooter::kSize;
constexpr size_t read_size = BlobLogFooter::kSize;

const Status s =
ReadFromFile(file_reader, read_offset, read_size, statistics,
&footer_slice, &buf, &aligned_buf);
// TODO: rate limit reading footers from blob files.
const Status s = ReadFromFile(file_reader, read_offset, read_size,
statistics, &footer_slice, &buf, &aligned_buf,
Env::IO_TOTAL /* rate_limiter_priority */);
if (!s.ok()) {
return s;
}
Expand Down Expand Up @@ -230,7 +232,8 @@ Status BlobFileReader::ReadFooter(const RandomAccessFileReader* file_reader,
Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader,
uint64_t read_offset, size_t read_size,
Statistics* statistics, Slice* slice,
Buffer* buf, AlignedBuf* aligned_buf) {
Buffer* buf, AlignedBuf* aligned_buf,
Env::IOPriority rate_limiter_priority) {
assert(slice);
assert(buf);
assert(aligned_buf);
Expand All @@ -245,13 +248,13 @@ Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader,
constexpr char* scratch = nullptr;

s = file_reader->Read(IOOptions(), read_offset, read_size, slice, scratch,
aligned_buf);
aligned_buf, rate_limiter_priority);
} else {
buf->reset(new char[read_size]);
constexpr AlignedBuf* aligned_scratch = nullptr;

s = file_reader->Read(IOOptions(), read_offset, read_size, slice,
buf->get(), aligned_scratch);
buf->get(), aligned_scratch, rate_limiter_priority);
}

if (!s.ok()) {
Expand Down Expand Up @@ -323,7 +326,8 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options,

prefetched = prefetch_buffer->TryReadFromCache(
IOOptions(), file_reader_.get(), record_offset,
static_cast<size_t>(record_size), &record_slice, &s, for_compaction);
static_cast<size_t>(record_size), &record_slice, &s,
read_options.rate_limiter_priority, for_compaction);
if (!s.ok()) {
return s;
}
Expand All @@ -334,7 +338,8 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options,

const Status s = ReadFromFile(file_reader_.get(), record_offset,
static_cast<size_t>(record_size), statistics_,
&record_slice, &buf, &aligned_buf);
&record_slice, &buf, &aligned_buf,
read_options.rate_limiter_priority);
if (!s.ok()) {
return s;
}
Expand Down Expand Up @@ -424,7 +429,8 @@ void BlobFileReader::MultiGetBlob(
}
TEST_SYNC_POINT("BlobFileReader::MultiGetBlob:ReadFromFile");
s = file_reader_->MultiRead(IOOptions(), read_reqs.data(), read_reqs.size(),
direct_io ? &aligned_buf : nullptr);
direct_io ? &aligned_buf : nullptr,
read_options.rate_limiter_priority);
if (!s.ok()) {
for (auto& req : read_reqs) {
req.status.PermitUncheckedError();
Expand Down
3 changes: 2 additions & 1 deletion db/blob/blob_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ class BlobFileReader {
static Status ReadFromFile(const RandomAccessFileReader* file_reader,
uint64_t read_offset, size_t read_size,
Statistics* statistics, Slice* slice, Buffer* buf,
AlignedBuf* aligned_buf);
AlignedBuf* aligned_buf,
Env::IOPriority rate_limiter_priority);

static Status VerifyBlob(const Slice& record_slice, const Slice& user_key,
uint64_t value_size);
Expand Down
6 changes: 4 additions & 2 deletions db/blob/blob_log_sequential_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ Status BlobLogSequentialReader::ReadSlice(uint64_t size, Slice* slice,
assert(file_);

StopWatch read_sw(clock_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
Status s = file_->Read(IOOptions(), next_byte_, static_cast<size_t>(size),
slice, buf, nullptr);
// TODO: rate limit `BlobLogSequentialReader` reads (it appears unused?)
Status s =
file_->Read(IOOptions(), next_byte_, static_cast<size_t>(size), slice,
buf, nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
next_byte_ += size;
if (!s.ok()) {
return s;
Expand Down
1 change: 1 addition & 0 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1264,6 +1264,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
ReadOptions read_options;
read_options.verify_checksums = true;
read_options.fill_cache = false;
read_options.rate_limiter_priority = Env::IO_LOW;
// Compaction iterators shouldn't be confined to a single prefix.
// Compactions use Seek() for
// (a) concurrent compactions,
Expand Down
5 changes: 4 additions & 1 deletion db/convenience.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ Status VerifySstFileChecksum(const Options& options,
}
std::unique_ptr<TableReader> table_reader;
std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(std::move(file), file_path));
new RandomAccessFileReader(
std::move(file), file_path, ioptions.clock, nullptr /* io_tracer */,
nullptr /* stats */, 0 /* hist_type */, nullptr /* file_read_hist */,
ioptions.rate_limiter.get()));
const bool kImmortal = true;
s = ioptions.table_factory->NewTableReader(
TableReaderOptions(ioptions, options.prefix_extractor, env_options,
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5175,7 +5175,8 @@ Status DBImpl::VerifyFullFileChecksum(const std::string& file_checksum_expected,
fs_.get(), fname, immutable_db_options_.file_checksum_gen_factory.get(),
func_name_expected, &file_checksum, &func_name,
read_options.readahead_size, immutable_db_options_.allow_mmap_reads,
io_tracer_, immutable_db_options_.rate_limiter.get());
io_tracer_, immutable_db_options_.rate_limiter.get(),
read_options.rate_limiter_priority);
if (s.ok()) {
assert(func_name_expected == func_name);
if (file_checksum != file_checksum_expected) {
Expand Down
Loading