Skip to content

Commit ec802df

Browse files
Merge f3a98fc into 8140e4b
2 parents 8140e4b + f3a98fc commit ec802df

35 files changed

+702
-244
lines changed

ydb/core/tx/columnshard/columnshard_schema.h

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -901,57 +901,94 @@ struct Schema : NIceDb::Schema {
901901
}
902902

903903
namespace NKikimr::NOlap {
904+
class TPortionLoadContext {
905+
private:
906+
YDB_READONLY(ui64, PathId, 0);
907+
YDB_READONLY(ui64, PortionId, 0);
908+
YDB_READONLY_DEF(NKikimrTxColumnShard::TIndexPortionMeta, MetaProto);
909+
910+
public:
911+
template <class TSource>
912+
TPortionLoadContext(const TSource& rowset) {
913+
PathId = rowset.template GetValue<NColumnShard::Schema::IndexPortions::PathId>();
914+
PortionId = rowset.template GetValue<NColumnShard::Schema::IndexPortions::PortionId>();
915+
const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexPortions::Metadata>();
916+
AFL_VERIFY(MetaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf");
917+
}
918+
};
919+
904920
class TColumnChunkLoadContext {
905921
private:
906922
YDB_READONLY_DEF(TBlobRange, BlobRange);
907923
TChunkAddress Address;
924+
YDB_READONLY(ui64, PathId, 0);
925+
YDB_READONLY(ui64, PortionId, 0);
908926
YDB_READONLY_DEF(NKikimrTxColumnShard::TIndexColumnMeta, MetaProto);
927+
YDB_READONLY(TSnapshot, RemoveSnapshot, TSnapshot::Zero());
928+
YDB_READONLY(TSnapshot, MinSnapshotDeprecated, TSnapshot::Zero());
929+
909930
public:
910931
const TChunkAddress& GetAddress() const {
911932
return Address;
912933
}
913934

914-
TColumnChunkLoadContext(const TChunkAddress& address, const TBlobRange& bRange, const NKikimrTxColumnShard::TIndexColumnMeta& metaProto)
935+
TColumnChunkLoadContext(const ui64 pathId, const ui64 portionId, const TChunkAddress& address, const TBlobRange& bRange,
936+
const NKikimrTxColumnShard::TIndexColumnMeta& metaProto)
915937
: BlobRange(bRange)
916938
, Address(address)
917-
, MetaProto(metaProto)
918-
{
919-
939+
, PathId(pathId)
940+
, PortionId(portionId)
941+
, MetaProto(metaProto) {
920942
}
921943

922944
template <class TSource>
923945
TColumnChunkLoadContext(const TSource& rowset, const IBlobGroupSelector* dsGroupSelector)
924-
: Address(rowset.template GetValue<NColumnShard::Schema::IndexColumns::ColumnIdx>(), rowset.template GetValue<NColumnShard::Schema::IndexColumns::Chunk>()) {
946+
: Address(rowset.template GetValue<NColumnShard::Schema::IndexColumns::ColumnIdx>(),
947+
rowset.template GetValue<NColumnShard::Schema::IndexColumns::Chunk>())
948+
, RemoveSnapshot(rowset.template GetValue<NColumnShard::Schema::IndexColumns::XPlanStep>(),
949+
rowset.template GetValue<NColumnShard::Schema::IndexColumns::XTxId>())
950+
, MinSnapshotDeprecated(rowset.template GetValue<NColumnShard::Schema::IndexColumns::PlanStep>(),
951+
rowset.template GetValue<NColumnShard::Schema::IndexColumns::TxId>())
952+
{
925953
AFL_VERIFY(Address.GetColumnId())("event", "incorrect address")("address", Address.DebugString());
926954
TString strBlobId = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Blob>();
927955
Y_ABORT_UNLESS(strBlobId.size() == sizeof(TLogoBlobID), "Size %" PRISZT " doesn't match TLogoBlobID", strBlobId.size());
928956
TLogoBlobID logoBlobId((const ui64*)strBlobId.data());
929957
BlobRange.BlobId = NOlap::TUnifiedBlobId(dsGroupSelector->GetGroup(logoBlobId), logoBlobId);
930958
BlobRange.Offset = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Offset>();
931959
BlobRange.Size = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Size>();
960+
PathId = rowset.template GetValue<NColumnShard::Schema::IndexColumns::PathId>();
961+
PortionId = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Portion>();
932962
AFL_VERIFY(BlobRange.BlobId.IsValid() && BlobRange.Size)("event", "incorrect blob")("blob", BlobRange.ToString());
933963

934964
const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Metadata>();
935965
AFL_VERIFY(MetaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf");
936966
}
937-
938-
const NKikimrTxColumnShard::TIndexPortionMeta* GetPortionMeta() const {
939-
if (MetaProto.HasPortionMeta()) {
940-
return &MetaProto.GetPortionMeta();
941-
} else {
942-
return nullptr;
943-
}
944-
}
945967
};
946968

947969
class TIndexChunkLoadContext {
948970
private:
949971
YDB_READONLY_DEF(std::optional<TBlobRange>, BlobRange);
950972
YDB_READONLY_DEF(std::optional<TString>, BlobData);
973+
YDB_READONLY(ui64, PathId, 0);
974+
YDB_READONLY(ui64, PortionId, 0);
951975
TChunkAddress Address;
952976
const ui32 RecordsCount;
953977
const ui32 RawBytes;
954978
public:
979+
ui32 GetRawBytes() const {
980+
return RawBytes;
981+
}
982+
983+
ui32 GetDataSize() const {
984+
if (BlobRange) {
985+
return BlobRange->GetSize();
986+
} else {
987+
AFL_VERIFY(!!BlobData);
988+
return BlobData->size();
989+
}
990+
}
991+
955992
TIndexChunk BuildIndexChunk(const TBlobRangeLink16::TLinkId blobLinkId) const {
956993
AFL_VERIFY(BlobRange);
957994
return TIndexChunk(Address.GetColumnId(), Address.GetChunkIdx(), RecordsCount, RawBytes, BlobRange->BuildLink(blobLinkId));
@@ -964,7 +1001,9 @@ class TIndexChunkLoadContext {
9641001

9651002
template <class TSource>
9661003
TIndexChunkLoadContext(const TSource& rowset, const IBlobGroupSelector* dsGroupSelector)
967-
: Address(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::IndexId>(), rowset.template GetValue<NColumnShard::Schema::IndexIndexes::ChunkIdx>())
1004+
: PathId(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::PathId>())
1005+
, PortionId(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::PortionId>())
1006+
, Address(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::IndexId>(), rowset.template GetValue<NColumnShard::Schema::IndexIndexes::ChunkIdx>())
9681007
, RecordsCount(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::RecordsCount>())
9691008
, RawBytes(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::RawBytes>())
9701009
{

ydb/core/tx/columnshard/common/blob.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,14 @@ struct TBlobRange {
189189
ui32 Offset;
190190
ui32 Size;
191191

192+
ui32 GetSize() const {
193+
return Size;
194+
}
195+
196+
ui32 GetOffset() const {
197+
return Offset;
198+
}
199+
192200
TString GetData(const TString& blobData) const;
193201

194202
bool operator<(const TBlobRange& br) const {

ydb/core/tx/columnshard/data_sharing/destination/session/destination.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ NKikimr::TConclusionStatus TDestinationSession::DataReceived(
2020
AFL_VERIFY(it != PathIds.end())("path_id_undefined", i.first);
2121
for (auto&& portion : i.second.DetachPortions()) {
2222
portion.MutablePortionInfo().SetPathId(it->second);
23-
index.AppendPortion(portion.GetPortionInfo());
23+
index.AppendPortion(portion.MutablePortionInfoPtr());
2424
}
2525
}
2626
return TConclusionStatus::Success();

ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@ bool TTxDataFromSource::DoExecute(NTabletFlatExecutor::TTransactionContext& txc,
1717
}
1818
dbWrapper.WriteCounter(TColumnEngineForLogs::LAST_PORTION, *lastPortionPtr);
1919
}
20-
auto schemaPtr = index.GetVersionedIndex().GetLastSchema();
2120
THashMap<TString, THashSet<NBlobCache::TUnifiedBlobId>> sharedBlobIds;
2221
for (auto&& i : PortionsByPathId) {
2322
for (auto&& p : i.second.GetPortions()) {
24-
p.SaveToDatabase(dbWrapper, schemaPtr->GetIndexInfo().GetPKFirstColumnId(), false);
23+
p.SaveToDatabase(dbWrapper, false);
2524
}
2625
}
2726
NIceDb::TNiceDb db(txc.DB);

ydb/core/tx/columnshard/engines/changes/with_appended.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ namespace NKikimr::NOlap {
1212

1313
void TChangesWithAppend::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) {
1414
THashSet<ui64> usedPortionIds;
15-
auto schemaPtr = context.EngineLogs.GetVersionedIndex().GetLastSchema();
1615

1716
for (auto&& [_, i] : PortionsToRemove) {
1817
Y_ABORT_UNLESS(!i->HasRemoveSnapshot());
@@ -21,7 +20,7 @@ void TChangesWithAppend::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self,
2120
portionCopy.SetRemoveSnapshot(context.Snapshot);
2221
};
2322
context.EngineLogs.GetGranuleVerified(i->GetPathId())
24-
.ModifyPortionOnExecute(context.DBWrapper, i, pred, schemaPtr->GetIndexInfo().GetPKFirstColumnId());
23+
.ModifyPortionOnExecute(context.DBWrapper, i, pred);
2524
}
2625

2726
const auto predRemoveDroppedTable = [self](const TWritePortionInfoWithBlobsResult& item) {
@@ -38,14 +37,14 @@ void TChangesWithAppend::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self,
3837
for (auto& portionInfoWithBlobs : AppendedPortions) {
3938
const auto& portionInfo = portionInfoWithBlobs.GetPortionResult().GetPortionInfoPtr();
4039
AFL_VERIFY(usedPortionIds.emplace(portionInfo->GetPortionId()).second)("portion_info", portionInfo->DebugString(true));
41-
portionInfoWithBlobs.GetPortionResult().SaveToDatabase(context.DBWrapper, schemaPtr->GetIndexInfo().GetPKFirstColumnId(), false);
40+
portionInfoWithBlobs.GetPortionResult().SaveToDatabase(context.DBWrapper, false);
4241
}
4342
for (auto&& [_, i] : PortionsToMove) {
4443
const auto pred = [&](TPortionInfo& portionCopy) {
4544
portionCopy.MutableMeta().ResetCompactionLevel(TargetCompactionLevel.value_or(0));
4645
};
4746
context.EngineLogs.GetGranuleVerified(i->GetPathId())
48-
.ModifyPortionOnExecute(context.DBWrapper, i, pred, schemaPtr->GetIndexInfo().GetPKFirstColumnId());
47+
.ModifyPortionOnExecute(context.DBWrapper, i, pred);
4948
}
5049
}
5150

@@ -111,7 +110,7 @@ void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self
111110
context.EngineLogs.AddCleanupPortion(i);
112111
}
113112
for (auto& portionBuilder : AppendedPortions) {
114-
context.EngineLogs.AppendPortion(portionBuilder.GetPortionResult().GetPortionInfo());
113+
context.EngineLogs.AppendPortion(portionBuilder.GetPortionResult().MutablePortionInfoPtr());
115114
}
116115
}
117116
}

0 commit comments

Comments
 (0)