Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@

#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_log.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h>
#include <library/cpp/testing/unittest/registar.h>

#undef IS_CTX_LOG_PRIORITY_ENABLED
#define IS_CTX_LOG_PRIORITY_ENABLED(actorCtxOrSystem, priority, component, sampleBy) false
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h>

namespace NYql::NDq {

Y_UNIT_TEST_SUITE(TComputeActorAsyncInputHelperTest) {

struct TDummyDqComputeActorAsyncInput: IDqComputeActorAsyncInput {
TDummyDqComputeActorAsyncInput() {
Batch.emplace_back(NUdf::TUnboxedValue{});
Batch.emplace_back(NUdf::TUnboxedValue{});
}
ui64 GetInputIndex() const override {
return 4;
}

const TDqAsyncStats& GetIngressStats() const override{
static TDqAsyncStats stats;
return stats;
}

i64 GetAsyncInputData(
NKikimr::NMiniKQL::TUnboxedValueBatch& batch,
TMaybe<TInstant>& watermark,
bool& finished,
i64 freeSpace) override
{
Y_ABORT_IF(Batch.empty());
batch = Batch;
Y_UNUSED(watermark);
Y_UNUSED(finished);
Y_UNUSED(freeSpace);
return 2;
}

// Checkpointing.
void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TSourceState& state) override {
Y_UNUSED(checkpoint);
Y_UNUSED(state);
}
void CommitState(const NDqProto::TCheckpoint& checkpoint) override {
Y_UNUSED(checkpoint);
}
void LoadState(const NDqProto::TSourceState& state) override {
Y_UNUSED(state);
}

void PassAway() override {}
NKikimr::NMiniKQL::TUnboxedValueBatch Batch;
};

struct TDummyAsyncInputHelper: TComputeActorAsyncInputHelper{
using TComputeActorAsyncInputHelper::TComputeActorAsyncInputHelper;
i64 GetFreeSpace() const override{
return 10;
}
void AsyncInputPush(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space, bool finished) override{
batch.clear();
Y_UNUSED(space);
Y_UNUSED(finished);
return;
}
};

Y_UNIT_TEST(PollAsyncInput) {
NKikimr::NMiniKQL::TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, true);
TDummyDqComputeActorAsyncInput input;
TDummyAsyncInputHelper helper("MyPrefix", 13, NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED);
helper.AsyncInput = &input;
TDqComputeActorMetrics metrics{NMonitoring::TDynamicCounterPtr{}};
TDqComputeActorWatermarks watermarks(NActors::TActorIdentity{NActors::TActorId{}}, TTxId{}, 7);
auto result = helper.PollAsyncInput(metrics, watermarks, 20);
UNIT_ASSERT(result && EResumeSource::CAPollAsync == *result);
}
}

} //namespace NYql::NDq
3 changes: 3 additions & 0 deletions ydb/library/yql/dq/actors/compute/ut/ya.make
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
UNITTEST_FOR(ydb/library/yql/dq/actors/compute)

SRCS(
dq_compute_actor_async_input_helper_ut.cpp
dq_compute_issues_buffer_ut.cpp
dq_source_watermark_tracker_ut.cpp
)
Expand All @@ -12,4 +13,6 @@ PEERDIR(
ydb/library/yql/sql/pg_dummy
)

YQL_LAST_ABI_VERSION()

END()