Skip to content

generic lookup: use retry_policy library #13460

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Feb 24, 2025
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,6 @@ namespace NYql::NDq {
NConnector::NApi::TError Error;
};

struct TEvRetry: NActors::TEventLocal<TEvRetry, EvRetry> {
explicit TEvRetry(ui32 nextRetries)
: NextRetries(nextRetries)
{
}

ui32 NextRetries;
};

protected: // TODO move common logic here
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <yql/essentials/utils/yql_panic.h>
#include <ydb/core/formats/arrow/serializer/abstract.h>

#include <library/cpp/retry/retry_policy.h>

namespace NYql::NDq {

using namespace NActors;
Expand Down Expand Up @@ -60,6 +62,12 @@ namespace NYql::NDq {
public TGenericBaseActor<TGenericLookupActor> {
using TBase = TGenericBaseActor<TGenericLookupActor>;

using ILookupRetryPolicy = IRetryPolicy<const NYdbGrpc::TGrpcStatus&>;
using ILookupRetryState = ILookupRetryPolicy::IRetryState;

struct TEvLookupRetry : NActors::TEventLocal<TEvLookupRetry, EvRetry> {
};

public:
TGenericLookupActor(
NConnector::IClient::TPtr connectorClient,
Expand All @@ -86,6 +94,24 @@ namespace NYql::NDq {
, HolderFactory(holderFactory)
, ColumnDestinations(CreateColumnDestination())
, MaxKeysInRequest(maxKeysInRequest)
, RetryPolicy(
ILookupRetryPolicy::GetExponentialBackoffPolicy(
/* retryClassFunction */
[](const NYdbGrpc::TGrpcStatus& status) {
if (NConnector::GrpcStatusNeedsRetry(status)) {
return ERetryErrorClass::ShortRetry;
}
if (status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) {
return ERetryErrorClass::ShortRetry; // TODO LongRetry?
}
return ERetryErrorClass::NoRetry;
},
/* minDelay */ TDuration::MilliSeconds(1),
/* minLongRetryDelay */ TDuration::MilliSeconds(500),
/* maxDelay */ TDuration::Seconds(1),
/* maxRetries */ RequestRetriesLimit,
/* maxTime */ TDuration::Minutes(5),
/* scaleFactor */ 2))
{
InitMonCounters(taskCounters);
}
Expand Down Expand Up @@ -156,7 +182,7 @@ namespace NYql::NDq {
hFunc(TEvReadSplitsPart, Handle);
hFunc(TEvReadSplitsFinished, Handle);
hFunc(TEvError, Handle);
hFunc(TEvRetry, Handle);
hFunc(TEvLookupRetry, Handle);
hFunc(NActors::TEvents::TEvPoison, Handle);)

void Handle(TEvListSplitsIterator::TPtr ev) {
Expand All @@ -165,7 +191,7 @@ namespace NYql::NDq {
[
actorSystem = TActivationContext::ActorSystem(),
selfId = SelfId(),
retriesRemaining = RetriesRemaining
retryState = RetryState
](const NConnector::TAsyncResult<NConnector::NApi::TListSplitsResponse>& asyncResult) {
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got TListSplitsResponse from Connector";
auto result = ExtractFromConstFuture(asyncResult);
Expand All @@ -174,7 +200,7 @@ namespace NYql::NDq {
auto ev = new TEvListSplitsPart(std::move(*result.Response));
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
} else {
SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
SendRetryOrError(actorSystem, selfId, result.Status, retryState);
}
});
}
Expand All @@ -198,15 +224,15 @@ namespace NYql::NDq {
Connector->ReadSplits(readRequest, RequestTimeout).Subscribe([
actorSystem = TActivationContext::ActorSystem(),
selfId = SelfId(),
retriesRemaining = RetriesRemaining
retryState = RetryState
](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) {
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got ReadSplitsStreamIterator from Connector";
auto result = ExtractFromConstFuture(asyncResult);
if (result.Status.Ok()) {
auto ev = new TEvReadSplitsIterator(std::move(result.Iterator));
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
} else {
SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
SendRetryOrError(actorSystem, selfId, result.Status, retryState);
}
});
}
Expand Down Expand Up @@ -235,9 +261,8 @@ namespace NYql::NDq {
actorSystem->Send(new NActors::IEventHandle(ParentId, SelfId(), errEv.release()));
}

void Handle(TEvRetry::TPtr ev) {
void Handle(TEvLookupRetry::TPtr) {
auto guard = Guard(*Alloc);
RetriesRemaining = ev->Get()->NextRetries;
SendRequest();
}

Expand Down Expand Up @@ -269,7 +294,7 @@ namespace NYql::NDq {
}

Request = std::move(request);
RetriesRemaining = RequestRetriesLimit;
RetryState = std::shared_ptr<ILookupRetryState>(RetryPolicy->CreateRetryState());
SendRequest();
}

Expand All @@ -287,7 +312,7 @@ namespace NYql::NDq {
Connector->ListSplits(splitRequest, RequestTimeout).Subscribe([
actorSystem = TActivationContext::ActorSystem(),
selfId = SelfId(),
retriesRemaining = RetriesRemaining
retryState = RetryState
](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) {
auto result = ExtractFromConstFuture(asyncResult);
if (result.Status.Ok()) {
Expand All @@ -296,7 +321,7 @@ namespace NYql::NDq {
auto ev = new TEvListSplitsIterator(std::move(result.Iterator));
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
} else {
SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
SendRetryOrError(actorSystem, selfId, result.Status, retryState);
}
});
if (CpuTime) {
Expand All @@ -309,7 +334,7 @@ namespace NYql::NDq {
[
actorSystem = TActivationContext::ActorSystem(),
selfId = SelfId(),
retriesRemaining = RetriesRemaining
retryState = RetryState
](const NConnector::TAsyncResult<NConnector::NApi::TReadSplitsResponse>& asyncResult) {
auto result = ExtractFromConstFuture(asyncResult);
if (result.Status.Ok()) {
Expand All @@ -328,7 +353,7 @@ namespace NYql::NDq {
auto ev = new TEvReadSplitsFinished(std::move(result.Status));
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
} else {
SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
SendRetryOrError(actorSystem, selfId, result.Status, retryState);
}
});
}
Expand Down Expand Up @@ -394,22 +419,12 @@ namespace NYql::NDq {
new TEvError(std::move(error)));
}

static void SendRetryOrError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, ui32 retriesRemaining) {
if (NConnector::GrpcStatusNeedsRetry(status) || status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) {
if (retriesRemaining) {
const auto retry = RequestRetriesLimit - retriesRemaining;
const auto delay = TDuration::MilliSeconds(1u << retry); // Exponential delay from 1ms to ~0.5s
// << TODO tune/tweak
YQL_CLOG(WARN, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry " << (retry + 1) << " of " << RequestRetriesLimit << ", scheduled in " << delay;
--retriesRemaining;
if (status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) {
// if error was deadline, retry only once
retriesRemaining = 0; // TODO tune/tweak
}
actorSystem->Schedule(delay, new IEventHandle(selfId, selfId, new TEvRetry(retriesRemaining)));
return;
}
YQL_CLOG(ERROR, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry count exceed limit " << RequestRetriesLimit;
static void SendRetryOrError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, std::shared_ptr<ILookupRetryState> retryState) {
auto nextRetry = retryState->GetNextRetryDelay(status);
if (nextRetry) {
YQL_CLOG(WARN, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry scheduled in " << *nextRetry;
actorSystem->Schedule(*nextRetry, new IEventHandle(selfId, selfId, new TEvLookupRetry()));
return;
}
SendError(actorSystem, selfId, NConnector::ErrorFromGRPCStatus(status));
}
Expand Down Expand Up @@ -509,7 +524,8 @@ namespace NYql::NDq {
std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> Request;
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; // TODO move me to TEvReadSplitsPart
NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult;
ui32 RetriesRemaining;
ILookupRetryPolicy::TPtr RetryPolicy;
std::shared_ptr<ILookupRetryState> RetryState;
::NMonitoring::TDynamicCounters::TCounterPtr Count;
::NMonitoring::TDynamicCounters::TCounterPtr Keys;
::NMonitoring::TDynamicCounters::TCounterPtr ResultRows;
Expand Down
Loading