Skip to content

24-3-13-enterprise: KIKIMR-22120: Block table splits and merges during import from S3 (#13918) (#13918) #19486

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ydb/core/protos/counters_schemeshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -574,4 +574,6 @@ enum ETxTypes {
TXTYPE_UPDATE_DOMAIN_REPLY = 85 [(TxTypeOpts) = {Name: "TxUpdateDomainReply"}];

TXTYPE_SEQUENCESHARD_GET_SEQUENCE_RESULT = 86 [(TxTypeOpts) = {Name: "TxSequenceShardGetSequenceResult"}];

TXTYPE_UNMARK_RESTORE_TABLES = 87 [(TxTypeOpts) = {Name: "TxUnmarkRestoreTables"}];
}
1 change: 1 addition & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ message TTableDescription {

optional bool OmitFollowers = 36 [default = false]; // used with CopyFromTable
optional bool IsBackup = 37 [default = false];
optional bool IsRestore = 45 [default = false];

repeated TCdcStreamDescription CdcStreams = 38;
repeated TSequenceDescription Sequences = 39;
Expand Down
13 changes: 11 additions & 2 deletions ydb/core/tx/schemeshard/schemeshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
TVector<ui64> ExportsToResume;
TVector<ui64> ImportsToResume;
THashMap<TPathId, TVector<TPathId>> CdcStreamScansToResume;
TVector<TPathId> RestoreTablesToUnmark;
bool Broken = false;

explicit TTxInit(TSelf *self)
Expand Down Expand Up @@ -307,7 +308,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
return true;
}

typedef std::tuple<TPathId, ui32, ui64, TString, TString, TString, ui64, TString, bool, TString, bool, TString> TTableRec;
typedef std::tuple<TPathId, ui32, ui64, TString, TString, TString, ui64, TString, bool, TString, bool, TString, bool> TTableRec;
typedef TDeque<TTableRec> TTableRows;

template <typename SchemaTable, typename TRowSet>
Expand All @@ -323,7 +324,8 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
rowSet.template GetValueOrDefault<typename SchemaTable::IsBackup>(false),
rowSet.template GetValueOrDefault<typename SchemaTable::ReplicationConfig>(),
rowSet.template GetValueOrDefault<typename SchemaTable::IsTemporary>(false),
rowSet.template GetValueOrDefault<typename SchemaTable::OwnerActorId>("")
rowSet.template GetValueOrDefault<typename SchemaTable::OwnerActorId>(""),
rowSet.template GetValueOrDefault<typename SchemaTable::IsRestore>(false)
);
}

Expand Down Expand Up @@ -1831,6 +1833,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
}

tableInfo->IsBackup = std::get<8>(rec);
tableInfo->IsRestore = std::get<12>(rec);

Self->Tables[pathId] = tableInfo;
Self->IncrementPathDbRefCount(pathId);
Expand Down Expand Up @@ -3883,6 +3886,8 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
<< ", read records: " << history.size()
<< ", at schemeshard: " << Self->TabletID());

RestoreTablesToUnmark.clear();

for (auto& rec: history) {
auto pathId = std::get<0>(rec);
auto txId = std::get<1>(rec);
Expand Down Expand Up @@ -3929,6 +3934,9 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
break;
case TTableInfo::TBackupRestoreResult::EKind::Restore:
tableInfo->RestoreHistory[txId] = std::move(info);
if (tableInfo->IsRestore) {
RestoreTablesToUnmark.push_back(pathId);
}
break;
}

Expand Down Expand Up @@ -4928,6 +4936,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
.CdcStreamScans = std::move(cdcStreamScansToResume),
.TablesToClean = std::move(TablesToClean),
.BlockStoreVolumesToClean = std::move(BlockStoreVolumesToClean),
.RestoreTablesToUnmark = std::move(RestoreTablesToUnmark),
});
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,18 @@ TTableInfo::TAlterDataPtr ParseParams(const TPath& path, TTableInfo::TPtr table,
copyAlter.ColumnsSize() != 0 ||
copyAlter.DropColumnsSize() != 0);

if (copyAlter.HasIsBackup() && copyAlter.GetIsBackup() != table->IsBackup) {
if (copyAlter.HasIsBackup() && copyAlter.GetIsBackup() != table->IsBackup) {
errStr = Sprintf("Cannot add/remove 'IsBackup' property");
status = NKikimrScheme::StatusInvalidParameter;
return nullptr;
}

if (copyAlter.HasIsRestore() && copyAlter.GetIsRestore() != table->IsRestore) {
errStr = Sprintf("Cannot add/remove 'IsRestore' property");
status = NKikimrScheme::StatusInvalidParameter;
return nullptr;
}

if (!hasSchemaChanges
&& !copyAlter.HasPartitionConfig()
&& !copyAlter.HasTTLSettings()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ class TProposedWaitParts: public TSubOperationState {
return false;
}

TKind::FinishStats(OperationId, txState, context);
TKind::Finish(OperationId, txState, context);
return true;
}

Expand Down Expand Up @@ -285,7 +285,7 @@ class TProposedWaitParts: public TSubOperationState {
if (txState->ShardsInProgress.empty()) {
NTableState::AckAllSchemaChanges(OperationId, *txState, context);
context.SS->ChangeTxState(db, OperationId, TTxState::Done);
TKind::FinishStats(OperationId, *txState, context);
TKind::Finish(OperationId, *txState, context);
return true;
}

Expand Down Expand Up @@ -361,7 +361,7 @@ class TAborting: public TProposedWaitParts<TKind> {
if (txState->ShardsInProgress.empty()) {
NTableState::AckAllSchemaChanges(OperationId, *txState, context);
context.SS->ChangeTxState(db, OperationId, TTxState::Done);
TKind::FinishStats(OperationId, *txState, context);
TKind::Finish(OperationId, *txState, context);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ struct TBackup {
return TRUCalculator::ReadTable(bytes);
}

static void FinishStats(const TOperationId& opId, TTxState& txState, TOperationContext& context) {
static void Finish(const TOperationId& opId, TTxState& txState, TOperationContext& context) {
if (txState.TxType != TTxState::TxBackup) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ struct TRestore {
return TRUCalculator::BulkUpsert(bytes, rows);
}

static void FinishStats(const TOperationId& opId, TTxState& txState, TOperationContext& context) {
static void Finish(const TOperationId& opId, TTxState& txState, TOperationContext& context) {
if (txState.TxType != TTxState::TxRestore) {
return;
}
Expand All @@ -77,6 +77,9 @@ struct TRestore {

NIceDb::TNiceDb db(context.GetDB());
context.SS->PersistCompletedRestore(db, opId.GetTxId(), txState, restoreInfo);

table->IsRestore = false;
context.SS->PersistTableIsRestore(db, txState.TargetPathId, table);
}

static void PersistTask(const TPathId& pathId, const TTxTransaction& tx, TOperationContext& context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,13 @@ class TSplitMerge: public TSubOperation {
return result;
}

if (tableInfo->IsRestore) {
TString errMsg = TStringBuilder()
<< "cannot split/merge restore table " << info.GetTablePath();
result->SetError(NKikimrScheme::StatusInvalidParameter, errMsg);
return result;
}

const THashMap<TShardIdx, ui64>& shardIdx2partition = tableInfo->GetShard2PartitionIdx();

TVector<ui64> srcPartitionIdxs;
Expand Down
62 changes: 62 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__unmark_restore_tables.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#include "schemeshard_impl.h"

namespace NKikimr::NSchemeShard {

using namespace NTabletFlatExecutor;

struct TSchemeShard::TTxUnmarkRestoreTables : public TTransactionBase<TSchemeShard> {
static const ui32 BucketSize = 100;
TVector<TPathId> RestoreTablesToUnmark;
ui32 UnmarkedCount;

TTxUnmarkRestoreTables(TSelf* self, TVector<TPathId> tablesToClean)
: TTransactionBase<TSchemeShard>(self)
, RestoreTablesToUnmark(std::move(tablesToClean))
, UnmarkedCount(0)
{}

TTxType GetTxType() const override {
return TXTYPE_UNMARK_RESTORE_TABLES;
}

bool Execute(TTransactionContext &txc, const TActorContext&) override {
NIceDb::TNiceDb db(txc.DB);

UnmarkedCount = 0;
while (UnmarkedCount < BucketSize && RestoreTablesToUnmark) {
TPathId tableId = RestoreTablesToUnmark.back();
if (Self->Tables.contains(tableId)) {
auto table = Self->Tables[tableId];
table->IsRestore = false;
Self->PersistTableIsRestore(db, tableId, table);
}

++UnmarkedCount;
RestoreTablesToUnmark.pop_back();
}

return true;
}

void Complete(const TActorContext &ctx) override {
if (UnmarkedCount) {
LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"TTxUnmarkRestoreTables Complete"
<< ", done for " << UnmarkedCount << " tables"
<< ", left " << RestoreTablesToUnmark.size()
<< ", at schemeshard: "<< Self->TabletID()
);
}

if (RestoreTablesToUnmark) {
Self->Execute(Self->CreateTxUnmarkRestoreTables(std::move(RestoreTablesToUnmark)), ctx);
}
}
};

NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxUnmarkRestoreTables(TVector<TPathId>&& tablesToUnmark) {
return new TTxUnmarkRestoreTables(this, std::move(tablesToUnmark));
}

} // NKikimr::NSchemeShard

20 changes: 20 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ void TSchemeShard::ActivateAfterInitialization(const TActorContext& ctx, TActiva
Execute(CreateTxCleanBlockStoreVolumes(std::move(opts.BlockStoreVolumesToClean)), ctx);
}

if (opts.RestoreTablesToUnmark) {
Execute(CreateTxUnmarkRestoreTables(std::move(opts.RestoreTablesToUnmark)), ctx);
}

if (IsDomainSchemeShard) {
InitializeTabletMigrations();
}
Expand Down Expand Up @@ -2621,6 +2625,16 @@ void TSchemeShard::PersistTableFinishColumnBuilding(NIceDb::TNiceDb& db, const T
}
}

void TSchemeShard::PersistTableIsRestore(NIceDb::TNiceDb& db, const TPathId pathId, const TTableInfo::TPtr tableInfo) {
if (pathId.OwnerId == TabletID()) {
db.Table<Schema::Tables>().Key(pathId.LocalPathId).Update(
NIceDb::TUpdate<Schema::Tables::IsRestore>(tableInfo->IsRestore));
} else {
db.Table<Schema::MigratedTables>().Key(pathId.OwnerId, pathId.LocalPathId).Update(
NIceDb::TUpdate<Schema::MigratedTables::IsRestore>(tableInfo->IsRestore));
}
}

void TSchemeShard::PersistTableAltered(NIceDb::TNiceDb& db, const TPathId pathId, const TTableInfo::TPtr tableInfo) {
TString partitionConfig;
Y_PROTOBUF_SUPPRESS_NODISCARD tableInfo->PartitionConfig().SerializeToString(&partitionConfig);
Expand All @@ -2644,6 +2658,7 @@ void TSchemeShard::PersistTableAltered(NIceDb::TNiceDb& db, const TPathId pathId
NIceDb::TUpdate<Schema::Tables::AlterTableFull>(TString()),
NIceDb::TUpdate<Schema::Tables::TTLSettings>(ttlSettings),
NIceDb::TUpdate<Schema::Tables::IsBackup>(tableInfo->IsBackup),
NIceDb::TUpdate<Schema::Tables::IsRestore>(tableInfo->IsRestore),
NIceDb::TUpdate<Schema::Tables::ReplicationConfig>(replicationConfig),
NIceDb::TUpdate<Schema::Tables::IsTemporary>(tableInfo->IsTemporary),
NIceDb::TUpdate<Schema::Tables::OwnerActorId>(tableInfo->OwnerActorId.ToString()));
Expand All @@ -2656,6 +2671,7 @@ void TSchemeShard::PersistTableAltered(NIceDb::TNiceDb& db, const TPathId pathId
NIceDb::TUpdate<Schema::MigratedTables::AlterTableFull>(TString()),
NIceDb::TUpdate<Schema::MigratedTables::TTLSettings>(ttlSettings),
NIceDb::TUpdate<Schema::MigratedTables::IsBackup>(tableInfo->IsBackup),
NIceDb::TUpdate<Schema::MigratedTables::IsRestore>(tableInfo->IsRestore),
NIceDb::TUpdate<Schema::MigratedTables::ReplicationConfig>(replicationConfig),
NIceDb::TUpdate<Schema::MigratedTables::IsTemporary>(tableInfo->IsTemporary),
NIceDb::TUpdate<Schema::MigratedTables::OwnerActorId>(tableInfo->OwnerActorId.ToString()));
Expand Down Expand Up @@ -6666,6 +6682,10 @@ void TSchemeShard::FillTableDescriptionForShardIdx(
tableDescr->SetIsBackup(true);
}

if (tinfo->IsRestore) {
tableDescr->SetIsRestore(true);
}

if (tinfo->HasReplicationConfig()) {
tableDescr->MutableReplicationConfig()->CopyFrom(tinfo->ReplicationConfig());
}
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,7 @@ class TSchemeShard
void PersistTableCreated(NIceDb::TNiceDb& db, const TPathId tableId);
void PersistTableAlterVersion(NIceDb::TNiceDb &db, const TPathId pathId, const TTableInfo::TPtr tableInfo);
void PersistTableFinishColumnBuilding(NIceDb::TNiceDb& db, const TPathId pathId, const TTableInfo::TPtr tableInfo, ui64 colId);
void PersistTableIsRestore(NIceDb::TNiceDb &db, const TPathId pathId, const TTableInfo::TPtr tableInfo);
void PersistTableAltered(NIceDb::TNiceDb &db, const TPathId pathId, const TTableInfo::TPtr tableInfo);
void PersistAddAlterTable(NIceDb::TNiceDb& db, TPathId pathId, const TTableInfo::TAlterDataPtr alter);
void PersistPersQueueGroup(NIceDb::TNiceDb &db, TPathId pathId, const TTopicInfo::TPtr);
Expand Down Expand Up @@ -857,6 +858,7 @@ class TSchemeShard
TVector<TPathId> CdcStreamScans;
TVector<TPathId> TablesToClean;
TDeque<TPathId> BlockStoreVolumesToClean;
TVector<TPathId> RestoreTablesToUnmark;
};

void SubscribeToTempTableOwners();
Expand All @@ -883,6 +885,9 @@ class TSchemeShard

void ScheduleCleanDroppedPaths();
void Handle(TEvPrivate::TEvCleanDroppedPaths::TPtr& ev, const TActorContext& ctx);

struct TTxUnmarkRestoreTables;
NTabletFlatExecutor::ITransaction* CreateTxUnmarkRestoreTables(TVector<TPathId>&& tablesToUnmark);

void EnqueueBackgroundCompaction(const TShardIdx& shardIdx, const TPartitionStats& stats);
void UpdateBackgroundCompaction(const TShardIdx& shardIdx, const TPartitionStats& stats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateTablePropose(
auto* indexedTable = modifyScheme.MutableCreateIndexedTable();
auto& tableDesc = *(indexedTable->MutableTableDescription());
tableDesc.SetName(wdAndPath.second);
tableDesc.SetIsRestore(true);

Y_ABORT_UNLESS(ss->TableProfilesLoaded);
Ydb::StatusIds::StatusCode status;
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_info_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ TTableInfo::TAlterDataPtr TTableInfo::CreateAlterData(
}

alterData->IsBackup = op.GetIsBackup();
alterData->IsRestore = op.GetIsRestore();

if (source && op.KeyColumnNamesSize() == 0)
return alterData;
Expand Down Expand Up @@ -1890,6 +1891,11 @@ bool TTableInfo::CheckCanMergePartitions(const TSplitSettings& splitSettings,
return false;
}

// Don't split/merge restore tables
if (IsRestore) {
return false;
}

// Ignore stats from unknown datashard (it could have been split)
if (!Stats.PartitionStats.contains(shardIdx)) {
return false;
Expand Down Expand Up @@ -1941,6 +1947,10 @@ bool TTableInfo::CheckSplitByLoad(
if (IsBackup)
return false;

// Don't split/merge restore tables
if (IsRestore)
return false;

if (!splitSettings.SplitByLoadEnabled)
return false;

Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_info_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ struct TTableInfo : public TSimpleRefCount<TTableInfo> {
THashMap<ui32, TColumn> Columns;
TVector<ui32> KeyColumnIds;
bool IsBackup = false;
bool IsRestore = false;

NKikimrSchemeOp::TTableDescription TableDescriptionDiff;
TMaybeFail<NKikimrSchemeOp::TTableDescription> TableDescriptionFull;
Expand Down Expand Up @@ -437,6 +438,7 @@ struct TTableInfo : public TSimpleRefCount<TTableInfo> {
THashMap<ui32, TColumn> Columns;
TVector<ui32> KeyColumnIds;
bool IsBackup = false;
bool IsRestore = false;
bool IsTemporary = false;
TActorId OwnerActorId;

Expand Down Expand Up @@ -549,6 +551,7 @@ struct TTableInfo : public TSimpleRefCount<TTableInfo> {
, Columns(std::move(alterData.Columns))
, KeyColumnIds(std::move(alterData.KeyColumnIds))
, IsBackup(alterData.IsBackup)
, IsRestore(alterData.IsRestore)
{
TableDescription.Swap(alterData.TableDescriptionFull.Get());
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1269,6 +1269,7 @@ void TSchemeShard::DescribeTable(
}

entry->SetIsBackup(tableInfo->IsBackup);
entry->SetIsRestore(tableInfo->IsRestore);
}

void TSchemeShard::DescribeTableIndex(const TPathId& pathId, const TString& name,
Expand Down
Loading
Loading