Skip to content

Commit 9f0c221

Browse files
authored
Merge 1579ac1 into 4ba7107
2 parents 4ba7107 + 1579ac1 commit 9f0c221

File tree

2 files changed

+63
-12
lines changed

2 files changed

+63
-12
lines changed

ydb/library/yql/providers/generic/actors/yql_generic_base_actor.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ namespace NYql::NDq {
2121
EvReadSplitsPart,
2222
EvReadSplitsFinished,
2323
EvError,
24+
EvRetry,
2425
EvEnd
2526
};
2627

@@ -89,6 +90,12 @@ namespace NYql::NDq {
8990
NConnector::NApi::TError Error;
9091
};
9192

93+
struct TEvRetry: NActors::TEventLocal<TEvRetry, EvRetry> {
94+
explicit TEvRetry()
95+
{
96+
}
97+
};
98+
9299
protected: // TODO move common logic here
93100
};
94101

ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ namespace NYql::NDq {
2828
using namespace NActors;
2929

3030
namespace {
31+
constexpr ui32 RequestRetriesLimit = 10; // TODO lookup parameters or PRAGMA?
3132

3233
const NKikimr::NMiniKQL::TStructType* MergeStructTypes(const NKikimr::NMiniKQL::TTypeEnvironment& env, const NKikimr::NMiniKQL::TStructType* t1, const NKikimr::NMiniKQL::TStructType* t2) {
3334
Y_ABORT_UNLESS(t1);
@@ -45,7 +46,7 @@ namespace NYql::NDq {
4546
template <typename T>
4647
T ExtractFromConstFuture(const NThreading::TFuture<T>& f) {
4748
// We want to avoid making a copy of data stored in a future.
48-
// But there is no direct way to extract data from a const future5
49+
// But there is no direct way to extract data from a const future
4950
// So, we make a copy of the future, that is cheap. Then, extract the value from this copy.
5051
// It destructs the value in the original future, but this trick is legal and documented here:
5152
// https://docs.yandex-team.ru/arcadia-cpp/cookbook/concurrency
@@ -155,20 +156,25 @@ namespace NYql::NDq {
155156
hFunc(TEvReadSplitsPart, Handle);
156157
hFunc(TEvReadSplitsFinished, Handle);
157158
hFunc(TEvError, Handle);
159+
hFunc(TEvRetry, Handle);
158160
hFunc(NActors::TEvents::TEvPoison, Handle);)
159161

160162
void Handle(TEvListSplitsIterator::TPtr ev) {
161163
auto& iterator = ev->Get()->Iterator;
162164
iterator->ReadNext().Subscribe(
163-
[actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TAsyncResult<NConnector::NApi::TListSplitsResponse>& asyncResult) {
165+
[
166+
actorSystem = TActivationContext::ActorSystem(),
167+
selfId = SelfId(),
168+
retriesRemaining = RetriesRemaining
169+
](const NConnector::TAsyncResult<NConnector::NApi::TListSplitsResponse>& asyncResult) {
164170
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got TListSplitsResponse from Connector";
165171
auto result = ExtractFromConstFuture(asyncResult);
166172
if (result.Status.Ok()) {
167173
Y_ABORT_UNLESS(result.Response);
168174
auto ev = new TEvListSplitsPart(std::move(*result.Response));
169175
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
170176
} else {
171-
SendError(actorSystem, selfId, result.Status);
177+
SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
172178
}
173179
});
174180
}
@@ -189,14 +195,18 @@ namespace NYql::NDq {
189195
*readRequest.add_splits() = split;
190196
readRequest.Setformat(NConnector::NApi::TReadSplitsRequest_EFormat::TReadSplitsRequest_EFormat_ARROW_IPC_STREAMING);
191197
readRequest.set_filtering(NConnector::NApi::TReadSplitsRequest::FILTERING_MANDATORY);
192-
Connector->ReadSplits(readRequest).Subscribe([actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) {
198+
Connector->ReadSplits(readRequest).Subscribe([
199+
actorSystem = TActivationContext::ActorSystem(),
200+
selfId = SelfId(),
201+
retriesRemaining = RetriesRemaining
202+
](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) {
193203
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got ReadSplitsStreamIterator from Connector";
194204
auto result = ExtractFromConstFuture(asyncResult);
195205
if (result.Status.Ok()) {
196206
auto ev = new TEvReadSplitsIterator(std::move(result.Iterator));
197207
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
198208
} else {
199-
SendError(actorSystem, selfId, result.Status);
209+
SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
200210
}
201211
});
202212
}
@@ -225,6 +235,13 @@ namespace NYql::NDq {
225235
actorSystem->Send(new NActors::IEventHandle(ParentId, SelfId(), errEv.release()));
226236
}
227237

238+
void Handle(TEvRetry::TPtr) {
239+
auto guard = Guard(*Alloc);
240+
Y_ENSURE(RetriesRemaining > 0);
241+
--RetriesRemaining;
242+
SendRequest();
243+
}
244+
228245
void Handle(NActors::TEvents::TEvPoison::TPtr) {
229246
PassAway();
230247
}
@@ -243,18 +260,22 @@ namespace NYql::NDq {
243260
if (!request) {
244261
return;
245262
}
246-
auto startCycleCount = GetCycleCountFast();
247263
SentTime = TInstant::Now();
248264
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << SelfId() << " Got LookupRequest for " << request->size() << " keys";
249265
Y_ABORT_IF(request->size() == 0 || request->size() > MaxKeysInRequest);
250-
251266
if (Count) {
252267
Count->Inc();
253268
InFlight->Inc();
254269
Keys->Add(request->size());
255270
}
256271

257272
Request = std::move(request);
273+
RetriesRemaining = RequestRetriesLimit;
274+
SendRequest();
275+
}
276+
277+
void SendRequest() {
278+
auto startCycleCount = GetCycleCountFast();
258279
NConnector::NApi::TListSplitsRequest splitRequest;
259280

260281
auto error = FillSelect(*splitRequest.add_selects());
@@ -264,15 +285,19 @@ namespace NYql::NDq {
264285
};
265286

266287
splitRequest.Setmax_split_count(1);
267-
Connector->ListSplits(splitRequest).Subscribe([actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) {
288+
Connector->ListSplits(splitRequest).Subscribe([
289+
actorSystem = TActivationContext::ActorSystem(),
290+
selfId = SelfId(),
291+
retriesRemaining = RetriesRemaining
292+
](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) {
268293
auto result = ExtractFromConstFuture(asyncResult);
269294
if (result.Status.Ok()) {
270295
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got TListSplitsStreamIterator";
271296
Y_ABORT_UNLESS(result.Iterator, "Uninitialized iterator");
272297
auto ev = new TEvListSplitsIterator(std::move(result.Iterator));
273298
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
274299
} else {
275-
SendError(actorSystem, selfId, result.Status);
300+
SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
276301
}
277302
});
278303
if (CpuTime) {
@@ -282,12 +307,17 @@ namespace NYql::NDq {
282307

283308
void ReadNextData() {
284309
ReadSplitsIterator->ReadNext().Subscribe(
285-
[actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TAsyncResult<NConnector::NApi::TReadSplitsResponse>& asyncResult) {
310+
[
311+
actorSystem = TActivationContext::ActorSystem(),
312+
selfId = SelfId(),
313+
retriesRemaining = RetriesRemaining
314+
](const NConnector::TAsyncResult<NConnector::NApi::TReadSplitsResponse>& asyncResult) {
286315
auto result = ExtractFromConstFuture(asyncResult);
287316
if (result.Status.Ok()) {
288317
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got DataChunk";
289318
Y_ABORT_UNLESS(result.Response);
290319
auto& response = *result.Response;
320+
// TODO: retry on some YDB errors
291321
if (NConnector::IsSuccess(response)) {
292322
auto ev = new TEvReadSplitsPart(std::move(response));
293323
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
@@ -299,7 +329,7 @@ namespace NYql::NDq {
299329
auto ev = new TEvReadSplitsFinished(std::move(result.Status));
300330
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
301331
} else {
302-
SendError(actorSystem, selfId, result.Status);
332+
SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
303333
}
304334
});
305335
}
@@ -365,7 +395,18 @@ namespace NYql::NDq {
365395
new TEvError(std::move(error)));
366396
}
367397

368-
static void SendError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status) {
398+
static void SendRetryOrError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, const ui32 retriesRemaining) {
399+
if (NConnector::GrpcStatusNeedsRetry(status)) {
400+
if (retriesRemaining) {
401+
const auto retry = RequestRetriesLimit - retriesRemaining;
402+
// XXX FIXME tune/tweak
403+
const auto delay = TDuration::MilliSeconds(1u << retry); // Exponential delay from 1ms to 1s
404+
YQL_CLOG(WARN, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry " << (retry + 1) << " of " << RequestRetriesLimit << ", scheduled in " << delay;
405+
actorSystem->Schedule(delay, new IEventHandle(selfId, selfId, new TEvRetry()));
406+
return;
407+
}
408+
YQL_CLOG(ERROR, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry count exceed limit " << RequestRetriesLimit;
409+
}
369410
SendError(actorSystem, selfId, NConnector::ErrorFromGRPCStatus(status));
370411
}
371412

@@ -422,6 +463,8 @@ namespace NYql::NDq {
422463

423464
NConnector::NApi::TPredicate_TDisjunction disjunction;
424465
for (const auto& [k, _] : *Request) {
466+
// TODO consider skipping already retrieved keys
467+
// ... but careful, can we end up with zero? TODO
425468
NConnector::NApi::TPredicate_TConjunction conjunction;
426469
for (ui32 c = 0; c != KeyType->GetMembersCount(); ++c) {
427470
NConnector::NApi::TPredicate_TComparison eq;
@@ -454,6 +497,7 @@ namespace NYql::NDq {
454497
std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> Request;
455498
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; // TODO move me to TEvReadSplitsPart
456499
NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult;
500+
ui32 RetriesRemaining;
457501
::NMonitoring::TDynamicCounters::TCounterPtr Count;
458502
::NMonitoring::TDynamicCounters::TCounterPtr Keys;
459503
::NMonitoring::TDynamicCounters::TCounterPtr ResultRows;

0 commit comments

Comments
 (0)