Skip to content

Commit 636f707

Browse files
committed
WIP
1 parent c2e9bc8 commit 636f707

18 files changed

+1541
-31
lines changed

ydb/core/kqp/host/kqp_gateway_proxy.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1368,7 +1368,8 @@ class TKqpGatewayProxy : public IKikimrGateway {
13681368
op.SetPrefix(settings.Prefix);
13691369

13701370
if (settings.Settings.IncrementalBackupEnabled) {
1371-
op.MutableIncrementalBackupConfig();
1371+
auto* config = op.MutableIncrementalBackupConfig();
1372+
config->SetOmitIndexes(settings.Settings.OmitIndexes);
13721373
}
13731374

13741375
auto errOpt = std::visit(

ydb/core/kqp/provider/yql_kikimr_exec.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1021,6 +1021,13 @@ namespace {
10211021
"INCREMENTAL_BACKUP_ENABLED must be true or false"));
10221022
return false;
10231023
}
1024+
} else if (name == "omit_indexes") {
1025+
auto value = ToString(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value());
1026+
if (!TryFromString(value, dstSettings.OmitIndexes)) {
1027+
ctx.AddError(TIssue(ctx.GetPosition(pos),
1028+
"OMIT_INDEXES must be true or false"));
1029+
return false;
1030+
}
10241031
} else if (name == "storage") {
10251032
auto value = ToString(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value());
10261033
if (to_lower(value) != "cluster") {

ydb/core/kqp/provider/yql_kikimr_gateway.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1052,7 +1052,8 @@ struct TAnalyzeSettings {
10521052
};
10531053

10541054
struct TBackupCollectionSettings {
1055-
bool IncrementalBackupEnabled;
1055+
bool IncrementalBackupEnabled = false;
1056+
bool OmitIndexes = false;
10561057
};
10571058

10581059
struct TCreateBackupCollectionSettings {

ydb/core/kqp/provider/yql_kikimr_type_ann.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2448,6 +2448,7 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
24482448
const THashSet<TString> supportedSettings = {
24492449
"incremental_backup_enabled",
24502450
"storage",
2451+
"omit_indexes",
24512452
};
24522453

24532454
if (!CheckBackupCollectionSettings(node.BackupCollectionSettings(), supportedSettings, ctx)) {

ydb/core/protos/flat_scheme_op.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2312,7 +2312,7 @@ message TBackupCollectionDescription {
23122312
}
23132313

23142314
message TIncrementalBackupConfig {
2315-
2315+
optional bool OmitIndexes = 1 [default = false];
23162316
}
23172317

23182318
oneof Entries {

ydb/core/tx/datashard/change_collector_cdc_stream.cpp

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,15 @@ bool TCdcStreamChangeCollector::NeedToReadKeys() const {
123123
}
124124

125125
bool value = false;
126-
for (const auto& [_, tableInfo] : Self->GetUserTables()) {
127-
for (const auto& [_, streamInfo] : tableInfo->CdcStreams) {
126+
const auto& userTables = Self->GetUserTables();
127+
Cerr << "CDC_DEBUG: NeedToReadKeys checking " << userTables.size() << " user tables" << Endl;
128+
for (const auto& [tableLocalPathId, tableInfo] : userTables) {
129+
Cerr << "CDC_DEBUG: Table LocalPathId=" << tableLocalPathId
130+
<< " has " << tableInfo->CdcStreams.size() << " CDC streams" << Endl;
131+
for (const auto& [streamPathId, streamInfo] : tableInfo->CdcStreams) {
132+
Cerr << "CDC_DEBUG: Stream PathId=" << streamPathId
133+
<< " State=" << static_cast<ui32>(streamInfo.State)
134+
<< " Mode=" << static_cast<ui32>(streamInfo.Mode) << Endl;
128135
if (streamInfo.State == NKikimrSchemeOp::ECdcStreamStateDisabled) {
129136
continue;
130137
}
@@ -145,15 +152,29 @@ bool TCdcStreamChangeCollector::NeedToReadKeys() const {
145152
}
146153

147154
CachedNeedToReadKeys = value;
155+
Cerr << "CDC_DEBUG: NeedToReadKeys returning " << value << Endl;
148156
return *CachedNeedToReadKeys;
149157
}
150158

151159
bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
152160
TArrayRef<const TRawTypeValue> key, TArrayRef<const TUpdateOp> updates)
153161
{
154-
Y_ENSURE(Self->IsUserTable(tableId), "Unknown table: " << tableId);
162+
Cerr << "CDC_DEBUG: Collect called for TableId OwnerId=" << tableId.PathId.OwnerId
163+
<< " LocalPathId=" << tableId.PathId.LocalPathId
164+
<< " RowOp=" << static_cast<ui32>(rop) << Endl;
165+
166+
if (!Self->IsUserTable(tableId)) {
167+
const auto& userTables = Self->GetUserTables();
168+
Cerr << "CDC_DEBUG: IsUserTable returned FALSE! TableId not in UserTables map." << Endl;
169+
Cerr << "CDC_DEBUG: UserTables contains " << userTables.size() << " tables:" << Endl;
170+
for (const auto& [localPathId, _] : userTables) {
171+
Cerr << "CDC_DEBUG: LocalPathId=" << localPathId << Endl;
172+
}
173+
Y_ENSURE(false, "Unknown table: " << tableId);
174+
}
155175

156176
auto userTable = Self->GetUserTables().at(tableId.PathId.LocalPathId);
177+
Cerr << "CDC_DEBUG: Found user table with " << userTable->CdcStreams.size() << " CDC streams" << Endl;
157178
const auto& keyTags = userTable->KeyColumnIds;
158179
const auto& keyTypes = userTable->KeyColumnTypes;
159180
const auto valueTags = MakeValueTags(userTable->Columns);
@@ -173,6 +194,9 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
173194
}
174195

175196
for (const auto& [pathId, stream] : userTable->CdcStreams) {
197+
Cerr << "CDC_DEBUG: Processing CDC stream PathId=" << pathId
198+
<< " State=" << static_cast<ui32>(stream.State)
199+
<< " Mode=" << static_cast<ui32>(stream.Mode) << Endl;
176200
TMaybe<TRowState> initialState;
177201
TMaybe<TRowVersion> snapshotVersion;
178202

ydb/core/tx/datashard/create_cdc_stream_unit.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,17 @@ class TCreateCdcStreamUnit : public TExecutionUnit {
4343
const auto version = params.GetTableSchemaVersion();
4444
Y_ENSURE(version);
4545

46+
Cerr << "CDC_DEBUG: CreateCdcStreamUnit creating CDC stream on table PathId=" << pathId
47+
<< " (OwnerId=" << pathId.OwnerId << " LocalPathId=" << pathId.LocalPathId << ")"
48+
<< " streamPathId=" << streamPathId
49+
<< " State=" << static_cast<ui32>(streamDesc.GetState())
50+
<< " Mode=" << static_cast<ui32>(streamDesc.GetMode()) << Endl;
51+
4652
auto tableInfo = DataShard.AlterTableAddCdcStream(ctx, txc, pathId, version, streamDesc);
4753
TDataShardLocksDb locksDb(DataShard, txc);
4854
DataShard.AddUserTable(pathId, tableInfo, &locksDb);
55+
56+
Cerr << "CDC_DEBUG: Added table to TableInfos with " << tableInfo->CdcStreams.size() << " CDC streams" << Endl;
4957

5058
if (tableInfo->NeedSchemaSnapshots()) {
5159
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);

ydb/core/tx/datashard/datashard_impl.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1700,8 +1700,16 @@ class TDataShard
17001700
}
17011701

17021702
bool IsUserTable(const TTableId& tableId) const {
1703-
return (TableInfos.find(tableId.PathId.LocalPathId) != TableInfos.end())
1704-
&& !TSysTables::IsSystemTable(tableId);
1703+
bool inTableInfos = TableInfos.find(tableId.PathId.LocalPathId) != TableInfos.end();
1704+
bool isSystemTable = TSysTables::IsSystemTable(tableId);
1705+
bool result = inTableInfos && !isSystemTable;
1706+
if (!result) {
1707+
Cerr << "CDC_DEBUG: IsUserTable returning FALSE for TableId OwnerId=" << tableId.PathId.OwnerId
1708+
<< " LocalPathId=" << tableId.PathId.LocalPathId
1709+
<< " inTableInfos=" << inTableInfos
1710+
<< " isSystemTable=" << isSystemTable << Endl;
1711+
}
1712+
return result;
17051713
}
17061714

17071715
const THashMap<ui64, TUserTable::TCPtr> &GetUserTables() const { return TableInfos; }

0 commit comments

Comments
 (0)