Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ struct TBlobRange {
return BlobId;
}

bool IsValid() const {
return BlobId.IsValid() && Size && Offset + Size <= BlobId.BlobSize();
}

ui32 GetBlobSize() const {
return Size;
}
Expand Down
25 changes: 0 additions & 25 deletions ydb/core/tx/columnshard/columnshard_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,6 @@

namespace NKikimr::NColumnShard {

bool Schema::IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, const std::function<void(const NOlap::TPortionInfo&, const NOlap::TColumnChunkLoadContext&)>& callback) {
auto rowset = db.Table<IndexColumns>().Prefix(0).Select();
if (!rowset.IsReady()) {
return false;
}

while (!rowset.EndOfSet()) {
NOlap::TPortionInfo portion = NOlap::TPortionInfo::BuildEmpty();
portion.SetPathId(rowset.GetValue<IndexColumns::PathId>());
portion.SetMinSnapshot(rowset.GetValue<IndexColumns::PlanStep>(), rowset.GetValue<IndexColumns::TxId>());
portion.SetPortion(rowset.GetValue<IndexColumns::Portion>());
portion.SetDeprecatedGranuleId(rowset.GetValue<IndexColumns::Granule>());

NOlap::TColumnChunkLoadContext chunkLoadContext(rowset, dsGroupSelector);

portion.SetRemoveSnapshot(rowset.GetValue<IndexColumns::XPlanStep>(), rowset.GetValue<IndexColumns::XTxId>());

callback(portion, chunkLoadContext);

if (!rowset.Next())
return false;
}
return true;
}

bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, NOlap::TInsertTableAccessor& insertTable, const TInstant& /*loadTime*/) {
auto rowset = db.Table<InsertTable>().GreaterOrEqual(0, 0, 0, 0, "").Select();
if (!rowset.IsReady()) {
Expand Down
28 changes: 0 additions & 28 deletions ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -508,34 +508,6 @@ struct Schema : NIceDb::Schema {
NOlap::TInsertTableAccessor& insertTable,
const TInstant& loadTime);

// IndexColumns activities

static void IndexColumns_Write(NIceDb::TNiceDb& db, const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
auto proto = portion.GetMeta().SerializeToProto(row.ColumnId, row.Chunk);
auto rowProto = row.GetMeta().SerializeToProto();
if (proto) {
*rowProto.MutablePortionMeta() = std::move(*proto);
}
db.Table<IndexColumns>().Key(0, portion.GetDeprecatedGranuleId(), row.ColumnId,
portion.GetMinSnapshot().GetPlanStep(), portion.GetMinSnapshot().GetTxId(), portion.GetPortion(), row.Chunk).Update(
NIceDb::TUpdate<IndexColumns::XPlanStep>(portion.GetRemoveSnapshot().GetPlanStep()),
NIceDb::TUpdate<IndexColumns::XTxId>(portion.GetRemoveSnapshot().GetTxId()),
NIceDb::TUpdate<IndexColumns::Blob>(row.SerializedBlobId()),
NIceDb::TUpdate<IndexColumns::Metadata>(rowProto.SerializeAsString()),
NIceDb::TUpdate<IndexColumns::Offset>(row.BlobRange.Offset),
NIceDb::TUpdate<IndexColumns::Size>(row.BlobRange.Size),
NIceDb::TUpdate<IndexColumns::PathId>(portion.GetPathId())
);
}

static void IndexColumns_Erase(NIceDb::TNiceDb& db, const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
db.Table<IndexColumns>().Key(0, portion.GetDeprecatedGranuleId(), row.ColumnId,
portion.GetMinSnapshot().GetPlanStep(), portion.GetMinSnapshot().GetTxId(), portion.GetPortion(), row.Chunk).Delete();
}

static bool IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector,
const std::function<void(const NOlap::TPortionInfo&, const NOlap::TColumnChunkLoadContext&)>& callback);

// IndexCounters

static void IndexCounters_Write(NIceDb::TNiceDb& db, ui32 counterId, ui64 value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#include "settings.h"
#include <ydb/core/tx/columnshard/blobs_action/abstract/action.h>
#include <ydb/core/tx/columnshard/counters/indexation.h>
#include <ydb/core/tx/columnshard/engines/columns_table.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
#include <ydb/core/tx/columnshard/engines/portions/with_blobs.h>
#include <ydb/core/tx/columnshard/resource_subscriber/task.h>
Expand Down
4 changes: 1 addition & 3 deletions ydb/core/tx/columnshard/engines/changes/cleanup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ bool TCleanupColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TAp
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "Cannot erase portion")("portion", portionInfo.DebugString());
continue;
}
for (auto& record : portionInfo.Records) {
context.DB.EraseColumn(portionInfo, record);
}
portionInfo.RemoveFromDatabase(context.DB);
}

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ bool TPortionColumnCursor::NextChunk() {
ChunkRecordIndexStartPosition += CurrentChunkRecordsCount;
CurrentBlobChunk = BlobChunks[ChunkIdx];
CurrentColumnChunk = ColumnChunks[ChunkIdx];
CurrentChunkRecordsCount = CurrentBlobChunk->GetRecordsCount();
CurrentChunkRecordsCount = CurrentBlobChunk->GetRecordsCountVerified();
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ namespace NKikimr::NOlap::NCompaction {

class TPortionColumnCursor {
private:
std::vector<IPortionColumnChunk::TPtr> BlobChunks;
std::vector<std::shared_ptr<IPortionDataChunk>> BlobChunks;
std::vector<const TColumnRecord*> ColumnChunks;
std::optional<ui32> RecordIndexStart;
YDB_READONLY(ui32, RecordIndexFinish, 0);
ui32 ChunkRecordIndexStartPosition = 0;
ui32 ChunkIdx = 0;
IPortionColumnChunk::TPtr CurrentBlobChunk;
std::shared_ptr<IPortionDataChunk> CurrentBlobChunk;
const TColumnRecord* CurrentColumnChunk = nullptr;
ui32 CurrentChunkRecordsCount = 0;
std::shared_ptr<arrow::Array> CurrentArray;
Expand All @@ -38,19 +38,18 @@ class TPortionColumnCursor {

bool Fetch(TMergedColumn& column);

TPortionColumnCursor(const std::vector<IPortionColumnChunk::TPtr>& columnChunks, const std::vector<const TColumnRecord*>& records, const std::shared_ptr<TColumnLoader>& loader, const ui64 portionId)
TPortionColumnCursor(const std::vector<std::shared_ptr<IPortionDataChunk>>& columnChunks, const std::vector<const TColumnRecord*>& records, const std::shared_ptr<TColumnLoader>& loader, const ui64 portionId)
: BlobChunks(columnChunks)
, ColumnChunks(records)
, ColumnLoader(loader)
, PortionId(portionId)
{
, PortionId(portionId) {
AFL_VERIFY(ColumnLoader);
Y_UNUSED(PortionId);
Y_ABORT_UNLESS(BlobChunks.size());
Y_ABORT_UNLESS(ColumnChunks.size() == BlobChunks.size());
CurrentBlobChunk = BlobChunks.front();
CurrentColumnChunk = ColumnChunks.front();
CurrentChunkRecordsCount = CurrentBlobChunk->GetRecordsCount();
CurrentChunkRecordsCount = CurrentBlobChunk->GetRecordsCountVerified();
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@

namespace NKikimr::NOlap::NCompaction {

std::vector<NKikimr::NOlap::IPortionColumnChunk::TPtr> TChunkPreparation::DoInternalSplit(const TColumnSaver& saver, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const std::vector<ui64>& splitSizes) const {
std::vector<std::shared_ptr<IPortionDataChunk>> TChunkPreparation::DoInternalSplitImpl(const TColumnSaver& saver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const std::vector<ui64>& splitSizes) const {
auto loader = SchemaInfo->GetColumnLoaderVerified(Record.ColumnId);
auto rb = NArrow::TStatusValidator::GetValid(loader->Apply(Data));

auto chunks = TSimpleSplitter(saver, counters).SplitBySizes(rb, Data, splitSizes);
std::vector<IPortionColumnChunk::TPtr> newChunks;
std::vector<std::shared_ptr<IPortionDataChunk>> newChunks;
for (auto&& i : chunks) {
Y_ABORT_UNLESS(i.GetSlicedBatch()->num_columns() == 1);
newChunks.emplace_back(std::make_shared<TChunkPreparation>(
saver.Apply(i.GetSlicedBatch()), i.GetSlicedBatch()->column(0), ColumnId, SchemaInfo));
newChunks.emplace_back(std::make_shared<TChunkPreparation>(saver.Apply(i.GetSlicedBatch()), i.GetSlicedBatch()->column(0), GetColumnId(), SchemaInfo));
}
return newChunks;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ class TChunkPreparation: public IPortionColumnChunk {
std::shared_ptr<arrow::Scalar> First;
std::shared_ptr<arrow::Scalar> Last;
protected:
virtual std::vector<IPortionColumnChunk::TPtr> DoInternalSplit(const TColumnSaver& saver, std::shared_ptr<NColumnShard::TSplitterCounters> counters, const std::vector<ui64>& splitSizes) const override;
virtual std::vector<std::shared_ptr<IPortionDataChunk>> DoInternalSplitImpl(const TColumnSaver& saver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const std::vector<ui64>& splitSizes) const override;
virtual const TString& DoGetData() const override {
return Data;
}
virtual ui32 DoGetRecordsCount() const override {
virtual ui32 DoGetRecordsCountImpl() const override {
return Record.GetMeta().GetNumRowsVerified();
}
virtual TString DoDebugString() const override {
Expand Down Expand Up @@ -70,14 +70,15 @@ class TNullChunkPreparation: public IPortionColumnChunk {
const ui32 RecordsCount;
TString Data;
protected:
virtual std::vector<IPortionColumnChunk::TPtr> DoInternalSplit(const TColumnSaver& /*saver*/, std::shared_ptr<NColumnShard::TSplitterCounters> /*counters*/, const std::vector<ui64>& /*splitSizes*/) const override {
virtual std::vector<std::shared_ptr<IPortionDataChunk>> DoInternalSplitImpl(const TColumnSaver& /*saver*/, const std::shared_ptr<NColumnShard::TSplitterCounters>& /*counters*/,
const std::vector<ui64>& /*splitSizes*/) const override {
AFL_VERIFY(false);
return {};
}
virtual const TString& DoGetData() const override {
return Data;
}
virtual ui32 DoGetRecordsCount() const override {
virtual ui32 DoGetRecordsCountImpl() const override {
return RecordsCount;
}
virtual TString DoDebugString() const override {
Expand Down Expand Up @@ -107,7 +108,7 @@ class TNullChunkPreparation: public IPortionColumnChunk {

class TColumnPortionResult {
protected:
std::vector<std::shared_ptr<IPortionColumnChunk>> Chunks;
std::vector<std::shared_ptr<IPortionDataChunk>> Chunks;
ui64 CurrentPortionRecords = 0;
const ui32 ColumnId;
ui64 PackedSize = 0;
Expand All @@ -121,7 +122,7 @@ class TColumnPortionResult {

}

const std::vector<std::shared_ptr<IPortionColumnChunk>>& GetChunks() const {
const std::vector<std::shared_ptr<IPortionDataChunk>>& GetChunks() const {
return Chunks;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ void TMergedColumn::AppendSlice(const std::shared_ptr<arrow::Array>& data, const
}
}

std::vector<NKikimr::NOlap::NCompaction::TColumnPortionResult> TMergedColumn::BuildResult() {
std::vector<TColumnPortionResult> TMergedColumn::BuildResult() {
std::vector<TColumnPortionResult> result;
if (Portions.size()) {
Portions.back().FlushBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(TConstruc
auto dataSchema = context.SchemaVersions.GetSchema(p.GetPortionInfo().GetMinSnapshot());
auto loader = dataSchema->GetColumnLoaderOptional(columnId);
std::vector<const TColumnRecord*> records;
std::vector<IPortionColumnChunk::TPtr> chunks;
std::vector<std::shared_ptr<IPortionDataChunk>> chunks;
if (!p.ExtractColumnChunks(columnId, records, chunks)) {
AFL_VERIFY(!loader);
records = {nullptr};
Expand Down Expand Up @@ -177,7 +177,7 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(TConstruc
std::shared_ptr<TDefaultSchemaDetails> schemaDetails(new TDefaultSchemaDetails(resultSchema, SaverContext, stats));

for (ui32 i = 0; i < columnChunks.begin()->second.size(); ++i) {
std::map<ui32, std::vector<IPortionColumnChunk::TPtr>> portionColumns;
std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>> portionColumns;
for (auto&& p : columnChunks) {
portionColumns.emplace(p.first, p.second[i].GetChunks());
}
Expand All @@ -190,7 +190,7 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(TConstruc
for (auto&& i : packs) {
TGeneralSerializedSlice slice(std::move(i));
auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount());
std::vector<std::vector<IPortionColumnChunk::TPtr>> chunksByBlobs = slice.GroupChunksByBlobs();
std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>> chunksByBlobs = slice.GroupChunksByBlobs();
AppendedPortions.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(chunksByBlobs, nullptr, GranuleMeta->GetPathId(), resultSchema->GetSnapshot(), SaverContext.GetStorageOperator()));
NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey()));
NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot());
Expand Down
10 changes: 3 additions & 7 deletions ydb/core/tx/columnshard/engines/changes/with_appended.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,14 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange
AFL_VERIFY(usedPortionIds.emplace(portionInfo.GetPortionId()).second)("portion_info", portionInfo.DebugString(true));
self.UpsertPortion(portionInfo, &oldInfo);

for (auto& record : portionInfo.Records) {
context.DB.WriteColumn(portionInfo, record);
}
portionInfo.SaveToDatabase(context.DB);
}
for (auto& portionInfoWithBlobs : AppendedPortions) {
auto& portionInfo = portionInfoWithBlobs.GetPortionInfo();
Y_ABORT_UNLESS(!portionInfo.Empty());
AFL_VERIFY(usedPortionIds.emplace(portionInfo.GetPortionId()).second)("portion_info", portionInfo.DebugString(true));
self.UpsertPortion(portionInfo);
for (auto& record : portionInfo.Records) {
context.DB.WriteColumn(portionInfo, record);
}
portionInfo.SaveToDatabase(context.DB);
}
}

Expand Down Expand Up @@ -115,7 +111,7 @@ std::vector<TPortionInfoWithBlobs> TChangesWithAppend::MakeAppendedPortions(cons
auto schema = std::make_shared<TDefaultSchemaDetails>(resultSchema, SaverContext, stats);
TRBSplitLimiter limiter(context.Counters.SplitterCounters, schema, batch, SplitSettings);

std::vector<std::vector<IPortionColumnChunk::TPtr>> chunkByBlobs;
std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>> chunkByBlobs;
std::shared_ptr<arrow::RecordBatch> portionBatch;
while (limiter.Next(chunkByBlobs, portionBatch)) {
TPortionInfoWithBlobs infoWithBlob = TPortionInfoWithBlobs::BuildByBlobs(chunkByBlobs, nullptr, granule, snapshot, SaverContext.GetStorageOperator());
Expand Down
9 changes: 6 additions & 3 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db) {
bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) {
TSnapshot lastSnapshot(0, 0);
const TIndexInfo* currentIndexInfo = nullptr;
auto result = db.LoadColumns([&](const TPortionInfo& portion, const TColumnChunkLoadContext& loadContext) {
if (!db.LoadColumns([&](const TPortionInfo& portion, const TColumnChunkLoadContext& loadContext) {
if (!currentIndexInfo || lastSnapshot != portion.GetMinSnapshot()) {
currentIndexInfo = &VersionedIndex.GetSchema(portion.GetMinSnapshot())->GetIndexInfo();
lastSnapshot = portion.GetMinSnapshot();
Expand All @@ -178,11 +178,14 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) {
// Locate granule and append the record.
TColumnRecord rec(loadContext, *currentIndexInfo);
GetGranulePtrVerified(portion.GetPathId())->AddColumnRecord(*currentIndexInfo, portion, rec, loadContext.GetPortionMeta());
});
})) {
return false;
}

for (auto&& i : Tables) {
i.second->OnAfterPortionsLoad();
}
return result;
return true;
}

bool TColumnEngineForLogs::LoadCounters(IDbWrapper& db) {
Expand Down
45 changes: 42 additions & 3 deletions ydb/core/tx/columnshard/engines/db_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,56 @@ bool TDbWrapper::Load(TInsertTableAccessor& insertTable,

void TDbWrapper::WriteColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
NIceDb::TNiceDb db(Database);
NColumnShard::Schema::IndexColumns_Write(db, portion, row);
auto proto = portion.GetMeta().SerializeToProto(row.ColumnId, row.Chunk);
auto rowProto = row.GetMeta().SerializeToProto();
if (proto) {
*rowProto.MutablePortionMeta() = std::move(*proto);
}
using IndexColumns = NColumnShard::Schema::IndexColumns;
db.Table<IndexColumns>().Key(0, portion.GetDeprecatedGranuleId(), row.ColumnId,
portion.GetMinSnapshot().GetPlanStep(), portion.GetMinSnapshot().GetTxId(), portion.GetPortion(), row.Chunk).Update(
NIceDb::TUpdate<IndexColumns::XPlanStep>(portion.GetRemoveSnapshot().GetPlanStep()),
NIceDb::TUpdate<IndexColumns::XTxId>(portion.GetRemoveSnapshot().GetTxId()),
NIceDb::TUpdate<IndexColumns::Blob>(row.SerializedBlobId()),
NIceDb::TUpdate<IndexColumns::Metadata>(rowProto.SerializeAsString()),
NIceDb::TUpdate<IndexColumns::Offset>(row.BlobRange.Offset),
NIceDb::TUpdate<IndexColumns::Size>(row.BlobRange.Size),
NIceDb::TUpdate<IndexColumns::PathId>(portion.GetPathId())
);
}

void TDbWrapper::EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
NIceDb::TNiceDb db(Database);
NColumnShard::Schema::IndexColumns_Erase(db, portion, row);
using IndexColumns = NColumnShard::Schema::IndexColumns;
db.Table<IndexColumns>().Key(0, portion.GetDeprecatedGranuleId(), row.ColumnId,
portion.GetMinSnapshot().GetPlanStep(), portion.GetMinSnapshot().GetTxId(), portion.GetPortion(), row.Chunk).Delete();
}

bool TDbWrapper::LoadColumns(const std::function<void(const NOlap::TPortionInfo&, const TColumnChunkLoadContext&)>& callback) {
NIceDb::TNiceDb db(Database);
return NColumnShard::Schema::IndexColumns_Load(db, DsGroupSelector, callback);
using IndexColumns = NColumnShard::Schema::IndexColumns;
auto rowset = db.Table<IndexColumns>().Prefix(0).Select();
if (!rowset.IsReady()) {
return false;
}

while (!rowset.EndOfSet()) {
NOlap::TPortionInfo portion = NOlap::TPortionInfo::BuildEmpty();
portion.SetPathId(rowset.GetValue<IndexColumns::PathId>());
portion.SetMinSnapshot(rowset.GetValue<IndexColumns::PlanStep>(), rowset.GetValue<IndexColumns::TxId>());
portion.SetPortion(rowset.GetValue<IndexColumns::Portion>());
portion.SetDeprecatedGranuleId(rowset.GetValue<IndexColumns::Granule>());

NOlap::TColumnChunkLoadContext chunkLoadContext(rowset, DsGroupSelector);

portion.SetRemoveSnapshot(rowset.GetValue<IndexColumns::XPlanStep>(), rowset.GetValue<IndexColumns::XTxId>());

callback(portion, chunkLoadContext);

if (!rowset.Next())
return false;
}
return true;
}

void TDbWrapper::WriteCounter(ui32 counterId, ui64 value) {
Expand Down
Loading