Skip to content

Commit ea09754

Browse files
authored
Generic lookup add timeouts (#13239)
1 parent 798b717 commit ea09754

File tree

6 files changed

+227
-24
lines changed

6 files changed

+227
-24
lines changed

ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,4 +226,189 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
226226
}
227227
}
228228

229+
Y_UNIT_TEST(LookupWithErrors) {
230+
auto alloc = std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false);
231+
NKikimr::NMiniKQL::TMemoryUsageInfo memUsage("TestMemUsage");
232+
NKikimr::NMiniKQL::THolderFactory holderFactory(alloc->Ref(), memUsage);
233+
NKikimr::NMiniKQL::TTypeEnvironment typeEnv(*alloc);
234+
NKikimr::NMiniKQL::TTypeBuilder typeBuilder(typeEnv);
235+
236+
auto loggerConfig = NYql::NProto::TLoggingConfig();
237+
loggerConfig.set_allcomponentslevel(::NYql::NProto::TLoggingConfig_ELevel::TLoggingConfig_ELevel_TRACE);
238+
NYql::NLog::InitLogger(loggerConfig, false);
239+
240+
TTestActorRuntimeBase runtime(1, 1, true);
241+
runtime.Initialize();
242+
auto edge = runtime.AllocateEdgeActor();
243+
244+
NYql::TGenericDataSourceInstance dsi;
245+
dsi.Setkind(NYql::EGenericDataSourceKind::YDB);
246+
dsi.mutable_endpoint()->Sethost("some_host");
247+
dsi.mutable_endpoint()->Setport(2135);
248+
dsi.Setdatabase("some_db");
249+
dsi.Setuse_tls(true);
250+
dsi.set_protocol(::NYql::EGenericProtocol::NATIVE);
251+
auto token = dsi.mutable_credentials()->mutable_token();
252+
token->Settype("IAM");
253+
token->Setvalue("TEST_TOKEN");
254+
255+
auto connectorMock = std::make_shared<NYql::NConnector::NTest::TConnectorClientMock>();
256+
257+
// clang-format off
258+
// step 1: ListSplits
259+
{
260+
::testing::InSequence seq;
261+
for (grpc::StatusCode readStatus : { grpc::StatusCode::UNAVAILABLE, grpc::StatusCode::OK }) {
262+
for (grpc::StatusCode listStatus : { grpc::StatusCode::UNAVAILABLE, readStatus == grpc::StatusCode::OK ? grpc::StatusCode::DEADLINE_EXCEEDED : grpc::StatusCode::UNAVAILABLE, grpc::StatusCode::OK }) {
263+
auto listBuilder = connectorMock->ExpectListSplits();
264+
listBuilder
265+
.Select()
266+
.DataSourceInstance(dsi)
267+
.What()
268+
.Column("id", Ydb::Type::UINT64)
269+
.NullableColumn("optional_id", Ydb::Type::UINT64)
270+
.NullableColumn("string_value", Ydb::Type::STRING)
271+
.Done()
272+
.Table("lookup_test")
273+
.Where()
274+
.Filter()
275+
.Disjunction()
276+
.Operand()
277+
.Conjunction()
278+
.Operand().Equal().Column("id").Value<ui64>(2).Done().Done()
279+
.Operand().Equal().Column("optional_id").OptionalValue<ui64>(102).Done().Done()
280+
.Done()
281+
.Done()
282+
.Operand()
283+
.Conjunction()
284+
.Operand().Equal().Column("id").Value<ui64>(1).Done().Done()
285+
.Operand().Equal().Column("optional_id").OptionalValue<ui64>(101).Done().Done()
286+
.Done()
287+
.Done()
288+
.Operand()
289+
.Conjunction()
290+
.Operand().Equal().Column("id").Value<ui64>(0).Done().Done()
291+
.Operand().Equal().Column("optional_id").OptionalValue<ui64>(100).Done().Done()
292+
.Done()
293+
.Done()
294+
.Done()
295+
.Done()
296+
.Done()
297+
.Done()
298+
.MaxSplitCount(1)
299+
;
300+
if (listStatus != grpc::StatusCode::OK) {
301+
listBuilder
302+
.Status(NYdbGrpc::TGrpcStatus(listStatus, "Mocked Error"))
303+
;
304+
continue;
305+
}
306+
listBuilder
307+
.Result()
308+
.AddResponse(NewSuccess())
309+
.Description("Actual split info is not important")
310+
;
311+
}
312+
313+
auto readBuilder = connectorMock->ExpectReadSplits();
314+
readBuilder
315+
.DataSourceInstance(dsi)
316+
.Filtering(NYql::NConnector::NApi::TReadSplitsRequest::FILTERING_MANDATORY)
317+
.Split()
318+
.Description("Actual split info is not important")
319+
.Done()
320+
;
321+
if (readStatus != grpc::StatusCode::OK) {
322+
readBuilder
323+
.Status(NYdbGrpc::TGrpcStatus(readStatus, "Mocked Error"))
324+
;
325+
continue;
326+
}
327+
readBuilder
328+
.Result()
329+
.AddResponse(
330+
MakeRecordBatch(
331+
MakeArray<arrow::UInt64Builder, uint64_t>("id", {0, 1, 2}, arrow::uint64()),
332+
MakeArray<arrow::UInt64Builder, uint64_t>("optional_id", {100, 101, 103}, arrow::uint64()), // the last value is intentially wrong
333+
MakeArray<arrow::StringBuilder, std::string>("string_value", {"a", "b", "c"}, arrow::utf8())
334+
),
335+
NewSuccess()
336+
)
337+
;
338+
}
339+
}
340+
// clang-format on
341+
342+
NYql::Generic::TLookupSource lookupSourceSettings;
343+
*lookupSourceSettings.mutable_data_source_instance() = dsi;
344+
lookupSourceSettings.Settable("lookup_test");
345+
lookupSourceSettings.SetServiceAccountId("testsaid");
346+
lookupSourceSettings.SetServiceAccountIdSignature("fake_signature");
347+
348+
google::protobuf::Any packedLookupSource;
349+
Y_ABORT_UNLESS(packedLookupSource.PackFrom(lookupSourceSettings));
350+
351+
NKikimr::NMiniKQL::TStructTypeBuilder keyTypeBuilder{typeEnv};
352+
keyTypeBuilder.Add("id", typeBuilder.NewDataType(NYql::NUdf::EDataSlot::Uint64, false));
353+
keyTypeBuilder.Add("optional_id", typeBuilder.NewDataType(NYql::NUdf::EDataSlot::Uint64, true));
354+
NKikimr::NMiniKQL::TStructTypeBuilder outputypeBuilder{typeEnv};
355+
outputypeBuilder.Add("string_value", typeBuilder.NewDataType(NYql::NUdf::EDataSlot::String, true));
356+
357+
auto guard = Guard(*alloc.get());
358+
auto keyTypeHelper = std::make_shared<NYql::NDq::IDqAsyncLookupSource::TKeyTypeHelper>(keyTypeBuilder.Build());
359+
360+
auto [lookupSource, actor] = NYql::NDq::CreateGenericLookupActor(
361+
connectorMock,
362+
std::make_shared<NYql::NTestCreds::TSecuredServiceAccountCredentialsFactory>(),
363+
edge,
364+
nullptr,
365+
alloc,
366+
keyTypeHelper,
367+
std::move(lookupSourceSettings),
368+
keyTypeBuilder.Build(),
369+
outputypeBuilder.Build(),
370+
typeEnv,
371+
holderFactory,
372+
1'000'000);
373+
auto lookupActor = runtime.Register(actor);
374+
375+
auto request = std::make_shared<NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap>(3, keyTypeHelper->GetValueHash(), keyTypeHelper->GetValueEqual());
376+
for (size_t i = 0; i != 3; ++i) {
377+
NYql::NUdf::TUnboxedValue* keyItems;
378+
auto key = holderFactory.CreateDirectArrayHolder(2, keyItems);
379+
keyItems[0] = NYql::NUdf::TUnboxedValuePod(ui64(i));
380+
keyItems[1] = NYql::NUdf::TUnboxedValuePod(ui64(100 + i));
381+
request->emplace(std::move(key), NYql::NUdf::TUnboxedValue{});
382+
}
383+
384+
guard.Release(); // let actors use alloc
385+
386+
auto callLookupActor = new TCallLookupActor(alloc, lookupActor, request);
387+
runtime.Register(callLookupActor);
388+
389+
auto ev = runtime.GrabEdgeEventRethrow<NYql::NDq::IDqAsyncLookupSource::TEvLookupResult>(edge);
390+
auto guard2 = Guard(*alloc.get());
391+
auto lookupResult = ev->Get()->Result.lock();
392+
UNIT_ASSERT(lookupResult);
393+
394+
UNIT_ASSERT_EQUAL(3, lookupResult->size());
395+
{
396+
const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {0, 100}));
397+
UNIT_ASSERT(v);
398+
NYql::NUdf::TUnboxedValue val = v->GetElement(0);
399+
UNIT_ASSERT(val.AsStringRef() == TStringBuf("a"));
400+
}
401+
{
402+
const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {1, 101}));
403+
UNIT_ASSERT(v);
404+
NYql::NUdf::TUnboxedValue val = v->GetElement(0);
405+
UNIT_ASSERT(val.AsStringRef() == TStringBuf("b"));
406+
}
407+
{
408+
const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {2, 102}));
409+
UNIT_ASSERT(v);
410+
UNIT_ASSERT(!*v);
411+
}
412+
}
413+
229414
} // Y_UNIT_TEST_SUITE(GenericProviderLookupActor)

ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ namespace NYql::NDq {
2929

3030
namespace {
3131
constexpr ui32 RequestRetriesLimit = 10; // TODO lookup parameters or PRAGMA?
32+
constexpr TDuration RequestTimeout = TDuration::Minutes(3); // TODO lookup parameters or PRAGMA?
3233

3334
const NKikimr::NMiniKQL::TStructType* MergeStructTypes(const NKikimr::NMiniKQL::TTypeEnvironment& env, const NKikimr::NMiniKQL::TStructType* t1, const NKikimr::NMiniKQL::TStructType* t2) {
3435
Y_ABORT_UNLESS(t1);
@@ -195,7 +196,7 @@ namespace NYql::NDq {
195196
*readRequest.add_splits() = split;
196197
readRequest.Setformat(NConnector::NApi::TReadSplitsRequest_EFormat::TReadSplitsRequest_EFormat_ARROW_IPC_STREAMING);
197198
readRequest.set_filtering(NConnector::NApi::TReadSplitsRequest::FILTERING_MANDATORY);
198-
Connector->ReadSplits(readRequest).Subscribe([
199+
Connector->ReadSplits(readRequest, RequestTimeout).Subscribe([
199200
actorSystem = TActivationContext::ActorSystem(),
200201
selfId = SelfId(),
201202
retriesRemaining = RetriesRemaining
@@ -284,7 +285,7 @@ namespace NYql::NDq {
284285
};
285286

286287
splitRequest.Setmax_split_count(1);
287-
Connector->ListSplits(splitRequest).Subscribe([
288+
Connector->ListSplits(splitRequest, RequestTimeout).Subscribe([
288289
actorSystem = TActivationContext::ActorSystem(),
289290
selfId = SelfId(),
290291
retriesRemaining = RetriesRemaining
@@ -395,13 +396,17 @@ namespace NYql::NDq {
395396
}
396397

397398
static void SendRetryOrError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, ui32 retriesRemaining) {
398-
if (NConnector::GrpcStatusNeedsRetry(status)) {
399+
if (NConnector::GrpcStatusNeedsRetry(status) || status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) {
399400
if (retriesRemaining) {
400401
const auto retry = RequestRetriesLimit - retriesRemaining;
401402
const auto delay = TDuration::MilliSeconds(1u << retry); // Exponential delay from 1ms to ~0.5s
402403
// <<< TODO tune/tweak
403404
YQL_CLOG(WARN, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry " << (retry + 1) << " of " << RequestRetriesLimit << ", scheduled in " << delay;
404405
--retriesRemaining;
406+
if (status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) {
407+
// if error was deadline, retry only once
408+
retriesRemaining = 0; // TODO tune/tweak
409+
}
405410
actorSystem->Schedule(delay, new IEventHandle(selfId, selfId, new TEvRetry(retriesRemaining)));
406411
return;
407412
}

ydb/library/yql/providers/generic/connector/libcpp/client.cpp

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,16 @@ namespace NYql::NConnector {
5353
});
5454
}
5555

56-
virtual TDescribeTableAsyncResult DescribeTable(const NApi::TDescribeTableRequest& request) override {
57-
return UnaryCall<NApi::TDescribeTableRequest, NApi::TDescribeTableResponse>(request, &NApi::Connector::Stub::AsyncDescribeTable);
56+
virtual TDescribeTableAsyncResult DescribeTable(const NApi::TDescribeTableRequest& request, TDuration timeout = {}) override {
57+
return UnaryCall<NApi::TDescribeTableRequest, NApi::TDescribeTableResponse>(request, &NApi::Connector::Stub::AsyncDescribeTable, timeout);
5858
}
5959

60-
virtual TListSplitsStreamIteratorAsyncResult ListSplits(const NApi::TListSplitsRequest& request) override {
61-
return ServerSideStreamingCall<NApi::TListSplitsRequest, NApi::TListSplitsResponse>(request, &NApi::Connector::Stub::AsyncListSplits);
60+
virtual TListSplitsStreamIteratorAsyncResult ListSplits(const NApi::TListSplitsRequest& request, TDuration timeout = {}) override {
61+
return ServerSideStreamingCall<NApi::TListSplitsRequest, NApi::TListSplitsResponse>(request, &NApi::Connector::Stub::AsyncListSplits, timeout);
6262
}
6363

64-
virtual TReadSplitsStreamIteratorAsyncResult ReadSplits(const NApi::TReadSplitsRequest& request) override {
65-
return ServerSideStreamingCall<NApi::TReadSplitsRequest, NApi::TReadSplitsResponse>(request, &NApi::Connector::Stub::AsyncReadSplits);
64+
virtual TReadSplitsStreamIteratorAsyncResult ReadSplits(const NApi::TReadSplitsRequest& request, TDuration timeout = {}) override {
65+
return ServerSideStreamingCall<NApi::TReadSplitsRequest, NApi::TReadSplitsResponse>(request, &NApi::Connector::Stub::AsyncReadSplits, timeout);
6666
}
6767

6868
~TClientGRPC() {
@@ -80,7 +80,7 @@ namespace NYql::NConnector {
8080
template <class TRequest, class TResponse>
8181
TAsyncResult<TResponse> UnaryCall(
8282
const TRequest& request,
83-
typename NYdbGrpc::TSimpleRequestProcessor<NApi::Connector::Stub, TRequest, TResponse>::TAsyncRequest rpc) {
83+
typename NYdbGrpc::TSimpleRequestProcessor<NApi::Connector::Stub, TRequest, TResponse>::TAsyncRequest rpc, TDuration timeout = {}) {
8484
auto context = GrpcClient_->CreateContext();
8585
if (!context) {
8686
throw yexception() << "Client is being shutted down";
@@ -95,7 +95,7 @@ namespace NYql::NConnector {
9595
std::move(request),
9696
std::move(callback),
9797
rpc,
98-
{},
98+
{ .Timeout = timeout },
9999
context.get());
100100

101101
return promise.GetFuture();
@@ -104,7 +104,8 @@ namespace NYql::NConnector {
104104
template <class TRequest, class TResponse>
105105
TIteratorAsyncResult<IStreamIterator<TResponse>> ServerSideStreamingCall(
106106
const TRequest& request,
107-
TStreamRpc<NApi::Connector::Stub, TRequest, TResponse, NYdbGrpc::TStreamRequestReadProcessor> rpc) {
107+
TStreamRpc<NApi::Connector::Stub, TRequest, TResponse, NYdbGrpc::TStreamRequestReadProcessor> rpc,
108+
TDuration timeout = {}) {
108109
using TStreamProcessorPtr = typename NYdbGrpc::IStreamRequestReadProcessor<TResponse>::TPtr;
109110
using TStreamInitResult = std::pair<NYdbGrpc::TGrpcStatus, TStreamProcessorPtr>;
110111

@@ -121,7 +122,7 @@ namespace NYql::NConnector {
121122
promise.SetValue({std::move(status), streamProcessor});
122123
},
123124
rpc,
124-
{},
125+
{ .Timeout = timeout },
125126
context.get());
126127

127128
// TODO: async handling YQ-2513

ydb/library/yql/providers/generic/connector/libcpp/client.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,9 @@ namespace NYql::NConnector {
8989
public:
9090
using TPtr = std::shared_ptr<IClient>;
9191

92-
virtual TDescribeTableAsyncResult DescribeTable(const NApi::TDescribeTableRequest& request) = 0;
93-
virtual TListSplitsStreamIteratorAsyncResult ListSplits(const NApi::TListSplitsRequest& request) = 0;
94-
virtual TReadSplitsStreamIteratorAsyncResult ReadSplits(const NApi::TReadSplitsRequest& request) = 0;
92+
virtual TDescribeTableAsyncResult DescribeTable(const NApi::TDescribeTableRequest& request, TDuration timeout = {}) = 0;
93+
virtual TListSplitsStreamIteratorAsyncResult ListSplits(const NApi::TListSplitsRequest& request, TDuration timeout = {}) = 0;
94+
virtual TReadSplitsStreamIteratorAsyncResult ReadSplits(const NApi::TReadSplitsRequest& request, TDuration timeout = {}) = 0;
9595
virtual ~IClient() = default;
9696
};
9797

0 commit comments

Comments
 (0)