Skip to content

Commit a41d41d

Browse files
authored
Import from S3: check error is retriable KIKIMR-21102 (#2198)
1 parent 393f609 commit a41d41d

File tree

3 files changed

+45
-29
lines changed

3 files changed

+45
-29
lines changed

ydb/core/tx/datashard/export_s3_uploader.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
471471
}
472472

473473
void Retry() {
474-
Delay = Min(Delay * ++Attempt, TDuration::Minutes(10));
474+
Delay = Min(Delay * ++Attempt, MaxDelay);
475475
const TDuration random = TDuration::FromValue(TAppData::RandomProvider->GenRand64() % Delay.MicroSeconds());
476476
this->Schedule(Delay + random, new TEvents::TEvWakeup());
477477
}
@@ -637,8 +637,10 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
637637
const ui32 Retries;
638638
ui32 Attempt;
639639

640-
TActorId Client;
641640
TDuration Delay;
641+
static constexpr TDuration MaxDelay = TDuration::Minutes(10);
642+
643+
TActorId Client;
642644
bool SchemeUploaded;
643645
bool MetadataUploaded;
644646
bool MultiPart;

ydb/core/tx/datashard/import_s3.cpp

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -309,10 +309,6 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
309309

310310
}; // TUploadRowsRequestBuilder
311311

312-
enum class EWakeupTag: ui64 {
313-
Restart,
314-
};
315-
316312
void AllocateResource() {
317313
IMPORT_LOG_D("AllocateResource");
318314

@@ -386,7 +382,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
386382
default:
387383
IMPORT_LOG_E("Error at 'HeadObject'"
388384
<< ": error# " << result);
389-
return RestartOrFinish(result.GetError().GetMessage().c_str());
385+
return RetryOrFinish(result.GetError());
390386
}
391387

392388
CompressionCodec = NBackupRestoreTraits::NextCompressionCodec(CompressionCodec);
@@ -578,7 +574,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
578574
return Finish(false, record.GetErrorDescription());
579575

580576
default:
581-
return RestartOrFinish(record.GetErrorDescription());
577+
return RetryOrFinish(record.GetErrorDescription());
582578
}
583579
}
584580

@@ -590,7 +586,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
590586

591587
IMPORT_LOG_E("Error at '" << marker << "'"
592588
<< ": error# " << result);
593-
RestartOrFinish(result.GetError().GetMessage().c_str());
589+
RetryOrFinish(result.GetError());
594590

595591
return false;
596592
}
@@ -656,21 +652,35 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
656652
return true;
657653
}
658654

659-
void RestartOrFinish(const TString& error) {
660-
if (Attempt++ < Retries) {
661-
Delay = Min(Delay * Attempt, MaxDelay);
662-
const TDuration random = TDuration::FromValue(TAppData::RandomProvider->GenRand64() % Delay.MicroSeconds());
655+
static bool ShouldRetry(const Aws::S3::S3Error& error) {
656+
return error.ShouldRetry();
657+
}
663658

664-
Schedule(Delay + random, new TEvents::TEvWakeup(static_cast<ui64>(EWakeupTag::Restart)));
665-
} else {
666-
Finish(false, error);
667-
}
659+
static bool ShouldRetry(const TString&) {
660+
return true;
668661
}
669662

670-
void Handle(TEvents::TEvWakeup::TPtr& ev) {
671-
switch (static_cast<EWakeupTag>(ev->Get()->Tag)) {
672-
case EWakeupTag::Restart:
673-
return Restart();
663+
template <typename T>
664+
bool CanRetry(const T& error) const {
665+
return Attempt < Retries && ShouldRetry(error);
666+
}
667+
668+
void Retry() {
669+
Delay = Min(Delay * ++Attempt, MaxDelay);
670+
const TDuration random = TDuration::FromValue(TAppData::RandomProvider->GenRand64() % Delay.MicroSeconds());
671+
Schedule(Delay + random, new TEvents::TEvWakeup());
672+
}
673+
674+
template <typename T>
675+
void RetryOrFinish(const T& error) {
676+
if (CanRetry(error)) {
677+
Retry();
678+
} else {
679+
if constexpr (std::is_same_v<T, Aws::S3::S3Error>) {
680+
Finish(false, TStringBuilder() << "S3 error: " << error.GetMessage().c_str());
681+
} else {
682+
Finish(false, error);
683+
}
674684
}
675685
}
676686

@@ -754,7 +764,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
754764
hFunc(TEvDataShard::TEvS3DownloadInfo, Handle);
755765
hFunc(TEvDataShard::TEvS3UploadRowsResponse, Handle);
756766

757-
hFunc(TEvents::TEvWakeup, Handle);
767+
sFunc(TEvents::TEvWakeup, Restart);
758768
sFunc(TEvents::TEvPoisonPill, NotifyDied);
759769
}
760770
}

ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,17 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
9494
LOG_E("Error at '" << marker << "'"
9595
<< ": self# " << SelfId()
9696
<< ", error# " << result);
97-
MaybeRetry(result.GetError().GetMessage().c_str());
97+
MaybeRetry(result.GetError());
9898

9999
return false;
100100
}
101101

102-
void MaybeRetry(const TString& error) {
103-
if (Attempt++ < Retries) {
104-
Schedule(TDuration::Minutes(1), new TEvents::TEvWakeup());
102+
void MaybeRetry(const Aws::S3::S3Error& error) {
103+
if (Attempt < Retries && error.ShouldRetry()) {
104+
Delay = Min(Delay * ++Attempt, MaxDelay);
105+
Schedule(Delay, new TEvents::TEvWakeup());
105106
} else {
106-
Reply(false, error);
107+
Reply(false, TStringBuilder() << "S3 error: " << error.GetMessage().c_str());
107108
}
108109
}
109110

@@ -148,8 +149,8 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
148149
hFunc(TEvExternalStorage::TEvHeadObjectResponse, Handle);
149150
hFunc(TEvExternalStorage::TEvGetObjectResponse, Handle);
150151

151-
cFunc(TEvents::TEvWakeup::EventType, Bootstrap);
152-
cFunc(TEvents::TEvPoisonPill::EventType, PassAway);
152+
sFunc(TEvents::TEvWakeup, Bootstrap);
153+
sFunc(TEvents::TEvPoisonPill, PassAway);
153154
}
154155
}
155156

@@ -164,6 +165,9 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
164165
const ui32 Retries;
165166
ui32 Attempt = 0;
166167

168+
TDuration Delay = TDuration::Minutes(1);
169+
static constexpr TDuration MaxDelay = TDuration::Minutes(10);
170+
167171
TActorId Client;
168172

169173
}; // TSchemeGetter

0 commit comments

Comments
 (0)