Skip to content

Commit 81552aa

Browse files
committed
generic lookup retries: add tests
1 parent 141b39d commit 81552aa

File tree

2 files changed

+199
-2
lines changed

2 files changed

+199
-2
lines changed

ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,4 +226,189 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
226226
}
227227
}
228228

229+
Y_UNIT_TEST(LookupWithErrors) {
230+
auto alloc = std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false);
231+
NKikimr::NMiniKQL::TMemoryUsageInfo memUsage("TestMemUsage");
232+
NKikimr::NMiniKQL::THolderFactory holderFactory(alloc->Ref(), memUsage);
233+
NKikimr::NMiniKQL::TTypeEnvironment typeEnv(*alloc);
234+
NKikimr::NMiniKQL::TTypeBuilder typeBuilder(typeEnv);
235+
236+
auto loggerConfig = NYql::NProto::TLoggingConfig();
237+
loggerConfig.set_allcomponentslevel(::NYql::NProto::TLoggingConfig_ELevel::TLoggingConfig_ELevel_TRACE);
238+
NYql::NLog::InitLogger(loggerConfig, false);
239+
240+
TTestActorRuntimeBase runtime(1, 1, true);
241+
runtime.Initialize();
242+
auto edge = runtime.AllocateEdgeActor();
243+
244+
NYql::TGenericDataSourceInstance dsi;
245+
dsi.Setkind(NYql::EGenericDataSourceKind::YDB);
246+
dsi.mutable_endpoint()->Sethost("some_host");
247+
dsi.mutable_endpoint()->Setport(2135);
248+
dsi.Setdatabase("some_db");
249+
dsi.Setuse_tls(true);
250+
dsi.set_protocol(::NYql::EGenericProtocol::NATIVE);
251+
auto token = dsi.mutable_credentials()->mutable_token();
252+
token->Settype("IAM");
253+
token->Setvalue("TEST_TOKEN");
254+
255+
auto connectorMock = std::make_shared<NYql::NConnector::NTest::TConnectorClientMock>();
256+
257+
// clang-format off
258+
// step 1: ListSplits
259+
{
260+
::testing::InSequence seq;
261+
for (grpc::StatusCode readStatus : { grpc::StatusCode::UNAVAILABLE, grpc::StatusCode::OK }) {
262+
for (grpc::StatusCode listStatus : { grpc::StatusCode::UNAVAILABLE, readStatus == grpc::StatusCode::OK ? grpc::StatusCode::DEADLINE_EXCEEDED : grpc::StatusCode::UNAVAILABLE, grpc::StatusCode::OK }) {
263+
auto listBuilder = connectorMock->ExpectListSplits();
264+
listBuilder
265+
.Select()
266+
.DataSourceInstance(dsi)
267+
.What()
268+
.Column("id", Ydb::Type::UINT64)
269+
.NullableColumn("optional_id", Ydb::Type::UINT64)
270+
.NullableColumn("string_value", Ydb::Type::STRING)
271+
.Done()
272+
.Table("lookup_test")
273+
.Where()
274+
.Filter()
275+
.Disjunction()
276+
.Operand()
277+
.Conjunction()
278+
.Operand().Equal().Column("id").Value<ui64>(2).Done().Done()
279+
.Operand().Equal().Column("optional_id").OptionalValue<ui64>(102).Done().Done()
280+
.Done()
281+
.Done()
282+
.Operand()
283+
.Conjunction()
284+
.Operand().Equal().Column("id").Value<ui64>(1).Done().Done()
285+
.Operand().Equal().Column("optional_id").OptionalValue<ui64>(101).Done().Done()
286+
.Done()
287+
.Done()
288+
.Operand()
289+
.Conjunction()
290+
.Operand().Equal().Column("id").Value<ui64>(0).Done().Done()
291+
.Operand().Equal().Column("optional_id").OptionalValue<ui64>(100).Done().Done()
292+
.Done()
293+
.Done()
294+
.Done()
295+
.Done()
296+
.Done()
297+
.Done()
298+
.MaxSplitCount(1)
299+
;
300+
if (listStatus != grpc::StatusCode::OK) {
301+
listBuilder
302+
.Status(NYdbGrpc::TGrpcStatus(listStatus, "Mocked Error"))
303+
;
304+
continue;
305+
}
306+
listBuilder
307+
.Result()
308+
.AddResponse(NewSuccess())
309+
.Description("Actual split info is not important")
310+
;
311+
}
312+
313+
auto readBuilder = connectorMock->ExpectReadSplits();
314+
readBuilder
315+
.DataSourceInstance(dsi)
316+
.Filtering(NYql::NConnector::NApi::TReadSplitsRequest::FILTERING_MANDATORY)
317+
.Split()
318+
.Description("Actual split info is not important")
319+
.Done()
320+
;
321+
if (readStatus != grpc::StatusCode::OK) {
322+
readBuilder
323+
.Status(NYdbGrpc::TGrpcStatus(readStatus, "Mocked Error"))
324+
;
325+
continue;
326+
}
327+
readBuilder
328+
.Result()
329+
.AddResponse(
330+
MakeRecordBatch(
331+
MakeArray<arrow::UInt64Builder, uint64_t>("id", {0, 1, 2}, arrow::uint64()),
332+
MakeArray<arrow::UInt64Builder, uint64_t>("optional_id", {100, 101, 103}, arrow::uint64()), // the last value is intentially wrong
333+
MakeArray<arrow::StringBuilder, std::string>("string_value", {"a", "b", "c"}, arrow::utf8())
334+
),
335+
NewSuccess()
336+
)
337+
;
338+
}
339+
}
340+
// clang-format on
341+
342+
NYql::Generic::TLookupSource lookupSourceSettings;
343+
*lookupSourceSettings.mutable_data_source_instance() = dsi;
344+
lookupSourceSettings.Settable("lookup_test");
345+
lookupSourceSettings.SetServiceAccountId("testsaid");
346+
lookupSourceSettings.SetServiceAccountIdSignature("fake_signature");
347+
348+
google::protobuf::Any packedLookupSource;
349+
Y_ABORT_UNLESS(packedLookupSource.PackFrom(lookupSourceSettings));
350+
351+
NKikimr::NMiniKQL::TStructTypeBuilder keyTypeBuilder{typeEnv};
352+
keyTypeBuilder.Add("id", typeBuilder.NewDataType(NYql::NUdf::EDataSlot::Uint64, false));
353+
keyTypeBuilder.Add("optional_id", typeBuilder.NewDataType(NYql::NUdf::EDataSlot::Uint64, true));
354+
NKikimr::NMiniKQL::TStructTypeBuilder outputypeBuilder{typeEnv};
355+
outputypeBuilder.Add("string_value", typeBuilder.NewDataType(NYql::NUdf::EDataSlot::String, true));
356+
357+
auto guard = Guard(*alloc.get());
358+
auto keyTypeHelper = std::make_shared<NYql::NDq::IDqAsyncLookupSource::TKeyTypeHelper>(keyTypeBuilder.Build());
359+
360+
auto [lookupSource, actor] = NYql::NDq::CreateGenericLookupActor(
361+
connectorMock,
362+
std::make_shared<NYql::NTestCreds::TSecuredServiceAccountCredentialsFactory>(),
363+
edge,
364+
nullptr,
365+
alloc,
366+
keyTypeHelper,
367+
std::move(lookupSourceSettings),
368+
keyTypeBuilder.Build(),
369+
outputypeBuilder.Build(),
370+
typeEnv,
371+
holderFactory,
372+
1'000'000);
373+
auto lookupActor = runtime.Register(actor);
374+
375+
auto request = std::make_shared<NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap>(3, keyTypeHelper->GetValueHash(), keyTypeHelper->GetValueEqual());
376+
for (size_t i = 0; i != 3; ++i) {
377+
NYql::NUdf::TUnboxedValue* keyItems;
378+
auto key = holderFactory.CreateDirectArrayHolder(2, keyItems);
379+
keyItems[0] = NYql::NUdf::TUnboxedValuePod(ui64(i));
380+
keyItems[1] = NYql::NUdf::TUnboxedValuePod(ui64(100 + i));
381+
request->emplace(std::move(key), NYql::NUdf::TUnboxedValue{});
382+
}
383+
384+
guard.Release(); // let actors use alloc
385+
386+
auto callLookupActor = new TCallLookupActor(alloc, lookupActor, request);
387+
runtime.Register(callLookupActor);
388+
389+
auto ev = runtime.GrabEdgeEventRethrow<NYql::NDq::IDqAsyncLookupSource::TEvLookupResult>(edge);
390+
auto guard2 = Guard(*alloc.get());
391+
auto lookupResult = ev->Get()->Result.lock();
392+
UNIT_ASSERT(lookupResult);
393+
394+
UNIT_ASSERT_EQUAL(3, lookupResult->size());
395+
{
396+
const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {0, 100}));
397+
UNIT_ASSERT(v);
398+
NYql::NUdf::TUnboxedValue val = v->GetElement(0);
399+
UNIT_ASSERT(val.AsStringRef() == TStringBuf("a"));
400+
}
401+
{
402+
const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {1, 101}));
403+
UNIT_ASSERT(v);
404+
NYql::NUdf::TUnboxedValue val = v->GetElement(0);
405+
UNIT_ASSERT(val.AsStringRef() == TStringBuf("b"));
406+
}
407+
{
408+
const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {2, 102}));
409+
UNIT_ASSERT(v);
410+
UNIT_ASSERT(!*v);
411+
}
412+
}
413+
229414
} // Y_UNIT_TEST_SUITE(GenericProviderLookupActor)

ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -681,19 +681,25 @@ namespace NYql::NConnector::NTest {
681681
return TListSplitsResultBuilder<TBuilder>(ResponseResult_, this);
682682
}
683683

684+
auto& Status(const NYdbGrpc::TGrpcStatus& status) {
685+
ResponseStatus_ = status;
686+
return *this;
687+
}
688+
684689
void FillWithDefaults() {
685690
Result();
686691
}
687692

688693
private:
689694
void SetExpectation() {
690695
EXPECT_CALL(*Mock_, ListSplitsImpl(ProtobufRequestMatcher(*Result_)))
691-
.WillOnce(Return(TIteratorResult<IListSplitsStreamIterator>{NYdbGrpc::TGrpcStatus(), ResponseResult_}));
696+
.WillOnce(Return(TIteratorResult<IListSplitsStreamIterator>{ResponseStatus_, ResponseResult_}));
692697
}
693698

694699
private:
695700
TConnectorClientMock* Mock_ = nullptr;
696701
TListSplitsStreamIteratorMock::TPtr ResponseResult_ = std::make_shared<TListSplitsStreamIteratorMock>();
702+
NYdbGrpc::TGrpcStatus ResponseStatus_ {};
697703
};
698704

699705
template <class TParent = void /* no parent by default */>
@@ -751,19 +757,25 @@ namespace NYql::NConnector::NTest {
751757
return TReadSplitsResultBuilder<TBuilder>(ResponseResult_, this);
752758
}
753759

760+
auto& Status(const NYdbGrpc::TGrpcStatus& status) {
761+
ResponseStatus_ = status;
762+
return *this;
763+
}
764+
754765
void FillWithDefaults() {
755766
Format(NApi::TReadSplitsRequest::ARROW_IPC_STREAMING);
756767
}
757768

758769
private:
759770
void SetExpectation() {
760771
EXPECT_CALL(*Mock_, ReadSplitsImpl(ProtobufRequestMatcher(*Result_)))
761-
.WillOnce(Return(TIteratorResult<IReadSplitsStreamIterator>{NYdbGrpc::TGrpcStatus(), ResponseResult_}));
772+
.WillOnce(Return(TIteratorResult<IReadSplitsStreamIterator>{ResponseStatus_, ResponseResult_}));
762773
}
763774

764775
private:
765776
TConnectorClientMock* Mock_ = nullptr;
766777
TReadSplitsStreamIteratorMock::TPtr ResponseResult_ = std::make_shared<TReadSplitsStreamIteratorMock>();
778+
NYdbGrpc::TGrpcStatus ResponseStatus_ {};
767779
};
768780

769781
TDescribeTableExpectationBuilder ExpectDescribeTable() {

0 commit comments

Comments
 (0)