Skip to content

Commit

Permalink
Add blob cache tickers, perf context statistics, and DB properties (f…
Browse files Browse the repository at this point in the history
…acebook#10203)

Summary:
In order to be able to monitor the performance of the new blob cache, we made the follow changes:
- Add blob cache hit/miss/insertion tickers (see https://github.com/facebook/rocksdb/wiki/Statistics)
- Extend the perf context similarly (see https://github.com/facebook/rocksdb/wiki/Perf-Context-and-IO-Stats-Context)
- Implement new DB properties (see e.g. https://github.com/facebook/rocksdb/blob/main/include/rocksdb/db.h#L1042-L1051) that expose the capacity and current usage of the blob cache.

This PR is a part of facebook#10156

Pull Request resolved: facebook#10203

Reviewed By: ltamasi

Differential Revision: D37478658

Pulled By: gangliao

fbshipit-source-id: d8ee3f41d47315ef725e4551226330b4b6832e40
  • Loading branch information
gangliao authored and facebook-github-bot committed Jun 28, 2022
1 parent c6055cb commit d7ebb58
Show file tree
Hide file tree
Showing 18 changed files with 584 additions and 56 deletions.
4 changes: 4 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@
* Add support for timestamped snapshots (#9879)
* Provide support for AbortIO in posix to cancel submitted asynchronous requests using io_uring.
* Add support for rate-limiting batched `MultiGet()` APIs
* Added several new tickers, perf context statistics, and DB properties to BlobDB
* Added new DB properties "rocksdb.blob-cache-capacity", "rocksdb.blob-cache-usage", "rocksdb.blob-cache-pinned-usage" to show blob cache usage.
* Added new perf context statistics `blob_cache_hit_count`, `blob_read_count`, `blob_read_byte`, `blob_read_time`, `blob_checksum_time` and `blob_decompress_time`.
* Added new tickers `BLOB_DB_CACHE_MISS`, `BLOB_DB_CACHE_HIT`, `BLOB_DB_CACHE_ADD`, `BLOB_DB_CACHE_ADD_FAILURES`, `BLOB_DB_CACHE_BYTES_READ` and `BLOB_DB_CACHE_BYTES_WRITE`.

### Behavior changes
* DB::Open(), DB::OpenAsSecondary() will fail if a Logger cannot be created (#9984)
Expand Down
9 changes: 8 additions & 1 deletion db/blob/blob_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,9 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options,

if (!prefetched) {
TEST_SYNC_POINT("BlobFileReader::GetBlob:ReadFromFile");

PERF_COUNTER_ADD(blob_read_count, 1);
PERF_COUNTER_ADD(blob_read_byte, record_size);
PERF_TIMER_GUARD(blob_read_time);
const Status s = ReadFromFile(file_reader_.get(), record_offset,
static_cast<size_t>(record_size), statistics_,
&record_slice, &buf, &aligned_buf,
Expand Down Expand Up @@ -428,6 +430,8 @@ void BlobFileReader::MultiGetBlob(
}
}
TEST_SYNC_POINT("BlobFileReader::MultiGetBlob:ReadFromFile");
PERF_COUNTER_ADD(blob_read_count, num_blobs);
PERF_COUNTER_ADD(blob_read_byte, total_len);
s = file_reader_->MultiRead(IOOptions(), read_reqs.data(), read_reqs.size(),
direct_io ? &aligned_buf : nullptr,
read_options.rate_limiter_priority);
Expand Down Expand Up @@ -483,6 +487,8 @@ void BlobFileReader::MultiGetBlob(

Status BlobFileReader::VerifyBlob(const Slice& record_slice,
const Slice& user_key, uint64_t value_size) {
PERF_TIMER_GUARD(blob_checksum_time);

BlobLogRecord record;

const Slice header_slice(record_slice.data(), BlobLogRecord::kHeaderSize);
Expand Down Expand Up @@ -547,6 +553,7 @@ Status BlobFileReader::UncompressBlobIfNeeded(const Slice& value_slice,
CacheAllocationPtr output;

{
PERF_TIMER_GUARD(blob_decompress_time);
StopWatch stop_watch(clock, statistics, BLOB_DB_DECOMPRESSION_MICROS);
output = UncompressData(info, value_slice.data(), value_slice.size(),
&uncompressed_size, compression_format_version,
Expand Down
59 changes: 50 additions & 9 deletions db/blob/blob_source.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

#include "db/blob/blob_file_reader.h"
#include "db/blob/blob_log_format.h"
#include "monitoring/statistics.h"
#include "options/cf_options.h"
#include "table/get_context.h"
#include "table/multiget_context.h"

namespace ROCKSDB_NAMESPACE {
Expand Down Expand Up @@ -78,6 +80,38 @@ Status BlobSource::PutBlobIntoCache(const Slice& cache_key,
return s;
}

Cache::Handle* BlobSource::GetEntryFromCache(const Slice& key) const {
Cache::Handle* cache_handle = nullptr;
cache_handle = blob_cache_->Lookup(key, statistics_);
if (cache_handle != nullptr) {
PERF_COUNTER_ADD(blob_cache_hit_count, 1);
RecordTick(statistics_, BLOB_DB_CACHE_HIT);
RecordTick(statistics_, BLOB_DB_CACHE_BYTES_READ,
blob_cache_->GetUsage(cache_handle));
} else {
RecordTick(statistics_, BLOB_DB_CACHE_MISS);
}
return cache_handle;
}

Status BlobSource::InsertEntryIntoCache(const Slice& key, std::string* value,
size_t charge,
Cache::Handle** cache_handle,
Cache::Priority priority) const {
const Status s =
blob_cache_->Insert(key, value, charge, &DeleteCacheEntry<std::string>,
cache_handle, priority);
if (s.ok()) {
assert(*cache_handle != nullptr);
RecordTick(statistics_, BLOB_DB_CACHE_ADD);
RecordTick(statistics_, BLOB_DB_CACHE_BYTES_WRITE,
blob_cache_->GetUsage(*cache_handle));
} else {
RecordTick(statistics_, BLOB_DB_CACHE_ADD_FAILURES);
}
return s;
}

Status BlobSource::GetBlob(const ReadOptions& read_options,
const Slice& user_key, uint64_t file_number,
uint64_t offset, uint64_t file_size,
Expand All @@ -100,18 +134,21 @@ Status BlobSource::GetBlob(const ReadOptions& read_options,
Slice key = cache_key.AsSlice();
s = GetBlobFromCache(key, &blob_entry);
if (s.ok() && blob_entry.GetValue()) {
value->PinSelf(*blob_entry.GetValue());

// For consistency, the size of on-disk (possibly compressed) blob record
// is assigned to bytes_read.
uint64_t adjustment =
read_options.verify_checksums
? BlobLogRecord::CalculateAdjustmentForRecordHeader(
user_key.size())
: 0;
assert(offset >= adjustment);

uint64_t record_size = value_size + adjustment;
if (bytes_read) {
uint64_t adjustment =
read_options.verify_checksums
? BlobLogRecord::CalculateAdjustmentForRecordHeader(
user_key.size())
: 0;
assert(offset >= adjustment);
*bytes_read = value_size + adjustment;
*bytes_read = record_size;
}
value->PinSelf(*blob_entry.GetValue());
return s;
}
}
Expand Down Expand Up @@ -139,12 +176,16 @@ Status BlobSource::GetBlob(const ReadOptions& read_options,
return Status::Corruption("Compression type mismatch when reading blob");
}

uint64_t read_size = 0;
s = blob_file_reader.GetValue()->GetBlob(
read_options, user_key, offset, value_size, compression_type,
prefetch_buffer, value, bytes_read);
prefetch_buffer, value, &read_size);
if (!s.ok()) {
return s;
}
if (bytes_read) {
*bytes_read = read_size;
}
}

if (blob_cache_ && read_options.fill_cache) {
Expand Down
19 changes: 6 additions & 13 deletions db/blob/blob_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,26 +90,19 @@ class BlobSource {
CachableEntry<std::string>* cached_blob,
PinnableSlice* blob) const;

Cache::Handle* GetEntryFromCache(const Slice& key) const;

Status InsertEntryIntoCache(const Slice& key, std::string* value,
size_t charge, Cache::Handle** cache_handle,
Cache::Priority priority) const;

inline CacheKey GetCacheKey(uint64_t file_number, uint64_t file_size,
uint64_t offset) const {
OffsetableCacheKey base_cache_key(db_id_, db_session_id_, file_number,
file_size);
return base_cache_key.WithOffset(offset);
}

inline Cache::Handle* GetEntryFromCache(const Slice& key) const {
return blob_cache_->Lookup(key, statistics_);
}

inline Status InsertEntryIntoCache(const Slice& key, std::string* value,
size_t charge,
Cache::Handle** cache_handle,
Cache::Priority priority) const {
return blob_cache_->Insert(key, value, charge,
&DeleteCacheEntry<std::string>, cache_handle,
priority);
}

const std::string& db_id_;
const std::string& db_session_id_;

Expand Down
Loading

0 comments on commit d7ebb58

Please sign in to comment.