Skip to content

Commit e03355f

Browse files
authored
Merge aea4a05 into 50b9d76
2 parents 50b9d76 + aea4a05 commit e03355f

File tree

12 files changed

+333
-253
lines changed

12 files changed

+333
-253
lines changed

ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "base_compute_actor.h"
22

3+
#include <ydb/core/fq/libs/common/rows_proto_splitter.h>
34
#include <ydb/core/fq/libs/common/util.h>
45
#include <ydb/core/fq/libs/compute/common/metrics.h>
56
#include <ydb/core/fq/libs/compute/common/retry_actor.h>
@@ -22,6 +23,8 @@
2223
#include <ydb/library/actors/core/log.h>
2324
#include <library/cpp/protobuf/interop/cast.h>
2425

26+
#include <queue>
27+
2528
#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResultWriter] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream)
2629
#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResultWriter] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream)
2730
#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [ResultWriter] QueryId: " << Params.QueryId << " OperationId: " << ProtoToString(OperationId) << " " << stream)
@@ -100,25 +103,36 @@ class TResultSetWriterActor : public TBaseComputeActor<TResultSetWriterActor> {
100103
return;
101104
}
102105

103-
auto startTime = TInstant::Now();
106+
LOG_I("ResultSetId: " << ResultSetId << " FetchToken: " << FetchToken << " Successfully fetched " << response.ResultSet->RowsCount() << " rows");
104107
Truncated |= response.ResultSet->Truncated();
105108
FetchToken = response.NextFetchToken;
106109
auto emptyResultSet = response.ResultSet->RowsCount() == 0;
107-
const auto resultSetProto = NYdb::TProtoAccessor::GetProto(*response.ResultSet);
110+
auto resultSetProto = NYdb::TProtoAccessor::GetProto(*response.ResultSet);
108111

109112
if (!emptyResultSet) {
110-
auto chunk = CreateProtoRequestWithoutResultSet(Offset);
111-
WriterInflight[Cookie] = {Offset, startTime};
112-
Offset += response.ResultSet->RowsCount();
113-
*chunk.mutable_result_set() = resultSetProto;
114-
auto writeResultCounters = Counters.GetCounters(ERequestType::RT_WRITE_RESULT_SET);
115-
writeResultCounters->InFly->Inc();
116-
Send(NFq::MakeInternalServiceActorId(), new NFq::TEvInternalService::TEvWriteResultRequest(std::move(chunk)), 0, Cookie++);
113+
NFq::TRowsProtoSplitter rowsSplitter(std::move(resultSetProto), ProtoMessageLimit, BaseProtoByteSize, MaxRowsCountPerChunk);
114+
auto splittedResultSets = rowsSplitter.Split();
115+
116+
if (!splittedResultSets.Success) {
117+
LOG_E("ResultSetId: " << ResultSetId << " Can't split script result: " << splittedResultSets.Issues.ToOneLineString());
118+
Send(Parent, new TEvYdbCompute::TEvResultSetWriterResponse(ResultSetId, splittedResultSets.Issues, NYdb::EStatus::INTERNAL_ERROR));
119+
FailedAndPassAway();
120+
return;
121+
}
122+
123+
for (auto& resultSet : splittedResultSets.ResultSets) {
124+
auto protoReq = CreateProtoRequestWithoutResultSet(Offset);
125+
Offset += resultSet.rows().size();
126+
protoReq.mutable_result_set()->Swap(&resultSet);
127+
ResultChunks.emplace(std::move(protoReq));
128+
}
129+
130+
TryStartResultWriters();
117131
}
118132

119133
if (WriterInflight.empty()) {
120134
SendReplyAndPassAway();
121-
} else if (FetchToken && WriterInflight.size() < MAX_WRITER_INFLIGHT) {
135+
} else if (FetchToken && (WriterInflight.size() + ResultChunks.size()) < 2 * MAX_WRITER_INFLIGHT) {
122136
SendFetchScriptResultRequest();
123137
}
124138
}
@@ -149,17 +163,31 @@ class TResultSetWriterActor : public TBaseComputeActor<TResultSetWriterActor> {
149163
return;
150164
}
151165

166+
TryStartResultWriters();
167+
152168
writeResultCounters->Ok->Inc();
153169
LOG_I("ResultSetId: " << ResultSetId << " Cookie: " << cookie << " Result successfully written for offset " << meta.Offset);
154170
if (FetchToken) {
155-
if (FetchToken != LastProcessedToken) {
171+
if (FetchToken != LastProcessedToken && (WriterInflight.size() + ResultChunks.size()) < 2 * MAX_WRITER_INFLIGHT) {
156172
SendFetchScriptResultRequest();
157173
}
158174
} else if (WriterInflight.empty()) {
159175
SendReplyAndPassAway();
160176
}
161177
}
162178

179+
void TryStartResultWriters() {
180+
auto writeResultCounters = Counters.GetCounters(ERequestType::RT_WRITE_RESULT_SET);
181+
while (!ResultChunks.empty() && WriterInflight.size() < MAX_WRITER_INFLIGHT) {
182+
auto chunk = std::move(ResultChunks.front());
183+
ResultChunks.pop();
184+
185+
WriterInflight[Cookie] = {static_cast<int64_t>(chunk.offset()), TInstant::Now()};
186+
writeResultCounters->InFly->Inc();
187+
Send(NFq::MakeInternalServiceActorId(), new NFq::TEvInternalService::TEvWriteResultRequest(std::move(chunk)), 0, Cookie++);
188+
}
189+
}
190+
163191
void SendFetchScriptResultRequest() {
164192
LastProcessedToken = FetchToken;
165193
Register(new TRetryActor<TEvYdbCompute::TEvFetchScriptResultRequest, TEvYdbCompute::TEvFetchScriptResultResponse, NKikimr::NOperationId::TOperationId, int64_t, TString>(Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT), SelfId(), Connector, OperationId, ResultSetId, FetchToken));
@@ -194,6 +222,10 @@ class TResultSetWriterActor : public TBaseComputeActor<TResultSetWriterActor> {
194222
bool Truncated = false;
195223
TString FetchToken;
196224
TString LastProcessedToken;
225+
const size_t ProtoMessageLimit = 10_MB;
226+
const size_t MaxRowsCountPerChunk = 100'000;
227+
const size_t BaseProtoByteSize = CreateProtoRequestWithoutResultSet(0).ByteSizeLong();
228+
std::queue<Fq::Private::WriteTaskResultRequest> ResultChunks;
197229
};
198230

199231
class TResultWriterActor : public TBaseComputeActor<TResultWriterActor> {

ydb/core/grpc_services/query/rpc_fetch_script_results.cpp

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ using TEvFetchScriptResultsRequest = TGrpcRequestNoOperationCall<Ydb::Query::Fet
2525
Ydb::Query::FetchScriptResultsResponse>;
2626

2727
constexpr i64 MAX_ROWS_LIMIT = 1000;
28+
constexpr i64 MAX_SIZE_LIMIT = 60_MB;
2829

2930
class TFetchScriptResultsRPC : public TRpcRequestActor<TFetchScriptResultsRPC, TEvFetchScriptResultsRequest, false> {
3031
public:
@@ -45,7 +46,7 @@ class TFetchScriptResultsRPC : public TRpcRequestActor<TFetchScriptResultsRPC, T
4546
return;
4647
}
4748

48-
if (req->rows_limit() <= 0) {
49+
if (req->rows_limit() < 0) {
4950
Reply(Ydb::StatusIds::BAD_REQUEST, "Invalid rows limit");
5051
return;
5152
}
@@ -70,28 +71,27 @@ class TFetchScriptResultsRPC : public TRpcRequestActor<TFetchScriptResultsRPC, T
7071
return;
7172
}
7273

73-
Register(NKqp::CreateGetScriptExecutionResultActor(SelfId(), DatabaseName, ExecutionId, req->result_set_index(), RowsOffset, req->rows_limit() + 1));
74+
Register(NKqp::CreateGetScriptExecutionResultActor(SelfId(), DatabaseName, ExecutionId, req->result_set_index(), RowsOffset, req->rows_limit(), req->rows_limit() ? 0 : MAX_SIZE_LIMIT, Request->GetDeadline()));
7475

7576
Become(&TFetchScriptResultsRPC::StateFunc);
7677
}
7778

7879
private:
7980
STRICT_STFUNC(StateFunc,
80-
hFunc(NKqp::TEvKqp::TEvFetchScriptResultsResponse, Handle);
81+
hFunc(NKqp::TEvFetchScriptResultsResponse, Handle);
8182
)
8283

83-
void Handle(NKqp::TEvKqp::TEvFetchScriptResultsResponse::TPtr& ev) {
84+
void Handle(NKqp::TEvFetchScriptResultsResponse::TPtr& ev) {
8485
Ydb::Query::FetchScriptResultsResponse resp;
85-
resp.set_status(ev->Get()->Record.GetStatus());
86-
resp.mutable_issues()->Swap(ev->Get()->Record.MutableIssues());
87-
resp.set_result_set_index(static_cast<i64>(ev->Get()->Record.GetResultSetIndex()));
88-
if (ev->Get()->Record.HasResultSet()) {
89-
resp.mutable_result_set()->Swap(ev->Get()->Record.MutableResultSet());
90-
91-
const auto* userReq = GetProtoRequest();
92-
if (resp.mutable_result_set()->rows_size() == userReq->rows_limit() + 1) {
93-
resp.mutable_result_set()->mutable_rows()->DeleteSubrange(userReq->rows_limit(), 1);
94-
resp.set_next_fetch_token(ToString(RowsOffset + userReq->rows_limit()));
86+
resp.set_status(ev->Get()->Status);
87+
resp.set_result_set_index(static_cast<i64>(GetProtoRequest()->result_set_index()));
88+
if (ev->Get()->Issues) {
89+
NYql::IssuesToMessage(ev->Get()->Issues, resp.mutable_issues());
90+
}
91+
if (ev->Get()->ResultSet) {
92+
resp.mutable_result_set()->Swap(&(*ev->Get()->ResultSet));
93+
if (ev->Get()->HasMoreResults) {
94+
resp.set_next_fetch_token(ToString(RowsOffset + resp.result_set().rows_size()));
9595
}
9696
}
9797
Reply(resp.status(), std::move(resp));

ydb/core/kqp/common/events/events.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,6 @@ struct TEvKqp {
142142

143143
using TEvAbortExecution = NYql::NDq::TEvDq::TEvAbortExecution;
144144

145-
struct TEvFetchScriptResultsResponse : public TEventPB<TEvFetchScriptResultsResponse, NKikimrKqp::TEvFetchScriptResultsResponse, TKqpEvents::EvFetchScriptResultsResponse> {
146-
};
147-
148145
struct TEvCancelScriptExecutionRequest : public TEventPB<TEvCancelScriptExecutionRequest, NKikimrKqp::TEvCancelScriptExecutionRequest, TKqpEvents::EvCancelScriptExecutionRequest> {
149146
};
150147

ydb/core/kqp/common/events/script_executions.h

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -224,15 +224,18 @@ struct TEvSaveScriptResultFinished : public NActors::TEventLocal<TEvSaveScriptRe
224224
NYql::TIssues Issues;
225225
};
226226

227-
struct TEvFetchScriptResultsQueryResponse : public NActors::TEventLocal<TEvFetchScriptResultsQueryResponse, TKqpScriptExecutionEvents::EvFetchScriptResultsQueryResponse> {
228-
TEvFetchScriptResultsQueryResponse(bool truncated, NKikimrKqp::TEvFetchScriptResultsResponse&& results)
229-
: Truncated(truncated)
230-
, Results(std::move(results))
231-
{
232-
}
227+
struct TEvFetchScriptResultsResponse : public NActors::TEventLocal<TEvFetchScriptResultsResponse, TKqpScriptExecutionEvents::EvFetchScriptResultsResponse> {
228+
TEvFetchScriptResultsResponse(Ydb::StatusIds::StatusCode status, std::optional<Ydb::ResultSet>&& resultSet, bool hasMoreResults, NYql::TIssues issues)
229+
: Status(status)
230+
, ResultSet(std::move(resultSet))
231+
, HasMoreResults(hasMoreResults)
232+
, Issues(std::move(issues))
233+
{}
233234

234-
bool Truncated;
235-
NKikimrKqp::TEvFetchScriptResultsResponse Results;
235+
Ydb::StatusIds::StatusCode Status;
236+
std::optional<Ydb::ResultSet> ResultSet;
237+
bool HasMoreResults;
238+
NYql::TIssues Issues;
236239
};
237240

238241
struct TEvSaveScriptExternalEffectRequest : public NActors::TEventLocal<TEvSaveScriptExternalEffectRequest, TKqpScriptExecutionEvents::EvSaveScriptExternalEffectRequest> {

ydb/core/kqp/common/simple/kqp_event_ids.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ struct TKqpScriptExecutionEvents {
148148
EvSaveScriptResultFinished,
149149
EvCheckAliveRequest,
150150
EvCheckAliveResponse,
151-
EvFetchScriptResultsQueryResponse,
151+
EvFetchScriptResultsResponse,
152152
EvSaveScriptExternalEffectRequest,
153153
EvSaveScriptExternalEffectResponse,
154154
EvScriptFinalizeRequest,

0 commit comments

Comments
 (0)