Skip to content

Commit 12883ed

Browse files
committed
generic lookup retries: add tests
1 parent 3872b37 commit 12883ed

File tree

3 files changed

+203
-2
lines changed

3 files changed

+203
-2
lines changed

ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,4 +226,191 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
226226
}
227227
}
228228

229+
Y_UNIT_TEST(LookupWithErrors) {
230+
auto alloc = std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false);
231+
NKikimr::NMiniKQL::TMemoryUsageInfo memUsage("TestMemUsage");
232+
NKikimr::NMiniKQL::THolderFactory holderFactory(alloc->Ref(), memUsage);
233+
NKikimr::NMiniKQL::TTypeEnvironment typeEnv(*alloc);
234+
NKikimr::NMiniKQL::TTypeBuilder typeBuilder(typeEnv);
235+
236+
auto loggerConfig = NYql::NProto::TLoggingConfig();
237+
loggerConfig.set_allcomponentslevel(::NYql::NProto::TLoggingConfig_ELevel::TLoggingConfig_ELevel_TRACE);
238+
NYql::NLog::InitLogger(loggerConfig, false);
239+
240+
TTestActorRuntimeBase runtime(1, 1, true);
241+
runtime.Initialize();
242+
auto edge = runtime.AllocateEdgeActor();
243+
244+
NYql::TGenericDataSourceInstance dsi;
245+
dsi.Setkind(NYql::EGenericDataSourceKind::YDB);
246+
dsi.mutable_endpoint()->Sethost("some_host");
247+
dsi.mutable_endpoint()->Setport(2135);
248+
dsi.Setdatabase("some_db");
249+
dsi.Setuse_tls(true);
250+
dsi.set_protocol(::NYql::EGenericProtocol::NATIVE);
251+
auto token = dsi.mutable_credentials()->mutable_token();
252+
token->Settype("IAM");
253+
token->Setvalue("TEST_TOKEN");
254+
255+
auto connectorMock = std::make_shared<NYql::NConnector::NTest::TConnectorClientMock>();
256+
257+
// clang-format off
258+
// step 1: ListSplits
259+
{
260+
::testing::InSequence seq;
261+
for (grpc::StatusCode readStatus : { grpc::StatusCode::UNAVAILABLE, grpc::StatusCode::DEADLINE_EXCEEDED, grpc::StatusCode::OK }) {
262+
for (grpc::StatusCode listStatus : { grpc::StatusCode::UNAVAILABLE, grpc::StatusCode::DEADLINE_EXCEEDED, grpc::StatusCode::OK }) {
263+
auto listBuilder = connectorMock->ExpectListSplits();
264+
listBuilder
265+
.Select()
266+
.DataSourceInstance(dsi)
267+
.What()
268+
.Column("id", Ydb::Type::UINT64)
269+
.NullableColumn("optional_id", Ydb::Type::UINT64)
270+
.NullableColumn("string_value", Ydb::Type::STRING)
271+
.Done()
272+
.Table("lookup_test")
273+
.Where()
274+
.Filter()
275+
.Disjunction()
276+
.Operand()
277+
.Conjunction()
278+
.Operand().Equal().Column("id").Value<ui64>(2).Done().Done()
279+
.Operand().Equal().Column("optional_id").OptionalValue<ui64>(102).Done().Done()
280+
.Done()
281+
.Done()
282+
.Operand()
283+
.Conjunction()
284+
.Operand().Equal().Column("id").Value<ui64>(1).Done().Done()
285+
.Operand().Equal().Column("optional_id").OptionalValue<ui64>(101).Done().Done()
286+
.Done()
287+
.Done()
288+
.Operand()
289+
.Conjunction()
290+
.Operand().Equal().Column("id").Value<ui64>(0).Done().Done()
291+
.Operand().Equal().Column("optional_id").OptionalValue<ui64>(100).Done().Done()
292+
.Done()
293+
.Done()
294+
.Done()
295+
.Done()
296+
.Done()
297+
.Done()
298+
.MaxSplitCount(1)
299+
;
300+
Cerr << "Simulate " << (int)listStatus << Endl;
301+
if (listStatus != grpc::StatusCode::OK) {
302+
listBuilder
303+
.Status(NYdbGrpc::TGrpcStatus(listStatus, "Mocked Error"))
304+
;
305+
continue;
306+
}
307+
listBuilder
308+
.Result()
309+
.AddResponse(NewSuccess())
310+
.Description("Actual split info is not important")
311+
;
312+
}
313+
314+
auto readBuilder = connectorMock->ExpectReadSplits();
315+
readBuilder
316+
.DataSourceInstance(dsi)
317+
.Filtering(NYql::NConnector::NApi::TReadSplitsRequest::FILTERING_MANDATORY)
318+
.Split()
319+
.Description("Actual split info is not important")
320+
.Done()
321+
;
322+
Cerr << "Simulate " << (int)readStatus << Endl;
323+
if (readStatus != grpc::StatusCode::OK) {
324+
readBuilder
325+
.Status(NYdbGrpc::TGrpcStatus(readStatus, "Mocked Error"))
326+
;
327+
continue;
328+
}
329+
readBuilder
330+
.Result()
331+
.AddResponse(
332+
MakeRecordBatch(
333+
MakeArray<arrow::UInt64Builder, uint64_t>("id", {0, 1, 2}, arrow::uint64()),
334+
MakeArray<arrow::UInt64Builder, uint64_t>("optional_id", {100, 101, 103}, arrow::uint64()), // the last value is intentially wrong
335+
MakeArray<arrow::StringBuilder, std::string>("string_value", {"a", "b", "c"}, arrow::utf8())
336+
),
337+
NewSuccess()
338+
)
339+
;
340+
}
341+
}
342+
// clang-format on
343+
344+
NYql::Generic::TLookupSource lookupSourceSettings;
345+
*lookupSourceSettings.mutable_data_source_instance() = dsi;
346+
lookupSourceSettings.Settable("lookup_test");
347+
lookupSourceSettings.SetServiceAccountId("testsaid");
348+
lookupSourceSettings.SetServiceAccountIdSignature("fake_signature");
349+
350+
google::protobuf::Any packedLookupSource;
351+
Y_ABORT_UNLESS(packedLookupSource.PackFrom(lookupSourceSettings));
352+
353+
NKikimr::NMiniKQL::TStructTypeBuilder keyTypeBuilder{typeEnv};
354+
keyTypeBuilder.Add("id", typeBuilder.NewDataType(NYql::NUdf::EDataSlot::Uint64, false));
355+
keyTypeBuilder.Add("optional_id", typeBuilder.NewDataType(NYql::NUdf::EDataSlot::Uint64, true));
356+
NKikimr::NMiniKQL::TStructTypeBuilder outputypeBuilder{typeEnv};
357+
outputypeBuilder.Add("string_value", typeBuilder.NewDataType(NYql::NUdf::EDataSlot::String, true));
358+
359+
auto guard = Guard(*alloc.get());
360+
auto keyTypeHelper = std::make_shared<NYql::NDq::IDqAsyncLookupSource::TKeyTypeHelper>(keyTypeBuilder.Build());
361+
362+
auto [lookupSource, actor] = NYql::NDq::CreateGenericLookupActor(
363+
connectorMock,
364+
std::make_shared<NYql::NTestCreds::TSecuredServiceAccountCredentialsFactory>(),
365+
edge,
366+
nullptr,
367+
alloc,
368+
keyTypeHelper,
369+
std::move(lookupSourceSettings),
370+
keyTypeBuilder.Build(),
371+
outputypeBuilder.Build(),
372+
typeEnv,
373+
holderFactory,
374+
1'000'000);
375+
auto lookupActor = runtime.Register(actor);
376+
377+
auto request = std::make_shared<NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap>(3, keyTypeHelper->GetValueHash(), keyTypeHelper->GetValueEqual());
378+
for (size_t i = 0; i != 3; ++i) {
379+
NYql::NUdf::TUnboxedValue* keyItems;
380+
auto key = holderFactory.CreateDirectArrayHolder(2, keyItems);
381+
keyItems[0] = NYql::NUdf::TUnboxedValuePod(ui64(i));
382+
keyItems[1] = NYql::NUdf::TUnboxedValuePod(ui64(100 + i));
383+
request->emplace(std::move(key), NYql::NUdf::TUnboxedValue{});
384+
}
385+
386+
guard.Release(); // let actors use alloc
387+
388+
auto callLookupActor = new TCallLookupActor(alloc, lookupActor, request);
389+
runtime.Register(callLookupActor);
390+
391+
auto ev = runtime.GrabEdgeEventRethrow<NYql::NDq::IDqAsyncLookupSource::TEvLookupResult>(edge);
392+
auto guard2 = Guard(*alloc.get());
393+
auto lookupResult = ev->Get()->Result.lock();
394+
UNIT_ASSERT(lookupResult);
395+
396+
UNIT_ASSERT_EQUAL(3, lookupResult->size());
397+
{
398+
const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {0, 100}));
399+
UNIT_ASSERT(v);
400+
NYql::NUdf::TUnboxedValue val = v->GetElement(0);
401+
UNIT_ASSERT(val.AsStringRef() == TStringBuf("a"));
402+
}
403+
{
404+
const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {1, 101}));
405+
UNIT_ASSERT(v);
406+
NYql::NUdf::TUnboxedValue val = v->GetElement(0);
407+
UNIT_ASSERT(val.AsStringRef() == TStringBuf("b"));
408+
}
409+
{
410+
const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {2, 102}));
411+
UNIT_ASSERT(v);
412+
UNIT_ASSERT(!*v);
413+
}
414+
}
415+
229416
} // Y_UNIT_TEST_SUITE(GenericProviderLookupActor)

ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,8 +397,10 @@ namespace NYql::NDq {
397397
static void SendRetryOrError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, const ui32 retriesRemaining) {
398398
if (NConnector::GrpcStatusNeedsRetry(status) || status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) {
399399
if (retriesRemaining) {
400+
[[maybe_unused]]
400401
const auto retry = RequestRetriesLimit - retriesRemaining;
401402
// XXX FIXME tune/tweak
403+
[[maybe_unused]]
402404
const auto delay = TDuration::MilliSeconds(1u << retry); // Exponential delay from 1ms to 1s
403405
YQL_CLOG(WARN, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry " << (retry + 1) << " of " << RequestRetriesLimit << ", scheduled in " << delay;
404406
actorSystem->Schedule(delay, new IEventHandle(selfId, selfId, new TEvRetry()));

ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -681,19 +681,25 @@ namespace NYql::NConnector::NTest {
681681
return TListSplitsResultBuilder<TBuilder>(ResponseResult_, this);
682682
}
683683

684+
auto& Status(const NYdbGrpc::TGrpcStatus &status) {
685+
ResponseStatus_ = status;
686+
return *this;
687+
}
688+
684689
void FillWithDefaults() {
685690
Result();
686691
}
687692

688693
private:
689694
void SetExpectation() {
690695
EXPECT_CALL(*Mock_, ListSplitsImpl(ProtobufRequestMatcher(*Result_)))
691-
.WillOnce(Return(TIteratorResult<IListSplitsStreamIterator>{NYdbGrpc::TGrpcStatus(), ResponseResult_}));
696+
.WillOnce(Return(TIteratorResult<IListSplitsStreamIterator>{ResponseStatus_, ResponseResult_}));
692697
}
693698

694699
private:
695700
TConnectorClientMock* Mock_ = nullptr;
696701
TListSplitsStreamIteratorMock::TPtr ResponseResult_ = std::make_shared<TListSplitsStreamIteratorMock>();
702+
NYdbGrpc::TGrpcStatus ResponseStatus_ {};
697703
};
698704

699705
template <class TParent = void /* no parent by default */>
@@ -751,19 +757,25 @@ namespace NYql::NConnector::NTest {
751757
return TReadSplitsResultBuilder<TBuilder>(ResponseResult_, this);
752758
}
753759

760+
auto& Status(const NYdbGrpc::TGrpcStatus &status) {
761+
ResponseStatus_ = status;
762+
return *this;
763+
}
764+
754765
void FillWithDefaults() {
755766
Format(NApi::TReadSplitsRequest::ARROW_IPC_STREAMING);
756767
}
757768

758769
private:
759770
void SetExpectation() {
760771
EXPECT_CALL(*Mock_, ReadSplitsImpl(ProtobufRequestMatcher(*Result_)))
761-
.WillOnce(Return(TIteratorResult<IReadSplitsStreamIterator>{NYdbGrpc::TGrpcStatus(), ResponseResult_}));
772+
.WillOnce(Return(TIteratorResult<IReadSplitsStreamIterator>{ResponseStatus_, ResponseResult_}));
762773
}
763774

764775
private:
765776
TConnectorClientMock* Mock_ = nullptr;
766777
TReadSplitsStreamIteratorMock::TPtr ResponseResult_ = std::make_shared<TReadSplitsStreamIteratorMock>();
778+
NYdbGrpc::TGrpcStatus ResponseStatus_ {};
767779
};
768780

769781
TDescribeTableExpectationBuilder ExpectDescribeTable() {

0 commit comments

Comments
 (0)