Skip to content

Commit 717a5a0

Browse files
dont use default columns for merger (#12380)
1 parent ed0816f commit 717a5a0

File tree

5 files changed

+77
-66
lines changed

5 files changed

+77
-66
lines changed

ydb/core/tx/columnshard/engines/changes/general_compaction.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(
143143

144144
for (auto&& i : portions) {
145145
auto blobsSchema = i.GetPortionInfo().GetSchema(context.SchemaVersions);
146-
auto batch = i.RestoreBatch(*blobsSchema, *resultFiltered, seqDataColumnIds).DetachResult();
146+
auto batch = i.RestoreBatch(*blobsSchema, *resultFiltered, seqDataColumnIds, false).DetachResult();
147147
std::shared_ptr<NArrow::TColumnFilter> filter =
148148
BuildPortionFilter(shardingActual, batch, i.GetPortionInfo(), usedPortionIds, resultFiltered);
149149
merger.AddBatch(batch, filter);

ydb/core/tx/columnshard/engines/portions/data_accessor.cpp

Lines changed: 69 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -17,71 +17,78 @@
1717
namespace NKikimr::NOlap {
1818

1919
namespace {
20+
21+
void FillDefaultColumn(
22+
TPortionDataAccessor::TColumnAssemblingInfo& column, const TPortionInfo& portionInfo, const TSnapshot& defaultSnapshot) {
23+
if (column.GetColumnId() == (ui32)IIndexInfo::ESpecialColumn::PLAN_STEP) {
24+
column.AddBlobInfo(0, portionInfo.GetRecordsCount(),
25+
TPortionDataAccessor::TAssembleBlobInfo(
26+
portionInfo.GetRecordsCount(), std::make_shared<arrow::UInt64Scalar>(defaultSnapshot.GetPlanStep()), false));
27+
}
28+
if (column.GetColumnId() == (ui32)IIndexInfo::ESpecialColumn::TX_ID) {
29+
column.AddBlobInfo(0, portionInfo.GetRecordsCount(),
30+
TPortionDataAccessor::TAssembleBlobInfo(
31+
portionInfo.GetRecordsCount(), std::make_shared<arrow::UInt64Scalar>(defaultSnapshot.GetTxId()), false));
32+
}
33+
if (column.GetColumnId() == (ui32)IIndexInfo::ESpecialColumn::WRITE_ID) {
34+
column.AddBlobInfo(0, portionInfo.GetRecordsCount(),
35+
TPortionDataAccessor::TAssembleBlobInfo(
36+
portionInfo.GetRecordsCount(), std::make_shared<arrow::UInt64Scalar>((ui64)portionInfo.GetInsertWriteIdVerified()), false));
37+
}
38+
if (column.GetColumnId() == (ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG) {
39+
AFL_VERIFY(portionInfo.GetRecordsCount() == portionInfo.GetMeta().GetDeletionsCount() || portionInfo.GetMeta().GetDeletionsCount() == 0)("deletes",
40+
portionInfo.GetMeta().GetDeletionsCount())("count", portionInfo.GetRecordsCount());
41+
column.AddBlobInfo(0, portionInfo.GetRecordsCount(),
42+
TPortionDataAccessor::TAssembleBlobInfo(
43+
portionInfo.GetRecordsCount(), std::make_shared<arrow::BooleanScalar>((bool)portionInfo.GetMeta().GetDeletionsCount()), true));
44+
}
45+
}
46+
2047
template <class TExternalBlobInfo>
2148
TPortionDataAccessor::TPreparedBatchData PrepareForAssembleImpl(const TPortionDataAccessor& portionData, const TPortionInfo& portionInfo,
2249
const ISnapshotSchema& dataSchema, const ISnapshotSchema& resultSchema, THashMap<TChunkAddress, TExternalBlobInfo>& blobsData,
23-
const std::optional<TSnapshot>& defaultSnapshot) {
50+
const std::optional<TSnapshot>& defaultSnapshot, const bool restoreAbsent) {
2451
std::vector<TPortionDataAccessor::TColumnAssemblingInfo> columns;
2552
columns.reserve(resultSchema.GetColumnIds().size());
2653
const ui32 rowsCount = portionInfo.GetRecordsCount();
54+
auto it = portionData.GetRecordsVerified().begin();
55+
56+
TSnapshot defaultSnapshotLocal = TSnapshot::Zero();
57+
if (portionInfo.HasCommitSnapshot()) {
58+
defaultSnapshotLocal = portionInfo.GetCommitSnapshotVerified();
59+
} else if (defaultSnapshot) {
60+
defaultSnapshotLocal = *defaultSnapshot;
61+
}
62+
2763
for (auto&& i : resultSchema.GetColumnIds()) {
28-
columns.emplace_back(rowsCount, dataSchema.GetColumnLoaderOptional(i), resultSchema.GetColumnLoaderVerified(i));
29-
if (portionInfo.HasInsertWriteId()) {
30-
if (portionInfo.HasCommitSnapshot()) {
31-
if (i == (ui32)IIndexInfo::ESpecialColumn::PLAN_STEP) {
32-
columns.back().AddBlobInfo(0, portionInfo.GetRecordsCount(),
33-
TPortionDataAccessor::TAssembleBlobInfo(portionInfo.GetRecordsCount(),
34-
std::make_shared<arrow::UInt64Scalar>(portionInfo.GetCommitSnapshotVerified().GetPlanStep()), false));
35-
}
36-
if (i == (ui32)IIndexInfo::ESpecialColumn::TX_ID) {
37-
columns.back().AddBlobInfo(0, portionInfo.GetRecordsCount(),
38-
TPortionDataAccessor::TAssembleBlobInfo(portionInfo.GetRecordsCount(),
39-
std::make_shared<arrow::UInt64Scalar>(portionInfo.GetCommitSnapshotVerified().GetPlanStep()), false));
40-
}
41-
} else {
42-
if (i == (ui32)IIndexInfo::ESpecialColumn::PLAN_STEP) {
43-
columns.back().AddBlobInfo(0, portionInfo.GetRecordsCount(),
44-
TPortionDataAccessor::TAssembleBlobInfo(portionInfo.GetRecordsCount(),
45-
std::make_shared<arrow::UInt64Scalar>(defaultSnapshot ? defaultSnapshot->GetPlanStep() : 0)));
46-
}
47-
if (i == (ui32)IIndexInfo::ESpecialColumn::TX_ID) {
48-
columns.back().AddBlobInfo(0, portionInfo.GetRecordsCount(),
49-
TPortionDataAccessor::TAssembleBlobInfo(portionInfo.GetRecordsCount(),
50-
std::make_shared<arrow::UInt64Scalar>(defaultSnapshot ? defaultSnapshot->GetTxId() : 0)));
51-
}
52-
}
53-
if (i == (ui32)IIndexInfo::ESpecialColumn::WRITE_ID) {
54-
columns.back().AddBlobInfo(0, portionInfo.GetRecordsCount(),
55-
TPortionDataAccessor::TAssembleBlobInfo(portionInfo.GetRecordsCount(),
56-
std::make_shared<arrow::UInt64Scalar>((ui64)portionInfo.GetInsertWriteIdVerified()), false));
57-
}
58-
if (i == (ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG) {
59-
columns.back().AddBlobInfo(0, portionInfo.GetRecordsCount(),
60-
TPortionDataAccessor::TAssembleBlobInfo(portionInfo.GetRecordsCount(),
61-
std::make_shared<arrow::BooleanScalar>((bool)portionInfo.GetMeta().GetDeletionsCount()), true));
62-
}
64+
while (it != portionData.GetRecordsVerified().end() && it->GetColumnId() < i) {
65+
++it;
66+
continue;
6367
}
64-
}
65-
{
66-
int skipColumnId = -1;
67-
TPortionDataAccessor::TColumnAssemblingInfo* currentAssembler = nullptr;
68-
for (auto& rec : portionData.GetRecordsVerified()) {
69-
if (skipColumnId == (int)rec.ColumnId) {
70-
continue;
68+
if ((it == portionData.GetRecordsVerified().end() || i < it->GetColumnId())) {
69+
if (restoreAbsent || IIndexInfo::IsSpecialColumn(i)) {
70+
columns.emplace_back(rowsCount, dataSchema.GetColumnLoaderOptional(i), resultSchema.GetColumnLoaderVerified(i));
7171
}
72-
if (!currentAssembler || rec.ColumnId != currentAssembler->GetColumnId()) {
73-
const i32 resultPos = resultSchema.GetFieldIndex(rec.ColumnId);
74-
if (resultPos < 0) {
75-
skipColumnId = rec.ColumnId;
76-
continue;
77-
}
78-
AFL_VERIFY((ui32)resultPos < columns.size());
79-
currentAssembler = &columns[resultPos];
72+
if (!portionInfo.HasInsertWriteId()) {
73+
continue;
8074
}
81-
auto it = blobsData.find(rec.GetAddress());
82-
AFL_VERIFY(it != blobsData.end())("size", blobsData.size())("address", rec.GetAddress().DebugString());
83-
currentAssembler->AddBlobInfo(rec.Chunk, rec.GetMeta().GetRecordsCount(), std::move(it->second));
84-
blobsData.erase(it);
75+
FillDefaultColumn(columns.back(), portionInfo, defaultSnapshotLocal);
76+
}
77+
if (it == portionData.GetRecordsVerified().end()) {
78+
continue;
79+
} else if (it->GetColumnId() != i) {
80+
AFL_VERIFY(i < it->GetColumnId());
81+
continue;
82+
}
83+
columns.emplace_back(rowsCount, dataSchema.GetColumnLoaderOptional(i), resultSchema.GetColumnLoaderVerified(i));
84+
while (it != portionData.GetRecordsVerified().end() && it->GetColumnId() == i) {
85+
auto itBlobs = blobsData.find(it->GetAddress());
86+
AFL_VERIFY(itBlobs != blobsData.end())("size", blobsData.size())("address", it->GetAddress().DebugString());
87+
columns.back().AddBlobInfo(it->Chunk, it->GetMeta().GetRecordsCount(), std::move(itBlobs->second));
88+
blobsData.erase(itBlobs);
89+
90+
++it;
91+
continue;
8592
}
8693
}
8794

@@ -98,14 +105,15 @@ TPortionDataAccessor::TPreparedBatchData PrepareForAssembleImpl(const TPortionDa
98105
} // namespace
99106

100107
TPortionDataAccessor::TPreparedBatchData TPortionDataAccessor::PrepareForAssemble(const ISnapshotSchema& dataSchema,
101-
const ISnapshotSchema& resultSchema, THashMap<TChunkAddress, TString>& blobsData, const std::optional<TSnapshot>& defaultSnapshot) const {
102-
return PrepareForAssembleImpl(*this, *PortionInfo, dataSchema, resultSchema, blobsData, defaultSnapshot);
108+
const ISnapshotSchema& resultSchema, THashMap<TChunkAddress, TString>& blobsData, const std::optional<TSnapshot>& defaultSnapshot,
109+
const bool restoreAbsent) const {
110+
return PrepareForAssembleImpl(*this, *PortionInfo, dataSchema, resultSchema, blobsData, defaultSnapshot, restoreAbsent);
103111
}
104112

105113
TPortionDataAccessor::TPreparedBatchData TPortionDataAccessor::PrepareForAssemble(const ISnapshotSchema& dataSchema,
106-
const ISnapshotSchema& resultSchema, THashMap<TChunkAddress, TAssembleBlobInfo>& blobsData,
107-
const std::optional<TSnapshot>& defaultSnapshot) const {
108-
return PrepareForAssembleImpl(*this, *PortionInfo, dataSchema, resultSchema, blobsData, defaultSnapshot);
114+
const ISnapshotSchema& resultSchema, THashMap<TChunkAddress, TAssembleBlobInfo>& blobsData, const std::optional<TSnapshot>& defaultSnapshot,
115+
const bool restoreAbsent) const {
116+
return PrepareForAssembleImpl(*this, *PortionInfo, dataSchema, resultSchema, blobsData, defaultSnapshot, restoreAbsent);
109117
}
110118

111119
void TPortionDataAccessor::FillBlobRangesByStorage(THashMap<TString, THashSet<TBlobRange>>& result, const TVersionedIndex& index) const {
@@ -738,6 +746,7 @@ std::shared_ptr<NArrow::NAccessor::TDeserializeChunkedArray> TPortionDataAccesso
738746
NArrow::NAccessor::TDeserializeChunkedArray::TChunk TPortionDataAccessor::TAssembleBlobInfo::BuildDeserializeChunk(
739747
const std::shared_ptr<TColumnLoader>& loader) const {
740748
if (DefaultRowsCount) {
749+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "build_trivial");
741750
Y_ABORT_UNLESS(!Data);
742751
auto col = std::make_shared<NArrow::NAccessor::TTrivialArray>(
743752
NArrow::TThreadSimpleArraysCache::Get(loader->GetField()->type(), DefaultValue, DefaultRowsCount));

ydb/core/tx/columnshard/engines/portions/data_accessor.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -457,9 +457,10 @@ class TPortionDataAccessor {
457457
};
458458

459459
TPreparedBatchData PrepareForAssemble(const ISnapshotSchema& dataSchema, const ISnapshotSchema& resultSchema,
460-
THashMap<TChunkAddress, TString>& blobsData, const std::optional<TSnapshot>& defaultSnapshot = std::nullopt) const;
460+
THashMap<TChunkAddress, TString>& blobsData, const std::optional<TSnapshot>& defaultSnapshot = std::nullopt,
461+
const bool restoreAbsent = true) const;
461462
TPreparedBatchData PrepareForAssemble(const ISnapshotSchema& dataSchema, const ISnapshotSchema& resultSchema,
462-
THashMap<TChunkAddress, TAssembleBlobInfo>& blobsData, const std::optional<TSnapshot>& defaultSnapshot = std::nullopt) const;
463+
THashMap<TChunkAddress, TAssembleBlobInfo>& blobsData, const std::optional<TSnapshot>& defaultSnapshot = std::nullopt, const bool restoreAbsent = true) const;
463464

464465
class TPage {
465466
private:

ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,15 @@ void TReadPortionInfoWithBlobs::RestoreChunk(const std::shared_ptr<IPortionDataC
1616
}
1717

1818
TConclusion<std::shared_ptr<NArrow::TGeneralContainer>> TReadPortionInfoWithBlobs::RestoreBatch(
19-
const ISnapshotSchema& data, const ISnapshotSchema& resultSchema, const std::set<ui32>& seqColumns) const {
19+
const ISnapshotSchema& data, const ISnapshotSchema& resultSchema, const std::set<ui32>& seqColumns, const bool restoreAbsent) const {
2020
THashMap<TChunkAddress, TString> blobs;
2121
NActors::TLogContextGuard gLogging =
2222
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("portion_id", PortionInfo.GetPortionInfo().GetPortionId());
2323
for (auto&& i : PortionInfo.GetRecordsVerified()) {
2424
blobs[i.GetAddress()] = GetBlobByAddressVerified(i.ColumnId, i.Chunk);
2525
Y_ABORT_UNLESS(blobs[i.GetAddress()].size() == i.BlobRange.Size);
2626
}
27-
return PortionInfo.PrepareForAssemble(data, resultSchema, blobs).AssembleToGeneralContainer(seqColumns);
27+
return PortionInfo.PrepareForAssemble(data, resultSchema, blobs, {}, restoreAbsent).AssembleToGeneralContainer(seqColumns);
2828
}
2929

3030
TReadPortionInfoWithBlobs TReadPortionInfoWithBlobs::RestorePortion(

ydb/core/tx/columnshard/engines/portions/read_with_blobs.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ class TReadPortionInfoWithBlobs: public TBasePortionInfoWithBlobs {
4040
const TPortionDataAccessor& portion, NBlobOperations::NRead::TCompositeReadBlobs& blobs,
4141
const TIndexInfo& indexInfo);
4242

43-
TConclusion<std::shared_ptr<NArrow::TGeneralContainer>> RestoreBatch(const ISnapshotSchema& data, const ISnapshotSchema& resultSchema, const std::set<ui32>& seqColumns) const;
43+
TConclusion<std::shared_ptr<NArrow::TGeneralContainer>> RestoreBatch(
44+
const ISnapshotSchema& data, const ISnapshotSchema& resultSchema, const std::set<ui32>& seqColumns, const bool restoreAbsent = true) const;
4445
static std::optional<TWritePortionInfoWithBlobsResult> SyncPortion(TReadPortionInfoWithBlobs&& source,
4546
const ISnapshotSchema::TPtr& from, const ISnapshotSchema::TPtr& to, const TString& targetTier, const std::shared_ptr<IStoragesManager>& storages,
4647
std::shared_ptr<NColumnShard::TSplitterCounters> counters);

0 commit comments

Comments
 (0)