Skip to content

Commit

Permalink
Support prepopulating/warming the blob cache (#10298)
Browse files Browse the repository at this point in the history
Summary:
Many workloads have temporal locality, where recently written items are read back in a short period of time. When using remote file systems, this is inefficient since it involves network traffic and higher latencies. Because of this, we would like to support prepopulating the blob cache during flush.

This task is a part of facebook/rocksdb#10156

Pull Request resolved: facebook/rocksdb#10298

Reviewed By: ltamasi

Differential Revision: D37908743

Pulled By: gangliao

fbshipit-source-id: 9feaed234bc719d38f0c02975c1ad19fa4bb37d1
  • Loading branch information
gangliao authored and facebook-github-bot committed Jul 17, 2022
1 parent f5ef36a commit ec4ebef
Show file tree
Hide file tree
Showing 41 changed files with 730 additions and 58 deletions.
7 changes: 5 additions & 2 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
* Provide support for ReadOptions.async_io with direct_io to improve Seek latency by using async IO to parallelize child iterator seek and doing asynchronous prefetching on sequential scans.
* Added support for blob caching in order to cache frequently used blobs for BlobDB.
* User can configure the new ColumnFamilyOptions `blob_cache` to enable/disable blob caching.
* Either sharing the backend cache with the block cache or using a completely separate cache is supported.
* A new abstraction interface called `BlobSource` for blob read logic gives all users access to blobs, whether they are in the blob cache, secondary cache, or (remote) storage. Blobs can be potentially read both while handling user reads (`Get`, `MultiGet`, or iterator) and during compaction (while dealing with compaction filters, Merges, or garbage collection) but eventually all blob reads go through `Version::GetBlob` or, for MultiGet, `Version::MultiGetBlob` (and then get dispatched to the interface -- `BlobSource`).
* Either sharing the backend cache with the block cache or using a completely separate cache is supported.
* A new abstraction interface called `BlobSource` for blob read logic gives all users access to blobs, whether they are in the blob cache, secondary cache, or (remote) storage. Blobs can be potentially read both while handling user reads (`Get`, `MultiGet`, or iterator) and during compaction (while dealing with compaction filters, Merges, or garbage collection) but eventually all blob reads go through `Version::GetBlob` or, for MultiGet, `Version::MultiGetBlob` (and then get dispatched to the interface -- `BlobSource`).
* Added `prepopulate_blob_cache` to ColumnFamilyOptions. If enabled, prepopulate warm/hot blobs which are already in memory into blob cache at the time of flush. On a flush, the blob that is in memory (in memtables) get flushed to the device. If using Direct IO, additional IO is incurred to read this blob back into memory again, which is avoided by enabling this option. This further helps if the workload exhibits high temporal locality, where most of the reads go to recently written data. This also helps in case of the remote file system since it involves network traffic and higher latencies.
* Support using secondary cache with the blob cache. When creating a blob cache, the user can set a secondary blob cache by configuring `secondary_cache` in LRUCacheOptions.
* Add experimental tiered compaction feature `AdvancedColumnFamilyOptions::preclude_last_level_data_seconds`, which makes sure the new data inserted within preclude_last_level_data_seconds won't be placed on cold tier (the feature is not complete).

Expand All @@ -23,6 +24,8 @@
* When using block cache strict capacity limit (`LRUCache` with `strict_capacity_limit=true`), DB operations now fail with Status code `kAborted` subcode `kMemoryLimit` (`IsMemoryLimit()`) instead of `kIncomplete` (`IsIncomplete()`) when the capacity limit is reached, because Incomplete can mean other specific things for some operations. In more detail, `Cache::Insert()` now returns the updated Status code and this usually propagates through RocksDB to the user on failure.
* NewClockCache calls temporarily return an LRUCache (with similar characteristics as the desired ClockCache). This is because ClockCache is being replaced by a new version (the old one had unknown bugs) but this is still under development.
* Add two functions `int ReserveThreads(int threads_to_be_reserved)` and `int ReleaseThreads(threads_to_be_released)` into `Env` class. In the default implementation, both return 0. Newly added `xxxEnv` class that inherits `Env` should implement these two functions for thread reservation/releasing features.
* Add `rocksdb_options_get_prepopulate_blob_cache` and `rocksdb_options_set_prepopulate_blob_cache` to C API.
* Add `prepopulateBlobCache` and `setPrepopulateBlobCache` to Java API.

### Bug Fixes
* Fix a bug in which backup/checkpoint can include a WAL deleted by RocksDB.
Expand Down
80 changes: 71 additions & 9 deletions db/blob/blob_file_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,28 @@ BlobFileBuilder::BlobFileBuilder(
VersionSet* versions, FileSystem* fs,
const ImmutableOptions* immutable_options,
const MutableCFOptions* mutable_cf_options, const FileOptions* file_options,
int job_id, uint32_t column_family_id,
const std::string& column_family_name, Env::IOPriority io_priority,
Env::WriteLifeTimeHint write_hint,
std::string db_id, std::string db_session_id, int job_id,
uint32_t column_family_id, const std::string& column_family_name,
Env::IOPriority io_priority, Env::WriteLifeTimeHint write_hint,
const std::shared_ptr<IOTracer>& io_tracer,
BlobFileCompletionCallback* blob_callback,
BlobFileCreationReason creation_reason,
std::vector<std::string>* blob_file_paths,
std::vector<BlobFileAddition>* blob_file_additions)
: BlobFileBuilder([versions]() { return versions->NewFileNumber(); }, fs,
immutable_options, mutable_cf_options, file_options,
job_id, column_family_id, column_family_name, io_priority,
write_hint, io_tracer, blob_callback, creation_reason,
blob_file_paths, blob_file_additions) {}
db_id, db_session_id, job_id, column_family_id,
column_family_name, io_priority, write_hint, io_tracer,
blob_callback, creation_reason, blob_file_paths,
blob_file_additions) {}

BlobFileBuilder::BlobFileBuilder(
std::function<uint64_t()> file_number_generator, FileSystem* fs,
const ImmutableOptions* immutable_options,
const MutableCFOptions* mutable_cf_options, const FileOptions* file_options,
int job_id, uint32_t column_family_id,
const std::string& column_family_name, Env::IOPriority io_priority,
Env::WriteLifeTimeHint write_hint,
std::string db_id, std::string db_session_id, int job_id,
uint32_t column_family_id, const std::string& column_family_name,
Env::IOPriority io_priority, Env::WriteLifeTimeHint write_hint,
const std::shared_ptr<IOTracer>& io_tracer,
BlobFileCompletionCallback* blob_callback,
BlobFileCreationReason creation_reason,
Expand All @@ -64,7 +65,10 @@ BlobFileBuilder::BlobFileBuilder(
min_blob_size_(mutable_cf_options->min_blob_size),
blob_file_size_(mutable_cf_options->blob_file_size),
blob_compression_type_(mutable_cf_options->blob_compression_type),
prepopulate_blob_cache_(mutable_cf_options->prepopulate_blob_cache),
file_options_(file_options),
db_id_(std::move(db_id)),
db_session_id_(std::move(db_session_id)),
job_id_(job_id),
column_family_id_(column_family_id),
column_family_name_(column_family_name),
Expand Down Expand Up @@ -133,6 +137,16 @@ Status BlobFileBuilder::Add(const Slice& key, const Slice& value,
}
}

{
const Status s =
PutBlobIntoCacheIfNeeded(value, blob_file_number, blob_offset);
if (!s.ok()) {
ROCKS_LOG_WARN(immutable_options_->info_log,
"Failed to pre-populate the blob into blob cache: %s",
s.ToString().c_str());
}
}

BlobIndex::EncodeBlob(blob_index, blob_file_number, blob_offset, blob.size(),
blob_compression_type_);

Expand Down Expand Up @@ -372,4 +386,52 @@ void BlobFileBuilder::Abandon(const Status& s) {
blob_count_ = 0;
blob_bytes_ = 0;
}

Status BlobFileBuilder::PutBlobIntoCacheIfNeeded(const Slice& blob,
uint64_t blob_file_number,
uint64_t blob_offset) const {
Status s = Status::OK();

auto blob_cache = immutable_options_->blob_cache;
auto statistics = immutable_options_->statistics.get();
bool warm_cache =
prepopulate_blob_cache_ == PrepopulateBlobCache::kFlushOnly &&
creation_reason_ == BlobFileCreationReason::kFlush;

if (blob_cache && warm_cache) {
// The blob file during flush is unknown to be exactly how big it is.
// Therefore, we set the file size to kMaxOffsetStandardEncoding. For any
// max_offset <= this value, the same encoding scheme is guaranteed.
const OffsetableCacheKey base_cache_key(
db_id_, db_session_id_, blob_file_number,
OffsetableCacheKey::kMaxOffsetStandardEncoding);
const CacheKey cache_key = base_cache_key.WithOffset(blob_offset);
const Slice key = cache_key.AsSlice();

const Cache::Priority priority = Cache::Priority::LOW;

// Objects to be put into the cache have to be heap-allocated and
// self-contained, i.e. own their contents. The Cache has to be able to
// take unique ownership of them. Therefore, we copy the blob into a
// string directly, and insert that into the cache.
std::unique_ptr<std::string> buf = std::make_unique<std::string>();
buf->assign(blob.data(), blob.size());

// TODO: support custom allocators and provide a better estimated memory
// usage using malloc_usable_size.
s = blob_cache->Insert(key, buf.get(), buf->size(),
&DeleteCacheEntry<std::string>,
nullptr /* cache_handle */, priority);
if (s.ok()) {
RecordTick(statistics, BLOB_DB_CACHE_ADD);
RecordTick(statistics, BLOB_DB_CACHE_BYTES_WRITE, buf->size());
buf.release();
} else {
RecordTick(statistics, BLOB_DB_CACHE_ADD_FAILURES);
}
}

return s;
}

} // namespace ROCKSDB_NAMESPACE
13 changes: 11 additions & 2 deletions db/blob/blob_file_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <string>
#include <vector>

#include "rocksdb/advanced_options.h"
#include "rocksdb/compression_type.h"
#include "rocksdb/env.h"
#include "rocksdb/rocksdb_namespace.h"
Expand All @@ -35,7 +36,8 @@ class BlobFileBuilder {
BlobFileBuilder(VersionSet* versions, FileSystem* fs,
const ImmutableOptions* immutable_options,
const MutableCFOptions* mutable_cf_options,
const FileOptions* file_options, int job_id,
const FileOptions* file_options, std::string db_id,
std::string db_session_id, int job_id,
uint32_t column_family_id,
const std::string& column_family_name,
Env::IOPriority io_priority,
Expand All @@ -49,7 +51,8 @@ class BlobFileBuilder {
BlobFileBuilder(std::function<uint64_t()> file_number_generator,
FileSystem* fs, const ImmutableOptions* immutable_options,
const MutableCFOptions* mutable_cf_options,
const FileOptions* file_options, int job_id,
const FileOptions* file_options, std::string db_id,
std::string db_session_id, int job_id,
uint32_t column_family_id,
const std::string& column_family_name,
Env::IOPriority io_priority,
Expand Down Expand Up @@ -78,13 +81,19 @@ class BlobFileBuilder {
Status CloseBlobFile();
Status CloseBlobFileIfNeeded();

Status PutBlobIntoCacheIfNeeded(const Slice& blob, uint64_t blob_file_number,
uint64_t blob_offset) const;

std::function<uint64_t()> file_number_generator_;
FileSystem* fs_;
const ImmutableOptions* immutable_options_;
uint64_t min_blob_size_;
uint64_t blob_file_size_;
CompressionType blob_compression_type_;
PrepopulateBlobCache prepopulate_blob_cache_;
const FileOptions* file_options_;
const std::string db_id_;
const std::string db_session_id_;
int job_id_;
uint32_t column_family_id_;
std::string column_family_name_;
Expand Down
35 changes: 21 additions & 14 deletions db/blob/blob_file_builder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckOneFile) {

BlobFileBuilder builder(
TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options,
&file_options_, job_id, column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
&file_options_, "" /*db_id*/, "" /*db_session_id*/, job_id,
column_family_id, column_family_name, io_priority, write_hint,
nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions);

std::vector<std::pair<std::string, std::string>> expected_key_value_pairs(
Expand Down Expand Up @@ -228,8 +229,9 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckMultipleFiles) {

BlobFileBuilder builder(
TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options,
&file_options_, job_id, column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
&file_options_, "" /*db_id*/, "" /*db_session_id*/, job_id,
column_family_id, column_family_name, io_priority, write_hint,
nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions);

std::vector<std::pair<std::string, std::string>> expected_key_value_pairs(
Expand Down Expand Up @@ -315,8 +317,9 @@ TEST_F(BlobFileBuilderTest, InlinedValues) {

BlobFileBuilder builder(
TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options,
&file_options_, job_id, column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
&file_options_, "" /*db_id*/, "" /*db_session_id*/, job_id,
column_family_id, column_family_name, io_priority, write_hint,
nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions);

for (size_t i = 0; i < number_of_blobs; ++i) {
Expand Down Expand Up @@ -369,8 +372,9 @@ TEST_F(BlobFileBuilderTest, Compression) {

BlobFileBuilder builder(
TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options,
&file_options_, job_id, column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
&file_options_, "" /*db_id*/, "" /*db_session_id*/, job_id,
column_family_id, column_family_name, io_priority, write_hint,
nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions);

const std::string key("1");
Expand Down Expand Up @@ -452,8 +456,9 @@ TEST_F(BlobFileBuilderTest, CompressionError) {

BlobFileBuilder builder(
TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options,
&file_options_, job_id, column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
&file_options_, "" /*db_id*/, "" /*db_session_id*/, job_id,
column_family_id, column_family_name, io_priority, write_hint,
nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions);

SyncPoint::GetInstance()->SetCallBack("CompressData:TamperWithReturnValue",
Expand Down Expand Up @@ -531,8 +536,9 @@ TEST_F(BlobFileBuilderTest, Checksum) {

BlobFileBuilder builder(
TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options,
&file_options_, job_id, column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
&file_options_, "" /*db_id*/, "" /*db_session_id*/, job_id,
column_family_id, column_family_name, io_priority, write_hint,
nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions);

const std::string key("1");
Expand Down Expand Up @@ -628,8 +634,9 @@ TEST_P(BlobFileBuilderIOErrorTest, IOError) {

BlobFileBuilder builder(
TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options,
&file_options_, job_id, column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
&file_options_, "" /*db_id*/, "" /*db_session_id*/, job_id,
column_family_id, column_family_name, io_priority, write_hint,
nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions);

SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) {
Expand Down
5 changes: 3 additions & 2 deletions db/blob/blob_source.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,16 @@ Status BlobSource::PutBlobIntoCache(const Slice& cache_key,
// self-contained, i.e. own their contents. The Cache has to be able to take
// unique ownership of them. Therefore, we copy the blob into a string
// directly, and insert that into the cache.
std::string* buf = new std::string();
std::unique_ptr<std::string> buf = std::make_unique<std::string>();
buf->assign(blob->data(), blob->size());

// TODO: support custom allocators and provide a better estimated memory
// usage using malloc_usable_size.
Cache::Handle* cache_handle = nullptr;
s = InsertEntryIntoCache(cache_key, buf, buf->size(), &cache_handle,
s = InsertEntryIntoCache(cache_key, buf.get(), buf->size(), &cache_handle,
priority);
if (s.ok()) {
buf.release();
assert(cache_handle != nullptr);
*cached_blob =
CacheHandleGuard<std::string>(blob_cache_.get(), cache_handle);
Expand Down
Loading

0 comments on commit ec4ebef

Please sign in to comment.