Skip to content

YQ-3342 fix script results writing #5851

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 97 additions & 61 deletions ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ namespace NKikimr::NKqp {
namespace {

constexpr ui32 LEASE_UPDATE_FREQUENCY = 2;
constexpr ui32 MAX_SAVE_RESULT_IN_FLIGHT = 1;

constexpr ui64 MIN_SAVE_RESULT_BATCH_SIZE = 5_MB;
constexpr i32 MIN_SAVE_RESULT_BATCH_ROWS = 5000;
constexpr ui64 RUN_SCRIPT_ACTOR_BUFFER_SIZE = 40_MB;

class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
enum class ERunState {
Expand All @@ -48,14 +51,20 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
UpdateLeaseEvent,
};

struct TPendingSaveResult {
ui32 ResultSetIndex;
ui64 FirstRow;
ui64 AccumulatedSize;
Ydb::ResultSet ResultSet;
struct TResultSetInfo {
bool Truncated = false;
ui64 RowCount = 0;
ui64 ByteCount = 0;
NJson::TJsonValue* Meta;

ui64 FirstRowId = 0;
ui64 AccumulatedSize = 0;
Ydb::ResultSet PendingResult;
};

struct TPendingAck {
TActorId ReplyActorId;
THolder<TEvKqpExecuter::TEvStreamDataAck> SaveResultResponse;
THolder<TEvKqpExecuter::TEvStreamDataAck> AckEvent;
};

public:
Expand Down Expand Up @@ -255,29 +264,55 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
PassAway();
}

void SendStreamDataResponse(TActorId replyActorId, THolder<TEvKqpExecuter::TEvStreamDataAck> saveResultResponse) const {
LOG_D("Send stream data ack"
<< ", seqNo: " << saveResultResponse->Record.GetSeqNo()
<< ", to: " << replyActorId);
void SendStreamDataResponse() {
if (PendingAcks.empty()) {
return;
}

if (PendingResultSetsSize > RUN_SCRIPT_ACTOR_BUFFER_SIZE) {
// Try to save any pending result
SaveResult();
}

Send(replyActorId, saveResultResponse.Release());
if (PendingResultSetsSize <= RUN_SCRIPT_ACTOR_BUFFER_SIZE) {
while (!PendingAcks.empty()) {
auto response = std::move(PendingAcks.front());
PendingAcks.pop();

LOG_D("Send stream data ack"
<< ", seqNo: " << response.AckEvent->Record.GetSeqNo()
<< ", to: " << response.ReplyActorId);

Send(response.ReplyActorId, response.AckEvent.Release());
}
}
}

void SaveResult() {
if (SaveResultInflight >= MAX_SAVE_RESULT_IN_FLIGHT || PendingSaveResults.empty()) {
void SaveResult(size_t resultSetId) {
if (SaveResultInflight) {
return;
}

if (!ExpireAt && ResultsTtl > TDuration::Zero()) {
ExpireAt = TInstant::Now() + ResultsTtl;
}

TPendingSaveResult& result = PendingSaveResults.back();
Register(CreateSaveScriptExecutionResultActor(SelfId(), Database, ExecutionId, result.ResultSetIndex, ExpireAt, result.FirstRow, result.AccumulatedSize, std::move(result.ResultSet)));
SendStreamDataResponse(result.ReplyActorId, std::move(result.SaveResultResponse));

PendingSaveResults.pop_back();
auto& resultSetInfo = ResultSetInfos[resultSetId];
Register(CreateSaveScriptExecutionResultActor(SelfId(), Database, ExecutionId, resultSetId, ExpireAt, resultSetInfo.FirstRowId, resultSetInfo.AccumulatedSize, std::move(resultSetInfo.PendingResult)));
SaveResultInflight++;
PendingResultSetsSize -= resultSetInfo.ByteCount - resultSetInfo.AccumulatedSize;
resultSetInfo.FirstRowId = resultSetInfo.RowCount;
resultSetInfo.AccumulatedSize = resultSetInfo.ByteCount;
resultSetInfo.PendingResult = Ydb::ResultSet();
}

void SaveResult() {
for (size_t resultSetId = 0; resultSetId < ResultSetInfos.size(); ++resultSetId) {
if (ResultSetInfos[resultSetId].PendingResult.rows_size()) {
SaveResult(resultSetId);
break;
}
}
}

void Handle(TEvKqpExecuter::TEvStreamData::TPtr& ev) {
Expand All @@ -299,57 +334,52 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {

auto resultSetIndex = ev->Get()->Record.GetQueryResultIndex();

if (resultSetIndex >= ResultSetMetaArray.size()) {
if (resultSetIndex >= ResultSetInfos.size()) {
// we don't know result set count, so just accept all of them
// it's possible to have several result sets per script
// they can arrive in any order and may be missed for some indices
ResultSetRowCount.resize(resultSetIndex + 1);
ResultSetByteCount.resize(resultSetIndex + 1);
Truncated.resize(resultSetIndex + 1);
ResultSetMetaArray.resize(resultSetIndex + 1, nullptr);
ResultSetInfos.resize(resultSetIndex + 1);
}

bool saveResultRequired = false;
if (IsExecuting() && !Truncated[resultSetIndex]) {
auto& rowCount = ResultSetRowCount[resultSetIndex];
auto& byteCount = ResultSetByteCount[resultSetIndex];
auto firstRow = rowCount;
auto accumulatedSize = byteCount;
auto& resultSetInfo = ResultSetInfos[resultSetIndex];
if (IsExecuting() && !resultSetInfo.Truncated) {
auto& rowCount = resultSetInfo.RowCount;
auto& byteCount = resultSetInfo.ByteCount;

Ydb::ResultSet resultSet;
for (auto& row : *ev->Get()->Record.MutableResultSet()->mutable_rows()) {
if (QueryServiceConfig.GetScriptResultRowsLimit() && rowCount + 1 > QueryServiceConfig.GetScriptResultRowsLimit()) {
Truncated[resultSetIndex] = true;
resultSetInfo.Truncated = true;
break;
}

auto serializedSize = row.ByteSizeLong();
if (QueryServiceConfig.GetScriptResultSizeLimit() && byteCount + serializedSize > QueryServiceConfig.GetScriptResultSizeLimit()) {
Truncated[resultSetIndex] = true;
resultSetInfo.Truncated = true;
break;
}

rowCount++;
byteCount += serializedSize;
*resultSet.add_rows() = std::move(row);
PendingResultSetsSize += serializedSize;
*resultSetInfo.PendingResult.add_rows() = std::move(row);
}

bool newResultSet = ResultSetMetaArray[resultSetIndex] == nullptr;
if (newResultSet || Truncated[resultSetIndex]) {
bool newResultSet = resultSetInfo.Meta == nullptr;
if (newResultSet || resultSetInfo.Truncated) {
Ydb::Query::Internal::ResultSetMeta meta;
if (newResultSet) {
*meta.mutable_columns() = ev->Get()->Record.GetResultSet().columns();
}
if (Truncated[resultSetIndex]) {
if (resultSetInfo.Truncated) {
meta.set_truncated(true);
}

NJson::TJsonValue* value;
if (newResultSet) {
value = &ResultSetMetas[resultSetIndex];
ResultSetMetaArray[resultSetIndex] = value;
resultSetInfo.Meta = value;
} else {
value = ResultSetMetaArray[resultSetIndex];
value = resultSetInfo.Meta;
}
NProtobufJson::Proto2Json(meta, *value, NProtobufJson::TProto2JsonConfig());

Expand All @@ -362,24 +392,13 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
}
}

if (resultSet.rows_size() > 0) {
saveResultRequired = true;
PendingSaveResults.push_back({
resultSetIndex,
firstRow,
accumulatedSize,
std::move(resultSet),
ev->Sender,
std::move(resp)
});
if (ShouldSaveResult(resultSetInfo)) {
SaveResult(resultSetIndex);
}
}

if (saveResultRequired) {
SaveResult();
} else {
SendStreamDataResponse(ev->Sender, std::move(resp));
}
PendingAcks.push({.ReplyActorId = ev->Sender, .AckEvent = std::move(resp)});
SendStreamDataResponse();
}

void SaveResultMeta() {
Expand Down Expand Up @@ -504,9 +523,15 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
Status = ev->Get()->Status;
Issues.AddIssues(ev->Get()->Issues);
} else {
SaveResult();
for (size_t resultSetId = 0; resultSetId < ResultSetInfos.size(); ++resultSetId) {
if (ShouldSaveResult(ResultSetInfos[resultSetId])) {
SaveResult(resultSetId);
break;
}
}
}
}
SendStreamDataResponse();
CheckInflight();
}

Expand All @@ -533,6 +558,12 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
return;
}

if (PendingResultSetsSize) {
// Complete results saving
SaveResult();
return;
}

if (!LeaseUpdateQueryRunning) {
RunScriptExecutionFinisher();
} else {
Expand All @@ -545,7 +576,7 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
Status = status;

// if query has no results, save empty json array
if (ResultSetMetaArray.empty()) {
if (ResultSetInfos.empty()) {
ResultSetMetas.SetType(NJson::JSON_ARRAY);
SaveResultMeta();
SaveResultMetaInflight++;
Expand All @@ -566,6 +597,13 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
&& RunState != ERunState::Cancelling;
}

static bool ShouldSaveResult(TResultSetInfo& resultInfo) {
if (!resultInfo.PendingResult.rows_size()) {
return false;
}
return resultInfo.Truncated || resultInfo.PendingResult.rows_size() >= MIN_SAVE_RESULT_BATCH_ROWS || resultInfo.ByteCount - resultInfo.AccumulatedSize >= MIN_SAVE_RESULT_BATCH_SIZE;
}

private:
const TString ExecutionId;
NKikimrKqp::TEvQueryRequest Request;
Expand All @@ -589,16 +627,14 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
Ydb::StatusIds::StatusCode Status = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;

// Result
std::vector<TPendingSaveResult> PendingSaveResults;
std::vector<ui64> ResultSetRowCount;
std::vector<ui64> ResultSetByteCount;
std::vector<bool> Truncated;
std::vector<NJson::TJsonValue*> ResultSetMetaArray;
std::vector<TResultSetInfo> ResultSetInfos;
std::queue<TPendingAck> PendingAcks;
TMaybe<TInstant> ExpireAt;
NJson::TJsonValue ResultSetMetas;
ui32 SaveResultInflight = 0;
ui32 SaveResultMetaInflight = 0;
bool PendingResultMeta = false;
ui64 PendingResultSetsSize = 0;
std::optional<TString> QueryPlan;
std::optional<TString> QueryAst;
std::optional<NKqpProto::TKqpStatsQuery> QueryStats;
Expand Down
Loading