Skip to content

Commit 2fcb3da

Browse files
authored
Merge 6dc86b6 into bd79c91
2 parents bd79c91 + 6dc86b6 commit 2fcb3da

File tree

2 files changed

+119
-51
lines changed

2 files changed

+119
-51
lines changed

ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp

Lines changed: 117 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,18 @@ struct TCheckpointContext : public TThrRefBase {
5959
TGenerationContextPtr GenerationContext;
6060
TCheckpointGraphDescriptionContextPtr CheckpointGraphDescriptionContext;
6161
IEntityIdGenerator::TPtr EntityIdGenerator;
62+
TExecDataQuerySettings Settings;
6263

6364
TCheckpointContext(const TCheckpointId& id,
6465
ECheckpointStatus status,
6566
ECheckpointStatus expected,
66-
ui64 stateSizeBytes)
67+
ui64 stateSizeBytes,
68+
TExecDataQuerySettings settings)
6769
: CheckpointId(id)
6870
, Status(status)
6971
, ExpectedStatus(expected)
7072
, StateSizeBytes(stateSizeBytes)
73+
, Settings(settings)
7174
{
7275
}
7376
};
@@ -220,7 +223,7 @@ TFuture<TStatus> CreateCheckpoint(const TCheckpointContextPtr& context) {
220223
}
221224

222225
auto ttxControl = TTxControl::Tx(*generationContext->Transaction).CommitTx();
223-
return generationContext->Session.ExecuteDataQuery(query, ttxControl, params.Build()).Apply(
226+
return generationContext->Session.ExecuteDataQuery(query, ttxControl, params.Build(), context->Settings).Apply(
224227
[] (const TFuture<TDataQueryResult>& future) {
225228
TStatus status = future.GetValue();
226229
return status;
@@ -236,21 +239,41 @@ TFuture<TStatus> UpdateCheckpoint(const TCheckpointContextPtr& context) {
236239
auto query = Sprintf(R"(
237240
--!syntax_v1
238241
PRAGMA TablePathPrefix("%s");
239-
$ts = cast(%lu as Timestamp);
242+
DECLARE $graph_id AS String;
243+
DECLARE $coordinator_generation AS Uint64;
244+
DECLARE $seq_no AS Uint64;
245+
DECLARE $status AS Uint8;
246+
DECLARE $state_size AS Uint64;
247+
DECLARE $ts AS Timestamp;
240248
241249
UPSERT INTO %s (graph_id, coordinator_generation, seq_no, status, state_size, modified_by) VALUES
242-
("%s", %lu, %lu, %u, %lu, $ts);
250+
($graph_id, $coordinator_generation, $seq_no, $status, $state_size, $ts);
243251
)", generationContext->TablePathPrefix.c_str(),
244-
TInstant::Now().MicroSeconds(),
245-
CheckpointsMetadataTable,
246-
generationContext->PrimaryKey.c_str(),
247-
context->CheckpointId.CoordinatorGeneration,
248-
context->CheckpointId.SeqNo,
249-
(ui32)context->Status,
250-
context->StateSizeBytes);
252+
CheckpointsMetadataTable);
253+
254+
NYdb::TParamsBuilder params;
255+
params
256+
.AddParam("$graph_id")
257+
.String(generationContext->PrimaryKey)
258+
.Build()
259+
.AddParam("$coordinator_generation")
260+
.Uint64(context->CheckpointId.CoordinatorGeneration)
261+
.Build()
262+
.AddParam("$seq_no")
263+
.Uint64(context->CheckpointId.SeqNo)
264+
.Build()
265+
.AddParam("$status")
266+
.Uint8((ui8)context->Status)
267+
.Build()
268+
.AddParam("$state_size")
269+
.Uint64(context->StateSizeBytes)
270+
.Build()
271+
.AddParam("$ts")
272+
.Timestamp(TInstant::Now())
273+
.Build();
251274

252275
auto ttxControl = TTxControl::Tx(*generationContext->Transaction).CommitTx();
253-
return generationContext->Session.ExecuteDataQuery(query, ttxControl).Apply(
276+
return generationContext->Session.ExecuteDataQuery(query, ttxControl, params.Build(), context->Settings).Apply(
254277
[] (const TFuture<TDataQueryResult>& future) {
255278
TStatus status = future.GetValue();
256279
return status;
@@ -264,15 +287,20 @@ TFuture<TDataQueryResult> SelectGraphDescId(const TCheckpointContextPtr& context
264287
auto query = Sprintf(R"(
265288
--!syntax_v1
266289
PRAGMA TablePathPrefix("%s");
290+
DECLARE $graph_desc_id AS String;
267291
268292
SELECT ref_count
269293
FROM %s
270-
WHERE id = "%s";
294+
WHERE id = $graph_desc_id;
271295
)", generationContext->TablePathPrefix.c_str(),
272-
CheckpointsGraphsDescriptionTable,
273-
graphDescContext->GraphDescId.c_str());
296+
CheckpointsGraphsDescriptionTable);
297+
NYdb::TParamsBuilder params;
298+
params
299+
.AddParam("$graph_desc_id")
300+
.String(graphDescContext->GraphDescId)
301+
.Build();
274302

275-
return generationContext->Session.ExecuteDataQuery(query, TTxControl::Tx(*generationContext->Transaction));
303+
return generationContext->Session.ExecuteDataQuery(query, TTxControl::Tx(*generationContext->Transaction), params.Build(), context->Settings);
276304
}
277305

278306
bool GraphDescIdExists(const TFuture<TDataQueryResult>& result) {
@@ -292,6 +320,7 @@ TFuture<TStatus> GenerateGraphDescId(const TCheckpointContextPtr& context) {
292320
if (!result.GetValue().IsSuccess()) {
293321
return MakeFuture<TStatus>(result.GetValue());
294322
}
323+
// TODO racing!
295324
if (!GraphDescIdExists(result)) {
296325
return MakeFuture(TStatus(EStatus::SUCCESS, NYdb::NIssue::TIssues()));
297326
} else {
@@ -443,19 +472,33 @@ TFuture<TDataQueryResult> SelectCheckpoint(const TCheckpointContextPtr& context)
443472
auto query = Sprintf(R"(
444473
--!syntax_v1
445474
PRAGMA TablePathPrefix("%s");
475+
DECLARE $graph_id AS String;
476+
DECLARE $coordinator_generation AS Uint64;
477+
DECLARE $seq_no AS Uint64;
446478
447479
SELECT status
448480
FROM %s
449-
WHERE graph_id = "%s" AND coordinator_generation = %lu AND seq_no = %lu;
481+
WHERE graph_id = $graph_id AND coordinator_generation = $coordinator_generation AND seq_no = $seq_no;
450482
)", generationContext->TablePathPrefix.c_str(),
451-
CheckpointsMetadataTable,
452-
generationContext->PrimaryKey.c_str(),
453-
context->CheckpointId.CoordinatorGeneration,
454-
context->CheckpointId.SeqNo);
483+
CheckpointsMetadataTable);
484+
485+
NYdb::TParamsBuilder params;
486+
params
487+
.AddParam("$graph_id")
488+
.String(generationContext->PrimaryKey)
489+
.Build()
490+
.AddParam("$coordinator_generation")
491+
.Uint64(context->CheckpointId.CoordinatorGeneration)
492+
.Build()
493+
.AddParam("$seq_no")
494+
.Uint64(context->CheckpointId.SeqNo)
495+
.Build();
455496

456497
return generationContext->Session.ExecuteDataQuery(
457498
query,
458-
TTxControl::Tx(*generationContext->Transaction));
499+
TTxControl::Tx(*generationContext->Transaction),
500+
params.Build(),
501+
context->Settings);
459502
}
460503

461504
TFuture<TStatus> CheckCheckpoint(
@@ -768,7 +811,7 @@ TFuture<ICheckpointStorage::TCreateCheckpointResult> TCheckpointStorage::CreateC
768811
ECheckpointStatus status)
769812
{
770813
Y_ABORT_UNLESS(graphDescId);
771-
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, status, ECheckpointStatus::Pending, 0ul);
814+
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, status, ECheckpointStatus::Pending, 0ul, DefaultExecDataQuerySettings());
772815
checkpointContext->CheckpointGraphDescriptionContext = MakeIntrusive<TCheckpointGraphDescriptionContext>(graphDescId);
773816
return CreateCheckpointImpl(coordinator, checkpointContext);
774817
}
@@ -779,7 +822,7 @@ TFuture<ICheckpointStorage::TCreateCheckpointResult> TCheckpointStorage::CreateC
779822
const NProto::TCheckpointGraphDescription& graphDesc,
780823
ECheckpointStatus status)
781824
{
782-
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, status, ECheckpointStatus::Pending, 0ul);
825+
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, status, ECheckpointStatus::Pending, 0ul, DefaultExecDataQuerySettings());
783826
checkpointContext->CheckpointGraphDescriptionContext = MakeIntrusive<TCheckpointGraphDescriptionContext>(graphDesc);
784827
checkpointContext->EntityIdGenerator = EntityIdGenerator;
785828
return CreateCheckpointImpl(coordinator, checkpointContext);
@@ -820,7 +863,7 @@ TFuture<TIssues> TCheckpointStorage::UpdateCheckpointStatus(
820863
ECheckpointStatus prevStatus,
821864
ui64 stateSizeBytes)
822865
{
823-
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, newStatus, prevStatus, stateSizeBytes);
866+
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, newStatus, prevStatus, stateSizeBytes, DefaultExecDataQuerySettings());
824867
auto future = YdbConnection->TableClient.RetryOperation(
825868
[prefix = YdbConnection->TablePathPrefix, coordinator, checkpointContext] (TSession session) {
826869
auto generationContext = MakeIntrusive<TGenerationContext>(
@@ -846,7 +889,7 @@ TFuture<TIssues> TCheckpointStorage::AbortCheckpoint(
846889
const TCoordinatorId& coordinator,
847890
const TCheckpointId& checkpointId)
848891
{
849-
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, ECheckpointStatus::Aborted, ECheckpointStatus::Pending, 0ul);
892+
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, ECheckpointStatus::Aborted, ECheckpointStatus::Pending, 0ul, DefaultExecDataQuerySettings());
850893
auto future = YdbConnection->TableClient.RetryOperation(
851894
[prefix = YdbConnection->TablePathPrefix, coordinator, checkpointContext] (TSession session) {
852895
auto generationContext = MakeIntrusive<TGenerationContext>(
@@ -905,28 +948,35 @@ TFuture<ICheckpointStorage::TGetCheckpointsResult> TCheckpointStorage::GetCheckp
905948

906949
TFuture<TIssues> TCheckpointStorage::DeleteGraph(const TString& graphId) {
907950
auto future = YdbConnection->TableClient.RetryOperation(
908-
[prefix = YdbConnection->TablePathPrefix, graphId] (TSession session) {
951+
[prefix = YdbConnection->TablePathPrefix, graphId, settings = DefaultExecDataQuerySettings()] (TSession session) {
909952
// TODO: use prepared queries
910953
auto query = Sprintf(R"(
911954
--!syntax_v1
912955
PRAGMA TablePathPrefix("%s");
956+
DECLARE $graph_id AS String;
913957
914958
DELETE
915959
FROM %s
916-
WHERE graph_id = "%s";
960+
WHERE graph_id = $graph_id;
917961
918962
DELETE
919963
FROM %s
920-
WHERE graph_id = "%s";
964+
WHERE graph_id = $graph_id;
921965
)", prefix.c_str(),
922966
CoordinatorsSyncTable,
923-
graphId.c_str(),
924-
CheckpointsMetadataTable,
925-
graphId.c_str());
967+
CheckpointsMetadataTable);
968+
969+
NYdb::TParamsBuilder params;
970+
params
971+
.AddParam("$graph_id")
972+
.String(graphId)
973+
.Build();
926974

927975
auto future = session.ExecuteDataQuery(
928976
query,
929-
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx());
977+
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
978+
params.Build(),
979+
settings);
930980

931981
return future.Apply(
932982
[] (const TFuture<TDataQueryResult>& future) {
@@ -943,30 +993,48 @@ TFuture<TIssues> TCheckpointStorage::MarkCheckpointsGC(
943993
const TCheckpointId& checkpointUpperBound)
944994
{
945995
auto future = YdbConnection->TableClient.RetryOperation(
946-
[prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound] (TSession session) {
996+
[prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound, thisPtr = TIntrusivePtr(this)] (TSession session) {
947997
// TODO: use prepared queries
948998
auto query = Sprintf(R"(
949999
--!syntax_v1
9501000
PRAGMA TablePathPrefix("%s");
951-
$ts = cast(%lu as Timestamp);
1001+
DECLARE $ts AS Timestamp;
1002+
DECLARE $status AS Uint8;
1003+
DECLARE $graph_id AS String;
1004+
DECLARE $coordinator_generation AS Uint64;
1005+
DECLARE $seq_no AS Uint64;
9521006
9531007
UPDATE %s
954-
SET status = %u, modified_by = $ts
955-
WHERE graph_id = "%s" AND
956-
(coordinator_generation < %lu OR
957-
(coordinator_generation = %lu AND seq_no < %lu));
1008+
SET status = $status, modified_by = $ts
1009+
WHERE graph_id = $graph_id AND
1010+
(coordinator_generation < $coordinator_generation OR
1011+
(coordinator_generation = $coordinator_generation AND seq_no < $seq_no));
9581012
)", prefix.c_str(),
959-
TInstant::Now().MicroSeconds(),
960-
CheckpointsMetadataTable,
961-
(ui32)ECheckpointStatus::GC,
962-
graphId.c_str(),
963-
checkpointUpperBound.CoordinatorGeneration,
964-
checkpointUpperBound.CoordinatorGeneration,
965-
checkpointUpperBound.SeqNo);
1013+
CheckpointsMetadataTable);
1014+
1015+
NYdb::TParamsBuilder params;
1016+
params
1017+
.AddParam("$graph_id")
1018+
.String(graphId)
1019+
.Build()
1020+
.AddParam("$coordinator_generation")
1021+
.Uint64(checkpointUpperBound.CoordinatorGeneration)
1022+
.Build()
1023+
.AddParam("$seq_no")
1024+
.Uint64(checkpointUpperBound.SeqNo)
1025+
.Build()
1026+
.AddParam("$status")
1027+
.Uint8((ui8)ECheckpointStatus::GC)
1028+
.Build()
1029+
.AddParam("$ts")
1030+
.Timestamp(TInstant::Now())
1031+
.Build();
9661032

9671033
auto future = session.ExecuteDataQuery(
9681034
query,
969-
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx());
1035+
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
1036+
params.Build(),
1037+
thisPtr->DefaultExecDataQuerySettings());
9701038

9711039
return future.Apply(
9721040
[] (const TFuture<TDataQueryResult>& future) {
@@ -983,7 +1051,7 @@ TFuture<TIssues> TCheckpointStorage::DeleteMarkedCheckpoints(
9831051
const TCheckpointId& checkpointUpperBound)
9841052
{
9851053
auto future = YdbConnection->TableClient.RetryOperation(
986-
[prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound] (TSession session) {
1054+
[prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound, settings = DefaultExecDataQuerySettings()] (TSession session) {
9871055
// TODO: use prepared queries
9881056
using namespace fmt::literals;
9891057
const TString query = fmt::format(R"sql(
@@ -1042,7 +1110,7 @@ TFuture<TIssues> TCheckpointStorage::DeleteMarkedCheckpoints(
10421110

10431111
auto future = session.ExecuteDataQuery(
10441112
query,
1045-
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params.Build());
1113+
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params.Build(), settings);
10461114

10471115
return future.Apply(
10481116
[] (const TFuture<TDataQueryResult>& future) {

ydb/core/fq/libs/checkpoint_storage/ydb_state_storage.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -726,8 +726,8 @@ TFuture<TIssues> TStateStorage::DeleteGraph(const TString& graphId) {
726726
727727
DELETE
728728
FROM %s
729-
WHERE graph_id = "%s";
730-
)", prefix.c_str(), StatesTable, graphId.c_str());
729+
WHERE graph_id = $graph_id;
730+
)", prefix.c_str(), StatesTable);
731731

732732
auto future = session.ExecuteDataQuery(
733733
query,

0 commit comments

Comments
 (0)