diff --git a/db_stress_tool/db_stress_common.cc b/db_stress_tool/db_stress_common.cc index 7ee722de3e2..9798ccca371 100644 --- a/db_stress_tool/db_stress_common.cc +++ b/db_stress_tool/db_stress_common.cc @@ -148,42 +148,6 @@ void DbVerificationThread(void* v) { } } -void TimestampedSnapshotsThread(void* v) { - assert(FLAGS_create_timestamped_snapshot_one_in > 0); - auto* thread = reinterpret_cast(v); - assert(thread); - SharedState* shared = thread->shared; - assert(shared); - StressTest* stress_test = shared->GetStressTest(); - assert(stress_test); - while (true) { - { - MutexLock l(shared->GetMutex()); - if (shared->ShouldStopBgThread()) { - shared->IncBgThreadsFinished(); - if (shared->BgThreadsFinished()) { - shared->GetCondVar()->SignalAll(); - } - return; - } - } - - uint64_t now = db_stress_env->NowNanos(); - std::pair> res = - stress_test->CreateTimestampedSnapshot(now); - if (res.first.ok()) { - assert(res.second); - assert(res.second->GetTimestamp() == now); - } else { - assert(!res.second); - } - constexpr uint64_t time_diff = static_cast(1000) * 1000 * 1000; - stress_test->ReleaseOldTimestampedSnapshots(now - time_diff); - - db_stress_env->SleepForMicroseconds(1000 * 1000); - } -} - void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz) { if (!FLAGS_verbose) { return; diff --git a/db_stress_tool/db_stress_driver.cc b/db_stress_tool/db_stress_driver.cc index 06aa1c8239a..009168ae376 100644 --- a/db_stress_tool/db_stress_driver.cc +++ b/db_stress_tool/db_stress_driver.cc @@ -84,10 +84,6 @@ bool RunStressTest(StressTest* stress) { shared.IncBgThreads(); } - if (FLAGS_create_timestamped_snapshot_one_in > 0) { - shared.IncBgThreads(); - } - std::vector threads(n); for (uint32_t i = 0; i < n; i++) { threads[i] = new ThreadState(i, &shared); @@ -105,12 +101,6 @@ bool RunStressTest(StressTest* stress) { &continuous_verification_thread); } - ThreadState timestamped_snapshots_thread(0, &shared); - if (FLAGS_create_timestamped_snapshot_one_in > 0) { - db_stress_env->StartThread(TimestampedSnapshotsThread, - ×tamped_snapshots_thread); - } - // Each thread goes through the following states: // initializing -> wait for others to init -> read/populate/depopulate // wait for others to operate -> verify -> done @@ -179,8 +169,7 @@ bool RunStressTest(StressTest* stress) { stress->PrintStatistics(); if (FLAGS_compaction_thread_pool_adjust_interval > 0 || - FLAGS_continuous_verification_interval > 0 || - FLAGS_create_timestamped_snapshot_one_in > 0) { + FLAGS_continuous_verification_interval > 0) { MutexLock l(shared.GetMutex()); shared.SetShouldStopBgThread(); while (!shared.BgThreadsFinished()) { diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 833ae9fc88c..cc40eac117f 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -421,35 +421,6 @@ void StressTest::PrintStatistics() { } } -void StressTest::ReleaseOldTimestampedSnapshots(uint64_t ts) { -#ifndef ROCKSDB_LITE - if (!txn_db_) { - return; - } - assert(txn_db_); - txn_db_->ReleaseTimestampedSnapshotsOlderThan(ts); -#else - (void)ts; - fprintf(stderr, "timestamped snapshots not supported in LITE mode\n"); - exit(1); -#endif // ROCKSDB_LITE -} - -std::pair> -StressTest::CreateTimestampedSnapshot(uint64_t ts) { -#ifndef ROCKSDB_LITE - if (!txn_db_) { - return std::make_pair(Status::InvalidArgument(), nullptr); - } - assert(txn_db_); - return txn_db_->CreateTimestampedSnapshot(ts); -#else - (void)ts; - fprintf(stderr, "timestamped snapshots not supported in LITE mode\n"); - exit(1); -#endif // ROCKSDB_LITE -} - // Currently PreloadDb has to be single-threaded. void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys, SharedState* shared) { @@ -594,6 +565,7 @@ Status StressTest::CommitTxn(Transaction* txn, ThreadState* thread) { if (!FLAGS_use_txn) { return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set"); } + assert(txn_db_); Status s = txn->Prepare(); std::shared_ptr timestamped_snapshot; if (s.ok()) { @@ -602,10 +574,32 @@ Status StressTest::CommitTxn(Transaction* txn, ThreadState* thread) { uint64_t ts = db_stress_env->NowNanos(); s = txn->CommitAndTryCreateSnapshot(/*notifier=*/nullptr, ts, ×tamped_snapshot); + + std::pair> res; + if (thread->tid == 0) { + uint64_t now = db_stress_env->NowNanos(); + res = txn_db_->CreateTimestampedSnapshot(now); + if (res.first.ok()) { + assert(res.second); + assert(res.second->GetTimestamp() == now); + if (timestamped_snapshot) { + assert(res.second->GetTimestamp() > + timestamped_snapshot->GetTimestamp()); + } + } else { + assert(!res.second); + } + } } else { s = txn->Commit(); } } + if (thread && FLAGS_create_timestamped_snapshot_one_in > 0 && + thread->rand.OneInOpt(50000)) { + uint64_t now = db_stress_env->NowNanos(); + constexpr uint64_t time_diff = static_cast(1000) * 1000 * 1000; + txn_db_->ReleaseTimestampedSnapshotsOlderThan(now - time_diff); + } delete txn; return s; } diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index 94cefaeb7e7..a7c7c68ec3a 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -43,11 +43,6 @@ class StressTest { void PrintStatistics(); - void ReleaseOldTimestampedSnapshots(uint64_t ts); - - std::pair> CreateTimestampedSnapshot( - uint64_t ts); - protected: Status AssertSame(DB* db, ColumnFamilyHandle* cf, ThreadState::SnapshotState& snap_state); diff --git a/db_stress_tool/multi_ops_txns_stress.cc b/db_stress_tool/multi_ops_txns_stress.cc index 95f883ec08d..78df01cc344 100644 --- a/db_stress_tool/multi_ops_txns_stress.cc +++ b/db_stress_tool/multi_ops_txns_stress.cc @@ -1382,6 +1382,13 @@ Status MultiOpsTxnsStressTest::CommitAndCreateTimestampedSnapshotIfNeeded( } else { s = txn.Commit(); } + assert(txn_db_); + if (FLAGS_create_timestamped_snapshot_one_in > 0 && + thread->rand.OneInOpt(50000)) { + uint64_t now = db_stress_env->NowNanos(); + constexpr uint64_t time_diff = static_cast(1000) * 1000 * 1000; + txn_db_->ReleaseTimestampedSnapshotsOlderThan(now - time_diff); + } return s; }