23
23
#include < yql/essentials/utils/yql_panic.h>
24
24
#include < ydb/core/formats/arrow/serializer/abstract.h>
25
25
26
+ #include < library/cpp/retry/retry_policy.h>
27
+
26
28
namespace NYql ::NDq {
27
29
28
30
using namespace NActors ;
@@ -61,6 +63,13 @@ namespace NYql::NDq {
61
63
public TGenericBaseActor<TGenericLookupActor> {
62
64
using TBase = TGenericBaseActor<TGenericLookupActor>;
63
65
66
+ using ILookupRetryPolicy = IRetryPolicy<const NYdbGrpc::TGrpcStatus&>;
67
+ using ILookupRetryState = ILookupRetryPolicy::IRetryState;
68
+
69
+ struct TEvLookupRetry : NActors::TEventLocal<TEvLookupRetry, EvRetry> {
70
+ typedef typename THandle::TPtr TPtr;
71
+ };
72
+
64
73
public:
65
74
TGenericLookupActor (
66
75
NConnector::IClient::TPtr connectorClient,
@@ -87,6 +96,22 @@ namespace NYql::NDq {
87
96
, HolderFactory(holderFactory)
88
97
, ColumnDestinations(CreateColumnDestination())
89
98
, MaxKeysInRequest(maxKeysInRequest)
99
+ , RetryPolicy(
100
+ ILookupRetryPolicy::GetExponentialBackoffPolicy (
101
+ /* retryClassFunction */
102
+ [](const NYdbGrpc::TGrpcStatus& status) {
103
+ if (NConnector::GrpcStatusNeedsRetry (status))
104
+ return ERetryErrorClass::ShortRetry;
105
+ if (status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED)
106
+ return ERetryErrorClass::ShortRetry; // TODO LongRetry?
107
+ return ERetryErrorClass::NoRetry;
108
+ },
109
+ /* minDelay */ TDuration::MilliSeconds(1 ),
110
+ /* minLongRetryDelay */ TDuration::MilliSeconds(500 ),
111
+ /* maxDelay */ TDuration::Seconds(1 ),
112
+ /* maxRetries */ RequestRetriesLimit,
113
+ /* maxTime */ TDuration::Minutes(5 ),
114
+ /* scaleFactor */ 2 ))
90
115
{
91
116
InitMonCounters (taskCounters);
92
117
}
@@ -157,7 +182,7 @@ namespace NYql::NDq {
157
182
hFunc (TEvReadSplitsPart, Handle);
158
183
hFunc (TEvReadSplitsFinished, Handle);
159
184
hFunc (TEvError, Handle);
160
- hFunc (TEvRetry , Handle);
185
+ hFunc (TEvLookupRetry , Handle);
161
186
hFunc (NActors::TEvents::TEvPoison, Handle);)
162
187
163
188
void Handle (TEvListSplitsIterator::TPtr ev) {
@@ -166,7 +191,7 @@ namespace NYql::NDq {
166
191
[
167
192
actorSystem = TActivationContext::ActorSystem (),
168
193
selfId = SelfId (),
169
- retriesRemaining = RetriesRemaining
194
+ retryState = RetryState
170
195
](const NConnector::TAsyncResult<NConnector::NApi::TListSplitsResponse>& asyncResult) {
171
196
YQL_CLOG (DEBUG, ProviderGeneric) << " ActorId=" << selfId << " Got TListSplitsResponse from Connector" ;
172
197
auto result = ExtractFromConstFuture (asyncResult);
@@ -175,7 +200,7 @@ namespace NYql::NDq {
175
200
auto ev = new TEvListSplitsPart (std::move (*result.Response ));
176
201
actorSystem->Send (new NActors::IEventHandle (selfId, selfId, ev));
177
202
} else {
178
- SendRetryOrError (actorSystem, selfId, result.Status , retriesRemaining );
203
+ SendRetryOrError (actorSystem, selfId, result.Status , retryState );
179
204
}
180
205
});
181
206
}
@@ -199,15 +224,15 @@ namespace NYql::NDq {
199
224
Connector->ReadSplits (readRequest, RequestTimeout).Subscribe ([
200
225
actorSystem = TActivationContext::ActorSystem (),
201
226
selfId = SelfId (),
202
- retriesRemaining = RetriesRemaining
227
+ retryState = RetryState
203
228
](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) {
204
229
YQL_CLOG (DEBUG, ProviderGeneric) << " ActorId=" << selfId << " Got ReadSplitsStreamIterator from Connector" ;
205
230
auto result = ExtractFromConstFuture (asyncResult);
206
231
if (result.Status .Ok ()) {
207
232
auto ev = new TEvReadSplitsIterator (std::move (result.Iterator ));
208
233
actorSystem->Send (new NActors::IEventHandle (selfId, selfId, ev));
209
234
} else {
210
- SendRetryOrError (actorSystem, selfId, result.Status , retriesRemaining );
235
+ SendRetryOrError (actorSystem, selfId, result.Status , retryState );
211
236
}
212
237
});
213
238
}
@@ -236,9 +261,8 @@ namespace NYql::NDq {
236
261
actorSystem->Send (new NActors::IEventHandle (ParentId, SelfId (), errEv.release ()));
237
262
}
238
263
239
- void Handle (TEvRetry ::TPtr ev ) {
264
+ void Handle (TEvLookupRetry ::TPtr) {
240
265
auto guard = Guard (*Alloc);
241
- RetriesRemaining = ev->Get ()->NextRetries ;
242
266
SendRequest ();
243
267
}
244
268
@@ -270,7 +294,7 @@ namespace NYql::NDq {
270
294
}
271
295
272
296
Request = std::move (request);
273
- RetriesRemaining = RequestRetriesLimit ;
297
+ RetryState = std::shared_ptr<ILookupRetryState>(RetryPolicy-> CreateRetryState (). release ()) ;
274
298
SendRequest ();
275
299
}
276
300
@@ -288,7 +312,7 @@ namespace NYql::NDq {
288
312
Connector->ListSplits (splitRequest, RequestTimeout).Subscribe ([
289
313
actorSystem = TActivationContext::ActorSystem (),
290
314
selfId = SelfId (),
291
- retriesRemaining = RetriesRemaining
315
+ retryState = RetryState
292
316
](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) {
293
317
auto result = ExtractFromConstFuture (asyncResult);
294
318
if (result.Status .Ok ()) {
@@ -297,7 +321,7 @@ namespace NYql::NDq {
297
321
auto ev = new TEvListSplitsIterator (std::move (result.Iterator ));
298
322
actorSystem->Send (new NActors::IEventHandle (selfId, selfId, ev));
299
323
} else {
300
- SendRetryOrError (actorSystem, selfId, result.Status , retriesRemaining );
324
+ SendRetryOrError (actorSystem, selfId, result.Status , retryState );
301
325
}
302
326
});
303
327
if (CpuTime) {
@@ -310,7 +334,7 @@ namespace NYql::NDq {
310
334
[
311
335
actorSystem = TActivationContext::ActorSystem (),
312
336
selfId = SelfId (),
313
- retriesRemaining = RetriesRemaining
337
+ retryState = RetryState
314
338
](const NConnector::TAsyncResult<NConnector::NApi::TReadSplitsResponse>& asyncResult) {
315
339
auto result = ExtractFromConstFuture (asyncResult);
316
340
if (result.Status .Ok ()) {
@@ -329,7 +353,7 @@ namespace NYql::NDq {
329
353
auto ev = new TEvReadSplitsFinished (std::move (result.Status ));
330
354
actorSystem->Send (new NActors::IEventHandle (selfId, selfId, ev));
331
355
} else {
332
- SendRetryOrError (actorSystem, selfId, result.Status , retriesRemaining );
356
+ SendRetryOrError (actorSystem, selfId, result.Status , retryState );
333
357
}
334
358
});
335
359
}
@@ -395,22 +419,12 @@ namespace NYql::NDq {
395
419
new TEvError (std::move (error)));
396
420
}
397
421
398
- static void SendRetryOrError (NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, ui32 retriesRemaining) {
399
- if (NConnector::GrpcStatusNeedsRetry (status) || status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) {
400
- if (retriesRemaining) {
401
- const auto retry = RequestRetriesLimit - retriesRemaining;
402
- const auto delay = TDuration::MilliSeconds (1u << retry); // Exponential delay from 1ms to ~0.5s
403
- // <<< TODO tune/tweak
404
- YQL_CLOG (WARN, ProviderGeneric) << " ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString () << " , retry " << (retry + 1 ) << " of " << RequestRetriesLimit << " , scheduled in " << delay;
405
- --retriesRemaining;
406
- if (status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) {
407
- // if error was deadline, retry only once
408
- retriesRemaining = 0 ; // TODO tune/tweak
409
- }
410
- actorSystem->Schedule (delay, new IEventHandle (selfId, selfId, new TEvRetry (retriesRemaining)));
411
- return ;
412
- }
413
- YQL_CLOG (ERROR, ProviderGeneric) << " ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString () << " , retry count exceed limit " << RequestRetriesLimit;
422
+ static void SendRetryOrError (NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, std::shared_ptr<ILookupRetryState> retryState) {
423
+ auto nextRetry = retryState->GetNextRetryDelay (status);
424
+ if (nextRetry) {
425
+ YQL_CLOG (WARN, ProviderGeneric) << " ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString () << " , retry scheduled in " << *nextRetry;
426
+ actorSystem->Schedule (*nextRetry, new IEventHandle (selfId, selfId, new TEvLookupRetry ()));
427
+ return ;
414
428
}
415
429
SendError (actorSystem, selfId, NConnector::ErrorFromGRPCStatus (status));
416
430
}
@@ -502,7 +516,8 @@ namespace NYql::NDq {
502
516
std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> Request;
503
517
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; // TODO move me to TEvReadSplitsPart
504
518
NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult;
505
- ui32 RetriesRemaining;
519
+ ILookupRetryPolicy::TPtr RetryPolicy;
520
+ std::shared_ptr<ILookupRetryState> RetryState;
506
521
::NMonitoring::TDynamicCounters::TCounterPtr Count;
507
522
::NMonitoring::TDynamicCounters::TCounterPtr Keys;
508
523
::NMonitoring::TDynamicCounters::TCounterPtr ResultRows;
0 commit comments