@@ -28,6 +28,7 @@ namespace NYql::NDq {
2828 using namespace NActors ;
2929
3030 namespace {
31+ constexpr ui32 RequestRetriesLimit = 10 ; // TODO lookup parameters or PRAGMA?
3132
3233 const NKikimr::NMiniKQL::TStructType* MergeStructTypes (const NKikimr::NMiniKQL::TTypeEnvironment& env, const NKikimr::NMiniKQL::TStructType* t1, const NKikimr::NMiniKQL::TStructType* t2) {
3334 Y_ABORT_UNLESS (t1);
@@ -45,7 +46,7 @@ namespace NYql::NDq {
4546 template <typename T>
4647 T ExtractFromConstFuture (const NThreading::TFuture<T>& f) {
4748 // We want to avoid making a copy of data stored in a future.
48- // But there is no direct way to extract data from a const future5
49+ // But there is no direct way to extract data from a const future
4950 // So, we make a copy of the future, that is cheap. Then, extract the value from this copy.
5051 // It destructs the value in the original future, but this trick is legal and documented here:
5152 // https://docs.yandex-team.ru/arcadia-cpp/cookbook/concurrency
@@ -155,20 +156,25 @@ namespace NYql::NDq {
155156 hFunc (TEvReadSplitsPart, Handle);
156157 hFunc (TEvReadSplitsFinished, Handle);
157158 hFunc (TEvError, Handle);
159+ hFunc (TEvRetry, Handle);
158160 hFunc (NActors::TEvents::TEvPoison, Handle);)
159161
160162 void Handle (TEvListSplitsIterator::TPtr ev) {
161163 auto & iterator = ev->Get ()->Iterator ;
162164 iterator->ReadNext ().Subscribe (
163- [actorSystem = TActivationContext::ActorSystem (), selfId = SelfId ()](const NConnector::TAsyncResult<NConnector::NApi::TListSplitsResponse>& asyncResult) {
165+ [
166+ actorSystem = TActivationContext::ActorSystem (),
167+ selfId = SelfId (),
168+ retriesRemaining = RetriesRemaining
169+ ](const NConnector::TAsyncResult<NConnector::NApi::TListSplitsResponse>& asyncResult) {
164170 YQL_CLOG (DEBUG, ProviderGeneric) << " ActorId=" << selfId << " Got TListSplitsResponse from Connector" ;
165171 auto result = ExtractFromConstFuture (asyncResult);
166172 if (result.Status .Ok ()) {
167173 Y_ABORT_UNLESS (result.Response );
168174 auto ev = new TEvListSplitsPart (std::move (*result.Response ));
169175 actorSystem->Send (new NActors::IEventHandle (selfId, selfId, ev));
170176 } else {
171- SendError (actorSystem, selfId, result.Status );
177+ SendRetryOrError (actorSystem, selfId, result.Status , retriesRemaining );
172178 }
173179 });
174180 }
@@ -189,14 +195,18 @@ namespace NYql::NDq {
189195 *readRequest.add_splits () = split;
190196 readRequest.Setformat (NConnector::NApi::TReadSplitsRequest_EFormat::TReadSplitsRequest_EFormat_ARROW_IPC_STREAMING);
191197 readRequest.set_filtering (NConnector::NApi::TReadSplitsRequest::FILTERING_MANDATORY);
192- Connector->ReadSplits (readRequest).Subscribe ([actorSystem = TActivationContext::ActorSystem (), selfId = SelfId ()](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) {
198+ Connector->ReadSplits (readRequest).Subscribe ([
199+ actorSystem = TActivationContext::ActorSystem (),
200+ selfId = SelfId (),
201+ retriesRemaining = RetriesRemaining
202+ ](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) {
193203 YQL_CLOG (DEBUG, ProviderGeneric) << " ActorId=" << selfId << " Got ReadSplitsStreamIterator from Connector" ;
194204 auto result = ExtractFromConstFuture (asyncResult);
195205 if (result.Status .Ok ()) {
196206 auto ev = new TEvReadSplitsIterator (std::move (result.Iterator ));
197207 actorSystem->Send (new NActors::IEventHandle (selfId, selfId, ev));
198208 } else {
199- SendError (actorSystem, selfId, result.Status );
209+ SendRetryOrError (actorSystem, selfId, result.Status , retriesRemaining );
200210 }
201211 });
202212 }
@@ -225,6 +235,12 @@ namespace NYql::NDq {
225235 actorSystem->Send (new NActors::IEventHandle (ParentId, SelfId (), errEv.release ()));
226236 }
227237
238+ void Handle (TEvRetry::TPtr ev) {
239+ auto guard = Guard (*Alloc);
240+ RetriesRemaining = ev->Get ()->NextRetries ;
241+ SendRequest ();
242+ }
243+
228244 void Handle (NActors::TEvents::TEvPoison::TPtr) {
229245 PassAway ();
230246 }
@@ -243,18 +259,22 @@ namespace NYql::NDq {
243259 if (!request) {
244260 return ;
245261 }
246- auto startCycleCount = GetCycleCountFast ();
247262 SentTime = TInstant::Now ();
248263 YQL_CLOG (DEBUG, ProviderGeneric) << " ActorId=" << SelfId () << " Got LookupRequest for " << request->size () << " keys" ;
249264 Y_ABORT_IF (request->size () == 0 || request->size () > MaxKeysInRequest);
250-
251265 if (Count) {
252266 Count->Inc ();
253267 InFlight->Inc ();
254268 Keys->Add (request->size ());
255269 }
256270
257271 Request = std::move (request);
272+ RetriesRemaining = RequestRetriesLimit;
273+ SendRequest ();
274+ }
275+
276+ void SendRequest () {
277+ auto startCycleCount = GetCycleCountFast ();
258278 NConnector::NApi::TListSplitsRequest splitRequest;
259279
260280 auto error = FillSelect (*splitRequest.add_selects ());
@@ -264,15 +284,19 @@ namespace NYql::NDq {
264284 };
265285
266286 splitRequest.Setmax_split_count (1 );
267- Connector->ListSplits (splitRequest).Subscribe ([actorSystem = TActivationContext::ActorSystem (), selfId = SelfId ()](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) {
287+ Connector->ListSplits (splitRequest).Subscribe ([
288+ actorSystem = TActivationContext::ActorSystem (),
289+ selfId = SelfId (),
290+ retriesRemaining = RetriesRemaining
291+ ](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) {
268292 auto result = ExtractFromConstFuture (asyncResult);
269293 if (result.Status .Ok ()) {
270294 YQL_CLOG (DEBUG, ProviderGeneric) << " ActorId=" << selfId << " Got TListSplitsStreamIterator" ;
271295 Y_ABORT_UNLESS (result.Iterator , " Uninitialized iterator" );
272296 auto ev = new TEvListSplitsIterator (std::move (result.Iterator ));
273297 actorSystem->Send (new NActors::IEventHandle (selfId, selfId, ev));
274298 } else {
275- SendError (actorSystem, selfId, result.Status );
299+ SendRetryOrError (actorSystem, selfId, result.Status , retriesRemaining );
276300 }
277301 });
278302 if (CpuTime) {
@@ -282,12 +306,17 @@ namespace NYql::NDq {
282306
283307 void ReadNextData () {
284308 ReadSplitsIterator->ReadNext ().Subscribe (
285- [actorSystem = TActivationContext::ActorSystem (), selfId = SelfId ()](const NConnector::TAsyncResult<NConnector::NApi::TReadSplitsResponse>& asyncResult) {
309+ [
310+ actorSystem = TActivationContext::ActorSystem (),
311+ selfId = SelfId (),
312+ retriesRemaining = RetriesRemaining
313+ ](const NConnector::TAsyncResult<NConnector::NApi::TReadSplitsResponse>& asyncResult) {
286314 auto result = ExtractFromConstFuture (asyncResult);
287315 if (result.Status .Ok ()) {
288316 YQL_CLOG (DEBUG, ProviderGeneric) << " ActorId=" << selfId << " Got DataChunk" ;
289317 Y_ABORT_UNLESS (result.Response );
290318 auto & response = *result.Response ;
319+ // TODO: retry on some YDB errors
291320 if (NConnector::IsSuccess (response)) {
292321 auto ev = new TEvReadSplitsPart (std::move (response));
293322 actorSystem->Send (new NActors::IEventHandle (selfId, selfId, ev));
@@ -299,7 +328,7 @@ namespace NYql::NDq {
299328 auto ev = new TEvReadSplitsFinished (std::move (result.Status ));
300329 actorSystem->Send (new NActors::IEventHandle (selfId, selfId, ev));
301330 } else {
302- SendError (actorSystem, selfId, result.Status );
331+ SendRetryOrError (actorSystem, selfId, result.Status , retriesRemaining );
303332 }
304333 });
305334 }
@@ -365,7 +394,19 @@ namespace NYql::NDq {
365394 new TEvError (std::move (error)));
366395 }
367396
368- static void SendError (NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status) {
397+ static void SendRetryOrError (NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, ui32 retriesRemaining) {
398+ if (NConnector::GrpcStatusNeedsRetry (status)) {
399+ if (retriesRemaining) {
400+ const auto retry = RequestRetriesLimit - retriesRemaining;
401+ const auto delay = TDuration::MilliSeconds (1u << retry); // Exponential delay from 1ms to ~0.5s
402+ // <<< TODO tune/tweak
403+ YQL_CLOG (WARN, ProviderGeneric) << " ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString () << " , retry " << (retry + 1 ) << " of " << RequestRetriesLimit << " , scheduled in " << delay;
404+ --retriesRemaining;
405+ actorSystem->Schedule (delay, new IEventHandle (selfId, selfId, new TEvRetry (retriesRemaining)));
406+ return ;
407+ }
408+ YQL_CLOG (ERROR, ProviderGeneric) << " ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString () << " , retry count exceed limit " << RequestRetriesLimit;
409+ }
369410 SendError (actorSystem, selfId, NConnector::ErrorFromGRPCStatus (status));
370411 }
371412
@@ -422,6 +463,8 @@ namespace NYql::NDq {
422463
423464 NConnector::NApi::TPredicate_TDisjunction disjunction;
424465 for (const auto & [k, _] : *Request) {
466+ // TODO consider skipping already retrieved keys
467+ // ... but careful, can we end up with zero? TODO
425468 NConnector::NApi::TPredicate_TConjunction conjunction;
426469 for (ui32 c = 0 ; c != KeyType->GetMembersCount (); ++c) {
427470 NConnector::NApi::TPredicate_TComparison eq;
@@ -454,6 +497,7 @@ namespace NYql::NDq {
454497 std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> Request;
455498 NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; // TODO move me to TEvReadSplitsPart
456499 NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult;
500+ ui32 RetriesRemaining;
457501 ::NMonitoring::TDynamicCounters::TCounterPtr Count;
458502 ::NMonitoring::TDynamicCounters::TCounterPtr Keys;
459503 ::NMonitoring::TDynamicCounters::TCounterPtr ResultRows;
0 commit comments