Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ ydb/services/ydb/ut YdbLogStore.AlterLogTable
ydb/core/mind/hive/ut THiveTest.DrainWithHiveRestart
ydb/core/persqueue/ut [*/*] chunk chunk
ydb/core/quoter/ut QuoterWithKesusTest.PrefetchCoefficient
ydb/core/tx/columnshard/ut_rw Normalizers.CleanEmptyPortionsNormalizer
ydb/core/tx/schemeshard/ut_move_reboots TSchemeShardMoveRebootsTest.WithData
ydb/core/tx/schemeshard/ut_move_reboots TSchemeShardMoveRebootsTest.WithDataAndPersistentPartitionStats
ydb/core/tx/schemeshard/ut_pq_reboots TPqGroupTestReboots.AlterWithReboots-PQConfigTransactionsAtSchemeShard-false
Expand Down
67 changes: 53 additions & 14 deletions ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -901,57 +901,94 @@ struct Schema : NIceDb::Schema {
}

namespace NKikimr::NOlap {
class TPortionLoadContext {
private:
YDB_READONLY(ui64, PathId, 0);
YDB_READONLY(ui64, PortionId, 0);
YDB_READONLY_DEF(NKikimrTxColumnShard::TIndexPortionMeta, MetaProto);

public:
template <class TSource>
TPortionLoadContext(const TSource& rowset) {
PathId = rowset.template GetValue<NColumnShard::Schema::IndexPortions::PathId>();
PortionId = rowset.template GetValue<NColumnShard::Schema::IndexPortions::PortionId>();
const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexPortions::Metadata>();
AFL_VERIFY(MetaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf");
}
};

class TColumnChunkLoadContext {
private:
YDB_READONLY_DEF(TBlobRange, BlobRange);
TChunkAddress Address;
YDB_READONLY(ui64, PathId, 0);
YDB_READONLY(ui64, PortionId, 0);
YDB_READONLY_DEF(NKikimrTxColumnShard::TIndexColumnMeta, MetaProto);
YDB_READONLY(TSnapshot, RemoveSnapshot, TSnapshot::Zero());
YDB_READONLY(TSnapshot, MinSnapshotDeprecated, TSnapshot::Zero());

public:
const TChunkAddress& GetAddress() const {
return Address;
}

TColumnChunkLoadContext(const TChunkAddress& address, const TBlobRange& bRange, const NKikimrTxColumnShard::TIndexColumnMeta& metaProto)
TColumnChunkLoadContext(const ui64 pathId, const ui64 portionId, const TChunkAddress& address, const TBlobRange& bRange,
const NKikimrTxColumnShard::TIndexColumnMeta& metaProto)
: BlobRange(bRange)
, Address(address)
, MetaProto(metaProto)
{

, PathId(pathId)
, PortionId(portionId)
, MetaProto(metaProto) {
}

template <class TSource>
TColumnChunkLoadContext(const TSource& rowset, const IBlobGroupSelector* dsGroupSelector)
: Address(rowset.template GetValue<NColumnShard::Schema::IndexColumns::ColumnIdx>(), rowset.template GetValue<NColumnShard::Schema::IndexColumns::Chunk>()) {
: Address(rowset.template GetValue<NColumnShard::Schema::IndexColumns::ColumnIdx>(),
rowset.template GetValue<NColumnShard::Schema::IndexColumns::Chunk>())
, RemoveSnapshot(rowset.template GetValue<NColumnShard::Schema::IndexColumns::XPlanStep>(),
rowset.template GetValue<NColumnShard::Schema::IndexColumns::XTxId>())
, MinSnapshotDeprecated(rowset.template GetValue<NColumnShard::Schema::IndexColumns::PlanStep>(),
rowset.template GetValue<NColumnShard::Schema::IndexColumns::TxId>())
{
AFL_VERIFY(Address.GetColumnId())("event", "incorrect address")("address", Address.DebugString());
TString strBlobId = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Blob>();
Y_ABORT_UNLESS(strBlobId.size() == sizeof(TLogoBlobID), "Size %" PRISZT " doesn't match TLogoBlobID", strBlobId.size());
TLogoBlobID logoBlobId((const ui64*)strBlobId.data());
BlobRange.BlobId = NOlap::TUnifiedBlobId(dsGroupSelector->GetGroup(logoBlobId), logoBlobId);
BlobRange.Offset = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Offset>();
BlobRange.Size = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Size>();
PathId = rowset.template GetValue<NColumnShard::Schema::IndexColumns::PathId>();
PortionId = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Portion>();
AFL_VERIFY(BlobRange.BlobId.IsValid() && BlobRange.Size)("event", "incorrect blob")("blob", BlobRange.ToString());

const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Metadata>();
AFL_VERIFY(MetaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf");
}

const NKikimrTxColumnShard::TIndexPortionMeta* GetPortionMeta() const {
if (MetaProto.HasPortionMeta()) {
return &MetaProto.GetPortionMeta();
} else {
return nullptr;
}
}
};

class TIndexChunkLoadContext {
private:
YDB_READONLY_DEF(std::optional<TBlobRange>, BlobRange);
YDB_READONLY_DEF(std::optional<TString>, BlobData);
YDB_READONLY(ui64, PathId, 0);
YDB_READONLY(ui64, PortionId, 0);
TChunkAddress Address;
const ui32 RecordsCount;
const ui32 RawBytes;
public:
ui32 GetRawBytes() const {
return RawBytes;
}

ui32 GetDataSize() const {
if (BlobRange) {
return BlobRange->GetSize();
} else {
AFL_VERIFY(!!BlobData);
return BlobData->size();
}
}

TIndexChunk BuildIndexChunk(const TBlobRangeLink16::TLinkId blobLinkId) const {
AFL_VERIFY(BlobRange);
return TIndexChunk(Address.GetColumnId(), Address.GetChunkIdx(), RecordsCount, RawBytes, BlobRange->BuildLink(blobLinkId));
Expand All @@ -964,7 +1001,9 @@ class TIndexChunkLoadContext {

template <class TSource>
TIndexChunkLoadContext(const TSource& rowset, const IBlobGroupSelector* dsGroupSelector)
: Address(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::IndexId>(), rowset.template GetValue<NColumnShard::Schema::IndexIndexes::ChunkIdx>())
: PathId(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::PathId>())
, PortionId(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::PortionId>())
, Address(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::IndexId>(), rowset.template GetValue<NColumnShard::Schema::IndexIndexes::ChunkIdx>())
, RecordsCount(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::RecordsCount>())
, RawBytes(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::RawBytes>())
{
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/columnshard/common/blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,14 @@ struct TBlobRange {
ui32 Offset;
ui32 Size;

ui32 GetSize() const {
return Size;
}

ui32 GetOffset() const {
return Offset;
}

TString GetData(const TString& blobData) const;

bool operator<(const TBlobRange& br) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ NKikimr::TConclusionStatus TDestinationSession::DataReceived(
AFL_VERIFY(it != PathIds.end())("path_id_undefined", i.first);
for (auto&& portion : i.second.DetachPortions()) {
portion.MutablePortionInfo().SetPathId(it->second);
index.AppendPortion(portion.GetPortionInfo());
index.AppendPortion(portion.MutablePortionInfoPtr());
}
}
return TConclusionStatus::Success();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self
context.EngineLogs.AddCleanupPortion(i);
}
for (auto& portionBuilder : AppendedPortions) {
context.EngineLogs.AppendPortion(portionBuilder.GetPortionResult().GetPortionInfo());
context.EngineLogs.AppendPortion(portionBuilder.GetPortionResult().MutablePortionInfoPtr());
}
}
}
Expand Down
Loading
Loading