Skip to content

Commit 8adf99d

Browse files
authored
Support KV patching in TestShard (#1699)
1 parent 0d3d3df commit 8adf99d

File tree

6 files changed

+105
-6
lines changed

6 files changed

+105
-6
lines changed

ydb/core/protos/msgbus.proto

+1
Original file line numberDiff line numberDiff line change
@@ -773,6 +773,7 @@ message TTestShardControlRequest {
773773
repeated TSizeInterval Sizes = 6; // distrubution of generated value size
774774
repeated TTimeInterval WritePeriods = 7; // time between two events
775775
repeated TTimeInterval RestartPeriods = 8; // time between automatic restarts
776+
optional uint32 PatchRequestsFractionPPM = 12;
776777
}
777778

778779
optional uint64 TabletId = 1;

ydb/core/test_tablet/load_actor_impl.cpp

+21-6
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ namespace NKikimr::NTestShard {
5959
return;
6060
}
6161
if (StallCounter > 500) {
62-
if (WritesInFlight.empty() && DeletesInFlight.empty() && ReadsInFlight.empty() && TransitionInFlight.empty()) {
62+
if (WritesInFlight.empty() && PatchesInFlight.empty() && DeletesInFlight.empty() && ReadsInFlight.empty() &&
63+
TransitionInFlight.empty()) {
6364
StallCounter = 0;
6465
} else {
6566
return;
@@ -70,17 +71,23 @@ namespace NKikimr::NTestShard {
7071
barrier = Settings.GetValidateAfterBytes();
7172
}
7273
if (BytesProcessed > barrier) { // time to perform validation
73-
if (WritesInFlight.empty() && DeletesInFlight.empty() && ReadsInFlight.empty() && TransitionInFlight.empty()) {
74+
if (WritesInFlight.empty() && PatchesInFlight.empty() && DeletesInFlight.empty() && ReadsInFlight.empty() &&
75+
TransitionInFlight.empty()) {
7476
RunValidation(false);
7577
}
7678
} else { // resume load
7779
const TMonotonic now = TActivationContext::Monotonic();
7880

7981
bool canWriteMore = false;
80-
if (WritesInFlight.size() < Settings.GetMaxInFlight()) {
82+
if (WritesInFlight.size() + PatchesInFlight.size() < Settings.GetMaxInFlight()) {
8183
if (NextWriteTimestamp <= now) {
82-
IssueWrite();
83-
if (WritesInFlight.size() < Settings.GetMaxInFlight() || !Settings.GetResetWritePeriodOnFull()) {
84+
if (Settings.HasPatchRequestsFractionPPM() && !ConfirmedKeys.empty() &&
85+
RandomNumber(1'000'000u) < Settings.GetPatchRequestsFractionPPM()) {
86+
IssuePatch();
87+
} else {
88+
IssueWrite();
89+
}
90+
if (WritesInFlight.size() + PatchesInFlight.size() < Settings.GetMaxInFlight() || !Settings.GetResetWritePeriodOnFull()) {
8491
NextWriteTimestamp += GenerateRandomInterval(Settings.GetWritePeriods());
8592
canWriteMore = NextWriteTimestamp <= now;
8693
} else {
@@ -177,6 +184,13 @@ namespace NKikimr::NTestShard {
177184
}
178185
WritesInFlight.erase(it);
179186
}
187+
if (auto nh = PatchesInFlight.extract(record.GetCookie())) {
188+
const TString& key = nh.mapped();
189+
const auto it = Keys.find(key);
190+
Y_VERIFY_S(it != Keys.end(), "Key# " << key << " not found in Keys dict");
191+
STLOG(PRI_WARN, TEST_SHARD, TS27, "patch failed", (TabletId, TabletId), (Key, key));
192+
RegisterTransition(*it, ::NTestShard::TStateServer::WRITE_PENDING, ::NTestShard::TStateServer::DELETED);
193+
}
180194
if (const auto it = DeletesInFlight.find(record.GetCookie()); it != DeletesInFlight.end()) {
181195
for (const TString& key : it->second.KeysInQuery) {
182196
const auto it = Keys.find(key);
@@ -209,10 +223,11 @@ namespace NKikimr::NTestShard {
209223
};
210224
STLOG(PRI_INFO, TEST_SHARD, TS04, "TEvKeyValue::TEvResponse", (TabletId, TabletId), (Msg, makeResponse()));
211225
ProcessWriteResult(record.GetCookie(), record.GetWriteResult());
226+
ProcessPatchResult(record.GetCookie(), record.GetPatchResult());
212227
ProcessDeleteResult(record.GetCookie(), record.GetDeleteRangeResult());
213228
ProcessReadResult(record.GetCookie(), record.GetReadResult(), *ev->Get());
214229
}
215-
if (WritesInFlight.size() != Settings.GetMaxInFlight() && NextWriteTimestamp == TMonotonic::Max()) {
230+
if (WritesInFlight.size() + PatchesInFlight.size() != Settings.GetMaxInFlight() && NextWriteTimestamp == TMonotonic::Max()) {
216231
NextWriteTimestamp = TMonotonic::Now() + GenerateRandomInterval(Settings.GetWritePeriods());
217232
}
218233
Action();

ydb/core/test_tablet/load_actor_impl.h

+3
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ namespace NKikimr::NTestShard {
119119
ui64 BytesOfData = 0;
120120

121121
std::unordered_map<ui64, TWriteInfo> WritesInFlight; // cookie -> TWriteInfo
122+
std::unordered_map<ui64, TString> PatchesInFlight;
122123
ui32 KeysWritten = 0;
123124
static constexpr TDuration WriteSpeedWindow = TDuration::Seconds(10);
124125
static constexpr TDuration ReadSpeedWindow = TDuration::Seconds(10);
@@ -136,7 +137,9 @@ namespace NKikimr::NTestShard {
136137

137138
void GenerateKeyValue(TString *key, TString *value, bool *isInline);
138139
void IssueWrite();
140+
void IssuePatch();
139141
void ProcessWriteResult(ui64 cookie, const google::protobuf::RepeatedPtrField<NKikimrClient::TKeyValueResponse::TWriteResult>& results);
142+
void ProcessPatchResult(ui64 cookie, const google::protobuf::RepeatedPtrField<NKikimrClient::TKeyValueResponse::TPatchResult>& results);
140143
void TrimBytesWritten(TInstant now);
141144
void HandleWriteOnTime();
142145
void HandleDoSomeAction();

ydb/core/test_tablet/load_actor_mon.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,11 @@ namespace NKikimr::NTestShard {
155155
TABLED() { str << self->WritesInFlight.size(); }
156156
}
157157

158+
TABLER() {
159+
TABLED() { str << "Patches in flight"; }
160+
TABLED() { str << self->PatchesInFlight.size(); }
161+
}
162+
158163
TABLER() {
159164
TABLED() { str << "Reads in flight"; }
160165
TABLED() { str << self->ReadsInFlight.size() << '/' << self->KeysBeingRead.size(); }

ydb/core/test_tablet/load_actor_read_validate.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ namespace NKikimr::NTestShard {
5353
{
5454
// ensure no concurrent operations are running
5555
Y_ABORT_UNLESS(self.WritesInFlight.empty());
56+
Y_ABORT_UNLESS(self.PatchesInFlight.empty());
5657
Y_ABORT_UNLESS(self.DeletesInFlight.empty());
5758
Y_ABORT_UNLESS(self.TransitionInFlight.empty());
5859
for (auto& [key, info] : KeysBefore) {

ydb/core/test_tablet/load_actor_write.cpp

+74
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,67 @@ namespace NKikimr::NTestShard {
4545
BytesProcessed += value.size();
4646
}
4747

48+
void TLoadActor::IssuePatch() {
49+
Y_ABORT_UNLESS(!ConfirmedKeys.empty());
50+
const size_t index = RandomNumber(ConfirmedKeys.size());
51+
const TString originalKey = ConfirmedKeys[index];
52+
53+
// extract length from the original key -- it may not change
54+
ui64 len, seed, id;
55+
StringSplitter(originalKey).Split(',').CollectInto(&len, &seed, &id);
56+
TString originalValue = FastGenDataForLZ4(len, seed);
57+
58+
// generate patched key
59+
seed = RandomNumber<ui64>();
60+
id = RandomNumber<ui64>();
61+
const TString patchedKey = TStringBuilder() << len << ',' << seed << ',' << id;
62+
63+
// generate random value for the new key
64+
TString patchedValue = FastGenDataForLZ4(len, seed);
65+
66+
auto ev = CreateRequest();
67+
auto& r = ev->Record;
68+
auto *patch = r.AddCmdPatch();
69+
patch->SetOriginalKey(originalKey);
70+
patch->SetPatchedKey(patchedKey);
71+
72+
TRope rope(patchedValue);
73+
ui64 offset = 0;
74+
for (size_t chunks = 0; offset != len; ++chunks) {
75+
// skip matching parts
76+
while (offset + 1 < len && originalValue[offset] == patchedValue[offset]) {
77+
++offset;
78+
}
79+
Y_ABORT_UNLESS(offset < len);
80+
81+
// add patched part
82+
size_t pos = offset + 1;
83+
while (pos < len && originalValue[pos] != patchedValue[pos]) {
84+
++pos;
85+
}
86+
const size_t size = (chunks < 8 ? pos : len) - offset;
87+
88+
auto *diff = patch->AddDiffs();
89+
diff->SetOffset(offset);
90+
if (RandomNumber(2u)) {
91+
diff->SetPayloadId(ev->AddPayload(TRope(rope.Position(offset), rope.Position(offset + size))));
92+
} else {
93+
diff->SetValue(patchedValue.substr(offset, size));
94+
}
95+
offset += size;
96+
}
97+
98+
auto [pifIt, pifInserted] = PatchesInFlight.try_emplace(r.GetCookie(), patchedKey);
99+
Y_ABORT_UNLESS(pifInserted);
100+
101+
auto [it, inserted] = Keys.try_emplace(patchedKey, len);
102+
Y_ABORT_UNLESS(inserted);
103+
RegisterTransition(*it, ::NTestShard::TStateServer::ABSENT, ::NTestShard::TStateServer::WRITE_PENDING, std::move(ev));
104+
105+
++KeysWritten;
106+
BytesProcessed += len;
107+
}
108+
48109
void TLoadActor::ProcessWriteResult(ui64 cookie, const google::protobuf::RepeatedPtrField<NKikimrClient::TKeyValueResponse::TWriteResult>& results) {
49110
if (const auto wifIt = WritesInFlight.find(cookie); wifIt != WritesInFlight.end()) {
50111
TWriteInfo& info = wifIt->second;
@@ -69,4 +130,17 @@ namespace NKikimr::NTestShard {
69130
}
70131
}
71132

133+
void TLoadActor::ProcessPatchResult(ui64 cookie, const google::protobuf::RepeatedPtrField<NKikimrClient::TKeyValueResponse::TPatchResult>& results) {
134+
if (auto nh = PatchesInFlight.extract(cookie)) {
135+
Y_ABORT_UNLESS(results.size() == 1);
136+
const auto& res = results[0];
137+
Y_VERIFY_S(res.GetStatus() == NKikimrProto::OK, "TabletId# " << TabletId << " CmdPatch failed Status# "
138+
<< NKikimrProto::EReplyStatus_Name(NKikimrProto::EReplyStatus(res.GetStatus())));
139+
const TString& key = nh.mapped();
140+
const auto it = Keys.find(key);
141+
Y_VERIFY_S(it != Keys.end(), "Key# " << key << " not found in Keys dict");
142+
RegisterTransition(*it, ::NTestShard::TStateServer::WRITE_PENDING, ::NTestShard::TStateServer::CONFIRMED);
143+
}
144+
}
145+
72146
} // NKikimr::NTestShard

0 commit comments

Comments
 (0)