Skip to content

Commit f186070

Browse files
authored
YQ-3097 Checkpoint states > 2GB (#4002)
1 parent 38281eb commit f186070

File tree

55 files changed

+686
-376
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+686
-376
lines changed

ydb/core/fq/libs/checkpoint_storage/state_storage.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22

33
#include <ydb/core/fq/libs/checkpointing_common/defs.h>
44

5-
#include <ydb/library/yql/dq/proto/dq_checkpoint.pb.h>
5+
#include <ydb/library/yql/dq/actors/compute/dq_checkpoints_states.h>
6+
67
#include <ydb/library/yql/public/issue/yql_issue.h>
78

89
#include <library/cpp/threading/future/core/future.h>
@@ -15,16 +16,17 @@ namespace NFq {
1516

1617
class IStateStorage : public virtual TThrRefBase {
1718
public:
18-
using TGetStateResult = std::pair<std::vector<NYql::NDqProto::TComputeActorState>, NYql::TIssues>;
19+
using TGetStateResult = std::pair<std::vector<NYql::NDq::TComputeActorState>, NYql::TIssues>;
20+
using TSaveStateResult = std::pair<size_t, NYql::TIssues>;
1921
using TCountStatesResult = std::pair<size_t, NYql::TIssues>;
2022

2123
virtual NThreading::TFuture<NYql::TIssues> Init() = 0;
2224

23-
virtual NThreading::TFuture<NYql::TIssues> SaveState(
25+
virtual NThreading::TFuture<TSaveStateResult> SaveState(
2426
ui64 taskId,
2527
const TString& graphId,
2628
const TCheckpointId& checkpointId,
27-
const NYql::NDqProto::TComputeActorState& state) = 0;
29+
const NYql::NDq::TComputeActorState& state) = 0;
2830

2931
virtual NThreading::TFuture<TGetStateResult> GetState(
3032
const std::vector<ui64>& taskIds,

ydb/core/fq/libs/checkpoint_storage/storage_proxy.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -347,14 +347,13 @@ void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvSaveTaskState::TPtr& ev)
347347
taskId = event->TaskId,
348348
cookie = ev->Cookie,
349349
sender = ev->Sender,
350-
stateSize = stateSize,
351-
actorSystem = TActivationContext::ActorSystem()](const NThreading::TFuture<NYql::TIssues>& futureResult) {
350+
actorSystem = TActivationContext::ActorSystem()](const NThreading::TFuture<IStateStorage::TSaveStateResult>& futureResult) {
352351
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << graphId << "] [" << checkpointId << "] TEvSaveTaskState Apply: task: " << taskId)
353-
const auto& issues = futureResult.GetValue();
352+
const auto& issues = futureResult.GetValue().second;
354353
auto response = std::make_unique<NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult>();
355354
response->Record.MutableCheckpoint()->SetGeneration(checkpointId.CoordinatorGeneration);
356355
response->Record.MutableCheckpoint()->SetId(checkpointId.SeqNo);
357-
response->Record.SetStateSizeBytes(stateSize);
356+
response->Record.SetStateSizeBytes(futureResult.GetValue().first);
358357
response->Record.SetTaskId(taskId);
359358

360359
if (issues) {

ydb/core/fq/libs/checkpoint_storage/ut/gc_ut.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ namespace {
3232

3333
////////////////////////////////////////////////////////////////////////////////
3434

35-
NYql::NDqProto::TComputeActorState MakeStateFromBlob(size_t blobSize, bool isIncrement = false) {
35+
NYql::NDq::TComputeActorState MakeStateFromBlob(size_t blobSize, bool isIncrement = false) {
3636
TString blob;
3737
blob.reserve(blobSize);
3838
for (size_t i = 0; i < blobSize; ++i) {
@@ -51,8 +51,8 @@ NYql::NDqProto::TComputeActorState MakeStateFromBlob(size_t blobSize, bool isInc
5151
const TStringBuf savedBuf = value.AsStringRef();
5252
TString result;
5353
NKikimr::NMiniKQL::TNodeStateHelper::AddNodeState(result, savedBuf);
54-
NYql::NDqProto::TComputeActorState state;
55-
state.MutableMiniKqlProgram()->MutableData()->MutableStateData()->SetBlob(result);
54+
NYql::NDq::TComputeActorState state;
55+
state.MiniKqlProgram.ConstructInPlace().Data.Blob = result;
5656
return state;
5757
}
5858

ydb/core/fq/libs/checkpoint_storage/ut/storage_service_ydb_ut.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ void SaveState(
217217
checkpoint.SetGeneration(checkpointId.CoordinatorGeneration);
218218
checkpoint.SetId(checkpointId.SeqNo);
219219
auto request = std::make_unique<NYql::NDq::TEvDqCompute::TEvSaveTaskState>(GraphId, taskId, checkpoint);
220-
request->State.MutableMiniKqlProgram()->MutableData()->MutableStateData()->SetBlob(blob);
220+
request->State.MiniKqlProgram.ConstructInPlace().Data.Blob = blob;
221221
runtime->Send(new IEventHandle(NYql::NDq::MakeCheckpointStorageID(), sender, request.release()));
222222

223223
TAutoPtr<IEventHandle> handle;
@@ -247,7 +247,7 @@ TString GetState(
247247
UNIT_ASSERT(event->Issues.Empty());
248248
UNIT_ASSERT(!event->States.empty());
249249

250-
return event->States[0].GetMiniKqlProgram().GetData().GetStateData().GetBlob();
250+
return event->States[0].MiniKqlProgram->Data.Blob;
251251
}
252252

253253
void CreateCompletedCheckpoint(

ydb/core/fq/libs/checkpoint_storage/ut/ydb_state_storage_ut.cpp

Lines changed: 59 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -52,15 +52,15 @@ class TFixture : public NUnitTest::TBaseFixture {
5252
return storage;
5353
}
5454

55-
NYql::NDqProto::TComputeActorState MakeState(NYql::NUdf::TUnboxedValuePod&& value) {
55+
NYql::NDq::TComputeActorState MakeState(NYql::NUdf::TUnboxedValuePod&& value) {
5656
TString result;
5757
NKikimr::NMiniKQL::TNodeStateHelper::AddNodeState(result, value.AsStringRef());
58-
NYql::NDqProto::TComputeActorState state;
59-
state.MutableMiniKqlProgram()->MutableData()->MutableStateData()->SetBlob(result);
58+
NYql::NDq::TComputeActorState state;
59+
state.MiniKqlProgram.ConstructInPlace().Data.Blob = result;
6060
return state;
6161
}
6262

63-
NYql::NDqProto::TComputeActorState MakeStateFromBlob(size_t blobSize) {
63+
NYql::NDq::TComputeActorState MakeStateFromBlob(size_t blobSize) {
6464
TString blob;
6565
blob.reserve(blobSize);
6666
for (size_t i = 0; i < blobSize; ++i) {
@@ -69,7 +69,7 @@ class TFixture : public NUnitTest::TBaseFixture {
6969
return MakeState(NKikimr::NMiniKQL::TOutputSerializer::MakeSimpleBlobState(blob, 0));
7070
}
7171

72-
NYql::NDqProto::TComputeActorState MakeIncrementState(size_t miniKqlPStateSize) {
72+
NYql::NDq::TComputeActorState MakeIncrementState(size_t miniKqlPStateSize) {
7373
std::map<TString, TString> map;
7474
size_t itemCount = 4;
7575
for (size_t i = 0; i < itemCount; ++i) {
@@ -78,7 +78,7 @@ class TFixture : public NUnitTest::TBaseFixture {
7878
return MakeState(NKikimr::NMiniKQL::TOutputSerializer::MakeSnapshotState(map, 0));
7979
}
8080

81-
NYql::NDqProto::TComputeActorState MakeIncrementState(
81+
NYql::NDq::TComputeActorState MakeIncrementState(
8282
const std::map<TString, TString>& snapshot,
8383
const std::map<TString, TString>& increment,
8484
const std::set<TString>& deleted)
@@ -94,13 +94,14 @@ class TFixture : public NUnitTest::TBaseFixture {
9494
ui64 taskId,
9595
const TString& graphId,
9696
const TCheckpointId& checkpointId,
97-
const NYql::NDqProto::TComputeActorState& state)
97+
const NYql::NDq::TComputeActorState& state)
9898
{
99-
auto issues = storage->SaveState(taskId, graphId, checkpointId, state).GetValueSync();
99+
auto [size, issues] = storage->SaveState(taskId, graphId, checkpointId, state).GetValueSync();
100100
UNIT_ASSERT_C(issues.Empty(), issues.ToString());
101+
UNIT_ASSERT(size > 0);
101102
}
102103

103-
NYql::NDqProto::TComputeActorState GetState(
104+
NYql::NDq::TComputeActorState GetState(
104105
TStateStoragePtr storage,
105106
const ui64 taskId,
106107
const TString& graphId,
@@ -112,16 +113,43 @@ class TFixture : public NUnitTest::TBaseFixture {
112113
return states[0];
113114
}
114115

115-
void ShouldSaveGetStateImpl(const char* tablePrefix, const NYql::NDqProto::TComputeActorState& state) {
116+
void ShouldSaveGetStateImpl(const char* tablePrefix, const NYql::NDq::TComputeActorState& state) {
116117
auto storage = GetStateStorage(tablePrefix);
117-
auto issues = storage->SaveState(1, "graph1", CheckpointId1, state).GetValueSync();
118+
auto [size, issues] = storage->SaveState(1, "graph1", CheckpointId1, state).GetValueSync();
118119
UNIT_ASSERT_C(issues.Empty(), issues.ToString());
120+
UNIT_ASSERT(size > 0);
119121

120122
auto [states, getIssues] = storage->GetState({1}, "graph1", CheckpointId1).GetValueSync();
121123
UNIT_ASSERT_C(getIssues.Empty(), getIssues.ToString());
122124
UNIT_ASSERT(!states.empty());
123-
UNIT_ASSERT(google::protobuf::util::MessageDifferencer::Equals(state, states[0]));
125+
CheckEquals(state, states[0]);
124126
}
127+
128+
void CheckEquals(const NYql::NDq::TComputeActorState& state1, const NYql::NDq::TComputeActorState& state2) {
129+
UNIT_ASSERT_VALUES_EQUAL(state1.MiniKqlProgram.Empty(), state2.MiniKqlProgram.Empty());
130+
if (state1.MiniKqlProgram) {
131+
UNIT_ASSERT_VALUES_EQUAL(state1.MiniKqlProgram->Data.Blob, state2.MiniKqlProgram->Data.Blob);
132+
UNIT_ASSERT_VALUES_EQUAL(state1.MiniKqlProgram->Data.Version, state2.MiniKqlProgram->Data.Version);
133+
UNIT_ASSERT_VALUES_EQUAL(state1.MiniKqlProgram->RuntimeVersion, state2.MiniKqlProgram->RuntimeVersion);
134+
}
135+
UNIT_ASSERT_VALUES_EQUAL(state1.Sources.size(), state2.Sources.size());
136+
UNIT_ASSERT(std::equal(std::begin(state1.Sources), std::end(state1.Sources), std::begin(state2.Sources), std::end(state2.Sources),
137+
[](const NYql::NDq::TSourceState& state1, const NYql::NDq::TSourceState& state2) {
138+
UNIT_ASSERT_VALUES_EQUAL(state1.InputIndex, state2.InputIndex);
139+
UNIT_ASSERT_VALUES_EQUAL(state1.Data.size(), state2.Data.size());
140+
return true;
141+
}));
142+
143+
UNIT_ASSERT_VALUES_EQUAL(state1.Sinks.size(), state2.Sinks.size());
144+
UNIT_ASSERT(std::equal(std::begin(state1.Sinks), std::end(state1.Sinks), std::begin(state2.Sinks), std::end(state2.Sinks),
145+
[](const NYql::NDq::TSinkState& state1, const NYql::NDq::TSinkState& state2) {
146+
UNIT_ASSERT_VALUES_EQUAL(state1.OutputIndex, state2.OutputIndex);
147+
UNIT_ASSERT_VALUES_EQUAL(state1.Data.Blob, state2.Data.Blob);
148+
UNIT_ASSERT_VALUES_EQUAL(state1.Data.Version, state2.Data.Version);
149+
return true;
150+
}));
151+
}
152+
125153
private:
126154
NKikimr::NMiniKQL::TScopedAlloc Alloc;
127155
NKikimr::TActorSystemStub ActorSystemStub;
@@ -147,8 +175,8 @@ Y_UNIT_TEST_SUITE(TStateStorageTest) {
147175
auto state2 = NKikimr::NMiniKQL::TOutputSerializer::MakeSimpleBlobState(TString(20, 'b'), 0);
148176
NKikimr::NMiniKQL::TNodeStateHelper::AddNodeState(result, state1.AsStringRef());
149177
NKikimr::NMiniKQL::TNodeStateHelper::AddNodeState(result, state2.AsStringRef());
150-
NYql::NDqProto::TComputeActorState state;
151-
state.MutableMiniKqlProgram()->MutableData()->MutableStateData()->SetBlob(result);
178+
NYql::NDq::TComputeActorState state;
179+
state.MiniKqlProgram.ConstructInPlace().Data.Blob = result;
152180
ShouldSaveGetStateImpl("TStateStorageTestShouldSaveGetState", state);
153181
}
154182

@@ -300,8 +328,9 @@ Y_UNIT_TEST_SUITE(TStateStorageTest) {
300328
Y_UNIT_TEST_F(ShouldIssueErrorOnNonExistentState, TFixture) {
301329
auto storage = GetStateStorage("TStateStorageTestShouldIssueErrorOnNonExistentState");
302330

303-
auto issues = storage->SaveState(1, "graph1", CheckpointId1, MakeStateFromBlob(4)).GetValueSync();
331+
auto [size, issues] = storage->SaveState(1, "graph1", CheckpointId1, MakeStateFromBlob(4)).GetValueSync();
304332
UNIT_ASSERT(issues.Empty());
333+
UNIT_ASSERT(size > 0);
305334

306335
auto getResult = storage->GetState({1}, "graph1", CheckpointId1).GetValueSync();
307336
UNIT_ASSERT(getResult.second.Empty());
@@ -321,32 +350,33 @@ Y_UNIT_TEST_SUITE(TStateStorageTest) {
321350
auto state3 = MakeStateFromBlob(YdbRowSizeLimit * 6);
322351
auto state4 = MakeIncrementState(YdbRowSizeLimit * 3);
323352

324-
auto issues = storage->SaveState(1, "graph1", CheckpointId1, state1).GetValueSync();
353+
auto [size, issues] = storage->SaveState(1, "graph1", CheckpointId1, state1).GetValueSync();
325354
UNIT_ASSERT(issues.Empty());
326-
issues = storage->SaveState(42, "graph1", CheckpointId1, state2).GetValueSync();
355+
UNIT_ASSERT(size > 0);
356+
issues = storage->SaveState(42, "graph1", CheckpointId1, state2).GetValueSync().second;
327357
UNIT_ASSERT(issues.Empty());
328-
issues = storage->SaveState(7, "graph1", CheckpointId1, state3).GetValueSync();
358+
issues = storage->SaveState(7, "graph1", CheckpointId1, state3).GetValueSync().second;
329359
UNIT_ASSERT(issues.Empty());
330-
issues = storage->SaveState(13, "graph1", CheckpointId1, state4).GetValueSync();
360+
issues = storage->SaveState(13, "graph1", CheckpointId1, state4).GetValueSync().second;
331361
UNIT_ASSERT(issues.Empty());
332362

333363
auto [states, getIssues] = storage->GetState({1, 42, 7, 13}, "graph1", CheckpointId1).GetValueSync();
334364
UNIT_ASSERT_C(getIssues.Empty(), getIssues.ToString());
335365
UNIT_ASSERT_VALUES_EQUAL(states.size(), 4);
336366

337-
UNIT_ASSERT(google::protobuf::util::MessageDifferencer::Equals(state1, states[0]));
338-
UNIT_ASSERT(google::protobuf::util::MessageDifferencer::Equals(state2, states[1]));
339-
UNIT_ASSERT(google::protobuf::util::MessageDifferencer::Equals(state3, states[2]));
340-
UNIT_ASSERT(google::protobuf::util::MessageDifferencer::Equals(state4, states[3]));
367+
CheckEquals(state1, states[0]);
368+
CheckEquals(state2, states[1]);
369+
CheckEquals(state3, states[2]);
370+
CheckEquals(state4, states[3]);
341371

342372
// in different order
343373
auto [states2, getIssues2] = storage->GetState({42, 1, 13, 7}, "graph1", CheckpointId1).GetValueSync();
344374
UNIT_ASSERT(getIssues2.Empty());
345375
UNIT_ASSERT_VALUES_EQUAL(states2.size(), 4);
346-
UNIT_ASSERT(google::protobuf::util::MessageDifferencer::Equals(state2, states2[0]));
347-
UNIT_ASSERT(google::protobuf::util::MessageDifferencer::Equals(state1, states2[1]));
348-
UNIT_ASSERT(google::protobuf::util::MessageDifferencer::Equals(state4, states2[2]));
349-
UNIT_ASSERT(google::protobuf::util::MessageDifferencer::Equals(state3, states2[3]));
376+
CheckEquals(state2, states2[0]);
377+
CheckEquals(state1, states2[1]);
378+
CheckEquals(state4, states2[2]);
379+
CheckEquals(state3, states2[3]);
350380
}
351381

352382
Y_UNIT_TEST_F(ShouldLoadLastSnapshot, TFixture)
@@ -360,7 +390,7 @@ Y_UNIT_TEST_SUITE(TStateStorageTest) {
360390
SaveState(storage, 1, "graph1", CheckpointId2, state2);
361391

362392
auto state = GetState(storage, 1, "graph1", CheckpointId2);
363-
UNIT_ASSERT(google::protobuf::util::MessageDifferencer::Equals(state, state2));
393+
CheckEquals(state, state2);
364394
}
365395

366396
Y_UNIT_TEST_F(ShouldNotGetNonExistendSnaphotState, TFixture)
@@ -392,7 +422,7 @@ Y_UNIT_TEST_SUITE(TStateStorageTest) {
392422
auto expected = MakeIncrementState({{"key1", "value1-new"}, {"key3", "value3"}, {"key4", value4}}, {}, {});
393423

394424
auto actual = GetState(storage, 1, "graph1", CheckpointId3);
395-
UNIT_ASSERT(google::protobuf::util::MessageDifferencer::Equals(expected, actual));
425+
CheckEquals(expected, actual);
396426
}
397427

398428
};

0 commit comments

Comments
 (0)