@@ -273,16 +273,43 @@ class TS3ApplicatorActor : public NActors::TActorBootstrapped<TS3ApplicatorActor
273
273
hFunc (TEvPrivate::TEvListParts, Handle);
274
274
)
275
275
276
- bool RetryOperation (CURLcode curlResponseCode, ui32 httpResponseCode, const TString& url, const TString& operationName) {
277
- auto result = RetryCount && RetryPolicy->CreateRetryState ()->GetNextRetryDelay (curlResponseCode, httpResponseCode);
278
- Issues.AddIssue (TStringBuilder () << " Retry operation " << operationName << " , curl error: " << curl_easy_strerror (curlResponseCode) << " , http code: " << httpResponseCode << " , url: " << url);
276
+ bool RetryOperation (IHTTPGateway::TResult&& operationResult, const TString& url, const TString& operationName) {
277
+ const auto curlResponseCode = operationResult.CurlResponseCode ;
278
+ const auto httpResponseCode = operationResult.Content .HttpResponseCode ;
279
+ const auto result = RetryCount && GetRetryState (operationName)->GetNextRetryDelay (curlResponseCode, httpResponseCode);
280
+
281
+ NYql::TIssues issues = std::move (operationResult.Issues );
282
+ TStringBuilder errorMessage = TStringBuilder () << " Retry operation " << operationName << " , curl error: " << curl_easy_strerror (curlResponseCode) << " , url: " << url;
283
+ if (const TString errorText = operationResult.Content .Extract ()) {
284
+ TString errorCode;
285
+ TString message;
286
+ if (!ParseS3ErrorResponse (errorText, errorCode, message)) {
287
+ message = errorText;
288
+ }
289
+ issues.AddIssues (BuildIssues (httpResponseCode, errorCode, message));
290
+ } else {
291
+ errorMessage << " , HTTP code: " << httpResponseCode;
292
+ }
293
+
294
+ if (issues) {
295
+ RetryIssues.AddIssues (NS3Util::AddParentIssue (errorMessage, std::move (issues)));
296
+ } else {
297
+ RetryIssues.AddIssue (errorMessage);
298
+ }
299
+
279
300
if (result) {
280
301
RetryCount--;
281
302
} else {
282
- Finish (true , RetryCount
283
- ? TString (" Number of retries exceeded limit per operation" )
284
- : TStringBuilder () << " Number of retries exceeded global limit in " << GLOBAL_RETRY_LIMIT << " retries" );
303
+ Issues.AddIssues (NS3Util::AddParentIssue (
304
+ RetryCount
305
+ ? TStringBuilder () << " Number of retries exceeded limit for operation " << operationName
306
+ : TStringBuilder () << " Number of retries exceeded global limit in " << GLOBAL_RETRY_LIMIT << " retries" ,
307
+ NYql::TIssues (RetryIssues)
308
+ ));
309
+ RetryIssues.Clear ();
310
+ Finish (true );
285
311
}
312
+
286
313
return result;
287
314
}
288
315
@@ -377,7 +404,7 @@ class TS3ApplicatorActor : public NActors::TActorBootstrapped<TS3ApplicatorActor
377
404
}
378
405
const TString& url = ev->Get ()->State ->BuildUrl ();
379
406
LOG_D (" CommitMultipartUpload ERROR " << url);
380
- if (RetryOperation (result. CurlResponseCode , result. Content . HttpResponseCode , url, " CommitMultipartUpload" )) {
407
+ if (RetryOperation (std::move ( result) , url, " CommitMultipartUpload" )) {
381
408
PushCommitMultipartUpload (ev->Get ()->State );
382
409
}
383
410
}
@@ -422,7 +449,9 @@ class TS3ApplicatorActor : public NActors::TActorBootstrapped<TS3ApplicatorActor
422
449
auto prefix = ev->Get ()->State ->Url + ev->Get ()->State ->Prefix ;
423
450
if (!UnknownPrefixes.contains (prefix)) {
424
451
UnknownPrefixes.insert (prefix);
425
- Issues.AddIssue (TIssue (" Unknown uncommitted upload with prefix: " + prefix));
452
+ TIssue issue (TStringBuilder () << " Unknown uncommitted upload with prefix: " << prefix);
453
+ issue.SetCode (NYql::DEFAULT_ERROR, NYql::TSeverityIds::S_INFO);
454
+ Issues.AddIssue (std::move (issue));
426
455
}
427
456
} else {
428
457
pos += KeyPrefix.size ();
@@ -452,7 +481,7 @@ class TS3ApplicatorActor : public NActors::TActorBootstrapped<TS3ApplicatorActor
452
481
}
453
482
const TString& url = ev->Get ()->State ->BuildUrl ();
454
483
LOG_D (" ListMultipartUploads ERROR " << url);
455
- if (RetryOperation (result. CurlResponseCode , result. Content . HttpResponseCode , url, " ListMultipartUploads" )) {
484
+ if (RetryOperation (std::move ( result) , url, " ListMultipartUploads" )) {
456
485
PushListMultipartUploads (ev->Get ()->State );
457
486
}
458
487
}
@@ -476,7 +505,7 @@ class TS3ApplicatorActor : public NActors::TActorBootstrapped<TS3ApplicatorActor
476
505
}
477
506
const TString& url = ev->Get ()->State ->BuildUrl ();
478
507
LOG_D (" AbortMultipartUpload ERROR " << url);
479
- if (RetryOperation (result. CurlResponseCode , result. Content . HttpResponseCode , url, " AbortMultipartUpload" )) {
508
+ if (RetryOperation (std::move ( result) , url, " AbortMultipartUpload" )) {
480
509
PushAbortMultipartUpload (ev->Get ()->State );
481
510
}
482
511
}
@@ -527,7 +556,7 @@ class TS3ApplicatorActor : public NActors::TActorBootstrapped<TS3ApplicatorActor
527
556
}
528
557
const TString& url = ev->Get ()->State ->BuildUrl ();
529
558
LOG_D (" ListParts ERROR " << url);
530
- if (RetryOperation (result. CurlResponseCode , result. Content . HttpResponseCode , url, " ListParts" )) {
559
+ if (RetryOperation (std::move ( result) , url, " ListParts" )) {
531
560
PushListParts (ev->Get ()->State );
532
561
}
533
562
}
@@ -597,6 +626,13 @@ class TS3ApplicatorActor : public NActors::TActorBootstrapped<TS3ApplicatorActor
597
626
actorSystem->Send (new NActors::IEventHandle (selfId, {}, new TEvPrivate::TEvListParts (state, std::move (result))));
598
627
}
599
628
629
+ IHTTPGateway::TRetryPolicy::IRetryState::TPtr& GetRetryState (const TString& operationName) {
630
+ if (const auto it = RetryStates.find (operationName); it != RetryStates.end ()) {
631
+ return it->second ;
632
+ }
633
+ return RetryStates.insert ({operationName, RetryPolicy->CreateRetryState ()}).first ->second ;
634
+ }
635
+
600
636
private:
601
637
NActors::TActorId ParentId;
602
638
IHTTPGateway::TPtr Gateway;
@@ -609,11 +645,13 @@ class TS3ApplicatorActor : public NActors::TActorBootstrapped<TS3ApplicatorActor
609
645
NYql::NDqProto::TExternalEffect ExternalEffect;
610
646
NActors::TActorSystem* const ActorSystem;
611
647
const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy;
648
+ std::unordered_map<TString, IHTTPGateway::TRetryPolicy::IRetryState::TPtr> RetryStates;
612
649
ui64 HttpRequestInflight = 0 ;
613
650
ui64 RetryCount;
614
651
THashSet<TString> UnknownPrefixes;
615
652
THashSet<TString> CommitUploads;
616
653
NYql::TIssues Issues;
654
+ NYql::TIssues RetryIssues;
617
655
std::queue<TObjectStorageRequest> RequestQueue;
618
656
bool ApplicationFinished = false ;
619
657
};
0 commit comments