Skip to content

Support CopySequence in schemeshard #2693

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Mar 22, 2024
130 changes: 130 additions & 0 deletions ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1698,6 +1698,136 @@ Y_UNIT_TEST_SUITE(KqpPg) {
}
}

Y_UNIT_TEST(CopyTableSerialColumns) {
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false).SetEnableNotNullDataColumns(true));
auto client = kikimr.GetTableClient();
auto session = client.CreateSession().GetValueSync().GetSession();
{
const auto query = Q_(R"(
--!syntax_pg
CREATE TABLE PgSerial (
key serial PRIMARY KEY,
value int2
))");

auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
const auto query = Q_(R"(
--!syntax_pg
INSERT INTO PgSerial (value) values (1);
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
const auto result = session.CopyTable("/Root/PgSerial", "/Root/copy").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

auto desc = session.DescribeTable("/Root/copy").ExtractValueSync();
UNIT_ASSERT_C(desc.IsSuccess(), desc.GetIssues().ToString());
}

{
const auto query = Q_(R"(
--!syntax_pg
SELECT * FROM copy;
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
CompareYson(R"(
[["1";"1"]]
)", FormatResultSetYson(result.GetResultSet(0)));
}

{
const auto query = Q_(R"(
--!syntax_pg
INSERT INTO copy (value) values (1);
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
const auto query = Q_(R"(
--!syntax_pg
SELECT * FROM copy;
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
CompareYson(R"(
[["1";"1"];["2";"1"]]
)", FormatResultSetYson(result.GetResultSet(0)));
}

{
const auto query = Q_(R"(
--!syntax_pg
SELECT * FROM PgSerial;
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
CompareYson(R"(
[["1";"1"]]
)", FormatResultSetYson(result.GetResultSet(0)));
}

{
const auto query = Q_(R"(
--!syntax_pg
INSERT INTO PgSerial (value) values (1);
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
const auto query = Q_(R"(
--!syntax_pg
SELECT * FROM copy;
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
CompareYson(R"(
[["1";"1"];["2";"1"]]
)", FormatResultSetYson(result.GetResultSet(0)));
}

{
const auto query = Q_(R"(
--!syntax_pg
SELECT * FROM PgSerial;
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
CompareYson(R"(
[["1";"1"];["2";"1"]]
)", FormatResultSetYson(result.GetResultSet(0)));
}
}

Y_UNIT_TEST(CreateIndex) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);;
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/protos/counters_schemeshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ enum ESimpleCounters {
COUNTER_IN_FLIGHT_OPS_TxDropView = 157 [(CounterOpts) = {Name: "InFlightOps/DropView"}];

COUNTER_GRAPHSHARD_COUNT = 158 [(CounterOpts) = {Name: "GraphShards"}];

COUNTER_IN_FLIGHT_OPS_TxCopySequence = 159 [(CounterOpts) = {Name: "InFlightOps/CopySequence"}];
}

enum ECumulativeCounters {
Expand Down Expand Up @@ -315,6 +317,7 @@ enum ECumulativeCounters {
COUNTER_FINISHED_OPS_TxCreateView = 95 [(CounterOpts) = {Name: "FinishedOps/CreateView"}];
COUNTER_FINISHED_OPS_TxDropView = 96 [(CounterOpts) = {Name: "FinishedOps/DropView"}];
COUNTER_FINISHED_OPS_TxAlterView = 97 [(CounterOpts) = {Name: "FinishedOps/AlterView"}];
COUNTER_FINISHED_OPS_TxCopySequence = 98 [(CounterOpts) = {Name: "FinishedOps/TxCopySequence"}];
}

enum EPercentileCounters {
Expand Down Expand Up @@ -536,4 +539,6 @@ enum ETxTypes {
TXTYPE_PERSQUEUE_PROPOSE_RESULT = 83 [(TxTypeOpts) = {Name: "TxPersQueueProposeResult"}];
TXTYPE_PERSQUEUE_PROPOSE_ATTACH_RESULT = 84 [(TxTypeOpts) = {Name: "TxProposeTransactionAttachResult"}];
TXTYPE_UPDATE_DOMAIN_REPLY = 85 [(TxTypeOpts) = {Name: "TxUpdateDomainReply"}];

TXTYPE_SEQUENCESHARD_GET_SEQUENCE_RESULT = 86 [(TxTypeOpts) = {Name: "TxSequenceShardGetSequenceResult"}];
}
1 change: 1 addition & 0 deletions ydb/core/protos/counters_sequenceshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ enum ETxTypes {
TXTYPE_FREEZE_SEQUENCE = 7 [(TxTypeOpts) = {Name: "TxFreezeSequence"}];
TXTYPE_RESTORE_SEQUENCE = 8 [(TxTypeOpts) = {Name: "TxRestoreSequence"}];
TXTYPE_REDIRECT_SEQUENCE = 9 [(TxTypeOpts) = {Name: "TxRedirectSequence"}];
TXTYPE_GET_SEQUENCE = 10 [(TxTypeOpts) = {Name: "TxGetSequence"}];
}
6 changes: 6 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1580,6 +1580,12 @@ message TModifyScheme {
optional TViewDescription CreateView = 64;

optional NActorsProto.TActorId TempTableOwnerActorId = 65;

optional TCopySequence CopySequence = 66;
}

message TCopySequence {
optional string CopyFrom = 1;
}

// "Script", used by client to parse text files with multiple DDL commands
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/protos/out/out_sequenceshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ Y_DECLARE_OUT_SPEC(, NKikimrTxSequenceShard::TEvRestoreSequenceResult::EStatus,
Y_DECLARE_OUT_SPEC(, NKikimrTxSequenceShard::TEvRedirectSequenceResult::EStatus, stream, value) {
stream << NKikimrTxSequenceShard::TEvRedirectSequenceResult::EStatus_Name(value);
}

Y_DECLARE_OUT_SPEC(, NKikimrTxSequenceShard::TEvGetSequenceResult::EStatus, stream, value) {
stream << NKikimrTxSequenceShard::TEvGetSequenceResult::EStatus_Name(value);
}
31 changes: 31 additions & 0 deletions ydb/core/protos/tx_sequenceshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ message TEvCreateSequence {
oneof OptionalCycle {
bool Cycle = 9;
}
bool Frozen = 10; // defaults to false
}

message TEvCreateSequenceResult {
Expand Down Expand Up @@ -224,3 +225,33 @@ message TEvRedirectSequenceResult {
uint64 TxId = 3;
uint64 TxPartId = 4;
}

message TEvGetSequence {
NKikimrProto.TPathID PathId = 1;
uint64 TxId = 2;
uint64 TxPartId = 3;
}

message TEvGetSequenceResult {
enum EStatus {
UNKNOWN = 0;
SUCCESS = 1;
PIPE_OUTDATED = 2;
SEQUENCE_NOT_FOUND = 3;
SEQUENCE_MOVED = 4;
}

EStatus Status = 1;
uint64 Origin = 2;
uint64 TxId = 3;
uint64 TxPartId = 4;
uint64 MovedTo = 5; // moved to a different sequence shard
sint64 MinValue = 6;
sint64 MaxValue = 7;
sint64 StartValue = 8;
sint64 NextValue = 9;
bool NextUsed = 10;
uint64 Cache = 11;
sint64 Increment = 12;
bool Cycle = 13;
}
2 changes: 2 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,8 @@ ISubOperation::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxState::
Y_ABORT("TODO: implement");
case TTxState::ETxType::TxDropSequence:
return CreateDropSequence(NextPartId(), txState);
case TTxState::ETxType::TxCopySequence:
return CreateCopySequence(NextPartId(), txState);

case TTxState::ETxType::TxFillIndex:
Y_ABORT("deprecated");
Expand Down
8 changes: 5 additions & 3 deletions ydb/core/tx/schemeshard/schemeshard__operation_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ void UpdatePartitioningForCopyTable(TOperationId operationId, TTxState& txState,
class TProposedWaitParts: public TSubOperationState {
private:
TOperationId OperationId;
const TTxState::ETxState NextState;

TString DebugHint() const override {
return TStringBuilder()
Expand All @@ -51,8 +52,9 @@ class TProposedWaitParts: public TSubOperationState {
}

public:
TProposedWaitParts(TOperationId id)
TProposedWaitParts(TOperationId id, TTxState::ETxState nextState = TTxState::Done)
: OperationId(id)
, NextState(nextState)
{
IgnoreMessages(DebugHint(),
{ TEvHive::TEvCreateTabletReply::EventType
Expand Down Expand Up @@ -124,7 +126,7 @@ class TProposedWaitParts: public TSubOperationState {
// Got notifications from all datashards?
if (txState->ShardsInProgress.empty()) {
NTableState::AckAllSchemaChanges(OperationId, *txState, context);
context.SS->ChangeTxState(db, OperationId, TTxState::Done);
context.SS->ChangeTxState(db, OperationId, NextState);
return true;
}

Expand Down Expand Up @@ -968,7 +970,7 @@ class TPropose: public TSubOperationState {
TTxState* txState = context.SS->FindTx(OperationId);
Y_ABORT_UNLESS(txState);
Y_ABORT_UNLESS(txState->TxType == TTxState::TxCreatePQGroup || txState->TxType == TTxState::TxAlterPQGroup || txState->TxType == TTxState::TxAllocatePQ);

TPathId pathId = txState->TargetPathId;
TPathElement::TPtr path = context.SS->PathsById.at(pathId);

Expand Down
Loading
Loading