Skip to content

Commit ecdc254

Browse files
Merge 9522b76 into d66a5bd
2 parents d66a5bd + 9522b76 commit ecdc254

29 files changed

+674
-206
lines changed

ydb/core/tx/columnshard/columnshard_schema.h

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -901,57 +901,94 @@ struct Schema : NIceDb::Schema {
901901
}
902902

903903
namespace NKikimr::NOlap {
904+
class TPortionLoadContext {
905+
private:
906+
YDB_READONLY(ui64, PathId, 0);
907+
YDB_READONLY(ui64, PortionId, 0);
908+
YDB_READONLY_DEF(NKikimrTxColumnShard::TIndexPortionMeta, MetaProto);
909+
910+
public:
911+
template <class TSource>
912+
TPortionLoadContext(const TSource& rowset) {
913+
PathId = rowset.template GetValue<NColumnShard::Schema::IndexPortions::PathId>();
914+
PortionId = rowset.template GetValue<NColumnShard::Schema::IndexPortions::PortionId>();
915+
const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexPortions::Metadata>();
916+
AFL_VERIFY(MetaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf");
917+
}
918+
};
919+
904920
class TColumnChunkLoadContext {
905921
private:
906922
YDB_READONLY_DEF(TBlobRange, BlobRange);
907923
TChunkAddress Address;
924+
YDB_READONLY(ui64, PathId, 0);
925+
YDB_READONLY(ui64, PortionId, 0);
908926
YDB_READONLY_DEF(NKikimrTxColumnShard::TIndexColumnMeta, MetaProto);
927+
YDB_READONLY(TSnapshot, RemoveSnapshot, TSnapshot::Zero());
928+
YDB_READONLY(TSnapshot, MinSnapshotDeprecated, TSnapshot::Zero());
929+
909930
public:
910931
const TChunkAddress& GetAddress() const {
911932
return Address;
912933
}
913934

914-
TColumnChunkLoadContext(const TChunkAddress& address, const TBlobRange& bRange, const NKikimrTxColumnShard::TIndexColumnMeta& metaProto)
935+
TColumnChunkLoadContext(const ui64 pathId, const ui64 portionId, const TChunkAddress& address, const TBlobRange& bRange,
936+
const NKikimrTxColumnShard::TIndexColumnMeta& metaProto)
915937
: BlobRange(bRange)
916938
, Address(address)
917-
, MetaProto(metaProto)
918-
{
919-
939+
, PathId(pathId)
940+
, PortionId(portionId)
941+
, MetaProto(metaProto) {
920942
}
921943

922944
template <class TSource>
923945
TColumnChunkLoadContext(const TSource& rowset, const IBlobGroupSelector* dsGroupSelector)
924-
: Address(rowset.template GetValue<NColumnShard::Schema::IndexColumns::ColumnIdx>(), rowset.template GetValue<NColumnShard::Schema::IndexColumns::Chunk>()) {
946+
: Address(rowset.template GetValue<NColumnShard::Schema::IndexColumns::ColumnIdx>(),
947+
rowset.template GetValue<NColumnShard::Schema::IndexColumns::Chunk>())
948+
, RemoveSnapshot(rowset.template GetValue<NColumnShard::Schema::IndexColumns::XPlanStep>(),
949+
rowset.template GetValue<NColumnShard::Schema::IndexColumns::XTxId>())
950+
, MinSnapshotDeprecated(rowset.template GetValue<NColumnShard::Schema::IndexColumns::PlanStep>(),
951+
rowset.template GetValue<NColumnShard::Schema::IndexColumns::TxId>())
952+
{
925953
AFL_VERIFY(Address.GetColumnId())("event", "incorrect address")("address", Address.DebugString());
926954
TString strBlobId = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Blob>();
927955
Y_ABORT_UNLESS(strBlobId.size() == sizeof(TLogoBlobID), "Size %" PRISZT " doesn't match TLogoBlobID", strBlobId.size());
928956
TLogoBlobID logoBlobId((const ui64*)strBlobId.data());
929957
BlobRange.BlobId = NOlap::TUnifiedBlobId(dsGroupSelector->GetGroup(logoBlobId), logoBlobId);
930958
BlobRange.Offset = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Offset>();
931959
BlobRange.Size = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Size>();
960+
PathId = rowset.template GetValue<NColumnShard::Schema::IndexColumns::PathId>();
961+
PortionId = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Portion>();
932962
AFL_VERIFY(BlobRange.BlobId.IsValid() && BlobRange.Size)("event", "incorrect blob")("blob", BlobRange.ToString());
933963

934964
const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Metadata>();
935965
AFL_VERIFY(MetaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf");
936966
}
937-
938-
const NKikimrTxColumnShard::TIndexPortionMeta* GetPortionMeta() const {
939-
if (MetaProto.HasPortionMeta()) {
940-
return &MetaProto.GetPortionMeta();
941-
} else {
942-
return nullptr;
943-
}
944-
}
945967
};
946968

947969
class TIndexChunkLoadContext {
948970
private:
949971
YDB_READONLY_DEF(std::optional<TBlobRange>, BlobRange);
950972
YDB_READONLY_DEF(std::optional<TString>, BlobData);
973+
YDB_READONLY(ui64, PathId, 0);
974+
YDB_READONLY(ui64, PortionId, 0);
951975
TChunkAddress Address;
952976
const ui32 RecordsCount;
953977
const ui32 RawBytes;
954978
public:
979+
ui32 GetRawBytes() const {
980+
return RawBytes;
981+
}
982+
983+
ui32 GetDataSize() const {
984+
if (BlobRange) {
985+
return BlobRange->GetSize();
986+
} else {
987+
AFL_VERIFY(!!BlobData);
988+
return BlobData->size();
989+
}
990+
}
991+
955992
TIndexChunk BuildIndexChunk(const TBlobRangeLink16::TLinkId blobLinkId) const {
956993
AFL_VERIFY(BlobRange);
957994
return TIndexChunk(Address.GetColumnId(), Address.GetChunkIdx(), RecordsCount, RawBytes, BlobRange->BuildLink(blobLinkId));
@@ -964,7 +1001,9 @@ class TIndexChunkLoadContext {
9641001

9651002
template <class TSource>
9661003
TIndexChunkLoadContext(const TSource& rowset, const IBlobGroupSelector* dsGroupSelector)
967-
: Address(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::IndexId>(), rowset.template GetValue<NColumnShard::Schema::IndexIndexes::ChunkIdx>())
1004+
: PathId(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::PathId>())
1005+
, PortionId(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::PortionId>())
1006+
, Address(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::IndexId>(), rowset.template GetValue<NColumnShard::Schema::IndexIndexes::ChunkIdx>())
9681007
, RecordsCount(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::RecordsCount>())
9691008
, RawBytes(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::RawBytes>())
9701009
{

ydb/core/tx/columnshard/common/blob.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,14 @@ struct TBlobRange {
189189
ui32 Offset;
190190
ui32 Size;
191191

192+
ui32 GetSize() const {
193+
return Size;
194+
}
195+
196+
ui32 GetOffset() const {
197+
return Offset;
198+
}
199+
192200
TString GetData(const TString& blobData) const;
193201

194202
bool operator<(const TBlobRange& br) const {

ydb/core/tx/columnshard/engines/changes/with_appended.cpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,14 @@ namespace NKikimr::NOlap {
1212

1313
void TChangesWithAppend::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) {
1414
THashSet<ui64> usedPortionIds;
15-
auto schemaPtr = context.EngineLogs.GetVersionedIndex().GetLastSchema();
16-
1715
for (auto&& [_, i] : PortionsToRemove) {
1816
Y_ABORT_UNLESS(!i->HasRemoveSnapshot());
1917
AFL_VERIFY(usedPortionIds.emplace(i->GetPortionId()).second)("portion_info", i->DebugString(true));
2018
const auto pred = [&](TPortionInfo& portionCopy) {
2119
portionCopy.SetRemoveSnapshot(context.Snapshot);
2220
};
2321
context.EngineLogs.GetGranuleVerified(i->GetPathId())
24-
.ModifyPortionOnExecute(context.DBWrapper, i, pred, schemaPtr->GetIndexInfo().GetPKFirstColumnId());
22+
.ModifyPortionOnExecute(context.DBWrapper, i, pred);
2523
}
2624

2725
const auto predRemoveDroppedTable = [self](const TWritePortionInfoWithBlobsResult& item) {
@@ -38,14 +36,14 @@ void TChangesWithAppend::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self,
3836
for (auto& portionInfoWithBlobs : AppendedPortions) {
3937
const auto& portionInfo = portionInfoWithBlobs.GetPortionResult().GetPortionInfoPtr();
4038
AFL_VERIFY(usedPortionIds.emplace(portionInfo->GetPortionId()).second)("portion_info", portionInfo->DebugString(true));
41-
portionInfoWithBlobs.GetPortionResult().SaveToDatabase(context.DBWrapper, schemaPtr->GetIndexInfo().GetPKFirstColumnId(), false);
39+
portionInfoWithBlobs.GetPortionResult().SaveToDatabase(context.DBWrapper, false);
4240
}
4341
for (auto&& [_, i] : PortionsToMove) {
4442
const auto pred = [&](TPortionInfo& portionCopy) {
4543
portionCopy.MutableMeta().ResetCompactionLevel(TargetCompactionLevel.value_or(0));
4644
};
4745
context.EngineLogs.GetGranuleVerified(i->GetPathId())
48-
.ModifyPortionOnExecute(context.DBWrapper, i, pred, schemaPtr->GetIndexInfo().GetPKFirstColumnId());
46+
.ModifyPortionOnExecute(context.DBWrapper, i, pred);
4947
}
5048
}
5149

0 commit comments

Comments
 (0)