Skip to content

Fixed forget operation failure #781

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/rpc_forget_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC,
}

void SendForgetScriptExecutionOperation() {
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvForgetScriptExecutionOperation(DatabaseName, OperationId));
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvForgetScriptExecutionOperation(DatabaseName, OperationId, Request->GetDeadline()));
}

public:
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/common/events/script_executions.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ enum EFinalizationStatus : i32 {
};

struct TEvForgetScriptExecutionOperation : public NActors::TEventLocal<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> {
explicit TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
explicit TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id, TInstant deadline)
: Database(database)
, OperationId(id)
, Deadline(deadline)
{
}

TString Database;
NOperationId::TOperationId OperationId;
TInstant Deadline;
};

struct TEvForgetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvForgetScriptExecutionOperationResponse, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperationResponse> {
Expand Down
60 changes: 46 additions & 14 deletions ydb/core/kqp/proxy_service/kqp_script_executions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ namespace {
constexpr TDuration LEASE_DURATION = TDuration::Seconds(30);
constexpr TDuration DEADLINE_OFFSET = TDuration::Minutes(20);
constexpr TDuration BRO_RUN_INTERVAL = TDuration::Minutes(60);
constexpr ui64 MAX_NUMBER_ROWS_IN_BATCH = 10000;

TString SerializeIssues(const NYql::TIssues& issues) {
NYql::TIssue root;
Expand Down Expand Up @@ -884,18 +883,26 @@ class TCheckLeaseStatusActor : public TCheckLeaseStatusActorBase {
};

class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
static constexpr i64 MAX_NUMBER_ROWS_IN_BATCH = 100000;
static constexpr TDuration MINIMAL_DEADLINE_TIME = TDuration::Seconds(1);

struct TResultSetDescription {
ui64 NumberRows;
i64 MaxRowId;
i32 ResultSetId;
};

public:
TForgetScriptExecutionOperationQueryActor(const TString& executionId, const TString& database)
TForgetScriptExecutionOperationQueryActor(const TString& executionId, const TString& database, TInstant operationDeadline)
: ExecutionId(executionId)
, Database(database)
, Deadline(operationDeadline - MINIMAL_DEADLINE_TIME)
{}

void OnRunQuery() override {
if (!CheckDeadline()) {
return;
}

TString sql = R"(
-- TForgetScriptExecutionOperationQueryActor::OnRunQuery
DECLARE $database AS Text;
Expand All @@ -905,7 +912,7 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
FROM `.metadata/script_executions`
WHERE database = $database AND execution_id = $execution_id;

SELECT result_set_id, COUNT(row_id) AS number_rows
SELECT result_set_id, MAX(row_id) AS max_row_id
FROM `.metadata/result_sets`
WHERE database = $database AND execution_id = $execution_id
GROUP BY result_set_id;
Expand Down Expand Up @@ -944,19 +951,24 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
while (result.TryNextRow()) {
TMaybe<i32> resultSetId = result.ColumnParser("result_set_id").GetOptionalInt32();
if (!resultSetId) {
continue;
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result set id is not specified");
return;
}

ui64 numberRows = result.ColumnParser("number_rows").GetUint64();
TMaybe<i64> maxRowId = result.ColumnParser("max_row_id").GetOptionalInt64();
if (!maxRowId) {
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result set row id is not specified");
return;
}

ResultSetsDescription.emplace_back(TResultSetDescription{numberRows, *resultSetId});
ResultSetsDescription.emplace_back(TResultSetDescription{*maxRowId, *resultSetId});
}

DeleteScriptResults();
}

void DeleteScriptResults() {
while (!ResultSetsDescription.empty() && ResultSetsDescription.back().NumberRows == 0) {
while (!ResultSetsDescription.empty() && ResultSetsDescription.back().MaxRowId < 0) {
ResultSetsDescription.pop_back();
}

Expand All @@ -965,22 +977,26 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
return;
}

if (!CheckDeadline()) {
return;
}

TResultSetDescription& resultSet = ResultSetsDescription.back();
resultSet.NumberRows -= std::min(MAX_NUMBER_ROWS_IN_BATCH, resultSet.NumberRows);
resultSet.MaxRowId -= MAX_NUMBER_ROWS_IN_BATCH;

TString sql = R"(
-- TForgetScriptExecutionOperationQueryActor::DeleteScriptResults
DECLARE $database AS Text;
DECLARE $execution_id AS Text;
DECLARE $result_set_id AS Int32;
DECLARE $last_row_id AS Uint64;
DECLARE $max_row_id AS Int64;

DELETE
FROM `.metadata/result_sets`
WHERE database = $database
AND execution_id = $execution_id
AND result_set_id = $result_set_id
AND row_id >= $last_row_id;
AND row_id > $max_row_id;
)";

NYdb::TParamsBuilder params;
Expand All @@ -994,8 +1010,8 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
.AddParam("$result_set_id")
.Int32(resultSet.ResultSetId)
.Build()
.AddParam("$last_row_id")
.Uint64(resultSet.NumberRows)
.AddParam("$max_row_id")
.Int64(resultSet.MaxRowId)
.Build();

RunDataQuery(sql, &params);
Expand All @@ -1006,9 +1022,19 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
Send(Owner, new TEvForgetScriptExecutionOperationResponse(status, std::move(issues)));
}

private:
bool CheckDeadline() {
if (TInstant::Now() >= Deadline) {
Finish(Ydb::StatusIds::TIMEOUT, "Forget script execution operation timeout");
return false;
}
return true;
}

private:
TString ExecutionId;
TString Database;
TInstant Deadline;
std::vector<TResultSetDescription> ResultSetsDescription;
};

Expand Down Expand Up @@ -1049,7 +1075,7 @@ class TForgetScriptExecutionOperationActor : public TActorBootstrapped<TForgetSc
}
}

Register(new TForgetScriptExecutionOperationQueryActor(ExecutionId, Request->Get()->Database));
Register(new TForgetScriptExecutionOperationQueryActor(ExecutionId, Request->Get()->Database, Request->Get()->Deadline));
}

void Handle(TEvForgetScriptExecutionOperationResponse::TPtr& ev) {
Expand Down Expand Up @@ -1847,6 +1873,7 @@ class TSaveScriptExecutionResultQuery : public TQueryBase {
};

class TSaveScriptExecutionResultActor : public TActorBootstrapped<TSaveScriptExecutionResultActor> {
static constexpr ui64 MAX_NUMBER_ROWS_IN_BATCH = 10000;
static constexpr ui64 PROGRAM_SIZE_LIMIT = 10_MB;
static constexpr ui64 PROGRAM_BASE_SIZE = 1_MB; // Depends on MAX_NUMBER_ROWS_IN_BATCH

Expand Down Expand Up @@ -2069,6 +2096,11 @@ class TGetScriptExecutionResultQuery : public TQueryBase {
const TMaybe<TString> serializedRow = result.ColumnParser("result_set").GetOptionalString();

if (!serializedRow) {
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result set row is null");
return;
}

if (serializedRow->Empty()) {
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result set row is empty");
return;
}
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
}

void Handle(TEvCheckAliveRequest::TPtr& ev) {
LOG_W("Lease was expired in database"
<< ", execution id: " << ExecutionId
<< ", saved final status: " << FinalStatusIsSaved
<< ", wait finalization request: " << WaitFinalizationRequest
<< ", is executing: " << IsExecuting()
<< ", current status: " << Status);

Send(ev->Sender, new TEvCheckAliveResponse());
}

Expand Down
3 changes: 3 additions & 0 deletions ydb/tests/tools/kqprun/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
sync_dir
*.log
*.sql
101 changes: 101 additions & 0 deletions ydb/tests/tools/kqprun/configuration/app_config.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
FeatureFlags {
EnableExternalDataSources: true
EnableScriptExecutionOperations: true
}

KQPConfig {
Settings {
Name: "HashJoinMode"
Value: "grace"
}
Settings {
Name: "_KqpExprNodesAllocationLimit"
Value: "3000000"
}
Settings {
Name: "_KqpExprStringsAllocationLimit"
Value: "100000000"
}
}

LogConfig {
DefaultLevel: 2
}

QueryServiceConfig {
MdbTransformHost: false
ScriptResultRowsLimit: 0
ScriptResultSizeLimit: 10485760

Generic {
MdbGateway: "https://mdb.api.cloud.yandex.net:443"

Connector {
UseSsl: false

Endpoint {
host: "localhost"
port: 50051
}
}
}

HttpGateway {
BuffersSizePerStream: 5000000
ConnectionTimeoutSeconds: 15
LowSpeedBytesLimit: 1024
LowSpeedTimeSeconds: 20
MaxInFlightCount: 2000
MaxSimulatenousDownloadsSize: 2000000000
RequestTimeoutSeconds: 0
}

S3 {
AllowConcurrentListings: true
FileSizeLimit: 100000000000
GeneratorPathsLimit: 50000
ListingCallbackPerThreadQueueSize: 100
ListingCallbackThreadCount: 1
MaxDirectoriesAndFilesPerQuery: 500000
MaxDiscoveryFilesPerQuery: 1000
MaxFilesPerQuery: 500000
MaxInflightListsPerQuery: 100
MaxReadSizePerQuery: 1000000000000
MinDesiredDirectoriesOfFilesPerQuery: 1000
RegexpCacheSize: 100

DefaultSettings {
Name: "UseBlocksSource"
Value: "true"
}
}
}

ResourceBrokerConfig {
Queues {
Name: "queue_kqp_resource_manager"
Weight: 30

Limit {
Memory: 6442450944
}
}

ResourceLimit {
Memory: 6442450944
}
}

TableServiceConfig {
BindingsMode: BM_DROP
CompileTimeoutMs: 600000
SessionsLimitPerNode: 1000

QueryLimits {
DataQueryTimeoutMs: 3600000
}

ResourceManager {
QueryMemoryLimit: 64424509440
}
}