Skip to content

Commit 03916c5

Browse files
authored
Merge 7eabf82 into 0a53ed1
2 parents 0a53ed1 + 7eabf82 commit 03916c5

File tree

25 files changed

+378
-200
lines changed

25 files changed

+378
-200
lines changed

ydb/core/formats/arrow/process_columns.cpp

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -207,24 +207,23 @@ NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Reorder(
207207
return ReorderImpl(incoming, columnNames);
208208
}
209209
namespace {
210-
template <class TDataContainer, class TSchemaImpl>
211-
TConclusion<TSchemaSubset> BuildSequentialSubsetImpl(const std::shared_ptr<TDataContainer>& srcBatch,
212-
const std::shared_ptr<TSchemaImpl>& dstSchema, const TColumnOperator::ECheckFieldTypesPolicy checkFieldTypesPolicy) {
210+
template <class TDataContainer>
211+
TConclusion<TSchemaSubset> BuildSequentialSubsetImpl(const std::shared_ptr<TDataContainer>& srcBatch, const TSchemaLiteView& dstSchema,
212+
const TColumnOperator::ECheckFieldTypesPolicy checkFieldTypesPolicy) {
213213
AFL_VERIFY(srcBatch);
214-
AFL_VERIFY(dstSchema);
215-
if (dstSchema->num_fields() < srcBatch->schema()->num_fields()) {
214+
if (dstSchema.num_fields() < srcBatch->schema()->num_fields()) {
216215
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "incorrect columns set: destination must been wider than source")(
217-
"source", srcBatch->schema()->ToString())("destination", dstSchema->ToString());
216+
"source", srcBatch->schema()->ToString())("destination", dstSchema.ToString());
218217
return TConclusionStatus::Fail("incorrect columns set: destination must been wider than source");
219218
}
220219
std::set<ui32> fieldIdx;
221220
auto itSrc = srcBatch->schema()->fields().begin();
222-
auto itDst = dstSchema->fields().begin();
223-
while (itSrc != srcBatch->schema()->fields().end() && itDst != dstSchema->fields().end()) {
221+
auto itDst = dstSchema.begin();
222+
while (itSrc != srcBatch->schema()->fields().end() && itDst != dstSchema.end()) {
224223
if ((*itSrc)->name() != (*itDst)->name()) {
225224
++itDst;
226225
} else {
227-
fieldIdx.emplace(itDst - dstSchema->fields().begin());
226+
fieldIdx.emplace(itDst - dstSchema.begin());
228227
if (checkFieldTypesPolicy != TColumnOperator::ECheckFieldTypesPolicy::Ignore && (*itDst)->Equals(*itSrc)) {
229228
switch (checkFieldTypesPolicy) {
230229
case TColumnOperator::ECheckFieldTypesPolicy::Error: {
@@ -245,25 +244,24 @@ TConclusion<TSchemaSubset> BuildSequentialSubsetImpl(const std::shared_ptr<TData
245244
++itSrc;
246245
}
247246
}
248-
if (itDst == dstSchema->fields().end() && itSrc != srcBatch->schema()->fields().end()) {
247+
if (itDst == dstSchema.end() && itSrc != srcBatch->schema()->fields().end()) {
249248
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "incorrect columns order in source set")("source", srcBatch->schema()->ToString())(
250-
"destination", dstSchema->ToString());
249+
"destination", dstSchema.ToString());
251250
return TConclusionStatus::Fail("incorrect columns order in source set");
252251
}
253-
return TSchemaSubset(fieldIdx, dstSchema->num_fields());
252+
return TSchemaSubset(fieldIdx, dstSchema.num_fields());
254253
}
255254
} // namespace
256255

257256
TConclusion<TSchemaSubset> TColumnOperator::BuildSequentialSubset(
258-
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<NArrow::TSchemaLite>& dstSchema) {
257+
const std::shared_ptr<arrow::RecordBatch>& incoming, const NArrow::TSchemaLiteView& dstSchema) {
259258
return BuildSequentialSubsetImpl(incoming, dstSchema, DifferentColumnTypesPolicy);
260259
}
261260
namespace {
262261
template <class TDataContainer>
263262
TConclusion<std::shared_ptr<TDataContainer>> AdaptIncomingToDestinationExtImpl(const std::shared_ptr<TDataContainer>& incoming,
264-
const std::shared_ptr<TSchemaLite>& dstSchema, const std::function<TConclusionStatus(const ui32, const i32)>& checker,
265-
const std::function<i32(const std::string&)>& nameResolver,
266-
const TColumnOperator::ECheckFieldTypesPolicy differentColumnTypesPolicy,
263+
const TSchemaLiteView& dstSchema, const std::function<TConclusionStatus(const ui32, const i32)>& checker,
264+
const std::function<i32(const std::string&)>& nameResolver, const TColumnOperator::ECheckFieldTypesPolicy differentColumnTypesPolicy,
267265
const TColumnOperator::EAbsentFieldPolicy absentColumnPolicy) {
268266
struct TFieldData {
269267
ui32 Index;
@@ -273,14 +271,13 @@ TConclusion<std::shared_ptr<TDataContainer>> AdaptIncomingToDestinationExtImpl(c
273271
}
274272
};
275273
AFL_VERIFY(incoming);
276-
AFL_VERIFY(dstSchema);
277274
std::vector<TFieldData> resultColumns;
278275
resultColumns.reserve(incoming->num_columns());
279276
ui32 idx = 0;
280277
for (auto& srcField : incoming->schema()->fields()) {
281278
const int dstIndex = nameResolver(srcField->name());
282279
if (dstIndex > -1) {
283-
const auto& dstField = dstSchema->GetFieldByIndexVerified(dstIndex);
280+
const auto& dstField = dstSchema.GetFieldByIndexVerified(dstIndex);
284281
switch (differentColumnTypesPolicy) {
285282
case TColumnOperator::ECheckFieldTypesPolicy::Verify:
286283
AFL_VERIFY(dstField->type()->Equals(srcField->type()))("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")(
@@ -322,14 +319,14 @@ TConclusion<std::shared_ptr<TDataContainer>> AdaptIncomingToDestinationExtImpl(c
322319
columns.reserve(resultColumns.size());
323320
fields.reserve(resultColumns.size());
324321
for (auto&& i : resultColumns) {
325-
fields.emplace_back(dstSchema->field(i.Index));
322+
fields.emplace_back(dstSchema.field(i.Index));
326323
columns.emplace_back(i.Column);
327324
}
328325
return NAdapter::TDataBuilderPolicy<TDataContainer>::Build(std::make_shared<arrow::Schema>(fields), std::move(columns), incoming->num_rows());
329326
}
330327
} // namespace
331328
TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::AdaptIncomingToDestinationExt(
332-
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<TSchemaLite>& dstSchema,
329+
const std::shared_ptr<arrow::RecordBatch>& incoming, const TSchemaLiteView& dstSchema,
333330
const std::function<TConclusionStatus(const ui32, const i32)>& checker, const std::function<i32(const std::string&)>& nameResolver) const {
334331
return AdaptIncomingToDestinationExtImpl(incoming, dstSchema, checker, nameResolver, DifferentColumnTypesPolicy, AbsentColumnPolicy);
335332
}

ydb/core/formats/arrow/process_columns.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ namespace NKikimr::NArrow {
88

99
class TSchemaSubset;
1010
class TSchemaLite;
11+
class TSchemaLiteView;
1112

1213
class TColumnOperator {
1314
public:
@@ -59,7 +60,7 @@ class TColumnOperator {
5960
}
6061

6162
TConclusion<std::shared_ptr<arrow::RecordBatch>> AdaptIncomingToDestinationExt(const std::shared_ptr<arrow::RecordBatch>& incoming,
62-
const std::shared_ptr<TSchemaLite>& dstSchema, const std::function<TConclusionStatus(const ui32, const i32)>& checker,
63+
const TSchemaLiteView& dstSchema, const std::function<TConclusionStatus(const ui32, const i32)>& checker,
6364
const std::function<i32(const std::string&)>& nameResolver) const;
6465

6566
std::shared_ptr<arrow::RecordBatch> Extract(
@@ -73,7 +74,7 @@ class TColumnOperator {
7374
std::shared_ptr<arrow::Table> Extract(const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames);
7475

7576
TConclusion<TSchemaSubset> BuildSequentialSubset(
76-
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<NArrow::TSchemaLite>& dstSchema);
77+
const std::shared_ptr<arrow::RecordBatch>& incoming, const NArrow::TSchemaLiteView& dstSchema);
7778

7879
TConclusion<std::shared_ptr<arrow::RecordBatch>> Adapt(
7980
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset = nullptr);

ydb/core/tx/columnshard/columnshard__propose_transaction.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,6 @@ class TTxProposeTransaction: public NTabletFlatExecutor::TTransactionBase<TColum
146146
}
147147

148148
auto schemaSnapshot = Self->TablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema();
149-
auto schema = schemaSnapshot->GetSchema();
150149
auto index = schemaSnapshot->GetColumnIdOptional(columnName);
151150
if (!index) {
152151
return TTxController::TProposeResult(

ydb/core/tx/columnshard/engines/changes/indexation.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ class TPathFieldsInfo {
108108
if (!Schemas.contains(data.GetSchemaVersion())) {
109109
Schemas.emplace(data.GetSchemaVersion(), blobSchema);
110110
}
111-
std::vector<ui32> filteredIds = data.GetMeta().GetSchemaSubset().Apply(blobSchema->GetIndexInfo().GetColumnIds(false));
111+
auto columnIds = blobSchema->GetIndexInfo().GetColumnIds(false);
112+
std::vector<ui32> filteredIds = data.GetMeta().GetSchemaSubset().Apply(columnIds.begin(), columnIds.end());
112113
if (data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete) {
113114
filteredIds.emplace_back((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG);
114115
}
@@ -245,8 +246,10 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
245246
std::shared_ptr<NArrow::TGeneralContainer> batch;
246247
{
247248
const auto blobData = Blobs.Extract(IStoragesManager::DefaultStorageId, blobRange);
249+
250+
auto blobSchemaView = blobSchema->GetIndexInfo().ArrowSchema();
248251
auto batchSchema =
249-
std::make_shared<arrow::Schema>(inserted.GetMeta().GetSchemaSubset().Apply(blobSchema->GetIndexInfo().ArrowSchema()->fields()));
252+
std::make_shared<arrow::Schema>(inserted.GetMeta().GetSchemaSubset().Apply(blobSchemaView.begin(), blobSchemaView.end()));
250253
batch = std::make_shared<NArrow::TGeneralContainer>(NArrow::DeserializeBatch(blobData, batchSchema));
251254
std::set<ui32> columnIdsToDelete = blobSchema->GetColumnIdsToDelete(resultSchema);
252255
if (!columnIdsToDelete.empty()) {

ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ class TReadMetadataBase {
118118

119119
ISnapshotSchema::TPtr GetLoadSchemaVerified(const TPortionInfo& porition) const;
120120

121-
const std::shared_ptr<NArrow::TSchemaLite>& GetBlobSchema(const ui64 version) const {
121+
NArrow::TSchemaLiteView GetBlobSchema(const ui64 version) const {
122122
return GetIndexVersions().GetSchemaVerified(version)->GetIndexInfo().ArrowSchema();
123123
}
124124

ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.h

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -131,15 +131,6 @@ class TReadMetadata: public TReadMetadataBase {
131131
TConclusionStatus Init(
132132
const NColumnShard::TColumnShard* owner, const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor);
133133

134-
std::vector<std::string> GetColumnsOrder() const {
135-
auto schema = GetResultSchema();
136-
std::vector<std::string> result;
137-
for (auto&& i : schema->GetSchema()->fields()) {
138-
result.emplace_back(i->name());
139-
}
140-
return result;
141-
}
142-
143134
std::set<ui32> GetEarlyFilterColumnIds() const;
144135
std::set<ui32> GetPKColumnIds() const;
145136

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,8 @@ void TCommittedDataSource::DoAssembleColumns(const std::shared_ptr<TColumnsSet>&
273273
AFL_VERIFY(GetStageData().GetBlobs().size() == 1);
274274
auto bData = MutableStageData().ExtractBlob(GetStageData().GetBlobs().begin()->first);
275275
auto schema = GetContext()->GetReadMetadata()->GetBlobSchema(CommittedBlob.GetSchemaVersion());
276-
auto rBatch = NArrow::DeserializeBatch(bData, std::make_shared<arrow::Schema>(CommittedBlob.GetSchemaSubset().Apply(schema->fields())));
277-
AFL_VERIFY(rBatch)("schema", schema->ToString());
276+
auto rBatch = NArrow::DeserializeBatch(bData, std::make_shared<arrow::Schema>(CommittedBlob.GetSchemaSubset().Apply(schema.begin(), schema.end())));
277+
AFL_VERIFY(rBatch)("schema", schema.ToString());
278278
auto batch = std::make_shared<NArrow::TGeneralContainer>(rBatch);
279279
std::set<ui32> columnIdsToDelete = batchSchema->GetColumnIdsToDelete(resultSchema);
280280
if (!columnIdsToDelete.empty()) {
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#include "column_ids.h"
2+
3+
namespace NKikimr::NOlap {}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
#pragma once
2+
3+
#include <ydb/library/actors/core/log.h>
4+
#include <ydb/library/formats/arrow/common/iterator.h>
5+
6+
#include <util/generic/noncopyable.h>
7+
#include <util/system/types.h>
8+
9+
#include <span>
10+
11+
namespace NKikimr::NOlap {
12+
13+
class TColumnIdsView: private TNonCopyable {
14+
private:
15+
std::span<const ui32> ColumnIds;
16+
17+
class TIterator: public NArrow::NUtil::TRandomAccessIteratorClone<std::span<const ui32>::iterator, TIterator> {
18+
using TBase = NArrow::NUtil::TRandomAccessIteratorClone<std::span<const ui32>::iterator, TIterator>;
19+
20+
public:
21+
using TBase::TRandomAccessIteratorClone;
22+
};
23+
24+
public:
25+
template <typename It>
26+
TColumnIdsView(const It begin, const It end)
27+
: ColumnIds(begin, end) {
28+
}
29+
30+
TIterator begin() const {
31+
return ColumnIds.begin();
32+
}
33+
34+
TIterator end() const {
35+
return ColumnIds.end();
36+
}
37+
38+
ui32 operator[](size_t idx) const {
39+
AFL_VERIFY(idx < ColumnIds.size())("idx", idx)("size", ColumnIds.size());
40+
return ColumnIds[idx];
41+
}
42+
43+
ui64 size() const {
44+
return ColumnIds.size();
45+
}
46+
};
47+
48+
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,9 @@ class IIndexInfo {
120120
return result;
121121
}
122122

123-
[[nodiscard]] static std::vector<ui32> AddSpecialFieldIds(const std::vector<ui32>& baseColumnIds) {
124-
std::vector<ui32> result = baseColumnIds;
123+
static void AddSpecialFieldIds(std::vector<ui32>& baseColumnIds) {
125124
const auto& cIds = GetSystemColumnIds();
126-
result.insert(result.end(), cIds.begin(), cIds.end());
127-
return result;
125+
baseColumnIds.insert(baseColumnIds.end(), cIds.begin(), cIds.end());
128126
}
129127

130128
[[nodiscard]] static std::set<ui32> AddSpecialFieldIds(const std::set<ui32>& baseColumnIds) {

0 commit comments

Comments
 (0)