Skip to content

Commit 8bff3db

Browse files
authored
Merge 71bcb4a into 82b202c
2 parents 82b202c + 71bcb4a commit 8bff3db

File tree

9 files changed

+236
-143
lines changed

9 files changed

+236
-143
lines changed

ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ class TYdbConnectorActor : public NActors::TActorBootstrapped<TYdbConnectorActor
130130
void Handle(const TEvYdbCompute::TEvFetchScriptResultRequest::TPtr& ev) {
131131
NYdb::NQuery::TFetchScriptResultsSettings settings;
132132
settings.FetchToken(ev->Get()->FetchToken);
133+
settings.RowsLimit(0);
133134
QueryClient
134135
->FetchScriptResults(ev->Get()->OperationId, ev->Get()->ResultSetId, settings)
135136
.Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie, database = ComputeConnection.database()](auto future) {

ydb/core/grpc_services/query/rpc_fetch_script_results.cpp

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ using TEvFetchScriptResultsRequest = TGrpcRequestNoOperationCall<Ydb::Query::Fet
2525
Ydb::Query::FetchScriptResultsResponse>;
2626

2727
constexpr i64 MAX_ROWS_LIMIT = 1000;
28+
constexpr i64 SIZE_LIMIT = 20_MB;
2829

2930
class TFetchScriptResultsRPC : public TRpcRequestActor<TFetchScriptResultsRPC, TEvFetchScriptResultsRequest, false> {
3031
public:
@@ -45,7 +46,7 @@ class TFetchScriptResultsRPC : public TRpcRequestActor<TFetchScriptResultsRPC, T
4546
return;
4647
}
4748

48-
if (req->rows_limit() <= 0) {
49+
if (req->rows_limit() < 0) {
4950
Reply(Ydb::StatusIds::BAD_REQUEST, "Invalid rows limit");
5051
return;
5152
}
@@ -70,28 +71,31 @@ class TFetchScriptResultsRPC : public TRpcRequestActor<TFetchScriptResultsRPC, T
7071
return;
7172
}
7273

73-
Register(NKqp::CreateGetScriptExecutionResultActor(SelfId(), DatabaseName, ExecutionId, req->result_set_index(), RowsOffset, req->rows_limit() + 1));
74+
Register(NKqp::CreateGetScriptExecutionResultActor(SelfId(), DatabaseName, ExecutionId, req->result_set_index(), RowsOffset, req->rows_limit(), SIZE_LIMIT, Request->GetDeadline()));
7475

7576
Become(&TFetchScriptResultsRPC::StateFunc);
7677
}
7778

7879
private:
7980
STRICT_STFUNC(StateFunc,
80-
hFunc(NKqp::TEvKqp::TEvFetchScriptResultsResponse, Handle);
81+
hFunc(NKqp::TEvFetchScriptResultsResponse, Handle);
8182
)
8283

83-
void Handle(NKqp::TEvKqp::TEvFetchScriptResultsResponse::TPtr& ev) {
84+
void Handle(NKqp::TEvFetchScriptResultsResponse::TPtr& ev) {
8485
Ydb::Query::FetchScriptResultsResponse resp;
85-
resp.set_status(ev->Get()->Record.GetStatus());
86-
resp.mutable_issues()->Swap(ev->Get()->Record.MutableIssues());
87-
resp.set_result_set_index(static_cast<i64>(ev->Get()->Record.GetResultSetIndex()));
88-
if (ev->Get()->Record.HasResultSet()) {
89-
resp.mutable_result_set()->Swap(ev->Get()->Record.MutableResultSet());
90-
91-
const auto* userReq = GetProtoRequest();
92-
if (resp.mutable_result_set()->rows_size() == userReq->rows_limit() + 1) {
93-
resp.mutable_result_set()->mutable_rows()->DeleteSubrange(userReq->rows_limit(), 1);
94-
resp.set_next_fetch_token(ToString(RowsOffset + userReq->rows_limit()));
86+
resp.set_status(ev->Get()->Status);
87+
resp.set_result_set_index(static_cast<i64>(GetProtoRequest()->result_set_index()));
88+
if (ev->Get()->Issues) {
89+
NYql::TIssue root;
90+
for (const NYql::TIssue& issue : ev->Get()->Issues) {
91+
root.AddSubIssue(MakeIntrusive<NYql::TIssue>(issue));
92+
}
93+
NYql::IssueToMessage(root, resp.mutable_issues());
94+
}
95+
if (ev->Get()->ResultSet) {
96+
resp.mutable_result_set()->Swap(&(*ev->Get()->ResultSet));
97+
if (ev->Get()->HasMoreResults) {
98+
resp.set_next_fetch_token(ToString(RowsOffset + resp.result_set().rows_size()));
9599
}
96100
}
97101
Reply(resp.status(), std::move(resp));

ydb/core/kqp/common/events/events.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,6 @@ struct TEvKqp {
142142

143143
using TEvAbortExecution = NYql::NDq::TEvDq::TEvAbortExecution;
144144

145-
struct TEvFetchScriptResultsResponse : public TEventPB<TEvFetchScriptResultsResponse, NKikimrKqp::TEvFetchScriptResultsResponse, TKqpEvents::EvFetchScriptResultsResponse> {
146-
};
147-
148145
struct TEvCancelScriptExecutionRequest : public TEventPB<TEvCancelScriptExecutionRequest, NKikimrKqp::TEvCancelScriptExecutionRequest, TKqpEvents::EvCancelScriptExecutionRequest> {
149146
};
150147

ydb/core/kqp/common/events/script_executions.h

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -224,15 +224,18 @@ struct TEvSaveScriptResultFinished : public NActors::TEventLocal<TEvSaveScriptRe
224224
NYql::TIssues Issues;
225225
};
226226

227-
struct TEvFetchScriptResultsQueryResponse : public NActors::TEventLocal<TEvFetchScriptResultsQueryResponse, TKqpScriptExecutionEvents::EvFetchScriptResultsQueryResponse> {
228-
TEvFetchScriptResultsQueryResponse(bool truncated, NKikimrKqp::TEvFetchScriptResultsResponse&& results)
229-
: Truncated(truncated)
230-
, Results(std::move(results))
231-
{
232-
}
227+
struct TEvFetchScriptResultsResponse : public NActors::TEventLocal<TEvFetchScriptResultsResponse, TKqpScriptExecutionEvents::EvFetchScriptResultsResponse> {
228+
TEvFetchScriptResultsResponse(Ydb::StatusIds::StatusCode status, std::optional<Ydb::ResultSet>&& resultSet, bool hasMoreResults, NYql::TIssues issues)
229+
: Status(status)
230+
, ResultSet(std::move(resultSet))
231+
, HasMoreResults(hasMoreResults)
232+
, Issues(std::move(issues))
233+
{}
233234

234-
bool Truncated;
235-
NKikimrKqp::TEvFetchScriptResultsResponse Results;
235+
Ydb::StatusIds::StatusCode Status;
236+
std::optional<Ydb::ResultSet> ResultSet;
237+
bool HasMoreResults;
238+
NYql::TIssues Issues;
236239
};
237240

238241
struct TEvSaveScriptExternalEffectRequest : public NActors::TEventLocal<TEvSaveScriptExternalEffectRequest, TKqpScriptExecutionEvents::EvSaveScriptExternalEffectRequest> {

ydb/core/kqp/common/simple/kqp_event_ids.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ struct TKqpScriptExecutionEvents {
148148
EvSaveScriptResultFinished,
149149
EvCheckAliveRequest,
150150
EvCheckAliveResponse,
151-
EvFetchScriptResultsQueryResponse,
151+
EvFetchScriptResultsResponse,
152152
EvSaveScriptExternalEffectRequest,
153153
EvSaveScriptExternalEffectResponse,
154154
EvScriptFinalizeRequest,

0 commit comments

Comments
 (0)