23
23
#include < yql/essentials/utils/yql_panic.h>
24
24
#include < ydb/core/formats/arrow/serializer/abstract.h>
25
25
26
+ #include < library/cpp/retry/retry_policy.h>
27
+
26
28
namespace NYql ::NDq {
27
29
28
30
using namespace NActors ;
@@ -61,6 +63,13 @@ namespace NYql::NDq {
61
63
public TGenericBaseActor<TGenericLookupActor> {
62
64
using TBase = TGenericBaseActor<TGenericLookupActor>;
63
65
66
+ using ILookupRetryPolicy = IRetryPolicy<const NYdbGrpc::TGrpcStatus&>;
67
+ using ILookupRetryState = ILookupRetryPolicy::IRetryState;
68
+
69
+ struct TEvLookupRetry : NActors::TEventLocal<TEvLookupRetry, EvRetry> {
70
+ typedef typename THandle::TPtr TPtr;
71
+ };
72
+
64
73
public:
65
74
TGenericLookupActor (
66
75
NConnector::IClient::TPtr connectorClient,
@@ -87,6 +96,24 @@ namespace NYql::NDq {
87
96
, HolderFactory(holderFactory)
88
97
, ColumnDestinations(CreateColumnDestination())
89
98
, MaxKeysInRequest(maxKeysInRequest)
99
+ , RetryPolicy(
100
+ ILookupRetryPolicy::GetExponentialBackoffPolicy (
101
+ /* retryClassFunction */
102
+ [](const NYdbGrpc::TGrpcStatus& status) {
103
+ if (NConnector::GrpcStatusNeedsRetry (status)) {
104
+ return ERetryErrorClass::ShortRetry;
105
+ }
106
+ if (status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) {
107
+ return ERetryErrorClass::ShortRetry; // TODO LongRetry?
108
+ }
109
+ return ERetryErrorClass::NoRetry;
110
+ },
111
+ /* minDelay */ TDuration::MilliSeconds(1 ),
112
+ /* minLongRetryDelay */ TDuration::MilliSeconds(500 ),
113
+ /* maxDelay */ TDuration::Seconds(1 ),
114
+ /* maxRetries */ RequestRetriesLimit,
115
+ /* maxTime */ TDuration::Minutes(5 ),
116
+ /* scaleFactor */ 2 ))
90
117
{
91
118
InitMonCounters (taskCounters);
92
119
}
@@ -157,7 +184,7 @@ namespace NYql::NDq {
157
184
hFunc (TEvReadSplitsPart, Handle);
158
185
hFunc (TEvReadSplitsFinished, Handle);
159
186
hFunc (TEvError, Handle);
160
- hFunc (TEvRetry , Handle);
187
+ hFunc (TEvLookupRetry , Handle);
161
188
hFunc (NActors::TEvents::TEvPoison, Handle);)
162
189
163
190
void Handle (TEvListSplitsIterator::TPtr ev) {
@@ -166,7 +193,7 @@ namespace NYql::NDq {
166
193
[
167
194
actorSystem = TActivationContext::ActorSystem (),
168
195
selfId = SelfId (),
169
- retriesRemaining = RetriesRemaining
196
+ retryState = RetryState
170
197
](const NConnector::TAsyncResult<NConnector::NApi::TListSplitsResponse>& asyncResult) {
171
198
YQL_CLOG (DEBUG, ProviderGeneric) << " ActorId=" << selfId << " Got TListSplitsResponse from Connector" ;
172
199
auto result = ExtractFromConstFuture (asyncResult);
@@ -175,7 +202,7 @@ namespace NYql::NDq {
175
202
auto ev = new TEvListSplitsPart (std::move (*result.Response ));
176
203
actorSystem->Send (new NActors::IEventHandle (selfId, selfId, ev));
177
204
} else {
178
- SendRetryOrError (actorSystem, selfId, result.Status , retriesRemaining );
205
+ SendRetryOrError (actorSystem, selfId, result.Status , retryState );
179
206
}
180
207
});
181
208
}
@@ -199,15 +226,15 @@ namespace NYql::NDq {
199
226
Connector->ReadSplits (readRequest, RequestTimeout).Subscribe ([
200
227
actorSystem = TActivationContext::ActorSystem (),
201
228
selfId = SelfId (),
202
- retriesRemaining = RetriesRemaining
229
+ retryState = RetryState
203
230
](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) {
204
231
YQL_CLOG (DEBUG, ProviderGeneric) << " ActorId=" << selfId << " Got ReadSplitsStreamIterator from Connector" ;
205
232
auto result = ExtractFromConstFuture (asyncResult);
206
233
if (result.Status .Ok ()) {
207
234
auto ev = new TEvReadSplitsIterator (std::move (result.Iterator ));
208
235
actorSystem->Send (new NActors::IEventHandle (selfId, selfId, ev));
209
236
} else {
210
- SendRetryOrError (actorSystem, selfId, result.Status , retriesRemaining );
237
+ SendRetryOrError (actorSystem, selfId, result.Status , retryState );
211
238
}
212
239
});
213
240
}
@@ -236,9 +263,8 @@ namespace NYql::NDq {
236
263
actorSystem->Send (new NActors::IEventHandle (ParentId, SelfId (), errEv.release ()));
237
264
}
238
265
239
- void Handle (TEvRetry ::TPtr ev ) {
266
+ void Handle (TEvLookupRetry ::TPtr) {
240
267
auto guard = Guard (*Alloc);
241
- RetriesRemaining = ev->Get ()->NextRetries ;
242
268
SendRequest ();
243
269
}
244
270
@@ -270,7 +296,7 @@ namespace NYql::NDq {
270
296
}
271
297
272
298
Request = std::move (request);
273
- RetriesRemaining = RequestRetriesLimit ;
299
+ RetryState = std::shared_ptr<ILookupRetryState>(RetryPolicy-> CreateRetryState ()) ;
274
300
SendRequest ();
275
301
}
276
302
@@ -288,7 +314,7 @@ namespace NYql::NDq {
288
314
Connector->ListSplits (splitRequest, RequestTimeout).Subscribe ([
289
315
actorSystem = TActivationContext::ActorSystem (),
290
316
selfId = SelfId (),
291
- retriesRemaining = RetriesRemaining
317
+ retryState = RetryState
292
318
](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) {
293
319
auto result = ExtractFromConstFuture (asyncResult);
294
320
if (result.Status .Ok ()) {
@@ -297,7 +323,7 @@ namespace NYql::NDq {
297
323
auto ev = new TEvListSplitsIterator (std::move (result.Iterator ));
298
324
actorSystem->Send (new NActors::IEventHandle (selfId, selfId, ev));
299
325
} else {
300
- SendRetryOrError (actorSystem, selfId, result.Status , retriesRemaining );
326
+ SendRetryOrError (actorSystem, selfId, result.Status , retryState );
301
327
}
302
328
});
303
329
if (CpuTime) {
@@ -310,7 +336,7 @@ namespace NYql::NDq {
310
336
[
311
337
actorSystem = TActivationContext::ActorSystem (),
312
338
selfId = SelfId (),
313
- retriesRemaining = RetriesRemaining
339
+ retryState = RetryState
314
340
](const NConnector::TAsyncResult<NConnector::NApi::TReadSplitsResponse>& asyncResult) {
315
341
auto result = ExtractFromConstFuture (asyncResult);
316
342
if (result.Status .Ok ()) {
@@ -329,7 +355,7 @@ namespace NYql::NDq {
329
355
auto ev = new TEvReadSplitsFinished (std::move (result.Status ));
330
356
actorSystem->Send (new NActors::IEventHandle (selfId, selfId, ev));
331
357
} else {
332
- SendRetryOrError (actorSystem, selfId, result.Status , retriesRemaining );
358
+ SendRetryOrError (actorSystem, selfId, result.Status , retryState );
333
359
}
334
360
});
335
361
}
@@ -395,22 +421,12 @@ namespace NYql::NDq {
395
421
new TEvError (std::move (error)));
396
422
}
397
423
398
- static void SendRetryOrError (NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, ui32 retriesRemaining) {
399
- if (NConnector::GrpcStatusNeedsRetry (status) || status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) {
400
- if (retriesRemaining) {
401
- const auto retry = RequestRetriesLimit - retriesRemaining;
402
- const auto delay = TDuration::MilliSeconds (1u << retry); // Exponential delay from 1ms to ~0.5s
403
- // << TODO tune/tweak
404
- YQL_CLOG (WARN, ProviderGeneric) << " ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString () << " , retry " << (retry + 1 ) << " of " << RequestRetriesLimit << " , scheduled in " << delay;
405
- --retriesRemaining;
406
- if (status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) {
407
- // if error was deadline, retry only once
408
- retriesRemaining = 0 ; // TODO tune/tweak
409
- }
410
- actorSystem->Schedule (delay, new IEventHandle (selfId, selfId, new TEvRetry (retriesRemaining)));
411
- return ;
412
- }
413
- YQL_CLOG (ERROR, ProviderGeneric) << " ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString () << " , retry count exceed limit " << RequestRetriesLimit;
424
+ static void SendRetryOrError (NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, std::shared_ptr<ILookupRetryState> retryState) {
425
+ auto nextRetry = retryState->GetNextRetryDelay (status);
426
+ if (nextRetry) {
427
+ YQL_CLOG (WARN, ProviderGeneric) << " ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString () << " , retry scheduled in " << *nextRetry;
428
+ actorSystem->Schedule (*nextRetry, new IEventHandle (selfId, selfId, new TEvLookupRetry ()));
429
+ return ;
414
430
}
415
431
SendError (actorSystem, selfId, NConnector::ErrorFromGRPCStatus (status));
416
432
}
@@ -510,7 +526,8 @@ namespace NYql::NDq {
510
526
std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> Request;
511
527
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; // TODO move me to TEvReadSplitsPart
512
528
NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult;
513
- ui32 RetriesRemaining;
529
+ ILookupRetryPolicy::TPtr RetryPolicy;
530
+ std::shared_ptr<ILookupRetryState> RetryState;
514
531
::NMonitoring::TDynamicCounters::TCounterPtr Count;
515
532
::NMonitoring::TDynamicCounters::TCounterPtr Keys;
516
533
::NMonitoring::TDynamicCounters::TCounterPtr ResultRows;
0 commit comments