Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/protos/msgbus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,7 @@ message TTestShardControlRequest {
repeated TSizeInterval Sizes = 6; // distrubution of generated value size
repeated TTimeInterval WritePeriods = 7; // time between two events
repeated TTimeInterval RestartPeriods = 8; // time between automatic restarts
optional uint32 PatchRequestsFractionPPM = 12;
}

optional uint64 TabletId = 1;
Expand Down
27 changes: 21 additions & 6 deletions ydb/core/test_tablet/load_actor_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ namespace NKikimr::NTestShard {
return;
}
if (StallCounter > 500) {
if (WritesInFlight.empty() && DeletesInFlight.empty() && ReadsInFlight.empty() && TransitionInFlight.empty()) {
if (WritesInFlight.empty() && PatchesInFlight.empty() && DeletesInFlight.empty() && ReadsInFlight.empty() &&
TransitionInFlight.empty()) {
StallCounter = 0;
} else {
return;
Expand All @@ -70,17 +71,23 @@ namespace NKikimr::NTestShard {
barrier = Settings.GetValidateAfterBytes();
}
if (BytesProcessed > barrier) { // time to perform validation
if (WritesInFlight.empty() && DeletesInFlight.empty() && ReadsInFlight.empty() && TransitionInFlight.empty()) {
if (WritesInFlight.empty() && PatchesInFlight.empty() && DeletesInFlight.empty() && ReadsInFlight.empty() &&
TransitionInFlight.empty()) {
RunValidation(false);
}
} else { // resume load
const TMonotonic now = TActivationContext::Monotonic();

bool canWriteMore = false;
if (WritesInFlight.size() < Settings.GetMaxInFlight()) {
if (WritesInFlight.size() + PatchesInFlight.size() < Settings.GetMaxInFlight()) {
if (NextWriteTimestamp <= now) {
IssueWrite();
if (WritesInFlight.size() < Settings.GetMaxInFlight() || !Settings.GetResetWritePeriodOnFull()) {
if (Settings.HasPatchRequestsFractionPPM() && !ConfirmedKeys.empty() &&
RandomNumber(1'000'000u) < Settings.GetPatchRequestsFractionPPM()) {
IssuePatch();
} else {
IssueWrite();
}
if (WritesInFlight.size() + PatchesInFlight.size() < Settings.GetMaxInFlight() || !Settings.GetResetWritePeriodOnFull()) {
NextWriteTimestamp += GenerateRandomInterval(Settings.GetWritePeriods());
canWriteMore = NextWriteTimestamp <= now;
} else {
Expand Down Expand Up @@ -177,6 +184,13 @@ namespace NKikimr::NTestShard {
}
WritesInFlight.erase(it);
}
if (auto nh = PatchesInFlight.extract(record.GetCookie())) {
const TString& key = nh.mapped();
const auto it = Keys.find(key);
Y_VERIFY_S(it != Keys.end(), "Key# " << key << " not found in Keys dict");
STLOG(PRI_WARN, TEST_SHARD, TS27, "patch failed", (TabletId, TabletId), (Key, key));
RegisterTransition(*it, ::NTestShard::TStateServer::WRITE_PENDING, ::NTestShard::TStateServer::DELETED);
}
if (const auto it = DeletesInFlight.find(record.GetCookie()); it != DeletesInFlight.end()) {
for (const TString& key : it->second.KeysInQuery) {
const auto it = Keys.find(key);
Expand Down Expand Up @@ -209,10 +223,11 @@ namespace NKikimr::NTestShard {
};
STLOG(PRI_INFO, TEST_SHARD, TS04, "TEvKeyValue::TEvResponse", (TabletId, TabletId), (Msg, makeResponse()));
ProcessWriteResult(record.GetCookie(), record.GetWriteResult());
ProcessPatchResult(record.GetCookie(), record.GetPatchResult());
ProcessDeleteResult(record.GetCookie(), record.GetDeleteRangeResult());
ProcessReadResult(record.GetCookie(), record.GetReadResult(), *ev->Get());
}
if (WritesInFlight.size() != Settings.GetMaxInFlight() && NextWriteTimestamp == TMonotonic::Max()) {
if (WritesInFlight.size() + PatchesInFlight.size() != Settings.GetMaxInFlight() && NextWriteTimestamp == TMonotonic::Max()) {
NextWriteTimestamp = TMonotonic::Now() + GenerateRandomInterval(Settings.GetWritePeriods());
}
Action();
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/test_tablet/load_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ namespace NKikimr::NTestShard {
ui64 BytesOfData = 0;

std::unordered_map<ui64, TWriteInfo> WritesInFlight; // cookie -> TWriteInfo
std::unordered_map<ui64, TString> PatchesInFlight;
ui32 KeysWritten = 0;
static constexpr TDuration WriteSpeedWindow = TDuration::Seconds(10);
static constexpr TDuration ReadSpeedWindow = TDuration::Seconds(10);
Expand All @@ -136,7 +137,9 @@ namespace NKikimr::NTestShard {

void GenerateKeyValue(TString *key, TString *value, bool *isInline);
void IssueWrite();
void IssuePatch();
void ProcessWriteResult(ui64 cookie, const google::protobuf::RepeatedPtrField<NKikimrClient::TKeyValueResponse::TWriteResult>& results);
void ProcessPatchResult(ui64 cookie, const google::protobuf::RepeatedPtrField<NKikimrClient::TKeyValueResponse::TPatchResult>& results);
void TrimBytesWritten(TInstant now);
void HandleWriteOnTime();
void HandleDoSomeAction();
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/test_tablet/load_actor_mon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ namespace NKikimr::NTestShard {
TABLED() { str << self->WritesInFlight.size(); }
}

TABLER() {
TABLED() { str << "Patches in flight"; }
TABLED() { str << self->PatchesInFlight.size(); }
}

TABLER() {
TABLED() { str << "Reads in flight"; }
TABLED() { str << self->ReadsInFlight.size() << '/' << self->KeysBeingRead.size(); }
Expand Down
1 change: 1 addition & 0 deletions ydb/core/test_tablet/load_actor_read_validate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ namespace NKikimr::NTestShard {
{
// ensure no concurrent operations are running
Y_ABORT_UNLESS(self.WritesInFlight.empty());
Y_ABORT_UNLESS(self.PatchesInFlight.empty());
Y_ABORT_UNLESS(self.DeletesInFlight.empty());
Y_ABORT_UNLESS(self.TransitionInFlight.empty());
for (auto& [key, info] : KeysBefore) {
Expand Down
74 changes: 74 additions & 0 deletions ydb/core/test_tablet/load_actor_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,67 @@ namespace NKikimr::NTestShard {
BytesProcessed += value.size();
}

void TLoadActor::IssuePatch() {
Y_ABORT_UNLESS(!ConfirmedKeys.empty());
const size_t index = RandomNumber(ConfirmedKeys.size());
const TString originalKey = ConfirmedKeys[index];

// extract length from the original key -- it may not change
ui64 len, seed, id;
StringSplitter(originalKey).Split(',').CollectInto(&len, &seed, &id);
TString originalValue = FastGenDataForLZ4(len, seed);

// generate patched key
seed = RandomNumber<ui64>();
id = RandomNumber<ui64>();
const TString patchedKey = TStringBuilder() << len << ',' << seed << ',' << id;

// generate random value for the new key
TString patchedValue = FastGenDataForLZ4(len, seed);

auto ev = CreateRequest();
auto& r = ev->Record;
auto *patch = r.AddCmdPatch();
patch->SetOriginalKey(originalKey);
patch->SetPatchedKey(patchedKey);

TRope rope(patchedValue);
ui64 offset = 0;
for (size_t chunks = 0; offset != len; ++chunks) {
// skip matching parts
while (offset + 1 < len && originalValue[offset] == patchedValue[offset]) {
++offset;
}
Y_ABORT_UNLESS(offset < len);

// add patched part
size_t pos = offset + 1;
while (pos < len && originalValue[pos] != patchedValue[pos]) {
++pos;
}
const size_t size = (chunks < 8 ? pos : len) - offset;

auto *diff = patch->AddDiffs();
diff->SetOffset(offset);
if (RandomNumber(2u)) {
diff->SetPayloadId(ev->AddPayload(TRope(rope.Position(offset), rope.Position(offset + size))));
} else {
diff->SetValue(patchedValue.substr(offset, size));
}
offset += size;
}

auto [pifIt, pifInserted] = PatchesInFlight.try_emplace(r.GetCookie(), patchedKey);
Y_ABORT_UNLESS(pifInserted);

auto [it, inserted] = Keys.try_emplace(patchedKey, len);
Y_ABORT_UNLESS(inserted);
RegisterTransition(*it, ::NTestShard::TStateServer::ABSENT, ::NTestShard::TStateServer::WRITE_PENDING, std::move(ev));

++KeysWritten;
BytesProcessed += len;
}

void TLoadActor::ProcessWriteResult(ui64 cookie, const google::protobuf::RepeatedPtrField<NKikimrClient::TKeyValueResponse::TWriteResult>& results) {
if (const auto wifIt = WritesInFlight.find(cookie); wifIt != WritesInFlight.end()) {
TWriteInfo& info = wifIt->second;
Expand All @@ -69,4 +130,17 @@ namespace NKikimr::NTestShard {
}
}

void TLoadActor::ProcessPatchResult(ui64 cookie, const google::protobuf::RepeatedPtrField<NKikimrClient::TKeyValueResponse::TPatchResult>& results) {
if (auto nh = PatchesInFlight.extract(cookie)) {
Y_ABORT_UNLESS(results.size() == 1);
const auto& res = results[0];
Y_VERIFY_S(res.GetStatus() == NKikimrProto::OK, "TabletId# " << TabletId << " CmdPatch failed Status# "
<< NKikimrProto::EReplyStatus_Name(NKikimrProto::EReplyStatus(res.GetStatus())));
const TString& key = nh.mapped();
const auto it = Keys.find(key);
Y_VERIFY_S(it != Keys.end(), "Key# " << key << " not found in Keys dict");
RegisterTransition(*it, ::NTestShard::TStateServer::WRITE_PENDING, ::NTestShard::TStateServer::CONFIRMED);
}
}

} // NKikimr::NTestShard