Skip to content

Commit e15ef51

Browse files
authored
Merge c475c28 into cd63894
2 parents cd63894 + c475c28 commit e15ef51

39 files changed

+1545
-17
lines changed

ydb/core/kqp/ut/pg/kqp_pg_ut.cpp

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1698,6 +1698,136 @@ Y_UNIT_TEST_SUITE(KqpPg) {
16981698
}
16991699
}
17001700

1701+
Y_UNIT_TEST(CopyTableSerialColumns) {
1702+
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false).SetEnableNotNullDataColumns(true));
1703+
auto client = kikimr.GetTableClient();
1704+
auto session = client.CreateSession().GetValueSync().GetSession();
1705+
{
1706+
const auto query = Q_(R"(
1707+
--!syntax_pg
1708+
CREATE TABLE PgSerial (
1709+
key serial PRIMARY KEY,
1710+
value int2
1711+
))");
1712+
1713+
auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
1714+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
1715+
}
1716+
1717+
{
1718+
const auto query = Q_(R"(
1719+
--!syntax_pg
1720+
INSERT INTO PgSerial (value) values (1);
1721+
)");
1722+
1723+
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
1724+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
1725+
}
1726+
1727+
{
1728+
const auto result = session.CopyTable("/Root/PgSerial", "/Root/copy").GetValueSync();
1729+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
1730+
1731+
auto desc = session.DescribeTable("/Root/copy").ExtractValueSync();
1732+
UNIT_ASSERT_C(desc.IsSuccess(), desc.GetIssues().ToString());
1733+
}
1734+
1735+
{
1736+
const auto query = Q_(R"(
1737+
--!syntax_pg
1738+
SELECT * FROM copy;
1739+
)");
1740+
1741+
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
1742+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
1743+
1744+
UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
1745+
CompareYson(R"(
1746+
[["1";"1"]]
1747+
)", FormatResultSetYson(result.GetResultSet(0)));
1748+
}
1749+
1750+
{
1751+
const auto query = Q_(R"(
1752+
--!syntax_pg
1753+
INSERT INTO copy (value) values (1);
1754+
)");
1755+
1756+
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
1757+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
1758+
}
1759+
1760+
{
1761+
const auto query = Q_(R"(
1762+
--!syntax_pg
1763+
SELECT * FROM copy;
1764+
)");
1765+
1766+
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
1767+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
1768+
1769+
UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
1770+
CompareYson(R"(
1771+
[["1";"1"];["2";"1"]]
1772+
)", FormatResultSetYson(result.GetResultSet(0)));
1773+
}
1774+
1775+
{
1776+
const auto query = Q_(R"(
1777+
--!syntax_pg
1778+
SELECT * FROM PgSerial;
1779+
)");
1780+
1781+
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
1782+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
1783+
1784+
UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
1785+
CompareYson(R"(
1786+
[["1";"1"]]
1787+
)", FormatResultSetYson(result.GetResultSet(0)));
1788+
}
1789+
1790+
{
1791+
const auto query = Q_(R"(
1792+
--!syntax_pg
1793+
INSERT INTO PgSerial (value) values (1);
1794+
)");
1795+
1796+
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
1797+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
1798+
}
1799+
1800+
{
1801+
const auto query = Q_(R"(
1802+
--!syntax_pg
1803+
SELECT * FROM copy;
1804+
)");
1805+
1806+
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
1807+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
1808+
1809+
UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
1810+
CompareYson(R"(
1811+
[["1";"1"];["2";"1"]]
1812+
)", FormatResultSetYson(result.GetResultSet(0)));
1813+
}
1814+
1815+
{
1816+
const auto query = Q_(R"(
1817+
--!syntax_pg
1818+
SELECT * FROM PgSerial;
1819+
)");
1820+
1821+
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
1822+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
1823+
1824+
UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
1825+
CompareYson(R"(
1826+
[["1";"1"];["2";"1"]]
1827+
)", FormatResultSetYson(result.GetResultSet(0)));
1828+
}
1829+
}
1830+
17011831
Y_UNIT_TEST(CreateIndex) {
17021832
NKikimrConfig::TAppConfig appConfig;
17031833
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);;

ydb/core/protos/counters_schemeshard.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,8 @@ enum ESimpleCounters {
195195
COUNTER_IN_FLIGHT_OPS_TxDropView = 157 [(CounterOpts) = {Name: "InFlightOps/DropView"}];
196196

197197
COUNTER_GRAPHSHARD_COUNT = 158 [(CounterOpts) = {Name: "GraphShards"}];
198+
199+
COUNTER_IN_FLIGHT_OPS_TxCopySequence = 159 [(CounterOpts) = {Name: "InFlightOps/CopySequence"}];
198200
}
199201

200202
enum ECumulativeCounters {
@@ -315,6 +317,7 @@ enum ECumulativeCounters {
315317
COUNTER_FINISHED_OPS_TxCreateView = 95 [(CounterOpts) = {Name: "FinishedOps/CreateView"}];
316318
COUNTER_FINISHED_OPS_TxDropView = 96 [(CounterOpts) = {Name: "FinishedOps/DropView"}];
317319
COUNTER_FINISHED_OPS_TxAlterView = 97 [(CounterOpts) = {Name: "FinishedOps/AlterView"}];
320+
COUNTER_FINISHED_OPS_TxCopySequence = 98 [(CounterOpts) = {Name: "FinishedOps/TxCopySequence"}];
318321
}
319322

320323
enum EPercentileCounters {
@@ -536,4 +539,6 @@ enum ETxTypes {
536539
TXTYPE_PERSQUEUE_PROPOSE_RESULT = 83 [(TxTypeOpts) = {Name: "TxPersQueueProposeResult"}];
537540
TXTYPE_PERSQUEUE_PROPOSE_ATTACH_RESULT = 84 [(TxTypeOpts) = {Name: "TxProposeTransactionAttachResult"}];
538541
TXTYPE_UPDATE_DOMAIN_REPLY = 85 [(TxTypeOpts) = {Name: "TxUpdateDomainReply"}];
542+
543+
TXTYPE_SEQUENCESHARD_GET_SEQUENCE_RESULT = 86 [(TxTypeOpts) = {Name: "TxSequenceShardGetSequenceResult"}];
539544
}

ydb/core/protos/counters_sequenceshard.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,5 @@ enum ETxTypes {
3434
TXTYPE_FREEZE_SEQUENCE = 7 [(TxTypeOpts) = {Name: "TxFreezeSequence"}];
3535
TXTYPE_RESTORE_SEQUENCE = 8 [(TxTypeOpts) = {Name: "TxRestoreSequence"}];
3636
TXTYPE_REDIRECT_SEQUENCE = 9 [(TxTypeOpts) = {Name: "TxRedirectSequence"}];
37+
TXTYPE_GET_SEQUENCE = 10 [(TxTypeOpts) = {Name: "TxGetSequence"}];
3738
}

ydb/core/protos/flat_scheme_op.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1305,6 +1305,7 @@ message TSequenceDescription {
13051305
optional uint64 Cache = 8; // number of items to cache, defaults to 1
13061306
optional sint64 Increment = 9; // increment at each call, defaults to 1
13071307
optional bool Cycle = 10; // true when cycle on overflow is allowed
1308+
optional bool Overflowed = 11; // true when sequence is overflowed
13081309
}
13091310

13101311
message TSequenceSharding {
@@ -1590,6 +1591,12 @@ message TModifyScheme {
15901591
optional TViewDescription CreateView = 64;
15911592

15921593
optional NActorsProto.TActorId TempTableOwnerActorId = 65;
1594+
1595+
optional TCopySequence CopySequence = 66;
1596+
}
1597+
1598+
message TCopySequence {
1599+
optional string CopyFrom = 1;
15931600
}
15941601

15951602
// "Script", used by client to parse text files with multiple DDL commands

ydb/core/protos/out/out_sequenceshard.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,7 @@ Y_DECLARE_OUT_SPEC(, NKikimrTxSequenceShard::TEvRestoreSequenceResult::EStatus,
2929
Y_DECLARE_OUT_SPEC(, NKikimrTxSequenceShard::TEvRedirectSequenceResult::EStatus, stream, value) {
3030
stream << NKikimrTxSequenceShard::TEvRedirectSequenceResult::EStatus_Name(value);
3131
}
32+
33+
Y_DECLARE_OUT_SPEC(, NKikimrTxSequenceShard::TEvGetSequenceResult::EStatus, stream, value) {
34+
stream << NKikimrTxSequenceShard::TEvGetSequenceResult::EStatus_Name(value);
35+
}

ydb/core/protos/tx_sequenceshard.proto

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ message TEvCreateSequence {
3939
oneof OptionalCycle {
4040
bool Cycle = 9;
4141
}
42+
bool Frozen = 10; // defaults to false
43+
bool Overflowed = 11; // defaults to false
4244
}
4345

4446
message TEvCreateSequenceResult {
@@ -224,3 +226,33 @@ message TEvRedirectSequenceResult {
224226
uint64 TxId = 3;
225227
uint64 TxPartId = 4;
226228
}
229+
230+
message TEvGetSequence {
231+
NKikimrProto.TPathID PathId = 1;
232+
uint64 TxId = 2;
233+
uint64 TxPartId = 3;
234+
}
235+
236+
message TEvGetSequenceResult {
237+
enum EStatus {
238+
UNKNOWN = 0;
239+
SUCCESS = 1;
240+
PIPE_OUTDATED = 2;
241+
SEQUENCE_NOT_FOUND = 3;
242+
SEQUENCE_MOVED = 4;
243+
}
244+
245+
EStatus Status = 1;
246+
uint64 Origin = 2;
247+
uint64 TxId = 3;
248+
uint64 TxPartId = 4;
249+
uint64 MovedTo = 5; // moved to a different sequence shard
250+
sint64 MinValue = 6;
251+
sint64 MaxValue = 7;
252+
sint64 StartValue = 8;
253+
sint64 NextValue = 9;
254+
bool NextUsed = 10;
255+
uint64 Cache = 11;
256+
sint64 Increment = 12;
257+
bool Cycle = 13;
258+
}

ydb/core/tx/schemeshard/schemeshard__operation.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,6 +1033,8 @@ ISubOperation::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxState::
10331033
Y_ABORT("TODO: implement");
10341034
case TTxState::ETxType::TxDropSequence:
10351035
return CreateDropSequence(NextPartId(), txState);
1036+
case TTxState::ETxType::TxCopySequence:
1037+
return CreateCopySequence(NextPartId(), txState);
10361038

10371039
case TTxState::ETxType::TxFillIndex:
10381040
Y_ABORT("deprecated");

ydb/core/tx/schemeshard/schemeshard__operation_common.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ void UpdatePartitioningForCopyTable(TOperationId operationId, TTxState& txState,
4343
class TProposedWaitParts: public TSubOperationState {
4444
private:
4545
TOperationId OperationId;
46+
const TTxState::ETxState NextState;
4647

4748
TString DebugHint() const override {
4849
return TStringBuilder()
@@ -51,8 +52,9 @@ class TProposedWaitParts: public TSubOperationState {
5152
}
5253

5354
public:
54-
TProposedWaitParts(TOperationId id)
55+
TProposedWaitParts(TOperationId id, TTxState::ETxState nextState = TTxState::Done)
5556
: OperationId(id)
57+
, NextState(nextState)
5658
{
5759
IgnoreMessages(DebugHint(),
5860
{ TEvHive::TEvCreateTabletReply::EventType
@@ -124,7 +126,7 @@ class TProposedWaitParts: public TSubOperationState {
124126
// Got notifications from all datashards?
125127
if (txState->ShardsInProgress.empty()) {
126128
NTableState::AckAllSchemaChanges(OperationId, *txState, context);
127-
context.SS->ChangeTxState(db, OperationId, TTxState::Done);
129+
context.SS->ChangeTxState(db, OperationId, NextState);
128130
return true;
129131
}
130132

@@ -968,7 +970,7 @@ class TPropose: public TSubOperationState {
968970
TTxState* txState = context.SS->FindTx(OperationId);
969971
Y_ABORT_UNLESS(txState);
970972
Y_ABORT_UNLESS(txState->TxType == TTxState::TxCreatePQGroup || txState->TxType == TTxState::TxAlterPQGroup || txState->TxType == TTxState::TxAllocatePQ);
971-
973+
972974
TPathId pathId = txState->TargetPathId;
973975
TPathElement::TPtr path = context.SS->PathsById.at(pathId);
974976

ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,6 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
137137
result.push_back(CreateCopyTable(NextPartId(nextId, result),
138138
CopyTableTask(srcPath, dstPath, descr.GetOmitFollowers(), descr.GetIsBackup()), sequences));
139139

140-
if (descr.GetOmitIndexes()) {
141-
continue;
142-
}
143-
144140
TVector<NKikimrSchemeOp::TSequenceDescription> sequenceDescriptions;
145141
for (const auto& child: srcPath.Base()->GetChildren()) {
146142
const auto& name = child.first;
@@ -160,6 +156,10 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
160156
continue;
161157
}
162158

159+
if (descr.GetOmitIndexes()) {
160+
continue;
161+
}
162+
163163
if (!srcIndexPath.IsTableIndex()) {
164164
continue;
165165
}
@@ -185,9 +185,11 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
185185
NKikimrSchemeOp::EOperationType::ESchemeOpCreateSequence);
186186
scheme.SetFailOnExist(true);
187187

188+
auto* copySequence = scheme.MutableCopySequence();
189+
copySequence->SetCopyFrom(srcPath.PathString() + "/" + sequenceDescription.GetName());
188190
*scheme.MutableSequence() = std::move(sequenceDescription);
189191

190-
result.push_back(CreateNewSequence(NextPartId(nextId, result), scheme));
192+
result.push_back(CreateCopySequence(NextPartId(nextId, result), scheme));
191193
}
192194
}
193195

0 commit comments

Comments
 (0)