Skip to content

Commit

Permalink
Add TimedPut to stress test (#12559)
Browse files Browse the repository at this point in the history
Summary:
This also updates WriteBatch's protection info to include write time since there are several places in memtable that by default protects the whole value slice.

This PR is stacked on #12543

Pull Request resolved: #12559

Reviewed By: pdillinger

Differential Revision: D56308285

Pulled By: jowlyzhang

fbshipit-source-id: 5524339fe0dd6c918dc940ca2f0657b5f2111c56
  • Loading branch information
jowlyzhang authored and facebook-github-bot committed Apr 30, 2024
1 parent abd6751 commit 8b3d9e6
Show file tree
Hide file tree
Showing 11 changed files with 174 additions and 36 deletions.
30 changes: 23 additions & 7 deletions db/compaction/tiered_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1584,7 +1584,16 @@ TEST_F(PrecludeLastLevelTest, SmallPrecludeTime) {
Close();
}

TEST_F(PrecludeLastLevelTest, FastTrackTimedPutToLastLevel) {
// Test Param: protection_bytes_per_key for WriteBatch
class TimedPutPrecludeLastLevelTest
: public PrecludeLastLevelTest,
public testing::WithParamInterface<size_t> {
public:
TimedPutPrecludeLastLevelTest()
: PrecludeLastLevelTest("timed_put_preclude_last_level_test") {}
};

TEST_P(TimedPutPrecludeLastLevelTest, FastTrackTimedPutToLastLevel) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
const int kNumKeys = 100;
Expand All @@ -1598,6 +1607,8 @@ TEST_F(PrecludeLastLevelTest, FastTrackTimedPutToLastLevel) {
options.num_levels = kNumLevels;
options.last_level_temperature = Temperature::kCold;
DestroyAndReopen(options);
WriteOptions wo;
wo.protection_bytes_per_key = GetParam();

Random rnd(301);

Expand All @@ -1606,7 +1617,7 @@ TEST_F(PrecludeLastLevelTest, FastTrackTimedPutToLastLevel) {
});

for (int i = 0; i < kNumKeys / 2; i++) {
ASSERT_OK(Put(Key(i), rnd.RandomString(100)));
ASSERT_OK(Put(Key(i), rnd.RandomString(100), wo));
dbfull()->TEST_WaitForPeriodicTaskRun([&] {
mock_clock_->MockSleepForSeconds(static_cast<int>(rnd.Uniform(2)));
});
Expand All @@ -1620,7 +1631,7 @@ TEST_F(PrecludeLastLevelTest, FastTrackTimedPutToLastLevel) {
// These data are eligible to be put on the last level once written to db
// and compaction will fast track them to the last level.
for (int i = kNumKeys / 2; i < kNumKeys; i++) {
ASSERT_OK(TimedPut(0, Key(i), rnd.RandomString(100), 50));
ASSERT_OK(TimedPut(0, Key(i), rnd.RandomString(100), 50, wo));
}
ASSERT_OK(Flush());

Expand All @@ -1640,7 +1651,7 @@ TEST_F(PrecludeLastLevelTest, FastTrackTimedPutToLastLevel) {
Close();
}

TEST_F(PrecludeLastLevelTest, PreserveTimedPutOnPenultimateLevel) {
TEST_P(TimedPutPrecludeLastLevelTest, PreserveTimedPutOnPenultimateLevel) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.disable_auto_compactions = true;
Expand All @@ -1651,6 +1662,8 @@ TEST_F(PrecludeLastLevelTest, PreserveTimedPutOnPenultimateLevel) {
options.last_level_temperature = Temperature::kCold;
options.default_write_temperature = Temperature::kHot;
DestroyAndReopen(options);
WriteOptions wo;
wo.protection_bytes_per_key = GetParam();

// Creating a snapshot to manually control when preferred sequence number is
// swapped in. An entry's preferred seqno won't get swapped in until it's
Expand All @@ -1659,9 +1672,9 @@ TEST_F(PrecludeLastLevelTest, PreserveTimedPutOnPenultimateLevel) {
// the seqno in the internal keys.
auto* snap1 = db_->GetSnapshot();
// Start time: kMockStartTime = 10000000;
ASSERT_OK(TimedPut(0, Key(0), "v0", kMockStartTime - 1 * 24 * 60 * 60));
ASSERT_OK(TimedPut(0, Key(1), "v1", kMockStartTime - 1 * 24 * 60 * 60));
ASSERT_OK(TimedPut(0, Key(2), "v2", kMockStartTime - 1 * 24 * 60 * 60));
ASSERT_OK(TimedPut(0, Key(0), "v0", kMockStartTime - 1 * 24 * 60 * 60, wo));
ASSERT_OK(TimedPut(0, Key(1), "v1", kMockStartTime - 1 * 24 * 60 * 60, wo));
ASSERT_OK(TimedPut(0, Key(2), "v2", kMockStartTime - 1 * 24 * 60 * 60, wo));
ASSERT_OK(Flush());

// Should still be in penultimate level.
Expand Down Expand Up @@ -1696,6 +1709,9 @@ TEST_F(PrecludeLastLevelTest, PreserveTimedPutOnPenultimateLevel) {
Close();
}

INSTANTIATE_TEST_CASE_P(TimedPutPrecludeLastLevelTest,
TimedPutPrecludeLastLevelTest, ::testing::Values(0, 8));

TEST_F(PrecludeLastLevelTest, LastLevelOnlyCompactionPartial) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
Expand Down
4 changes: 3 additions & 1 deletion db/db_test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,9 @@ Status DBTestBase::TimedPut(const Slice& k, const Slice& v,

Status DBTestBase::TimedPut(int cf, const Slice& k, const Slice& v,
uint64_t write_unix_time, WriteOptions wo) {
WriteBatch wb;
WriteBatch wb(/*reserved_bytes=*/0, /*max_bytes=*/0,
wo.protection_bytes_per_key,
/*default_cf_ts_sz=*/0);
ColumnFamilyHandle* cfh;
if (cf != 0) {
cfh = handles_[cf];
Expand Down
48 changes: 35 additions & 13 deletions db/write_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -484,13 +484,22 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag,
return Status::Corruption("bad WriteBatch TimedPut");
}
FALLTHROUGH_INTENDED;
case kTypeValuePreferredSeqno:
case kTypeValuePreferredSeqno: {
Slice packed_value;
if (!GetLengthPrefixedSlice(input, key) ||
!GetLengthPrefixedSlice(input, value) ||
!GetFixed64(input, write_unix_time)) {
!GetLengthPrefixedSlice(input, &packed_value)) {
return Status::Corruption("bad WriteBatch TimedPut");
}
if (write_unix_time) {
std::tie(*value, *write_unix_time) =
ParsePackedValueWithWriteTime(packed_value);
} else {
// Caller doesn't want to unpack write_unix_time, so keep it packed in
// the value.
*value = packed_value;
}
break;
}
default:
return Status::Corruption("unknown WriteBatch tag");
}
Expand Down Expand Up @@ -883,12 +892,11 @@ Status WriteBatchInternal::TimedPut(WriteBatch* b, uint32_t column_family_id,
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValuePreferredSeqno));
PutVarint32(&b->rep_, column_family_id);
}
std::string value_buf;
Slice packed_value =
PackValueAndWriteTime(value, write_unix_time, &value_buf);
PutLengthPrefixedSlice(&b->rep_, key);
PutLengthPrefixedSlice(&b->rep_, value);
// For a kTypeValuePreferredSeqno entry, its write time is encoded separately
// from value in an encoded WriteBatch. They are packed into one value Slice
// once it's written to the database.
PutFixed64(&b->rep_, write_unix_time);
PutLengthPrefixedSlice(&b->rep_, packed_value);

b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_TIMED_PUT,
Expand All @@ -899,7 +907,7 @@ Status WriteBatchInternal::TimedPut(WriteBatch* b, uint32_t column_family_id,
// `kTypeColumnFamilyValuePreferredSeqno` here.
b->prot_info_->entries_.emplace_back(
ProtectionInfo64()
.ProtectKVO(key, value, kTypeValuePreferredSeqno)
.ProtectKVO(key, packed_value, kTypeValuePreferredSeqno)
.ProtectC(column_family_id));
}
return save.commit();
Expand Down Expand Up @@ -1779,7 +1787,6 @@ Status WriteBatch::VerifyChecksum() const {
Slice input(rep_.data() + WriteBatchInternal::kHeader,
rep_.size() - WriteBatchInternal::kHeader);
Slice key, value, blob, xid;
uint64_t unix_write_time = 0;
char tag = 0;
uint32_t column_family = 0; // default
Status s;
Expand All @@ -1792,7 +1799,7 @@ Status WriteBatch::VerifyChecksum() const {
value.clear();
column_family = 0;
s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
&blob, &xid, &unix_write_time);
&blob, &xid, /*write_unix_time=*/nullptr);
if (!s.ok()) {
return s;
}
Expand Down Expand Up @@ -3167,8 +3174,12 @@ class ProtectionInfoUpdater : public WriteBatch::Handler {
}

Status TimedPutCF(uint32_t cf, const Slice& key, const Slice& val,
uint64_t /*unix_write_time*/) override {
return UpdateProtInfo(cf, key, val, kTypeValuePreferredSeqno);
uint64_t unix_write_time) override {
std::string encoded_write_time;
PutFixed64(&encoded_write_time, unix_write_time);
std::array<Slice, 2> value_with_time{{val, encoded_write_time}};
SliceParts packed_value(value_with_time.data(), 2);
return UpdateProtInfo(cf, key, packed_value, kTypeValuePreferredSeqno);
}

Status PutEntityCF(uint32_t cf, const Slice& key,
Expand Down Expand Up @@ -3227,6 +3238,17 @@ class ProtectionInfoUpdater : public WriteBatch::Handler {
return Status::OK();
}

Status UpdateProtInfo(uint32_t cf, const Slice& key, const SliceParts& val,
const ValueType op_type) {
if (prot_info_) {
prot_info_->entries_.emplace_back(
ProtectionInfo64()
.ProtectKVO(SliceParts(&key, 1), val, op_type)
.ProtectC(cf));
}
return Status::OK();
}

// No copy or move.
ProtectionInfoUpdater(const ProtectionInfoUpdater&) = delete;
ProtectionInfoUpdater(ProtectionInfoUpdater&&) = delete;
Expand Down
26 changes: 18 additions & 8 deletions db_stress_tool/batched_ops_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class BatchedOpsStressTest : public StressTest {
ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]];
assert(cfh);

Status status;
for (int i = 9; i >= 0; --i) {
const std::string num = std::to_string(i);

Expand All @@ -51,28 +52,37 @@ class BatchedOpsStressTest : public StressTest {
// batched, non-batched, CF consistency).
const std::string k = num + key_body;
const std::string v = value_body + num;

if (FLAGS_use_put_entity_one_in > 0 &&
(value_base % FLAGS_use_put_entity_one_in) == 0) {
batch.PutEntity(cfh, k, GenerateWideColumns(value_base, v));
status = batch.PutEntity(cfh, k, GenerateWideColumns(value_base, v));
} else if (FLAGS_use_timed_put_one_in > 0 &&
((value_base + kLargePrimeForCommonFactorSkew) %
FLAGS_use_timed_put_one_in) == 0) {
uint64_t write_unix_time = GetWriteUnixTime(thread);
status = batch.TimedPut(cfh, k, v, write_unix_time);
} else if (FLAGS_use_merge) {
batch.Merge(cfh, k, v);
status = batch.Merge(cfh, k, v);
} else {
batch.Put(cfh, k, v);
status = batch.Put(cfh, k, v);
}
if (!status.ok()) {
break;
}
}

const Status s = db_->Write(write_opts, &batch);
if (status.ok()) {
status = db_->Write(write_opts, &batch);
}

if (!s.ok()) {
fprintf(stderr, "multiput error: %s\n", s.ToString().c_str());
if (!status.ok()) {
fprintf(stderr, "multiput error: %s\n", status.ToString().c_str());
thread->stats.AddErrors(1);
} else {
// we did 10 writes each of size sz + 1
thread->stats.AddBytesForWrites(10, (sz + 1) * 10);
}

return s;
return status;
}

// Given a key K, this deletes ("0"+K), ("1"+K), ..., ("9"+K)
Expand Down
26 changes: 19 additions & 7 deletions db_stress_tool/cf_consistency_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,31 +36,43 @@ class CfConsistencyStressTest : public StressTest {

WriteBatch batch;

Status status;
for (auto cf : rand_column_families) {
ColumnFamilyHandle* const cfh = column_families_[cf];
assert(cfh);

if (FLAGS_use_put_entity_one_in > 0 &&
(value_base % FLAGS_use_put_entity_one_in) == 0) {
batch.PutEntity(cfh, k, GenerateWideColumns(value_base, v));
status = batch.PutEntity(cfh, k, GenerateWideColumns(value_base, v));
} else if (FLAGS_use_timed_put_one_in > 0 &&
((value_base + kLargePrimeForCommonFactorSkew) %
FLAGS_use_timed_put_one_in) == 0) {
uint64_t write_unix_time = GetWriteUnixTime(thread);
status = batch.TimedPut(cfh, k, v, write_unix_time);
} else if (FLAGS_use_merge) {
batch.Merge(cfh, k, v);
status = batch.Merge(cfh, k, v);
} else {
batch.Put(cfh, k, v);
status = batch.Put(cfh, k, v);
}
if (!status.ok()) {
break;
}
}

Status s = db_->Write(write_opts, &batch);
if (status.ok()) {
status = db_->Write(write_opts, &batch);
}

if (!s.ok()) {
fprintf(stderr, "multi put or merge error: %s\n", s.ToString().c_str());
if (!status.ok()) {
fprintf(stderr, "multi put or merge error: %s\n",
status.ToString().c_str());
thread->stats.AddErrors(1);
} else {
auto num = static_cast<long>(rand_column_families.size());
thread->stats.AddBytesForWrites(num, (sz + 1) * num);
}

return s;
return status;
}

Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
Expand Down
21 changes: 21 additions & 0 deletions db_stress_tool/db_stress_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,27 @@ std::string GetNowNanos() {
return ret;
}

uint64_t GetWriteUnixTime(ThreadState* thread) {
static uint64_t kPreserveSeconds =
std::max(FLAGS_preserve_internal_time_seconds,
FLAGS_preclude_last_level_data_seconds);
static uint64_t kFallbackTime = std::numeric_limits<uint64_t>::max();
int64_t write_time = 0;
Status s = db_stress_env->GetCurrentTime(&write_time);
uint32_t write_time_mode = thread->rand.Uniform(3);
if (write_time_mode == 0 || !s.ok()) {
return kFallbackTime;
} else if (write_time_mode == 1) {
uint64_t delta = kPreserveSeconds > 0
? static_cast<uint64_t>(thread->rand.Uniform(
static_cast<int>(kPreserveSeconds)))
: 0;
return static_cast<uint64_t>(write_time) - delta;
} else {
return static_cast<uint64_t>(write_time) - kPreserveSeconds;
}
}

namespace {

class MyXXH64Checksum : public FileChecksumGenerator {
Expand Down
4 changes: 4 additions & 0 deletions db_stress_tool/db_stress_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ DECLARE_uint32(bottommost_file_compaction_delay);
// Tiered storage
DECLARE_int64(preclude_last_level_data_seconds);
DECLARE_int64(preserve_internal_time_seconds);
DECLARE_uint32(use_timed_put_one_in);

DECLARE_int32(verify_iterator_with_expected_state_one_in);
DECLARE_bool(preserve_unverified_changes);
Expand Down Expand Up @@ -414,6 +415,7 @@ DECLARE_bool(inplace_update_support);
constexpr long KB = 1024;
constexpr int kRandomValueMaxFactor = 3;
constexpr int kValueMaxLen = 100;
constexpr uint32_t kLargePrimeForCommonFactorSkew = 1872439133;

// wrapped posix environment
extern ROCKSDB_NAMESPACE::Env* db_stress_env;
Expand Down Expand Up @@ -766,6 +768,8 @@ int64_t GetOneHotKeyID(double rand_seed, int64_t max_key);

std::string GetNowNanos();

uint64_t GetWriteUnixTime(ThreadState* thread);

std::shared_ptr<FileChecksumGenFactory> GetFileChecksumImpl(
const std::string& name);

Expand Down
4 changes: 4 additions & 0 deletions db_stress_tool/db_stress_gflags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,10 @@ DEFINE_int64(
preserve_internal_time_seconds, 0,
"Preserve internal time information which is attached to each SST.");

DEFINE_uint32(use_timed_put_one_in, 0,
"If greater than zero, TimedPut is used per every N write ops on "
"on average.");

static const bool FLAGS_subcompactions_dummy __attribute__((__unused__)) =
RegisterFlagValidator(&FLAGS_subcompactions, &ValidateUint32Range);

Expand Down
22 changes: 22 additions & 0 deletions db_stress_tool/expected_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,28 @@ class ExpectedStateTraceRecordHandler : public TraceRecord::Handler,
return Status::OK();
}

Status TimedPutCF(uint32_t column_family_id, const Slice& key_with_ts,
const Slice& value, uint64_t write_unix_time) override {
Slice key =
StripTimestampFromUserKey(key_with_ts, FLAGS_user_timestamp_size);
uint64_t key_id;
if (!GetIntVal(key.ToString(), &key_id)) {
return Status::Corruption("unable to parse key", key.ToString());
}
uint32_t value_base = GetValueBase(value);

bool should_buffer_write = !(buffered_writes_ == nullptr);
if (should_buffer_write) {
return WriteBatchInternal::TimedPut(buffered_writes_.get(),
column_family_id, key, value,
write_unix_time);
}

state_->SyncPut(column_family_id, static_cast<int64_t>(key_id), value_base);
++num_write_ops_;
return Status::OK();
}

Status PutEntityCF(uint32_t column_family_id, const Slice& key_with_ts,
const Slice& entity) override {
Slice key =
Expand Down
Loading

0 comments on commit 8b3d9e6

Please sign in to comment.