Skip to content

Commit 25d045b

Browse files
authored
Merge 2bb4035 into 22e791d
2 parents 22e791d + 2bb4035 commit 25d045b

File tree

2 files changed

+63
-12
lines changed

2 files changed

+63
-12
lines changed

ydb/library/yql/providers/generic/actors/yql_generic_base_actor.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ namespace NYql::NDq {
2121
EvReadSplitsPart,
2222
EvReadSplitsFinished,
2323
EvError,
24+
EvRetry,
2425
EvEnd
2526
};
2627

@@ -89,6 +90,12 @@ namespace NYql::NDq {
8990
NConnector::NApi::TError Error;
9091
};
9192

93+
struct TEvRetry: NActors::TEventLocal<TEvRetry, EvRetry> {
94+
explicit TEvRetry()
95+
{
96+
}
97+
};
98+
9299
protected: // TODO move common logic here
93100
};
94101

ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ namespace NYql::NDq {
2828
using namespace NActors;
2929

3030
namespace {
31+
constexpr ui32 RequestRetriesLimit = 10; // TODO lookup parameters or PRAGMA?
3132

3233
const NKikimr::NMiniKQL::TStructType* MergeStructTypes(const NKikimr::NMiniKQL::TTypeEnvironment& env, const NKikimr::NMiniKQL::TStructType* t1, const NKikimr::NMiniKQL::TStructType* t2) {
3334
Y_ABORT_UNLESS(t1);
@@ -45,7 +46,7 @@ namespace NYql::NDq {
4546
template <typename T>
4647
T ExtractFromConstFuture(const NThreading::TFuture<T>& f) {
4748
// We want to avoid making a copy of data stored in a future.
48-
// But there is no direct way to extract data from a const future5
49+
// But there is no direct way to extract data from a const future
4950
// So, we make a copy of the future, that is cheap. Then, extract the value from this copy.
5051
// It destructs the value in the original future, but this trick is legal and documented here:
5152
// https://docs.yandex-team.ru/arcadia-cpp/cookbook/concurrency
@@ -150,20 +151,25 @@ namespace NYql::NDq {
150151
hFunc(TEvReadSplitsPart, Handle);
151152
hFunc(TEvReadSplitsFinished, Handle);
152153
hFunc(TEvError, Handle);
154+
hFunc(TEvRetry, Handle);
153155
hFunc(NActors::TEvents::TEvPoison, Handle);)
154156

155157
void Handle(TEvListSplitsIterator::TPtr ev) {
156158
auto& iterator = ev->Get()->Iterator;
157159
iterator->ReadNext().Subscribe(
158-
[actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TAsyncResult<NConnector::NApi::TListSplitsResponse>& asyncResult) {
160+
[
161+
actorSystem = TActivationContext::ActorSystem(),
162+
selfId = SelfId(),
163+
retriesRemaining = RetriesRemaining
164+
](const NConnector::TAsyncResult<NConnector::NApi::TListSplitsResponse>& asyncResult) {
159165
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got TListSplitsResponse from Connector";
160166
auto result = ExtractFromConstFuture(asyncResult);
161167
if (result.Status.Ok()) {
162168
Y_ABORT_UNLESS(result.Response);
163169
auto ev = new TEvListSplitsPart(std::move(*result.Response));
164170
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
165171
} else {
166-
SendError(actorSystem, selfId, result.Status);
172+
SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
167173
}
168174
});
169175
}
@@ -184,14 +190,18 @@ namespace NYql::NDq {
184190
*readRequest.add_splits() = split;
185191
readRequest.Setformat(NConnector::NApi::TReadSplitsRequest_EFormat::TReadSplitsRequest_EFormat_ARROW_IPC_STREAMING);
186192
readRequest.set_filtering(NConnector::NApi::TReadSplitsRequest::FILTERING_MANDATORY);
187-
Connector->ReadSplits(readRequest).Subscribe([actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) {
193+
Connector->ReadSplits(readRequest).Subscribe([
194+
actorSystem = TActivationContext::ActorSystem(),
195+
selfId = SelfId(),
196+
retriesRemaining = RetriesRemaining
197+
](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) {
188198
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got ReadSplitsStreamIterator from Connector";
189199
auto result = ExtractFromConstFuture(asyncResult);
190200
if (result.Status.Ok()) {
191201
auto ev = new TEvReadSplitsIterator(std::move(result.Iterator));
192202
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
193203
} else {
194-
SendError(actorSystem, selfId, result.Status);
204+
SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
195205
}
196206
});
197207
}
@@ -220,6 +230,13 @@ namespace NYql::NDq {
220230
actorSystem->Send(new NActors::IEventHandle(ParentId, SelfId(), errEv.release()));
221231
}
222232

233+
void Handle(TEvRetry::TPtr ev) {
234+
auto guard = Guard(*Alloc);
235+
Y_ENSURE(RetriesRemaining > 0);
236+
--RetriesRemaining;
237+
SendRequest();
238+
}
239+
223240
void Handle(NActors::TEvents::TEvPoison::TPtr) {
224241
PassAway();
225242
}
@@ -238,17 +255,21 @@ namespace NYql::NDq {
238255
if (!request) {
239256
return;
240257
}
241-
auto startCycleCount = GetCycleCountFast();
242258
SentTime = TInstant::Now();
243259
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << SelfId() << " Got LookupRequest for " << request->size() << " keys";
244260
Y_ABORT_IF(request->size() == 0 || request->size() > MaxKeysInRequest);
245-
246261
if (Count) {
247262
Count->Inc();
248263
Keys->Add(request->size());
249264
}
250265

251266
Request = std::move(request);
267+
RetriesRemaining = RequestRetriesLimit;
268+
SendRequest();
269+
}
270+
271+
void SendRequest() {
272+
auto startCycleCount = GetCycleCountFast();
252273
NConnector::NApi::TListSplitsRequest splitRequest;
253274

254275
auto error = FillSelect(*splitRequest.add_selects());
@@ -258,15 +279,19 @@ namespace NYql::NDq {
258279
};
259280

260281
splitRequest.Setmax_split_count(1);
261-
Connector->ListSplits(splitRequest).Subscribe([actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) {
282+
Connector->ListSplits(splitRequest).Subscribe([
283+
actorSystem = TActivationContext::ActorSystem(),
284+
selfId = SelfId(),
285+
retriesRemaining = RetriesRemaining
286+
](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) {
262287
auto result = ExtractFromConstFuture(asyncResult);
263288
if (result.Status.Ok()) {
264289
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got TListSplitsStreamIterator";
265290
Y_ABORT_UNLESS(result.Iterator, "Uninitialized iterator");
266291
auto ev = new TEvListSplitsIterator(std::move(result.Iterator));
267292
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
268293
} else {
269-
SendError(actorSystem, selfId, result.Status);
294+
SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
270295
}
271296
});
272297
if (CpuTime) {
@@ -276,12 +301,17 @@ namespace NYql::NDq {
276301

277302
void ReadNextData() {
278303
ReadSplitsIterator->ReadNext().Subscribe(
279-
[actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TAsyncResult<NConnector::NApi::TReadSplitsResponse>& asyncResult) {
304+
[
305+
actorSystem = TActivationContext::ActorSystem(),
306+
selfId = SelfId(),
307+
retriesRemaining = RetriesRemaining
308+
](const NConnector::TAsyncResult<NConnector::NApi::TReadSplitsResponse>& asyncResult) {
280309
auto result = ExtractFromConstFuture(asyncResult);
281310
if (result.Status.Ok()) {
282311
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got DataChunk";
283312
Y_ABORT_UNLESS(result.Response);
284313
auto& response = *result.Response;
314+
// TODO: retry on some YDB errors
285315
if (NConnector::IsSuccess(response)) {
286316
auto ev = new TEvReadSplitsPart(std::move(response));
287317
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
@@ -293,7 +323,7 @@ namespace NYql::NDq {
293323
auto ev = new TEvReadSplitsFinished(std::move(result.Status));
294324
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
295325
} else {
296-
SendError(actorSystem, selfId, result.Status);
326+
SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
297327
}
298328
});
299329
}
@@ -358,7 +388,18 @@ namespace NYql::NDq {
358388
new TEvError(std::move(error)));
359389
}
360390

361-
static void SendError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status) {
391+
static void SendRetryOrError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, const ui32 retriesRemaining) {
392+
if (NConnector::GrpcStatusNeedsRetry(status)) {
393+
if (retriesRemaining) {
394+
const auto retry = RequestRetriesLimit - retriesRemaining;
395+
// XXX FIXME tune/tweak
396+
const auto delay = TDuration::MilliSeconds(1u << retry); // Exponential delay from 1ms to 1s
397+
YQL_CLOG(WARN, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry " << (retry + 1) << " of " << RequestRetriesLimit << ", scheduled in " << delay;
398+
actorSystem->Schedule(delay, new IEventHandle(selfId, selfId, new TEvRetry()));
399+
return;
400+
}
401+
YQL_CLOG(ERROR, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry count exceed limit " << RequestRetriesLimit;
402+
}
362403
SendError(actorSystem, selfId, NConnector::ErrorFromGRPCStatus(status));
363404
}
364405

@@ -415,6 +456,8 @@ namespace NYql::NDq {
415456

416457
NConnector::NApi::TPredicate_TDisjunction disjunction;
417458
for (const auto& [k, _] : *Request) {
459+
// TODO consider skipping already retrieved keys
460+
// ... but careful, can we end up with zero? TODO
418461
NConnector::NApi::TPredicate_TConjunction conjunction;
419462
for (ui32 c = 0; c != KeyType->GetMembersCount(); ++c) {
420463
NConnector::NApi::TPredicate_TComparison eq;
@@ -447,6 +490,7 @@ namespace NYql::NDq {
447490
std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> Request;
448491
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; // TODO move me to TEvReadSplitsPart
449492
NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult;
493+
ui32 RetriesRemaining;
450494
::NMonitoring::TDynamicCounters::TCounterPtr Count;
451495
::NMonitoring::TDynamicCounters::TCounterPtr Keys;
452496
::NMonitoring::TDynamicCounters::TCounterPtr ResultRows;

0 commit comments

Comments
 (0)