Skip to content

Commit 67e35be

Browse files
authored
Add checkpoint support for streamlookup (backport #9299) (#9719)
1 parent 1ec7315 commit 67e35be

File tree

5 files changed

+36
-10
lines changed

5 files changed

+36
-10
lines changed

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1369,8 +1369,20 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
13691369
}
13701370

13711371
void PollAsyncInput() {
1372+
if (!Running) {
1373+
CA_LOG_T("Skip polling inputs and sources because not running");
1374+
return;
1375+
}
1376+
1377+
CA_LOG_T("Poll inputs");
1378+
for (auto& [inputIndex, transform] : InputTransformsMap) {
1379+
if (auto resume = transform.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) {
1380+
ContinueExecute(*resume);
1381+
}
1382+
}
1383+
13721384
// Don't produce any input from sources if we're about to save checkpoint.
1373-
if (!Running || (Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved())) {
1385+
if ((Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved())) {
13741386
CA_LOG_T("Skip polling sources because of pending checkpoint");
13751387
return;
13761388
}
@@ -1381,13 +1393,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
13811393
ContinueExecute(*resume);
13821394
}
13831395
}
1384-
1385-
CA_LOG_T("Poll inputs");
1386-
for (auto& [inputIndex, transform] : InputTransformsMap) {
1387-
if (auto resume = transform.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) {
1388-
ContinueExecute(*resume);
1389-
}
1390-
}
13911396
}
13921397

13931398
void OnNewAsyncInputDataArrived(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) {

ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ class TInputTransformStreamLookupBase
184184
}
185185
}
186186
finished = IsFinished();
187-
return 0;
187+
return AwaitingQueue.RowCount();
188188
}
189189

190190
TMaybe<google::protobuf::Any> ExtraData() override {

ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,18 @@ class TLocalTaskRunnerActor
143143
return false;
144144
}
145145
}
146+
for (const auto transformId: InputTransforms) {
147+
const auto t = TaskRunner->GetInputTransform(transformId);
148+
if (t) {
149+
auto [_, transform] = *t;
150+
if (!transform->Empty()) {
151+
return false;
152+
}
153+
if (transform->IsPending()) {
154+
return false;
155+
}
156+
}
157+
}
146158
return true;
147159
}
148160

@@ -436,6 +448,7 @@ class TLocalTaskRunnerActor
436448
for (auto i = 0; i != inputs.size(); ++i) {
437449
if (auto t = TaskRunner->GetInputTransform(i)) {
438450
inputTransforms[i] = *t;
451+
InputTransforms.emplace(i);
439452
}
440453
}
441454

@@ -488,6 +501,7 @@ class TLocalTaskRunnerActor
488501
const TTxId TxId;
489502
const ui64 TaskId;
490503
THashSet<ui32> Inputs;
504+
THashSet<ui32> InputTransforms;
491505
THashSet<ui32> Sources;
492506
TIntrusivePtr<NDq::IDqTaskRunner> TaskRunner;
493507
THashSet<ui32> InputChannelsWithDisabledCheckpoints;

ydb/library/yql/dq/runtime/dq_async_input.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ namespace NYql::NDq {
66
class TDqAsyncInputBuffer : public TDqInputImpl<TDqAsyncInputBuffer, IDqAsyncInputBuffer> {
77
using TBaseImpl = TDqInputImpl<TDqAsyncInputBuffer, IDqAsyncInputBuffer>;
88
friend TBaseImpl;
9+
bool Pending = false;
910
public:
1011
TDqAsyncInputBufferStats PushStats;
1112
TDqInputStats PopStats;
@@ -32,7 +33,7 @@ class TDqAsyncInputBuffer : public TDqInputImpl<TDqAsyncInputBuffer, IDqAsyncInp
3233
}
3334

3435
void Push(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space) override {
35-
Y_ABORT_UNLESS(!batch.empty() || !space);
36+
Pending = space != 0;
3637
if (!batch.empty()) {
3738
AddBatch(std::move(batch), space);
3839
}
@@ -41,6 +42,10 @@ class TDqAsyncInputBuffer : public TDqInputImpl<TDqAsyncInputBuffer, IDqAsyncInp
4142
virtual void Push(TDqSerializedBatch&&, i64) override {
4243
YQL_ENSURE(!"Unimplemented");
4344
}
45+
46+
bool IsPending() const override {
47+
return Pending;
48+
}
4449
};
4550

4651
IDqAsyncInputBuffer::TPtr CreateDqAsyncInputBuffer(

ydb/library/yql/dq/runtime/dq_async_input.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ class IDqAsyncInputBuffer : public IDqInput {
2323
virtual void Push(TDqSerializedBatch&& batch, i64 space) = 0;
2424

2525
virtual void Finish() = 0;
26+
27+
virtual bool IsPending() const { return false; };
2628
};
2729

2830
IDqAsyncInputBuffer::TPtr CreateDqAsyncInputBuffer(ui64 inputIndex, const TString& type, NKikimr::NMiniKQL::TType* inputType, ui64 maxBufferBytes,

0 commit comments

Comments
 (0)