@@ -25,6 +25,7 @@ using TEvFetchScriptResultsRequest = TGrpcRequestNoOperationCall<Ydb::Query::Fet
25
25
Ydb::Query::FetchScriptResultsResponse>;
26
26
27
27
constexpr i64 MAX_ROWS_LIMIT = 1000 ;
28
+ constexpr i64 SIZE_LIMIT = 20_MB;
28
29
29
30
class TFetchScriptResultsRPC : public TRpcRequestActor <TFetchScriptResultsRPC, TEvFetchScriptResultsRequest, false > {
30
31
public:
@@ -45,7 +46,7 @@ class TFetchScriptResultsRPC : public TRpcRequestActor<TFetchScriptResultsRPC, T
45
46
return ;
46
47
}
47
48
48
- if (req->rows_limit () <= 0 ) {
49
+ if (req->rows_limit () < 0 ) {
49
50
Reply (Ydb::StatusIds::BAD_REQUEST, " Invalid rows limit" );
50
51
return ;
51
52
}
@@ -70,7 +71,7 @@ class TFetchScriptResultsRPC : public TRpcRequestActor<TFetchScriptResultsRPC, T
70
71
return ;
71
72
}
72
73
73
- Register (NKqp::CreateGetScriptExecutionResultActor (SelfId (), DatabaseName, ExecutionId, req->result_set_index (), RowsOffset, req->rows_limit () + 1 ));
74
+ Register (NKqp::CreateGetScriptExecutionResultActor (SelfId (), DatabaseName, ExecutionId, req->result_set_index (), RowsOffset, req->rows_limit (), SIZE_LIMIT, Request-> GetDeadline () ));
74
75
75
76
Become (&TFetchScriptResultsRPC::StateFunc);
76
77
}
@@ -80,18 +81,21 @@ class TFetchScriptResultsRPC : public TRpcRequestActor<TFetchScriptResultsRPC, T
80
81
hFunc (NKqp::TEvKqp::TEvFetchScriptResultsResponse, Handle);
81
82
)
82
83
83
- void Handle (NKqp::TEvKqp:: TEvFetchScriptResultsResponse::TPtr& ev) {
84
+ void Handle (NKqp::TEvFetchScriptResultsResponse::TPtr& ev) {
84
85
Ydb::Query::FetchScriptResultsResponse resp;
85
- resp.set_status (ev->Get ()->Record .GetStatus ());
86
- resp.mutable_issues ()->Swap (ev->Get ()->Record .MutableIssues ());
87
- resp.set_result_set_index (static_cast <i64 >(ev->Get ()->Record .GetResultSetIndex ()));
88
- if (ev->Get ()->Record .HasResultSet ()) {
89
- resp.mutable_result_set ()->Swap (ev->Get ()->Record .MutableResultSet ());
90
-
91
- const auto * userReq = GetProtoRequest ();
92
- if (resp.mutable_result_set ()->rows_size () == userReq->rows_limit () + 1 ) {
93
- resp.mutable_result_set ()->mutable_rows ()->DeleteSubrange (userReq->rows_limit (), 1 );
94
- resp.set_next_fetch_token (ToString (RowsOffset + userReq->rows_limit ()));
86
+ resp.set_status (ev->Get ()->Status );
87
+ resp.set_result_set_index (static_cast <i64 >(GetProtoRequest ()->result_set_index ()));
88
+ if (ev->Get ()->Issues ) {
89
+ NYql::TIssue root;
90
+ for (const NYql::TIssue& issue : ev->Get ()->Issues ) {
91
+ root.AddSubIssue (MakeIntrusive<NYql::TIssue>(issue));
92
+ }
93
+ NYql::IssueToMessage (root, resp.mutable_issues ());
94
+ }
95
+ if (ev->Get ()->ResultSet ) {
96
+ resp.mutable_result_set ()->Swap (&(*ev->Get ()->ResultSet ));
97
+ if (ev->Get ()->HasMoreResults ) {
98
+ resp.set_next_fetch_token (ToString (RowsOffset + resp.result_set ().rows_size ()));
95
99
}
96
100
}
97
101
Reply (resp.status (), std::move (resp));
0 commit comments