Skip to content

Commit

Permalink
Fix periodic_task unable to re-register the same task type (#10379)
Browse files Browse the repository at this point in the history
Summary:
Timer has a limitation that it cannot re-register a task with the same name,
because the cancel only mark the task as invalid and wait for the Timer thread
to clean it up later, before the task is cleaned up, the same task name cannot
be added. Which makes the task option update likely to fail, which basically
cancel and re-register the same task name. Change the periodic task name to a
random unique id and store it in periodic_task_scheduler.

Also refactor the `periodic_work` to `periodic_task` to make each job function
as a `task`.

Pull Request resolved: facebook/rocksdb#10379

Test Plan: unittests

Reviewed By: ajkr

Differential Revision: D38000615

Pulled By: jay-zhuang

fbshipit-source-id: e4135f9422e3b53aaec8eda54f4e18ce633a279e
  • Loading branch information
jay-zhuang authored and facebook-github-bot committed Aug 26, 2022
1 parent 3f57d84 commit d9e71fb
Show file tree
Hide file tree
Showing 20 changed files with 457 additions and 490 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ set(SOURCES
db/merge_helper.cc
db/merge_operator.cc
db/output_validator.cc
db/periodic_work_scheduler.cc
db/periodic_task_scheduler.cc
db/range_del_aggregator.cc
db/range_tombstone_fragmenter.cc
db/repair.cc
Expand Down Expand Up @@ -1297,7 +1297,7 @@ if(WITH_TESTS)
db/merge_test.cc
db/options_file_test.cc
db/perf_context_test.cc
db/periodic_work_scheduler_test.cc
db/periodic_task_scheduler_test.cc
db/plain_table_db_test.cc
db/seqno_time_test.cc
db/prefix_test.cc
Expand Down
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
## Unreleased
### Bug Fixes
* Fixed bug where `FlushWAL(true /* sync */)` (used by `GetLiveFilesStorageInfo()`, which is used by checkpoint and backup) could cause parallel writes at the tail of a WAL file to never be synced.
* Fix periodic_task unable to re-register the same task type, which may cause `SetOptions()` fail to update periodical_task time like: `stats_dump_period_sec`, `stats_persist_period_sec`.

### Public API changes
* Add `rocksdb_column_family_handle_get_id`, `rocksdb_column_family_handle_get_name` to get name, id of column family in C API
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1882,7 +1882,7 @@ blob_garbage_meter_test: $(OBJ_DIR)/db/blob/blob_garbage_meter_test.o $(TEST_LIB
timer_test: $(OBJ_DIR)/util/timer_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

periodic_work_scheduler_test: $(OBJ_DIR)/db/periodic_work_scheduler_test.o $(TEST_LIBRARY) $(LIBRARY)
periodic_task_scheduler_test: $(OBJ_DIR)/db/periodic_task_scheduler_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

testutil_test: $(OBJ_DIR)/test_util/testutil_test.o $(TEST_LIBRARY) $(LIBRARY)
Expand Down
8 changes: 4 additions & 4 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"db/merge_helper.cc",
"db/merge_operator.cc",
"db/output_validator.cc",
"db/periodic_work_scheduler.cc",
"db/periodic_task_scheduler.cc",
"db/range_del_aggregator.cc",
"db/range_tombstone_fragmenter.cc",
"db/repair.cc",
Expand Down Expand Up @@ -421,7 +421,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"db/merge_helper.cc",
"db/merge_operator.cc",
"db/output_validator.cc",
"db/periodic_work_scheduler.cc",
"db/periodic_task_scheduler.cc",
"db/range_del_aggregator.cc",
"db/range_tombstone_fragmenter.cc",
"db/repair.cc",
Expand Down Expand Up @@ -5600,8 +5600,8 @@ cpp_unittest_wrapper(name="perf_context_test",
extra_compiler_flags=[])


cpp_unittest_wrapper(name="periodic_work_scheduler_test",
srcs=["db/periodic_work_scheduler_test.cc"],
cpp_unittest_wrapper(name="periodic_task_scheduler_test",
srcs=["db/periodic_task_scheduler_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])

Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/compacted_db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ Status CompactedDBImpl::Open(const Options& options,
std::unique_ptr<CompactedDBImpl> db(new CompactedDBImpl(db_options, dbname));
Status s = db->Init(options);
if (s.ok()) {
s = db->StartPeriodicWorkScheduler();
s = db->StartPeriodicTaskScheduler();
}
if (s.ok()) {
ROCKS_LOG_INFO(db->immutable_db_options_.info_log,
Expand Down
141 changes: 88 additions & 53 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
#include "db/periodic_work_scheduler.h"
#include "db/periodic_task_scheduler.h"
#include "db/range_tombstone_fragmenter.h"
#include "db/table_cache.h"
#include "db/table_properties_collector.h"
Expand Down Expand Up @@ -217,7 +217,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
refitting_level_(false),
opened_successfully_(false),
#ifndef ROCKSDB_LITE
periodic_work_scheduler_(nullptr),
periodic_task_scheduler_(),
#endif // ROCKSDB_LITE
two_write_queues_(options.two_write_queues),
manual_wal_flush_(options.manual_wal_flush),
Expand Down Expand Up @@ -260,6 +260,18 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
SetDbSessionId();
assert(!db_session_id_.empty());

#ifndef ROCKSDB_LITE
periodic_task_functions_.emplace(PeriodicTaskType::kDumpStats,
[this]() { this->DumpStats(); });
periodic_task_functions_.emplace(PeriodicTaskType::kPersistStats,
[this]() { this->PersistStats(); });
periodic_task_functions_.emplace(PeriodicTaskType::kFlushInfoLog,
[this]() { this->FlushInfoLog(); });
periodic_task_functions_.emplace(
PeriodicTaskType::kRecordSeqnoTime,
[this]() { this->RecordSeqnoToTimeMapping(); });
#endif // ROCKSDB_LITE

versions_.reset(new VersionSet(dbname_, &immutable_db_options_, file_options_,
table_cache_.get(), write_buffer_manager_,
&write_controller_, &block_cache_tracer_,
Expand Down Expand Up @@ -480,9 +492,15 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
"Shutdown: canceling all background work");

#ifndef ROCKSDB_LITE
if (periodic_work_scheduler_ != nullptr) {
periodic_work_scheduler_->Unregister(this);
periodic_work_scheduler_->UnregisterRecordSeqnoTimeWorker(this);
for (uint8_t task_type = 0;
task_type < static_cast<uint8_t>(PeriodicTaskType::kMax); task_type++) {
Status s = periodic_task_scheduler_.Unregister(
static_cast<PeriodicTaskType>(task_type));
if (!s.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Failed to unregister periodic task %d, status: %s",
task_type, s.ToString().c_str());
}
}
#endif // !ROCKSDB_LITE

Expand Down Expand Up @@ -767,40 +785,57 @@ void DBImpl::PrintStatistics() {
}
}

Status DBImpl::StartPeriodicWorkScheduler() {
Status DBImpl::StartPeriodicTaskScheduler() {
#ifndef ROCKSDB_LITE

#ifndef NDEBUG
// It only used by test to disable scheduler
bool disable_scheduler = false;
TEST_SYNC_POINT_CALLBACK(
"DBImpl::StartPeriodicWorkScheduler:DisableScheduler",
"DBImpl::StartPeriodicTaskScheduler:DisableScheduler",
&disable_scheduler);
if (disable_scheduler) {
return Status::OK();
}
#endif // !NDEBUG

{
InstrumentedMutexLock l(&mutex_);
periodic_work_scheduler_ = PeriodicWorkScheduler::Default();
TEST_SYNC_POINT_CALLBACK("DBImpl::StartPeriodicWorkScheduler:Init",
&periodic_work_scheduler_);
TEST_SYNC_POINT_CALLBACK("DBImpl::StartPeriodicTaskScheduler:Init",
&periodic_task_scheduler_);
}

return periodic_work_scheduler_->Register(
this, mutable_db_options_.stats_dump_period_sec,
mutable_db_options_.stats_persist_period_sec);
#endif // !NDEBUG
if (mutable_db_options_.stats_dump_period_sec > 0) {
Status s = periodic_task_scheduler_.Register(
PeriodicTaskType::kDumpStats,
periodic_task_functions_.at(PeriodicTaskType::kDumpStats),
mutable_db_options_.stats_dump_period_sec);
if (!s.ok()) {
return s;
}
}
if (mutable_db_options_.stats_persist_period_sec > 0) {
Status s = periodic_task_scheduler_.Register(
PeriodicTaskType::kPersistStats,
periodic_task_functions_.at(PeriodicTaskType::kPersistStats),
mutable_db_options_.stats_persist_period_sec);
if (!s.ok()) {
return s;
}
}

Status s = periodic_task_scheduler_.Register(
PeriodicTaskType::kFlushInfoLog,
periodic_task_functions_.at(PeriodicTaskType::kFlushInfoLog));

return s;
#else
return Status::OK();
#endif // !ROCKSDB_LITE
}

Status DBImpl::RegisterRecordSeqnoTimeWorker() {
#ifndef ROCKSDB_LITE
if (!periodic_work_scheduler_) {
return Status::OK();
}
uint64_t min_time_duration = std::numeric_limits<uint64_t>::max();
uint64_t max_time_duration = std::numeric_limits<uint64_t>::min();
{
Expand Down Expand Up @@ -828,26 +863,13 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker() {
}

Status s;
if (seqno_time_cadence != record_seqno_time_cadence_) {
if (seqno_time_cadence == 0) {
periodic_work_scheduler_->UnregisterRecordSeqnoTimeWorker(this);
} else {
s = periodic_work_scheduler_->RegisterRecordSeqnoTimeWorker(
this, seqno_time_cadence);
}

if (s.ok()) {
record_seqno_time_cadence_ = seqno_time_cadence;
}

if (s.IsNotSupported()) {
// TODO: Fix the timer cannot cancel and re-add the same task
ROCKS_LOG_WARN(
immutable_db_options_.info_log,
"Updating seqno to time worker cadence is not supported yet, to make "
"the change effective, please reopen the DB instance.");
s = Status::OK();
}
if (seqno_time_cadence == 0) {
s = periodic_task_scheduler_.Unregister(PeriodicTaskType::kRecordSeqnoTime);
} else {
s = periodic_task_scheduler_.Register(
PeriodicTaskType::kRecordSeqnoTime,
periodic_task_functions_.at(PeriodicTaskType::kRecordSeqnoTime),
seqno_time_cadence);
}

return s;
Expand Down Expand Up @@ -1087,6 +1109,10 @@ void DBImpl::DumpStats() {
PrintStatistics();
}

// Periodically flush info log out of application buffer at a low frequency.
// This improves debuggability in case of RocksDB hanging since it ensures the
// log messages leading up to the hang will eventually become visible in the
// log.
void DBImpl::FlushInfoLog() {
if (shutdown_initiated_) {
return;
Expand Down Expand Up @@ -1279,22 +1305,36 @@ Status DBImpl::SetDBOptions(
MaybeScheduleFlushOrCompaction();
}

if (new_options.stats_dump_period_sec !=
mutable_db_options_.stats_dump_period_sec ||
new_options.stats_persist_period_sec !=
mutable_db_options_.stats_persist_period_sec) {
mutex_.Unlock();
periodic_work_scheduler_->Unregister(this);
s = periodic_work_scheduler_->Register(
this, new_options.stats_dump_period_sec,
new_options.stats_persist_period_sec);
mutex_.Lock();
mutex_.Unlock();
if (new_options.stats_dump_period_sec == 0) {
s = periodic_task_scheduler_.Unregister(PeriodicTaskType::kDumpStats);
} else {
s = periodic_task_scheduler_.Register(
PeriodicTaskType::kDumpStats,
periodic_task_functions_.at(PeriodicTaskType::kDumpStats),
new_options.stats_dump_period_sec);
}
if (new_options.max_total_wal_size !=
mutable_db_options_.max_total_wal_size) {
max_total_wal_size_.store(new_options.max_total_wal_size,
std::memory_order_release);
}
if (s.ok()) {
if (new_options.stats_persist_period_sec == 0) {
s = periodic_task_scheduler_.Unregister(
PeriodicTaskType::kPersistStats);
} else {
s = periodic_task_scheduler_.Register(
PeriodicTaskType::kPersistStats,
periodic_task_functions_.at(PeriodicTaskType::kPersistStats),
new_options.stats_persist_period_sec);
}
}
mutex_.Lock();
if (!s.ok()) {
return s;
}

write_controller_.set_max_delayed_write_rate(
new_options.delayed_write_rate);
table_cache_.get()->SetCapacity(new_options.max_open_files == -1
Expand Down Expand Up @@ -3043,12 +3083,7 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
} // InstrumentedMutexLock l(&mutex_)

if (cf_options.preclude_last_level_data_seconds > 0) {
// TODO(zjay): Fix the timer issue and re-enable this.
ROCKS_LOG_ERROR(
immutable_db_options_.info_log,
"Creating column family with `preclude_last_level_data_seconds` needs "
"to restart DB to take effect");
// s = RegisterRecordSeqnoTimeWorker();
s = RegisterRecordSeqnoTimeWorker();
}
sv_context.Clean();
// this is outside the mutex
Expand Down
24 changes: 9 additions & 15 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "db/log_writer.h"
#include "db/logs_with_prep_tracker.h"
#include "db/memtable_list.h"
#include "db/periodic_task_scheduler.h"
#include "db/post_memtable_callback.h"
#include "db/pre_release_callback.h"
#include "db/range_del_aggregator.h"
Expand Down Expand Up @@ -75,10 +76,6 @@ class ArenaWrappedDBIter;
class InMemoryStatsHistoryIterator;
class MemTable;
class PersistentStatsHistoryIterator;
class PeriodicWorkScheduler;
#ifndef NDEBUG
class PeriodicWorkTestScheduler;
#endif // !NDEBUG
class TableCache;
class TaskLimiterToken;
class Version;
Expand Down Expand Up @@ -1147,7 +1144,7 @@ class DBImpl : public DB {
int TEST_BGCompactionsAllowed() const;
int TEST_BGFlushesAllowed() const;
size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
void TEST_WaitForPeridicWorkerRun(std::function<void()> callback) const;
void TEST_WaitForPeridicTaskRun(std::function<void()> callback) const;
SeqnoToTimeMapping TEST_GetSeqnoToTimeMapping() const;
size_t TEST_EstimateInMemoryStatsHistorySize() const;

Expand All @@ -1162,7 +1159,7 @@ class DBImpl : public DB {
}

#ifndef ROCKSDB_LITE
PeriodicWorkTestScheduler* TEST_GetPeriodicWorkScheduler() const;
const PeriodicTaskScheduler& TEST_GetPeriodicTaskScheduler() const;
#endif // !ROCKSDB_LITE

#endif // NDEBUG
Expand Down Expand Up @@ -2069,7 +2066,7 @@ class DBImpl : public DB {
LogBuffer* log_buffer);

// Schedule background tasks
Status StartPeriodicWorkScheduler();
Status StartPeriodicTaskScheduler();

Status RegisterRecordSeqnoTimeWorker();

Expand Down Expand Up @@ -2611,14 +2608,11 @@ class DBImpl : public DB {

#ifndef ROCKSDB_LITE
// Scheduler to run DumpStats(), PersistStats(), and FlushInfoLog().
// Currently, it always use a global instance from
// PeriodicWorkScheduler::Default(). Only in unittest, it can be overrided by
// PeriodicWorkTestScheduler.
PeriodicWorkScheduler* periodic_work_scheduler_;

// Current cadence of the periodic worker for recording sequence number to
// time.
uint64_t record_seqno_time_cadence_ = 0;
// Currently, internally it has a global timer instance for running the tasks.
PeriodicTaskScheduler periodic_task_scheduler_;

// It contains the implementations for each periodic task.
std::map<PeriodicTaskType, const PeriodicTaskFunc> periodic_task_functions_;
#endif

// When set, we use a separate queue for writes that don't write to memtable.
Expand Down
Loading

0 comments on commit d9e71fb

Please sign in to comment.