Skip to content

Commit 23682ef

Browse files
committed
Fixed result writer actor
1 parent a7c0728 commit 23682ef

File tree

2 files changed

+44
-12
lines changed

2 files changed

+44
-12
lines changed

ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "base_compute_actor.h"
22

3+
#include <ydb/core/fq/libs/common/rows_proto_splitter.h>
34
#include <ydb/core/fq/libs/common/util.h>
45
#include <ydb/core/fq/libs/compute/common/metrics.h>
56
#include <ydb/core/fq/libs/compute/common/retry_actor.h>
@@ -22,6 +23,8 @@
2223
#include <ydb/library/actors/core/log.h>
2324
#include <library/cpp/protobuf/interop/cast.h>
2425

26+
#include <queue>
27+
2528
#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResultWriter] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream)
2629
#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResultWriter] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream)
2730
#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResultWriter] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream)
@@ -92,28 +95,39 @@ class TResultSetWriterActor : public TBaseComputeActor<TResultSetWriterActor> {
9295
)
9396

9497
void Handle(const TEvYdbCompute::TEvFetchScriptResultResponse::TPtr& ev) {
95-
const auto& response = *ev.Get()->Get();
98+
auto& response = *ev.Get()->Get();
9699
if (response.Status != NYdb::EStatus::SUCCESS) {
97100
LOG_E("ResultSetId: " << ResultSetId << " Can't fetch script result: " << ev->Get()->Issues.ToOneLineString());
98101
Send(Parent, new TEvYdbCompute::TEvResultSetWriterResponse(ResultSetId, ev->Get()->Issues, NYdb::EStatus::INTERNAL_ERROR));
99102
FailedAndPassAway();
100103
return;
101104
}
102105

103-
auto startTime = TInstant::Now();
106+
LOG_I("ResultSetId: " << ResultSetId << " FetchToken: " << FetchToken << " Successfully fetched " << response.ResultSet->RowsCount() << " rows");
104107
Truncated |= response.ResultSet->Truncated();
105108
FetchToken = response.NextFetchToken;
106109
auto emptyResultSet = response.ResultSet->RowsCount() == 0;
107-
const auto resultSetProto = NYdb::TProtoAccessor::GetProto(*response.ResultSet);
110+
auto resultSetProto = NYdb::TProtoAccessor::GetProto(std::move(*response.ResultSet));
108111

109112
if (!emptyResultSet) {
110-
auto chunk = CreateProtoRequestWithoutResultSet(Offset);
111-
WriterInflight[Cookie] = {Offset, startTime};
112-
Offset += response.ResultSet->RowsCount();
113-
*chunk.mutable_result_set() = resultSetProto;
114-
auto writeResultCounters = Counters.GetCounters(ERequestType::RT_WRITE_RESULT_SET);
115-
writeResultCounters->InFly->Inc();
116-
Send(NFq::MakeInternalServiceActorId(), new NFq::TEvInternalService::TEvWriteResultRequest(std::move(chunk)), 0, Cookie++);
113+
NFq::TRowsProtoSplitter rowsSplitter(std::move(resultSetProto), ProtoMessageLimit, BaseProtoByteSize, MaxRowsCountPerChunk);
114+
auto splittedResultSets = rowsSplitter.Split();
115+
116+
if (!splittedResultSets.Success) {
117+
LOG_E("ResultSetId: " << ResultSetId << " Can't split script result: " << splittedResultSets.Issues.ToOneLineString());
118+
Send(Parent, new TEvYdbCompute::TEvResultSetWriterResponse(ResultSetId, splittedResultSets.Issues, NYdb::EStatus::BAD_REQUEST));
119+
FailedAndPassAway();
120+
return;
121+
}
122+
123+
for (auto& resultSet : splittedResultSets.ResultSets) {
124+
auto protoReq = CreateProtoRequestWithoutResultSet(Offset);
125+
Offset += resultSet.rows().size();
126+
protoReq.mutable_result_set()->Swap(&resultSet);
127+
ResultChunks.emplace(std::move(protoReq));
128+
}
129+
130+
TryStartResultWriters();
117131
}
118132

119133
if (WriterInflight.empty()) {
@@ -149,17 +163,31 @@ class TResultSetWriterActor : public TBaseComputeActor<TResultSetWriterActor> {
149163
return;
150164
}
151165

166+
TryStartResultWriters();
167+
152168
writeResultCounters->Ok->Inc();
153169
LOG_I("ResultSetId: " << ResultSetId << " Cookie: " << cookie << " Result successfully written for offset " << meta.Offset);
154170
if (FetchToken) {
155-
if (FetchToken != LastProcessedToken) {
171+
if (FetchToken != LastProcessedToken && WriterInflight.size() < MAX_WRITER_INFLIGHT) {
156172
SendFetchScriptResultRequest();
157173
}
158174
} else if (WriterInflight.empty()) {
159175
SendReplyAndPassAway();
160176
}
161177
}
162178

179+
void TryStartResultWriters() {
180+
auto writeResultCounters = Counters.GetCounters(ERequestType::RT_WRITE_RESULT_SET);
181+
while (!ResultChunks.empty() && WriterInflight.size() < MAX_WRITER_INFLIGHT) {
182+
auto chunk = std::move(ResultChunks.front());
183+
ResultChunks.pop();
184+
185+
WriterInflight[Cookie] = {static_cast<int64_t>(chunk.offset()), TInstant::Now()};
186+
writeResultCounters->InFly->Inc();
187+
Send(NFq::MakeInternalServiceActorId(), new NFq::TEvInternalService::TEvWriteResultRequest(std::move(chunk)), 0, Cookie++);
188+
}
189+
}
190+
163191
void SendFetchScriptResultRequest() {
164192
LastProcessedToken = FetchToken;
165193
Register(new TRetryActor<TEvYdbCompute::TEvFetchScriptResultRequest, TEvYdbCompute::TEvFetchScriptResultResponse, NKikimr::NOperationId::TOperationId, int64_t, TString>(Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT), SelfId(), Connector, OperationId, ResultSetId, FetchToken));
@@ -194,6 +222,10 @@ class TResultSetWriterActor : public TBaseComputeActor<TResultSetWriterActor> {
194222
bool Truncated = false;
195223
TString FetchToken;
196224
TString LastProcessedToken;
225+
const size_t ProtoMessageLimit = 10_MB;
226+
const size_t MaxRowsCountPerChunk = 100'000;
227+
const size_t BaseProtoByteSize = CreateProtoRequestWithoutResultSet(0).ByteSizeLong();
228+
std::queue<Fq::Private::WriteTaskResultRequest> ResultChunks;
197229
};
198230

199231
class TResultWriterActor : public TBaseComputeActor<TResultWriterActor> {

ydb/core/kqp/proxy_service/kqp_script_executions.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -950,7 +950,7 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
950950
return;
951951
}
952952

953-
if (TInstant::Now() + TDuration::Seconds(1) + GetAverageTime() >= Deadline) {
953+
if (TInstant::Now() + 2 * GetAverageTime() >= Deadline) {
954954
Finish(Ydb::StatusIds::TIMEOUT, ForgetOperationTimeoutIssues());
955955
return;
956956
}

0 commit comments

Comments
 (0)