Skip to content

cleanup portions store #971

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/columnshard_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

namespace NKikimr::NColumnShard {

bool Schema::IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, ui32 index, const std::function<void(const NOlap::TPortionInfo&, const NOlap::TColumnChunkLoadContext&)>& callback) {
auto rowset = db.Table<IndexColumns>().Prefix(index).Select();
bool Schema::IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, const std::function<void(const NOlap::TPortionInfo&, const NOlap::TColumnChunkLoadContext&)>& callback) {
auto rowset = db.Table<IndexColumns>().Prefix(0).Select();
if (!rowset.IsReady()) {
return false;
}
Expand Down
18 changes: 9 additions & 9 deletions ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -510,13 +510,13 @@ struct Schema : NIceDb::Schema {

// IndexColumns activities

static void IndexColumns_Write(NIceDb::TNiceDb& db, ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
static void IndexColumns_Write(NIceDb::TNiceDb& db, const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
auto proto = portion.GetMeta().SerializeToProto(row.ColumnId, row.Chunk);
auto rowProto = row.GetMeta().SerializeToProto();
if (proto) {
*rowProto.MutablePortionMeta() = std::move(*proto);
}
db.Table<IndexColumns>().Key(index, portion.GetDeprecatedGranuleId(), row.ColumnId,
db.Table<IndexColumns>().Key(0, portion.GetDeprecatedGranuleId(), row.ColumnId,
portion.GetMinSnapshot().GetPlanStep(), portion.GetMinSnapshot().GetTxId(), portion.GetPortion(), row.Chunk).Update(
NIceDb::TUpdate<IndexColumns::XPlanStep>(portion.GetRemoveSnapshot().GetPlanStep()),
NIceDb::TUpdate<IndexColumns::XTxId>(portion.GetRemoveSnapshot().GetTxId()),
Expand All @@ -528,24 +528,24 @@ struct Schema : NIceDb::Schema {
);
}

static void IndexColumns_Erase(NIceDb::TNiceDb& db, ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
db.Table<IndexColumns>().Key(index, portion.GetDeprecatedGranuleId(), row.ColumnId,
static void IndexColumns_Erase(NIceDb::TNiceDb& db, const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
db.Table<IndexColumns>().Key(0, portion.GetDeprecatedGranuleId(), row.ColumnId,
portion.GetMinSnapshot().GetPlanStep(), portion.GetMinSnapshot().GetTxId(), portion.GetPortion(), row.Chunk).Delete();
}

static bool IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, ui32 index,
static bool IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector,
const std::function<void(const NOlap::TPortionInfo&, const NOlap::TColumnChunkLoadContext&)>& callback);

// IndexCounters

static void IndexCounters_Write(NIceDb::TNiceDb& db, ui32 index, ui32 counterId, ui64 value) {
db.Table<IndexCounters>().Key(index, counterId).Update(
static void IndexCounters_Write(NIceDb::TNiceDb& db, ui32 counterId, ui64 value) {
db.Table<IndexCounters>().Key(0, counterId).Update(
NIceDb::TUpdate<IndexCounters::ValueUI64>(value)
);
}

static bool IndexCounters_Load(NIceDb::TNiceDb& db, ui32 index, const std::function<void(ui32 id, ui64 value)>& callback) {
auto rowset = db.Table<IndexCounters>().Prefix(index).Select();
static bool IndexCounters_Load(NIceDb::TNiceDb& db, const std::function<void(ui32 id, ui64 value)>& callback) {
auto rowset = db.Table<IndexCounters>().Prefix(0).Select();
if (!rowset.IsReady())
return false;

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/changes/cleanup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ bool TCleanupColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TAp
continue;
}
for (auto& record : portionInfo.Records) {
self.ColumnsTable->Erase(context.DB, portionInfo, record);
context.DB.EraseColumn(portionInfo, record);
}
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/changes/with_appended.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange
self.UpsertPortion(portionInfo, &oldInfo);

for (auto& record : portionInfo.Records) {
self.ColumnsTable->Write(context.DB, portionInfo, record);
context.DB.WriteColumn(portionInfo, record);
}
}
for (auto& portionInfoWithBlobs : AppendedPortions) {
Expand All @@ -77,7 +77,7 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange
AFL_VERIFY(usedPortionIds.emplace(portionInfo.GetPortionId()).second)("portion_info", portionInfo.DebugString(true));
self.UpsertPortion(portionInfo);
for (auto& record : portionInfo.Records) {
self.ColumnsTable->Write(context.DB, portionInfo, record);
context.DB.WriteColumn(portionInfo, record);
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/engines/column_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,10 @@ class TVersionedIndex {
return Snapshots.rbegin()->second;
}

bool IsEmpty() const {
return Snapshots.empty();
}

const std::shared_ptr<arrow::Schema>& GetPrimaryKey() const noexcept {
return PrimaryKey;
}
Expand Down
18 changes: 7 additions & 11 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,7 @@ void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, c
}

void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& indexInfo) {
if (!ColumnsTable) {
ui32 indexId = indexInfo.GetId();
ColumnsTable = std::make_shared<TColumnsTable>(indexId);
CountersTable = std::make_shared<TCountersTable>(indexId);
} else {
if (!VersionedIndex.IsEmpty()) {
const NOlap::TIndexInfo& lastIndexInfo = VersionedIndex.GetLastSchema()->GetIndexInfo();
Y_ABORT_UNLESS(lastIndexInfo.CheckCompatible(indexInfo));
}
Expand Down Expand Up @@ -173,7 +169,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db) {
bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) {
TSnapshot lastSnapshot(0, 0);
const TIndexInfo* currentIndexInfo = nullptr;
auto result = ColumnsTable->Load(db, [&](const TPortionInfo& portion, const TColumnChunkLoadContext& loadContext) {
auto result = db.LoadColumns([&](const TPortionInfo& portion, const TColumnChunkLoadContext& loadContext) {
if (!currentIndexInfo || lastSnapshot != portion.GetMinSnapshot()) {
currentIndexInfo = &VersionedIndex.GetSchema(portion.GetMinSnapshot())->GetIndexInfo();
lastSnapshot = portion.GetMinSnapshot();
Expand Down Expand Up @@ -207,7 +203,7 @@ bool TColumnEngineForLogs::LoadCounters(IDbWrapper& db) {
}
};

return CountersTable->Load(db, callback);
return db.LoadCounters(callback);
}

std::shared_ptr<TInsertColumnEngineChanges> TColumnEngineForLogs::StartInsert(std::vector<TInsertedData>&& dataToIndex) noexcept {
Expand Down Expand Up @@ -476,13 +472,13 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE
TApplyChangesContext context(db, snapshot);
Y_ABORT_UNLESS(indexChanges->ApplyChanges(*this, context));
}
CountersTable->Write(db, LAST_PORTION, LastPortion);
CountersTable->Write(db, LAST_GRANULE, LastGranule);
db.WriteCounter(LAST_PORTION, LastPortion);
db.WriteCounter(LAST_GRANULE, LastGranule);

if (LastSnapshot < snapshot) {
LastSnapshot = snapshot;
CountersTable->Write(db, LAST_PLAN_STEP, LastSnapshot.GetPlanStep());
CountersTable->Write(db, LAST_TX_ID, LastSnapshot.GetTxId());
db.WriteCounter(LAST_PLAN_STEP, LastSnapshot.GetPlanStep());
db.WriteCounter(LAST_TX_ID, LastSnapshot.GetTxId());
}
return true;
}
Expand Down
40 changes: 0 additions & 40 deletions ydb/core/tx/columnshard/engines/columns_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,44 +7,4 @@

namespace NKikimr::NOlap {

class TColumnsTable {
public:
TColumnsTable(ui32 indexId)
: IndexId(indexId)
{}

void Write(IDbWrapper& db, const TPortionInfo& portion, const TColumnRecord& row) {
db.WriteColumn(IndexId, portion, row);
}

void Erase(IDbWrapper& db, const TPortionInfo& portion, const TColumnRecord& row) {
db.EraseColumn(IndexId, portion, row);
}

bool Load(IDbWrapper& db, std::function<void(const TPortionInfo&, const TColumnChunkLoadContext&)> callback) {
return db.LoadColumns(IndexId, callback);
}

private:
ui32 IndexId;
};

class TCountersTable {
public:
TCountersTable(ui32 indexId)
: IndexId(indexId)
{}

void Write(IDbWrapper& db, ui32 counterId, ui64 value) {
db.WriteCounter(IndexId, counterId, value);
}

bool Load(IDbWrapper& db, std::function<void(ui32 id, ui64 value)> callback) {
return db.LoadCounters(IndexId, callback);
}

private:
ui32 IndexId;
};

}
20 changes: 10 additions & 10 deletions ydb/core/tx/columnshard/engines/db_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,29 +40,29 @@ bool TDbWrapper::Load(TInsertTableAccessor& insertTable,
return NColumnShard::Schema::InsertTable_Load(db, DsGroupSelector, insertTable, loadTime);
}

void TDbWrapper::WriteColumn(ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
void TDbWrapper::WriteColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
NIceDb::TNiceDb db(Database);
NColumnShard::Schema::IndexColumns_Write(db, index, portion, row);
NColumnShard::Schema::IndexColumns_Write(db, portion, row);
}

void TDbWrapper::EraseColumn(ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
void TDbWrapper::EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
NIceDb::TNiceDb db(Database);
NColumnShard::Schema::IndexColumns_Erase(db, index, portion, row);
NColumnShard::Schema::IndexColumns_Erase(db, portion, row);
}

bool TDbWrapper::LoadColumns(ui32 index, const std::function<void(const NOlap::TPortionInfo&, const TColumnChunkLoadContext&)>& callback) {
bool TDbWrapper::LoadColumns(const std::function<void(const NOlap::TPortionInfo&, const TColumnChunkLoadContext&)>& callback) {
NIceDb::TNiceDb db(Database);
return NColumnShard::Schema::IndexColumns_Load(db, DsGroupSelector, index, callback);
return NColumnShard::Schema::IndexColumns_Load(db, DsGroupSelector, callback);
}

void TDbWrapper::WriteCounter(ui32 index, ui32 counterId, ui64 value) {
void TDbWrapper::WriteCounter(ui32 counterId, ui64 value) {
NIceDb::TNiceDb db(Database);
return NColumnShard::Schema::IndexCounters_Write(db, index, counterId, value);
return NColumnShard::Schema::IndexCounters_Write(db, counterId, value);
}

bool TDbWrapper::LoadCounters(ui32 index, const std::function<void(ui32 id, ui64 value)>& callback) {
bool TDbWrapper::LoadCounters(const std::function<void(ui32 id, ui64 value)>& callback) {
NIceDb::TNiceDb db(Database);
return NColumnShard::Schema::IndexCounters_Load(db, index, callback);
return NColumnShard::Schema::IndexCounters_Load(db, callback);
}

}
20 changes: 10 additions & 10 deletions ydb/core/tx/columnshard/engines/db_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ class IDbWrapper {
virtual bool Load(TInsertTableAccessor& insertTable,
const TInstant& loadTime) = 0;

virtual void WriteColumn(ui32 index, const TPortionInfo& portion, const TColumnRecord& row) = 0;
virtual void EraseColumn(ui32 index, const TPortionInfo& portion, const TColumnRecord& row) = 0;
virtual bool LoadColumns(ui32 index, const std::function<void(const TPortionInfo&, const TColumnChunkLoadContext&)>& callback) = 0;
virtual void WriteColumn(const TPortionInfo& portion, const TColumnRecord& row) = 0;
virtual void EraseColumn(const TPortionInfo& portion, const TColumnRecord& row) = 0;
virtual bool LoadColumns(const std::function<void(const TPortionInfo&, const TColumnChunkLoadContext&)>& callback) = 0;

virtual void WriteCounter(ui32 index, ui32 counterId, ui64 value) = 0;
virtual bool LoadCounters(ui32 index, const std::function<void(ui32 id, ui64 value)>& callback) = 0;
virtual void WriteCounter(ui32 counterId, ui64 value) = 0;
virtual bool LoadCounters(const std::function<void(ui32 id, ui64 value)>& callback) = 0;
};

class TDbWrapper : public IDbWrapper {
Expand All @@ -53,12 +53,12 @@ class TDbWrapper : public IDbWrapper {

bool Load(TInsertTableAccessor& insertTable, const TInstant& loadTime) override;

void WriteColumn(ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) override;
void EraseColumn(ui32 index, const NOlap::TPortionInfo& portion, const TColumnRecord& row) override;
bool LoadColumns(ui32 index, const std::function<void(const NOlap::TPortionInfo&, const TColumnChunkLoadContext&)>& callback) override;
void WriteColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) override;
void EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) override;
bool LoadColumns(const std::function<void(const NOlap::TPortionInfo&, const TColumnChunkLoadContext&)>& callback) override;

void WriteCounter(ui32 index, ui32 counterId, ui64 value) override;
bool LoadCounters(ui32 index, const std::function<void(ui32 id, ui64 value)>& callback) override;
void WriteCounter(ui32 counterId, ui64 value) override;
bool LoadCounters(const std::function<void(ui32 id, ui64 value)>& callback) override;

private:
NTable::TDatabase& Database;
Expand Down