Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ydb/core/protos/table_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,8 @@ message TTableStats {
repeated TChannelStats Channels = 30;

optional TStoragePoolsStats StoragePools = 31;

// denotes that datashard should be background compacted
// even if it is single parted
optional bool HasSchemaChanges = 33;
}
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/flat_comp.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ namespace NTable {
/**
* Returns row schema of the specified table
*/
virtual TIntrusiveConstPtr<TRowScheme> RowScheme(ui32 table) = 0;
virtual TIntrusiveConstPtr<TRowScheme> RowScheme(ui32 table) const = 0;

/**
* Returns schema of the specified table
Expand Down
100 changes: 83 additions & 17 deletions ydb/core/tablet_flat/flat_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4303,7 +4303,7 @@ const NTable::TScheme& TExecutor::DatabaseScheme()
return Scheme();
}

TIntrusiveConstPtr<NTable::TRowScheme> TExecutor::RowScheme(ui32 table)
TIntrusiveConstPtr<NTable::TRowScheme> TExecutor::RowScheme(ui32 table) const
{
return Database->GetRowScheme(table);
}
Expand Down Expand Up @@ -4344,6 +4344,80 @@ const NTable::TRowVersionRanges& TExecutor::TableRemovedRowVersions(ui32 table)
return Database->GetRemovedRowVersions(table);
}

bool TExecutor::HasSchemaChanges(ui32 table) const {
auto *tableInfo = Scheme().GetTableInfo(table);
auto rowScheme = RowScheme(table);
if (!tableInfo || !rowScheme) {
return false;
}

auto subset = Database->Subset(table, NTable::TEpoch::Max(), { } , { });
for (const auto& partView : subset->Flatten) {
if (HasSchemaChanges(partView, *tableInfo, *rowScheme)) {
return true;
}
}

return false;
}

bool TExecutor::HasSchemaChanges(const NTable::TPartView& partView, const NTable::TScheme::TTableInfo& tableInfo, const NTable::TRowScheme& rowScheme) const {
if (partView.Part->Stat.Rows == 0) {
return false;
}

{ // Check by key filter existence
bool partByKeyFilter = bool(partView->ByKey);
bool schemeByKeyFilter = tableInfo.ByKeyFilter;
if (partByKeyFilter != schemeByKeyFilter) {
return true;
}
}

{ // Check B-Tree index existence
if (AppData()->FeatureFlags.GetEnableLocalDBBtreeIndex() && !partView->IndexPages.HasBTree()) {
return true;
}
}

{ // Check families
size_t partFamiliesCount = partView->GroupsCount;
size_t schemeFamiliesCount = rowScheme.Families.size();
if (partFamiliesCount != schemeFamiliesCount) {
return true;
}

for (size_t index : xrange(rowScheme.Families.size())) {
auto familyId = rowScheme.Families[index];
static const NTable::TScheme::TFamily defaultFamilySettings;
const auto& family = tableInfo.Families.ValueRef(familyId, defaultFamilySettings); // Workaround for KIKIMR-17222

const auto* schemeGroupRoom = tableInfo.Rooms.FindPtr(family.Room);
Y_ABORT_UNLESS(schemeGroupRoom, "Cannot find room %" PRIu32 " in table %" PRIu32, family.Room, tableInfo.Id);

ui32 partGroupChannel = partView.Part->GetGroupChannel(NTable::NPage::TGroupId(index));
if (partGroupChannel != schemeGroupRoom->Main) {
return true;
}
}
}

{ // Check columns
THashMap<NTable::TTag, ui32> partColumnGroups, schemeColumnGroups;
for (const auto& column : partView->Scheme->AllColumns) {
partColumnGroups[column.Tag] = column.Group;
}
for (const auto& col : rowScheme.Cols) {
schemeColumnGroups[col.Tag] = col.Group;
}
if (partColumnGroups != schemeColumnGroups) {
return true;
}
}

return false;
}

ui64 TExecutor::BeginCompaction(THolder<NTable::TCompactionParams> params)
{
if (auto logl = Logger->Log(ELnLev::Info))
Expand Down Expand Up @@ -4383,37 +4457,29 @@ ui64 TExecutor::BeginCompaction(THolder<NTable::TCompactionParams> params)

for (size_t group : xrange(rowScheme->Families.size())) {
auto familyId = rowScheme->Families[group];
const auto* family = tableInfo->Families.FindPtr(familyId);
if (Y_UNLIKELY(!family)) {
// FIXME: workaround for KIKIMR-17222
// Column families with default settings may be missing in schema,
// so we have to use a static variable as a substitute
static const NTable::TScheme::TFamily defaultFamilySettings;
family = &defaultFamilySettings;
}
Y_ABORT_UNLESS(family, "Cannot find family %" PRIu32 " in table %" PRIu32, familyId, table);
static const NTable::TScheme::TFamily defaultFamilySettings;
const auto& family = tableInfo->Families.ValueRef(familyId, defaultFamilySettings); // Workaround for KIKIMR-17222

auto roomId = family->Room;
auto* room = tableInfo->Rooms.FindPtr(roomId);
Y_ABORT_UNLESS(room, "Cannot find room %" PRIu32 " in table %" PRIu32, roomId, table);
auto* room = tableInfo->Rooms.FindPtr(family.Room);
Y_ABORT_UNLESS(room, "Cannot find room %" PRIu32 " in table %" PRIu32, family.Room, table);

auto& pageGroup = comp->Layout.Groups.at(group);
auto& writeGroup = comp->Writer.Groups.at(group);

pageGroup.Codec = family->Codec;
pageGroup.Codec = family.Codec;
pageGroup.PageSize = policy->MinDataPageSize;
pageGroup.BTreeIndexNodeTargetSize = policy->MinBTreeIndexNodeSize;
pageGroup.BTreeIndexNodeKeysMin = policy->MinBTreeIndexNodeKeys;

writeGroup.Cache = Max(family->Cache, cache);
writeGroup.Cache = Max(family.Cache, cache);
writeGroup.MaxBlobSize = NBlockIO::BlockSize;
writeGroup.Channel = room->Main;
addChannel(room->Main);

if (group == 0) {
// Small/Large edges are taken from the leader family
comp->Layout.SmallEdge = family->Small;
comp->Layout.LargeEdge = family->Large;
comp->Layout.SmallEdge = family.Small;
comp->Layout.LargeEdge = family.Large;

// Small/Large channels are taken from the leader family
comp->Writer.BlobsChannel = room->Blobs;
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tablet_flat/flat_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ class TExecutor

ui64 OwnerTabletId() const override;
const NTable::TScheme& DatabaseScheme() override;
TIntrusiveConstPtr<NTable::TRowScheme> RowScheme(ui32 table) override;
TIntrusiveConstPtr<NTable::TRowScheme> RowScheme(ui32 table) const override;
const NTable::TScheme::TTableInfo* TableScheme(ui32 table) override;
ui64 TableMemSize(ui32 table, NTable::TEpoch epoch) override;
NTable::TPartView TablePart(ui32 table, const TLogoBlobID& label) override;
Expand Down Expand Up @@ -651,6 +651,8 @@ class TExecutor
bool CancelScan(ui32 tableId, ui64 taskId) override;

TFinishedCompactionInfo GetFinishedCompactionInfo(ui32 tableId) const override;
bool HasSchemaChanges(ui32 table) const override;
bool HasSchemaChanges(const NTable::TPartView& partView, const NTable::TScheme::TTableInfo& tableInfo, const NTable::TRowScheme& rowScheme) const;
ui64 CompactBorrowed(ui32 tableId) override;
ui64 CompactMemTable(ui32 tableId) override;
ui64 CompactTable(ui32 tableId) override;
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/tablet_flat/flat_stat_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ bool BuildStats(const TSubset& subset, TStats& stats, ui64 rowCountResolution, u
}

void GetPartOwners(const TSubset& subset, THashSet<ui64>& partOwners) {
for (auto& pi : subset.Flatten) {
partOwners.insert(pi->Label.TabletID());
for (const auto& partView : subset.Flatten) {
partOwners.insert(partView->Label.TabletID());
}
for (auto& pi : subset.ColdParts) {
partOwners.insert(pi->Label.TabletID());
for (const auto& coldPart : subset.ColdParts) {
partOwners.insert(coldPart->Label.TabletID());
}
for (auto& pi : subset.TxStatus) {
partOwners.insert(pi->Label.TabletID());
for (const auto& txStatus : subset.TxStatus) {
partOwners.insert(txStatus->Label.TabletID());
}
}

Expand Down
13 changes: 6 additions & 7 deletions ydb/core/tablet_flat/flat_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1398,13 +1398,12 @@ bool TTable::RemoveRowVersions(const TRowVersion& lower, const TRowVersion& uppe

TCompactionStats TTable::GetCompactionStats() const
{
TCompactionStats stats;
stats.MemRowCount = GetMemRowCount();
stats.MemDataSize = GetMemSize();
stats.MemDataWaste = GetMemWaste();
stats.PartCount = Flatten.size() + ColdParts.size();

return stats;
return {
.PartCount = Flatten.size() + ColdParts.size(),
.MemRowCount = GetMemRowCount(),
.MemDataSize = GetMemSize(),
.MemDataWaste = GetMemWaste(),
};
}

void TTable::SetTableObserver(TIntrusivePtr<ITableObserver> ptr) noexcept
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tablet_flat/tablet_flat_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,7 @@ namespace NFlatExecutorSetup {

// edge and ts of last full compaction
virtual TFinishedCompactionInfo GetFinishedCompactionInfo(ui32 tableId) const = 0;
virtual bool HasSchemaChanges(ui32 table) const = 0;

// Forces full compaction of the specified table in the near future
// Returns 0 if can't compact, otherwise compaction ID
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/ut/flat_comp_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class TSimpleBackend : public ICompactionBackend {
return DB.GetScheme();
}

TIntrusiveConstPtr<NKikimr::NTable::TRowScheme> RowScheme(ui32 table) override {
TIntrusiveConstPtr<NKikimr::NTable::TRowScheme> RowScheme(ui32 table) const override {
return DB.GetRowScheme(table);
}

Expand Down
8 changes: 4 additions & 4 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info)
, MaxTxLagMilliseconds(5*60*1000, 0, 30*24*3600*1000ll)
, CanCancelROWithReadSets(0, 0, 1)
, PerShardReadSizeLimit(5368709120, 0, 107374182400)
, CpuUsageReportThreshlodPercent(60, -1, 146)
, CpuUsageReportThresholdPercent(60, -1, 146)
, CpuUsageReportIntervalSeconds(60, 0, 365*86400)
, HighDataSizeReportThreshlodBytes(10ull<<30, -1, Max<i64>())
, HighDataSizeReportThresholdBytes(10ull<<30, -1, Max<i64>())
, HighDataSizeReportIntervalSeconds(60, 0, 365*86400)
, DataTxProfileLogThresholdMs(0, 0, 86400000)
, DataTxProfileBufferThresholdMs(0, 0, 86400000)
Expand Down Expand Up @@ -305,9 +305,9 @@ void TDataShard::IcbRegister() {

appData->Icb->RegisterSharedControl(CanCancelROWithReadSets, "DataShardControls.CanCancelROWithReadSets");
appData->Icb->RegisterSharedControl(PerShardReadSizeLimit, "TxLimitControls.PerShardReadSizeLimit");
appData->Icb->RegisterSharedControl(CpuUsageReportThreshlodPercent, "DataShardControls.CpuUsageReportThreshlodPercent");
appData->Icb->RegisterSharedControl(CpuUsageReportThresholdPercent, "DataShardControls.CpuUsageReportThreshlodPercent");
appData->Icb->RegisterSharedControl(CpuUsageReportIntervalSeconds, "DataShardControls.CpuUsageReportIntervalSeconds");
appData->Icb->RegisterSharedControl(HighDataSizeReportThreshlodBytes, "DataShardControls.HighDataSizeReportThreshlodBytes");
appData->Icb->RegisterSharedControl(HighDataSizeReportThresholdBytes, "DataShardControls.HighDataSizeReportThreshlodBytes");
appData->Icb->RegisterSharedControl(HighDataSizeReportIntervalSeconds, "DataShardControls.HighDataSizeReportIntervalSeconds");

appData->Icb->RegisterSharedControl(BackupReadAheadLo, "DataShardControls.BackupReadAheadLo");
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/datashard/datashard__compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ class TDataShard::TTxCompactTable : public NTabletFlatExecutor::TTransactionBase
auto stats = txc.DB.GetCompactionStats(localTid);
bool isEmpty = stats.PartCount == 0 && stats.MemDataSize == 0;
bool isSingleParted = stats.PartCount == 1 && stats.MemDataSize == 0;
if (isEmpty || isSingleParted && !hasBorrowed && !record.HasCompactSinglePartedShards()) {
bool hasSchemaChanges = Self->Executor()->HasSchemaChanges(tableInfo.LocalTid);
if (isEmpty || isSingleParted && !hasBorrowed && !hasSchemaChanges && !record.GetCompactSinglePartedShards()) {
// nothing to compact
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"Background compaction of tablet# " << Self->TabletID()
Expand Down
Loading
Loading