Skip to content

generic lookup: handle retrievable errors in grpc #13119

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ namespace NYql::NDq {
EvReadSplitsPart,
EvReadSplitsFinished,
EvError,
EvRetry,
EvEnd
};

Expand Down Expand Up @@ -89,6 +90,15 @@ namespace NYql::NDq {
NConnector::NApi::TError Error;
};

struct TEvRetry: NActors::TEventLocal<TEvRetry, EvRetry> {
explicit TEvRetry(ui32 nextRetries)
: NextRetries(nextRetries)
{
}

ui32 NextRetries;
};

protected: // TODO move common logic here
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace NYql::NDq {
using namespace NActors;

namespace {
constexpr ui32 RequestRetriesLimit = 10; // TODO lookup parameters or PRAGMA?

const NKikimr::NMiniKQL::TStructType* MergeStructTypes(const NKikimr::NMiniKQL::TTypeEnvironment& env, const NKikimr::NMiniKQL::TStructType* t1, const NKikimr::NMiniKQL::TStructType* t2) {
Y_ABORT_UNLESS(t1);
Expand All @@ -45,7 +46,7 @@ namespace NYql::NDq {
template <typename T>
T ExtractFromConstFuture(const NThreading::TFuture<T>& f) {
// We want to avoid making a copy of data stored in a future.
// But there is no direct way to extract data from a const future5
// But there is no direct way to extract data from a const future
// So, we make a copy of the future, that is cheap. Then, extract the value from this copy.
// It destructs the value in the original future, but this trick is legal and documented here:
// https://docs.yandex-team.ru/arcadia-cpp/cookbook/concurrency
Expand Down Expand Up @@ -155,20 +156,25 @@ namespace NYql::NDq {
hFunc(TEvReadSplitsPart, Handle);
hFunc(TEvReadSplitsFinished, Handle);
hFunc(TEvError, Handle);
hFunc(TEvRetry, Handle);
hFunc(NActors::TEvents::TEvPoison, Handle);)

void Handle(TEvListSplitsIterator::TPtr ev) {
auto& iterator = ev->Get()->Iterator;
iterator->ReadNext().Subscribe(
[actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TAsyncResult<NConnector::NApi::TListSplitsResponse>& asyncResult) {
[
actorSystem = TActivationContext::ActorSystem(),
selfId = SelfId(),
retriesRemaining = RetriesRemaining
](const NConnector::TAsyncResult<NConnector::NApi::TListSplitsResponse>& asyncResult) {
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got TListSplitsResponse from Connector";
auto result = ExtractFromConstFuture(asyncResult);
if (result.Status.Ok()) {
Y_ABORT_UNLESS(result.Response);
auto ev = new TEvListSplitsPart(std::move(*result.Response));
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
} else {
SendError(actorSystem, selfId, result.Status);
SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
}
});
}
Expand All @@ -189,14 +195,18 @@ namespace NYql::NDq {
*readRequest.add_splits() = split;
readRequest.Setformat(NConnector::NApi::TReadSplitsRequest_EFormat::TReadSplitsRequest_EFormat_ARROW_IPC_STREAMING);
readRequest.set_filtering(NConnector::NApi::TReadSplitsRequest::FILTERING_MANDATORY);
Connector->ReadSplits(readRequest).Subscribe([actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) {
Connector->ReadSplits(readRequest).Subscribe([
actorSystem = TActivationContext::ActorSystem(),
selfId = SelfId(),
retriesRemaining = RetriesRemaining
](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) {
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got ReadSplitsStreamIterator from Connector";
auto result = ExtractFromConstFuture(asyncResult);
if (result.Status.Ok()) {
auto ev = new TEvReadSplitsIterator(std::move(result.Iterator));
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
} else {
SendError(actorSystem, selfId, result.Status);
SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Оно будет работать так, ну нужно ли заново начинать (ListSplits) ? (я сам особо ничего не знаю как тут работает)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Не вполне понятно в каком состоянии находится коннектор (особенно в случае таймаута), так что просто начинаем с нуля (ListSplits)

}
});
}
Expand Down Expand Up @@ -225,6 +235,12 @@ namespace NYql::NDq {
actorSystem->Send(new NActors::IEventHandle(ParentId, SelfId(), errEv.release()));
}

void Handle(TEvRetry::TPtr ev) {
auto guard = Guard(*Alloc);
RetriesRemaining = ev->Get()->NextRetries;
SendRequest();
}

void Handle(NActors::TEvents::TEvPoison::TPtr) {
PassAway();
}
Expand All @@ -243,18 +259,22 @@ namespace NYql::NDq {
if (!request) {
return;
}
auto startCycleCount = GetCycleCountFast();
SentTime = TInstant::Now();
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << SelfId() << " Got LookupRequest for " << request->size() << " keys";
Y_ABORT_IF(request->size() == 0 || request->size() > MaxKeysInRequest);

if (Count) {
Count->Inc();
InFlight->Inc();
Keys->Add(request->size());
}

Request = std::move(request);
RetriesRemaining = RequestRetriesLimit;
SendRequest();
}

void SendRequest() {
auto startCycleCount = GetCycleCountFast();
NConnector::NApi::TListSplitsRequest splitRequest;

auto error = FillSelect(*splitRequest.add_selects());
Expand All @@ -264,15 +284,19 @@ namespace NYql::NDq {
};

splitRequest.Setmax_split_count(1);
Connector->ListSplits(splitRequest).Subscribe([actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) {
Connector->ListSplits(splitRequest).Subscribe([
actorSystem = TActivationContext::ActorSystem(),
selfId = SelfId(),
retriesRemaining = RetriesRemaining
](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) {
auto result = ExtractFromConstFuture(asyncResult);
if (result.Status.Ok()) {
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got TListSplitsStreamIterator";
Y_ABORT_UNLESS(result.Iterator, "Uninitialized iterator");
auto ev = new TEvListSplitsIterator(std::move(result.Iterator));
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
} else {
SendError(actorSystem, selfId, result.Status);
SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
}
});
if (CpuTime) {
Expand All @@ -282,12 +306,17 @@ namespace NYql::NDq {

void ReadNextData() {
ReadSplitsIterator->ReadNext().Subscribe(
[actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TAsyncResult<NConnector::NApi::TReadSplitsResponse>& asyncResult) {
[
actorSystem = TActivationContext::ActorSystem(),
selfId = SelfId(),
retriesRemaining = RetriesRemaining
](const NConnector::TAsyncResult<NConnector::NApi::TReadSplitsResponse>& asyncResult) {
auto result = ExtractFromConstFuture(asyncResult);
if (result.Status.Ok()) {
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got DataChunk";
Y_ABORT_UNLESS(result.Response);
auto& response = *result.Response;
// TODO: retry on some YDB errors
if (NConnector::IsSuccess(response)) {
auto ev = new TEvReadSplitsPart(std::move(response));
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
Expand All @@ -299,7 +328,7 @@ namespace NYql::NDq {
auto ev = new TEvReadSplitsFinished(std::move(result.Status));
actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev));
} else {
SendError(actorSystem, selfId, result.Status);
SendRetryOrError(actorSystem, selfId, result.Status, retriesRemaining);
}
});
}
Expand Down Expand Up @@ -365,7 +394,19 @@ namespace NYql::NDq {
new TEvError(std::move(error)));
}

static void SendError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status) {
static void SendRetryOrError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, ui32 retriesRemaining) {
if (NConnector::GrpcStatusNeedsRetry(status)) {
if (retriesRemaining) {
const auto retry = RequestRetriesLimit - retriesRemaining;
const auto delay = TDuration::MilliSeconds(1u << retry); // Exponential delay from 1ms to ~0.5s
// <<< TODO tune/tweak
YQL_CLOG(WARN, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry " << (retry + 1) << " of " << RequestRetriesLimit << ", scheduled in " << delay;
--retriesRemaining;
actorSystem->Schedule(delay, new IEventHandle(selfId, selfId, new TEvRetry(retriesRemaining)));
return;
}
YQL_CLOG(ERROR, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry count exceed limit " << RequestRetriesLimit;
}
SendError(actorSystem, selfId, NConnector::ErrorFromGRPCStatus(status));
}

Expand Down Expand Up @@ -422,6 +463,8 @@ namespace NYql::NDq {

NConnector::NApi::TPredicate_TDisjunction disjunction;
for (const auto& [k, _] : *Request) {
// TODO consider skipping already retrieved keys
// ... but careful, can we end up with zero? TODO
NConnector::NApi::TPredicate_TConjunction conjunction;
for (ui32 c = 0; c != KeyType->GetMembersCount(); ++c) {
NConnector::NApi::TPredicate_TComparison eq;
Expand Down Expand Up @@ -454,6 +497,7 @@ namespace NYql::NDq {
std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> Request;
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; // TODO move me to TEvReadSplitsPart
NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult;
ui32 RetriesRemaining;
::NMonitoring::TDynamicCounters::TCounterPtr Count;
::NMonitoring::TDynamicCounters::TCounterPtr Keys;
::NMonitoring::TDynamicCounters::TCounterPtr ResultRows;
Expand Down
Loading