Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ void Init(

if (protoConfig.GetPrivateApi().GetEnabled()) {
const auto& s3readConfig = protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig();
auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(TDuration::Max());
auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.MaxTime = TDuration::Max(), .RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
NYql::NDq::TS3ReadActorFactoryConfig readActorFactoryCfg;
if (const ui64 rowsInBatch = s3readConfig.GetRowsInBatch()) {
readActorFactoryCfg.RowsInBatch = rowsInBatch;
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
RegisterSequencerActorFactory(*factory, counters);

if (federatedQuerySetup) {
RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway);
RegisterS3WriteActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway);
auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy);
RegisterS3WriteActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy);

if (federatedQuerySetup->ConnectorClient) {
RegisterGenericProviderFactories(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->ConnectorClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,50 @@

namespace NYql {

IHTTPGateway::TRetryPolicy::TPtr GetHTTPDefaultRetryPolicy(TDuration maxTime, size_t maxRetries) {
std::unordered_set<CURLcode> YqlRetriedCurlCodes() {
return {
CURLE_COULDNT_CONNECT,
CURLE_WEIRD_SERVER_REPLY,
CURLE_WRITE_ERROR,
CURLE_READ_ERROR,
CURLE_OPERATION_TIMEDOUT,
CURLE_SSL_CONNECT_ERROR,
CURLE_BAD_DOWNLOAD_RESUME,
CURLE_SEND_ERROR,
CURLE_RECV_ERROR,
CURLE_NO_CONNECTION_AVAILABLE
};
}

std::unordered_set<CURLcode> FqRetriedCurlCodes() {
return {
CURLE_COULDNT_CONNECT,
CURLE_WEIRD_SERVER_REPLY,
CURLE_WRITE_ERROR,
CURLE_READ_ERROR,
CURLE_OPERATION_TIMEDOUT,
CURLE_SSL_CONNECT_ERROR,
CURLE_BAD_DOWNLOAD_RESUME,
CURLE_SEND_ERROR,
CURLE_RECV_ERROR,
CURLE_NO_CONNECTION_AVAILABLE,
CURLE_GOT_NOTHING
};
}

IHTTPGateway::TRetryPolicy::TPtr GetHTTPDefaultRetryPolicy(THttpRetryPolicyOptions&& options) {
auto maxTime = options.MaxTime;
auto maxRetries = options.MaxRetries;
if (!maxTime) {
maxTime = TDuration::Minutes(5);
}
return IHTTPGateway::TRetryPolicy::GetExponentialBackoffPolicy([](CURLcode curlCode, long httpCode) {

switch (curlCode) {
case CURLE_OK:
// look to http code
break;
case CURLE_COULDNT_CONNECT:
case CURLE_WEIRD_SERVER_REPLY:
case CURLE_WRITE_ERROR:
case CURLE_READ_ERROR:
case CURLE_OPERATION_TIMEDOUT:
case CURLE_SSL_CONNECT_ERROR:
case CURLE_BAD_DOWNLOAD_RESUME:
case CURLE_SEND_ERROR:
case CURLE_RECV_ERROR:
case CURLE_NO_CONNECTION_AVAILABLE:
// retry small number of known errors
return ERetryErrorClass::ShortRetry;
default:
// do not retry others
return ERetryErrorClass::NoRetry;
return IHTTPGateway::TRetryPolicy::GetExponentialBackoffPolicy([options = std::move(options)](CURLcode curlCode, long httpCode) {
if (curlCode == CURLE_OK) {
// pass
} else if (options.RetriedCurlCodes.contains(curlCode)) {
return ERetryErrorClass::ShortRetry;
} else {
return ERetryErrorClass::NoRetry;
}

switch (httpCode) {
Expand All @@ -52,4 +71,8 @@ IHTTPGateway::TRetryPolicy::TPtr GetHTTPDefaultRetryPolicy(TDuration maxTime, si
maxTime); // maxTime
}

IHTTPGateway::TRetryPolicy::TPtr GetHTTPDefaultRetryPolicy(TDuration maxTime, size_t maxRetries) {
return GetHTTPDefaultRetryPolicy(THttpRetryPolicyOptions{.MaxTime = maxTime, .MaxRetries = maxRetries});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,22 @@

#include "yql_http_gateway.h"

#include <curl/curl.h>
#include <unordered_set>

namespace NYql {

IHTTPGateway::TRetryPolicy::TPtr GetHTTPDefaultRetryPolicy(TDuration maxTime = TDuration::Zero(), size_t maxRetries = std::numeric_limits<size_t>::max()); // Zero means default maxTime
std::unordered_set<CURLcode> YqlRetriedCurlCodes();
std::unordered_set<CURLcode> FqRetriedCurlCodes();

struct THttpRetryPolicyOptions {
TDuration MaxTime = TDuration::Zero(); // Zero means default maxTime
size_t MaxRetries = std::numeric_limits<size_t>::max();
std::unordered_set<CURLcode> RetriedCurlCodes = YqlRetriedCurlCodes();
};

IHTTPGateway::TRetryPolicy::TPtr GetHTTPDefaultRetryPolicy(THttpRetryPolicyOptions&& options = {});

IHTTPGateway::TRetryPolicy::TPtr GetHTTPDefaultRetryPolicy(TDuration maxTime, size_t maxRetries = std::numeric_limits<size_t>::max()); // Zero means default maxTime

}
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ class TS3ApplicatorActor : public NActors::TActorBootstrapped<TS3ApplicatorActor
, CredentialsFactory(credentialsFactory)
, ExternalEffect(externalEffect)
, ActorSystem(NActors::TActivationContext::ActorSystem())
, RetryPolicy(NYql::GetHTTPDefaultRetryPolicy(TDuration::Zero(), 3))
, RetryPolicy(NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.MaxRetries = 3, .RetriedCurlCodes = NYql::FqRetriedCurlCodes()}))
, RetryCount(GLOBAL_RETRY_LIMIT) {
// ^^^ 3 retries in HTTP GW per operation
// up to 100 retries at app level for all operations ^^^
Expand Down