Skip to content

Commit a9453b0

Browse files
authored
Merge 5ac7033 into e1ce008
2 parents e1ce008 + 5ac7033 commit a9453b0

File tree

2 files changed

+57
-12
lines changed

2 files changed

+57
-12
lines changed

ydb/library/yql/providers/generic/actors/yql_generic_base_actor.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ namespace NYql::NDq {
2121
EvReadSplitsPart,
2222
EvReadSplitsFinished,
2323
EvError,
24+
EvRetry,
2425
EvEnd
2526
};
2627

@@ -89,6 +90,12 @@ namespace NYql::NDq {
8990
NConnector::NApi::TError Error;
9091
};
9192

93+
struct TEvRetry: NActors::TEventLocal<TEvRetry, EvRetry> {
94+
explicit TEvRetry()
95+
{
96+
}
97+
};
98+
9299
protected: // TODO move common logic here
93100
};
94101

ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ namespace NYql::NDq {
2828
using namespace NActors;
2929

3030
namespace {
31+
constexpr ui32 RequestRetriesLimit = 10; // TODO lookup parameters or PRAGMA?
3132

3233
const NKikimr::NMiniKQL::TStructType* MergeStructTypes(const NKikimr::NMiniKQL::TTypeEnvironment& env, const NKikimr::NMiniKQL::TStructType* t1, const NKikimr::NMiniKQL::TStructType* t2) {
3334
Y_ABORT_UNLESS(t1);
@@ -150,20 +151,25 @@ namespace NYql::NDq {
150151
hFunc(TEvReadSplitsPart, Handle);
151152
hFunc(TEvReadSplitsFinished, Handle);
152153
hFunc(TEvError, Handle);
154+
hFunc(TEvRetry, Handle);
153155
hFunc(NActors::TEvents::TEvPoison, Handle);)
154156

155157
void Handle(TEvListSplitsIterator::TPtr ev) {
156158
auto& iterator = ev->Get()->Iterator;
157159
iterator->ReadNext().Subscribe(
158-
[actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TAsyncResult<NConnector::NApi::TListSplitsResponse>& asyncResult) {
160+
[
161+
actorSystem = TActivationContext::ActorSystem(),
162+
selfId = SelfId(),
163+
retriesRemaining = RetriesRemaining
164+
](const NConnector::TAsyncResult<NConnector::NApi::TListSplitsResponse>& asyncResult) {
159165
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got TListSplitsResponse from Connector";
160166
auto result = ExtractFromConstFuture(asyncResult);
161167
if (result.Status.Ok()) {
162168
Y_ABORT_UNLESS(result.Response);
163169
auto ev = new TEvListSplitsPart(std::move(*result.Response));
164170
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
165171
} else {
166-
SendError(actorSystem, selfId, result.Status);
172+
SendError(actorSystem, selfId, result.Status, retriesRemaining);
167173
}
168174
});
169175
}
@@ -184,14 +190,18 @@ namespace NYql::NDq {
184190
*readRequest.add_splits() = split;
185191
readRequest.Setformat(NConnector::NApi::TReadSplitsRequest_EFormat::TReadSplitsRequest_EFormat_ARROW_IPC_STREAMING);
186192
readRequest.set_filtering(NConnector::NApi::TReadSplitsRequest::FILTERING_MANDATORY);
187-
Connector->ReadSplits(readRequest).Subscribe([actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) {
193+
Connector->ReadSplits(readRequest).Subscribe([
194+
actorSystem = TActivationContext::ActorSystem(),
195+
selfId = SelfId(),
196+
retriesRemaining = RetriesRemaining
197+
](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) {
188198
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got ReadSplitsStreamIterator from Connector";
189199
auto result = ExtractFromConstFuture(asyncResult);
190200
if (result.Status.Ok()) {
191201
auto ev = new TEvReadSplitsIterator(std::move(result.Iterator));
192202
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
193203
} else {
194-
SendError(actorSystem, selfId, result.Status);
204+
SendError(actorSystem, selfId, result.Status, retriesRemaining);
195205
}
196206
});
197207
}
@@ -220,6 +230,13 @@ namespace NYql::NDq {
220230
actorSystem->Send(new NActors::IEventHandle(ParentId, SelfId(), errEv.release()));
221231
}
222232

233+
void Handle(TEvRetry::TPtr ev) {
234+
auto guard = Guard(*Alloc);
235+
Y_ENSURE(RetriesRemaining > 0);
236+
--RetriesRemaining;
237+
SendRequest();
238+
}
239+
223240
void Handle(NActors::TEvents::TEvPoison::TPtr) {
224241
PassAway();
225242
}
@@ -238,17 +255,21 @@ namespace NYql::NDq {
238255
if (!request) {
239256
return;
240257
}
241-
auto startCycleCount = GetCycleCountFast();
242258
SentTime = TInstant::Now();
243259
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << SelfId() << " Got LookupRequest for " << request->size() << " keys";
244260
Y_ABORT_IF(request->size() == 0 || request->size() > MaxKeysInRequest);
245-
246261
if (Count) {
247262
Count->Inc();
248263
Keys->Add(request->size());
249264
}
250265

251266
Request = std::move(request);
267+
RetriesRemaining = RequestRetriesLimit;
268+
SendRequest();
269+
}
270+
271+
void SendRequest() {
272+
auto startCycleCount = GetCycleCountFast();
252273
NConnector::NApi::TListSplitsRequest splitRequest;
253274

254275
auto error = FillSelect(*splitRequest.add_selects());
@@ -258,15 +279,19 @@ namespace NYql::NDq {
258279
};
259280

260281
splitRequest.Setmax_split_count(1);
261-
Connector->ListSplits(splitRequest).Subscribe([actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) {
282+
Connector->ListSplits(splitRequest).Subscribe([
283+
actorSystem = TActivationContext::ActorSystem(),
284+
selfId = SelfId(),
285+
retriesRemaining = RetriesRemaining
286+
](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) {
262287
auto result = ExtractFromConstFuture(asyncResult);
263288
if (result.Status.Ok()) {
264289
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got TListSplitsStreamIterator";
265290
Y_ABORT_UNLESS(result.Iterator, "Uninitialized iterator");
266291
auto ev = new TEvListSplitsIterator(std::move(result.Iterator));
267292
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
268293
} else {
269-
SendError(actorSystem, selfId, result.Status);
294+
SendError(actorSystem, selfId, result.Status, retriesRemaining);
270295
}
271296
});
272297
if (CpuTime) {
@@ -276,7 +301,11 @@ namespace NYql::NDq {
276301

277302
void ReadNextData() {
278303
ReadSplitsIterator->ReadNext().Subscribe(
279-
[actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TAsyncResult<NConnector::NApi::TReadSplitsResponse>& asyncResult) {
304+
[
305+
actorSystem = TActivationContext::ActorSystem(),
306+
selfId = SelfId(),
307+
retriesRemaining = RetriesRemaining
308+
](const NConnector::TAsyncResult<NConnector::NApi::TReadSplitsResponse>& asyncResult) {
280309
auto result = ExtractFromConstFuture(asyncResult);
281310
if (result.Status.Ok()) {
282311
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got DataChunk";
@@ -293,7 +322,7 @@ namespace NYql::NDq {
293322
auto ev = new TEvReadSplitsFinished(std::move(result.Status));
294323
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
295324
} else {
296-
SendError(actorSystem, selfId, result.Status);
325+
SendError(actorSystem, selfId, result.Status, retriesRemaining);
297326
}
298327
});
299328
}
@@ -358,8 +387,14 @@ namespace NYql::NDq {
358387
new TEvError(std::move(error)));
359388
}
360389

361-
static void SendError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status) {
362-
SendError(actorSystem, selfId, NConnector::ErrorFromGRPCStatus(status));
390+
static void SendError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, const ui32 retriesRemaining) {
391+
if (retriesRemaining && NConnector::GrpcStatusNeedsRetry(status)) {
392+
actorSystem->Schedule(
393+
TDuration::MilliSeconds(1u<<(RequestRetriesLimit - retriesRemaining)), // XXX FIXME
394+
new IEventHandle(selfId, selfId, new TEvRetry()));
395+
} else {
396+
SendError(actorSystem, selfId, NConnector::ErrorFromGRPCStatus(status));
397+
}
363398
}
364399

365400
static void SendError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, TString error) {
@@ -415,6 +450,8 @@ namespace NYql::NDq {
415450

416451
NConnector::NApi::TPredicate_TDisjunction disjunction;
417452
for (const auto& [k, _] : *Request) {
453+
// TODO consider skipping alread retrieved keys
454+
// ... but careful, can we end up with zero? TODO
418455
NConnector::NApi::TPredicate_TConjunction conjunction;
419456
for (ui32 c = 0; c != KeyType->GetMembersCount(); ++c) {
420457
NConnector::NApi::TPredicate_TComparison eq;
@@ -447,6 +484,7 @@ namespace NYql::NDq {
447484
std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> Request;
448485
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; // TODO move me to TEvReadSplitsPart
449486
NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult;
487+
ui32 RetriesRemaining;
450488
::NMonitoring::TDynamicCounters::TCounterPtr Count;
451489
::NMonitoring::TDynamicCounters::TCounterPtr Keys;
452490
::NMonitoring::TDynamicCounters::TCounterPtr ResultRows;

0 commit comments

Comments
 (0)