Skip to content

Commit d32b402

Browse files
yumkamazevaykin
authored andcommitted
generic lookup: handle retrievable errors in grpc (ydb-platform#13119)
1 parent 7bc8ae1 commit d32b402

File tree

2 files changed

+66
-12
lines changed

2 files changed

+66
-12
lines changed

ydb/library/yql/providers/generic/actors/yql_generic_base_actor.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ namespace NYql::NDq {
2121
EvReadSplitsPart,
2222
EvReadSplitsFinished,
2323
EvError,
24+
EvRetry,
2425
EvEnd
2526
};
2627

@@ -89,6 +90,15 @@ namespace NYql::NDq {
8990
NConnector::NApi::TError Error;
9091
};
9192

93+
struct TEvRetry: NActors::TEventLocal<TEvRetry, EvRetry> {
94+
explicit TEvRetry(ui32 nextRetries)
95+
: NextRetries(nextRetries)
96+
{
97+
}
98+
99+
ui32 NextRetries;
100+
};
101+
92102
protected: // TODO move common logic here
93103
};
94104

ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ namespace NYql::NDq {
2828
using namespace NActors;
2929

3030
namespace {
31+
constexpr ui32 RequestRetriesLimit = 10; // TODO lookup parameters or PRAGMA?
3132

3233
const NKikimr::NMiniKQL::TStructType* MergeStructTypes(const NKikimr::NMiniKQL::TTypeEnvironment& env, const NKikimr::NMiniKQL::TStructType* t1, const NKikimr::NMiniKQL::TStructType* t2) {
3334
Y_ABORT_UNLESS(t1);
@@ -45,7 +46,7 @@ namespace NYql::NDq {
4546
template <typename T>
4647
T ExtractFromConstFuture(const NThreading::TFuture<T>& f) {
4748
// We want to avoid making a copy of data stored in a future.
48-
// But there is no direct way to extract data from a const future5
49+
// But there is no direct way to extract data from a const future
4950
// So, we make a copy of the future, that is cheap. Then, extract the value from this copy.
5051
// It destructs the value in the original future, but this trick is legal and documented here:
5152
// https://docs.yandex-team.ru/arcadia-cpp/cookbook/concurrency
@@ -155,20 +156,25 @@ namespace NYql::NDq {
155156
hFunc(TEvReadSplitsPart, Handle);
156157
hFunc(TEvReadSplitsFinished, Handle);
157158
hFunc(TEvError, Handle);
159+
hFunc(TEvRetry, Handle);
158160
hFunc(NActors::TEvents::TEvPoison, Handle);)
159161

160162
void Handle(TEvListSplitsIterator::TPtr ev) {
161163
auto& iterator = ev->Get()->Iterator;
162164
iterator->ReadNext().Subscribe(
163-
[actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TAsyncResult<NConnector::NApi::TListSplitsResponse>& asyncResult) {
165+
[
166+
actorSystem = TActivationContext::ActorSystem(),
167+
selfId = SelfId(),
168+
retriesRemaining = RetriesRemaining
169+
](const NConnector::TAsyncResult<NConnector::NApi::TListSplitsResponse>& asyncResult) {
164170
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got TListSplitsResponse from Connector";
165171
auto result = ExtractFromConstFuture(asyncResult);
166172
if (result.Status.Ok()) {
167173
Y_ABORT_UNLESS(result.Response);
168174
auto ev = new TEvListSplitsPart(std::move(*result.Response));
169175
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
170176
} else {
171-
SendError(actorSystem, selfId, result.Status);
177+
SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
172178
}
173179
});
174180
}
@@ -189,14 +195,18 @@ namespace NYql::NDq {
189195
*readRequest.add_splits() = split;
190196
readRequest.Setformat(NConnector::NApi::TReadSplitsRequest_EFormat::TReadSplitsRequest_EFormat_ARROW_IPC_STREAMING);
191197
readRequest.set_filtering(NConnector::NApi::TReadSplitsRequest::FILTERING_MANDATORY);
192-
Connector->ReadSplits(readRequest).Subscribe([actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) {
198+
Connector->ReadSplits(readRequest).Subscribe([
199+
actorSystem = TActivationContext::ActorSystem(),
200+
selfId = SelfId(),
201+
retriesRemaining = RetriesRemaining
202+
](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) {
193203
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got ReadSplitsStreamIterator from Connector";
194204
auto result = ExtractFromConstFuture(asyncResult);
195205
if (result.Status.Ok()) {
196206
auto ev = new TEvReadSplitsIterator(std::move(result.Iterator));
197207
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
198208
} else {
199-
SendError(actorSystem, selfId, result.Status);
209+
SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
200210
}
201211
});
202212
}
@@ -225,6 +235,12 @@ namespace NYql::NDq {
225235
actorSystem->Send(new NActors::IEventHandle(ParentId, SelfId(), errEv.release()));
226236
}
227237

238+
void Handle(TEvRetry::TPtr ev) {
239+
auto guard = Guard(*Alloc);
240+
RetriesRemaining = ev->Get()->NextRetries;
241+
SendRequest();
242+
}
243+
228244
void Handle(NActors::TEvents::TEvPoison::TPtr) {
229245
PassAway();
230246
}
@@ -243,18 +259,22 @@ namespace NYql::NDq {
243259
if (!request) {
244260
return;
245261
}
246-
auto startCycleCount = GetCycleCountFast();
247262
SentTime = TInstant::Now();
248263
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << SelfId() << " Got LookupRequest for " << request->size() << " keys";
249264
Y_ABORT_IF(request->size() == 0 || request->size() > MaxKeysInRequest);
250-
251265
if (Count) {
252266
Count->Inc();
253267
InFlight->Inc();
254268
Keys->Add(request->size());
255269
}
256270

257271
Request = std::move(request);
272+
RetriesRemaining = RequestRetriesLimit;
273+
SendRequest();
274+
}
275+
276+
void SendRequest() {
277+
auto startCycleCount = GetCycleCountFast();
258278
NConnector::NApi::TListSplitsRequest splitRequest;
259279

260280
auto error = FillSelect(*splitRequest.add_selects());
@@ -264,15 +284,19 @@ namespace NYql::NDq {
264284
};
265285

266286
splitRequest.Setmax_split_count(1);
267-
Connector->ListSplits(splitRequest).Subscribe([actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) {
287+
Connector->ListSplits(splitRequest).Subscribe([
288+
actorSystem = TActivationContext::ActorSystem(),
289+
selfId = SelfId(),
290+
retriesRemaining = RetriesRemaining
291+
](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) {
268292
auto result = ExtractFromConstFuture(asyncResult);
269293
if (result.Status.Ok()) {
270294
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got TListSplitsStreamIterator";
271295
Y_ABORT_UNLESS(result.Iterator, "Uninitialized iterator");
272296
auto ev = new TEvListSplitsIterator(std::move(result.Iterator));
273297
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
274298
} else {
275-
SendError(actorSystem, selfId, result.Status);
299+
SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
276300
}
277301
});
278302
if (CpuTime) {
@@ -282,12 +306,17 @@ namespace NYql::NDq {
282306

283307
void ReadNextData() {
284308
ReadSplitsIterator->ReadNext().Subscribe(
285-
[actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TAsyncResult<NConnector::NApi::TReadSplitsResponse>& asyncResult) {
309+
[
310+
actorSystem = TActivationContext::ActorSystem(),
311+
selfId = SelfId(),
312+
retriesRemaining = RetriesRemaining
313+
](const NConnector::TAsyncResult<NConnector::NApi::TReadSplitsResponse>& asyncResult) {
286314
auto result = ExtractFromConstFuture(asyncResult);
287315
if (result.Status.Ok()) {
288316
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got DataChunk";
289317
Y_ABORT_UNLESS(result.Response);
290318
auto& response = *result.Response;
319+
// TODO: retry on some YDB errors
291320
if (NConnector::IsSuccess(response)) {
292321
auto ev = new TEvReadSplitsPart(std::move(response));
293322
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
@@ -299,7 +328,7 @@ namespace NYql::NDq {
299328
auto ev = new TEvReadSplitsFinished(std::move(result.Status));
300329
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
301330
} else {
302-
SendError(actorSystem, selfId, result.Status);
331+
SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
303332
}
304333
});
305334
}
@@ -365,7 +394,19 @@ namespace NYql::NDq {
365394
new TEvError(std::move(error)));
366395
}
367396

368-
static void SendError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status) {
397+
static void SendRetryOrError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, ui32 retriesRemaining) {
398+
if (NConnector::GrpcStatusNeedsRetry(status)) {
399+
if (retriesRemaining) {
400+
const auto retry = RequestRetriesLimit - retriesRemaining;
401+
const auto delay = TDuration::MilliSeconds(1u << retry); // Exponential delay from 1ms to ~0.5s
402+
// <<< TODO tune/tweak
403+
YQL_CLOG(WARN, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry " << (retry + 1) << " of " << RequestRetriesLimit << ", scheduled in " << delay;
404+
--retriesRemaining;
405+
actorSystem->Schedule(delay, new IEventHandle(selfId, selfId, new TEvRetry(retriesRemaining)));
406+
return;
407+
}
408+
YQL_CLOG(ERROR, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry count exceed limit " << RequestRetriesLimit;
409+
}
369410
SendError(actorSystem, selfId, NConnector::ErrorFromGRPCStatus(status));
370411
}
371412

@@ -422,6 +463,8 @@ namespace NYql::NDq {
422463

423464
NConnector::NApi::TPredicate_TDisjunction disjunction;
424465
for (const auto& [k, _] : *Request) {
466+
// TODO consider skipping already retrieved keys
467+
// ... but careful, can we end up with zero? TODO
425468
NConnector::NApi::TPredicate_TConjunction conjunction;
426469
for (ui32 c = 0; c != KeyType->GetMembersCount(); ++c) {
427470
NConnector::NApi::TPredicate_TComparison eq;
@@ -454,6 +497,7 @@ namespace NYql::NDq {
454497
std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> Request;
455498
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; // TODO move me to TEvReadSplitsPart
456499
NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult;
500+
ui32 RetriesRemaining;
457501
::NMonitoring::TDynamicCounters::TCounterPtr Count;
458502
::NMonitoring::TDynamicCounters::TCounterPtr Keys;
459503
::NMonitoring::TDynamicCounters::TCounterPtr ResultRows;

0 commit comments

Comments
 (0)