Skip to content

Commit c25ac4a

Browse files
rui-moLakehouse Engine Bot
authored andcommitted
[5962] Support struct schema evolution matching by name
(cherry picked from commit 3b172d7)
1 parent 1484798 commit c25ac4a

23 files changed

+430
-65
lines changed

velox/connectors/hive/SplitReader.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -370,11 +370,17 @@ std::vector<TypePtr> SplitReader::adaptColumns(
370370
auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName);
371371
if (!fileTypeIdx.has_value()) {
372372
// Column is missing. Most likely due to schema evolution.
373-
VELOX_CHECK(tableSchema, "Unable to resolve column '{}'", fieldName);
373+
auto outputTypeIdx = readerOutputType_->getChildIdxIfExists(fieldName);
374+
TypePtr fieldType;
375+
if (outputTypeIdx.has_value()) {
376+
// Field name exists in the user-specified output type.
377+
fieldType = readerOutputType_->childAt(outputTypeIdx.value());
378+
} else {
379+
VELOX_CHECK(tableSchema, "Unable to resolve column '{}'", fieldName);
380+
fieldType = tableSchema->findChild(fieldName);
381+
}
374382
childSpec->setConstantValue(BaseVector::createNullConstant(
375-
tableSchema->findChild(fieldName),
376-
1,
377-
connectorQueryCtx_->memoryPool()));
383+
fieldType, 1, connectorQueryCtx_->memoryPool()));
378384
} else {
379385
// Column no longer missing, reset constant value set on the spec.
380386
childSpec->setConstantValue(nullptr);

velox/dwio/common/ScanSpec.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ bool ScanSpec::hasFilter() const {
142142
if (hasFilter_.has_value()) {
143143
return hasFilter_.value();
144144
}
145-
if (!isConstant() && filter()) {
145+
if (filter()) {
146146
hasFilter_ = true;
147147
return true;
148148
}

velox/dwio/common/SelectiveStructColumnReader.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,6 @@ void SelectiveStructColumnReaderBase::read(
359359
}
360360

361361
const auto& childSpecs = scanSpec_->children();
362-
VELOX_CHECK(!childSpecs.empty());
363362
for (size_t i = 0; i < childSpecs.size(); ++i) {
364363
const auto& childSpec = childSpecs[i];
365364
VELOX_TRACE_HISTORY_PUSH("read %s", childSpec->fieldName().c_str());
@@ -462,15 +461,17 @@ bool SelectiveStructColumnReaderBase::isChildMissing(
462461
// row type that doesn't exist
463462
// in the output.
464463
fileType_->type()->kind() !=
465-
TypeKind::MAP && // If this is the case it means this is a flat map,
466-
// so it can't have "missing" fields.
467-
childSpec.channel() >= fileType_->size());
464+
TypeKind::MAP // If this is the case it means this is a flat map,
465+
// so it can't have "missing" fields.
466+
) &&
467+
(useColumnNames_
468+
? !asRowType(fileType_->type())->containsChild(childSpec.fieldName())
469+
: childSpec.channel() >= fileType_->size());
468470
}
469471

470472
void SelectiveStructColumnReaderBase::getValues(
471473
const RowSet& rows,
472474
VectorPtr* result) {
473-
VELOX_CHECK(!scanSpec_->children().empty());
474475
VELOX_CHECK_NOT_NULL(
475476
*result, "SelectiveStructColumnReaderBase expects a non-null result");
476477
VELOX_CHECK(

velox/dwio/common/SelectiveStructColumnReader.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,13 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader {
115115
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
116116
FormatParams& params,
117117
velox::common::ScanSpec& scanSpec,
118+
bool useColumnNames,
118119
bool isRoot = false)
119120
: SelectiveColumnReader(requestedType, fileType, params, scanSpec),
120121
debugString_(
121122
getExceptionContext().message(VeloxException::Type::kSystem)),
122123
isRoot_(isRoot),
124+
useColumnNames_(useColumnNames),
123125
rows_(memoryPool_) {}
124126

125127
bool hasDeletion() const final {
@@ -164,6 +166,9 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader {
164166
// table.
165167
const bool isRoot_;
166168

169+
// Whether to use names for mapping table field names to file field names.
170+
const bool useColumnNames_;
171+
167172
// Dense set of rows to read in next().
168173
raw_vector<vector_size_t> rows_;
169174

velox/dwio/dwrf/reader/DwrfReader.cpp

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ class DwrfUnit : public LoadUnit {
4444
uint32_t stripeIndex,
4545
std::shared_ptr<dwio::common::ColumnSelector> columnSelector,
4646
const std::shared_ptr<BitSet>& projectedNodes,
47-
RowReaderOptions options)
47+
RowReaderOptions options,
48+
bool useColumnNames)
4849
: stripeReaderBase_{stripeReaderBase},
4950
strideIndexProvider_{strideIndexProvider},
5051
columnReaderStatistics_{&columnReaderStatistics},
@@ -53,7 +54,8 @@ class DwrfUnit : public LoadUnit {
5354
projectedNodes_{projectedNodes},
5455
options_{std::move(options)},
5556
stripeInfo_{
56-
stripeReaderBase.getReader().footer().stripes(stripeIndex_)} {}
57+
stripeReaderBase.getReader().footer().stripes(stripeIndex_)},
58+
useColumnNames_{useColumnNames} {}
5759

5860
~DwrfUnit() override = default;
5961

@@ -92,6 +94,9 @@ class DwrfUnit : public LoadUnit {
9294
const RowReaderOptions options_;
9395
const StripeInformationWrapper stripeInfo_;
9496

97+
// Whether to use names for mapping table field names to file field names.
98+
const bool useColumnNames_;
99+
95100
// Mutables
96101
bool preloaded_;
97102
std::optional<uint64_t> cachedIoSize_;
@@ -166,6 +171,7 @@ void DwrfUnit::ensureDecoders() {
166171
streamLabels,
167172
*columnReaderStatistics_,
168173
scanSpec,
174+
useColumnNames_,
169175
flatMapContext,
170176
/*isRoot=*/true);
171177
selectiveColumnReader_->setIsTopLevel();
@@ -328,7 +334,8 @@ std::unique_ptr<dwio::common::UnitLoader> DwrfRowReader::getUnitLoader() {
328334
stripe,
329335
columnSelector_,
330336
projectedNodes_,
331-
options_));
337+
options_,
338+
readerBaseShared()->readerOptions().useColumnNamesForColumnMapping()));
332339
}
333340
std::shared_ptr<UnitLoaderFactory> unitLoaderFactory =
334341
options_.unitLoaderFactory();

velox/dwio/dwrf/reader/SelectiveDwrfReader.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ std::unique_ptr<SelectiveColumnReader> SelectiveDwrfReader::build(
6464
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
6565
DwrfParams& params,
6666
common::ScanSpec& scanSpec,
67+
bool useColumnNames,
6768
bool isRoot) {
6869
VELOX_CHECK(
6970
!isRoot || fileType->type()->kind() == TypeKind::ROW,
@@ -90,16 +91,16 @@ std::unique_ptr<SelectiveColumnReader> SelectiveDwrfReader::build(
9091
requestedType, fileType, params, SHORT_BYTE_SIZE, scanSpec);
9192
case TypeKind::ARRAY:
9293
return std::make_unique<SelectiveListColumnReader>(
93-
requestedType, fileType, params, scanSpec);
94+
requestedType, fileType, params, scanSpec, useColumnNames);
9495
case TypeKind::MAP:
9596
if (stripe.format() == DwrfFormat::kDwrf &&
9697
stripe.getEncoding(ek).kind() ==
9798
proto::ColumnEncoding_Kind_MAP_FLAT) {
9899
return createSelectiveFlatMapColumnReader(
99-
requestedType, fileType, params, scanSpec);
100+
requestedType, fileType, params, scanSpec, useColumnNames);
100101
}
101102
return std::make_unique<SelectiveMapColumnReader>(
102-
requestedType, fileType, params, scanSpec);
103+
requestedType, fileType, params, scanSpec, useColumnNames);
103104
case TypeKind::REAL:
104105
if (requestedType->kind() == TypeKind::REAL) {
105106
return std::make_unique<
@@ -116,7 +117,7 @@ std::unique_ptr<SelectiveColumnReader> SelectiveDwrfReader::build(
116117
requestedType, fileType, params, scanSpec);
117118
case TypeKind::ROW:
118119
return std::make_unique<SelectiveStructColumnReader>(
119-
requestedType, fileType, params, scanSpec, isRoot);
120+
requestedType, fileType, params, scanSpec, useColumnNames, isRoot);
120121
case TypeKind::BOOLEAN:
121122
return std::make_unique<SelectiveByteRleColumnReader>(
122123
requestedType, fileType, params, scanSpec, true);

velox/dwio/dwrf/reader/SelectiveDwrfReader.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class SelectiveDwrfReader {
3030
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
3131
DwrfParams& params,
3232
common::ScanSpec& scanSpec,
33+
bool useColumnNames,
3334
bool isRoot = false);
3435

3536
/// Compatibility wrapper for tests. Takes the components of DwrfParams as
@@ -41,10 +42,12 @@ class SelectiveDwrfReader {
4142
const StreamLabels& streamLabels,
4243
dwio::common::ColumnReaderStatistics& stats,
4344
common::ScanSpec* scanSpec,
45+
bool useColumnNames,
4446
FlatMapContext flatMapContext = {},
4547
bool isRoot = false) {
4648
auto params = DwrfParams(stripe, streamLabels, stats, flatMapContext);
47-
return build(requestedType, fileType, params, *scanSpec, isRoot);
49+
return build(
50+
requestedType, fileType, params, *scanSpec, useColumnNames, isRoot);
4851
}
4952
};
5053

velox/dwio/dwrf/reader/SelectiveFlatMapColumnReader.cpp

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ std::vector<KeyNode<T>> getKeyNodes(
7171
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
7272
DwrfParams& params,
7373
common::ScanSpec& scanSpec,
74-
bool asStruct) {
74+
bool asStruct,
75+
bool useColumnNames) {
7576
using namespace dwio::common::flatmap;
7677

7778
std::vector<KeyNode<T>> keyNodes;
@@ -145,7 +146,11 @@ std::vector<KeyNode<T>> getKeyNodes(
145146
.inMapDecoder = inMapDecoder.get(),
146147
.keySelectionCallback = nullptr});
147148
auto reader = SelectiveDwrfReader::build(
148-
requestedValueType, dataValueType, childParams, *childSpec);
149+
requestedValueType,
150+
dataValueType,
151+
childParams,
152+
*childSpec,
153+
useColumnNames);
149154
keyNodes.emplace_back(
150155
key, sequence, std::move(reader), std::move(inMapDecoder));
151156
});
@@ -169,14 +174,21 @@ class SelectiveFlatMapAsStructReader : public SelectiveStructColumnReaderBase {
169174
const TypePtr& requestedType,
170175
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
171176
DwrfParams& params,
172-
common::ScanSpec& scanSpec)
177+
common::ScanSpec& scanSpec,
178+
bool useColumnNames)
173179
: SelectiveStructColumnReaderBase(
174180
requestedType,
175181
fileType,
176182
params,
177-
scanSpec),
178-
keyNodes_(
179-
getKeyNodes<T>(requestedType, fileType, params, scanSpec, true)) {
183+
scanSpec,
184+
useColumnNames),
185+
keyNodes_(getKeyNodes<T>(
186+
requestedType,
187+
fileType,
188+
params,
189+
scanSpec,
190+
true,
191+
useColumnNames)) {
180192
VELOX_CHECK(
181193
!keyNodes_.empty(),
182194
"For struct encoding, keys to project must be configured");
@@ -201,15 +213,23 @@ class SelectiveFlatMapReader : public SelectiveStructColumnReaderBase {
201213
const TypePtr& requestedType,
202214
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
203215
DwrfParams& params,
204-
common::ScanSpec& scanSpec)
216+
common::ScanSpec& scanSpec,
217+
bool useColumnNames)
205218
: SelectiveStructColumnReaderBase(
206219
requestedType,
207220
fileType,
208221
params,
209-
scanSpec),
222+
scanSpec,
223+
useColumnNames),
210224
flatMap_(
211225
*this,
212-
getKeyNodes<T>(requestedType, fileType, params, scanSpec, false)) {}
226+
getKeyNodes<T>(
227+
requestedType,
228+
fileType,
229+
params,
230+
scanSpec,
231+
false,
232+
useColumnNames)) {}
213233

214234
void read(int64_t offset, const RowSet& rows, const uint64_t* incomingNulls)
215235
override {
@@ -230,13 +250,14 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> createReader(
230250
const TypePtr& requestedType,
231251
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
232252
DwrfParams& params,
233-
common::ScanSpec& scanSpec) {
253+
common::ScanSpec& scanSpec,
254+
bool useColumnNames) {
234255
if (scanSpec.isFlatMapAsStruct()) {
235256
return std::make_unique<SelectiveFlatMapAsStructReader<T>>(
236-
requestedType, fileType, params, scanSpec);
257+
requestedType, fileType, params, scanSpec, useColumnNames);
237258
} else {
238259
return std::make_unique<SelectiveFlatMapReader<T>>(
239-
requestedType, fileType, params, scanSpec);
260+
requestedType, fileType, params, scanSpec, useColumnNames);
240261
}
241262
}
242263

@@ -247,21 +268,26 @@ createSelectiveFlatMapColumnReader(
247268
const TypePtr& requestedType,
248269
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
249270
DwrfParams& params,
250-
common::ScanSpec& scanSpec) {
271+
common::ScanSpec& scanSpec,
272+
bool useColumnNames) {
251273
auto kind = fileType->childAt(0)->type()->kind();
252274
switch (kind) {
253275
case TypeKind::TINYINT:
254-
return createReader<int8_t>(requestedType, fileType, params, scanSpec);
276+
return createReader<int8_t>(
277+
requestedType, fileType, params, scanSpec, useColumnNames);
255278
case TypeKind::SMALLINT:
256-
return createReader<int16_t>(requestedType, fileType, params, scanSpec);
279+
return createReader<int16_t>(
280+
requestedType, fileType, params, scanSpec, useColumnNames);
257281
case TypeKind::INTEGER:
258-
return createReader<int32_t>(requestedType, fileType, params, scanSpec);
282+
return createReader<int32_t>(
283+
requestedType, fileType, params, scanSpec, useColumnNames);
259284
case TypeKind::BIGINT:
260-
return createReader<int64_t>(requestedType, fileType, params, scanSpec);
285+
return createReader<int64_t>(
286+
requestedType, fileType, params, scanSpec, useColumnNames);
261287
case TypeKind::VARBINARY:
262288
case TypeKind::VARCHAR:
263289
return createReader<StringView>(
264-
requestedType, fileType, params, scanSpec);
290+
requestedType, fileType, params, scanSpec, useColumnNames);
265291
default:
266292
VELOX_UNSUPPORTED("Not supported key type: {}", kind);
267293
}

velox/dwio/dwrf/reader/SelectiveFlatMapColumnReader.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ createSelectiveFlatMapColumnReader(
2626
const TypePtr& requestedType,
2727
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
2828
DwrfParams&,
29-
common::ScanSpec&);
29+
common::ScanSpec&,
30+
bool useColumnNames);
3031

3132
} // namespace facebook::velox::dwrf

velox/dwio/dwrf/reader/SelectiveRepeatedColumnReader.cpp

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ SelectiveListColumnReader::SelectiveListColumnReader(
5252
const TypePtr& requestedType,
5353
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
5454
DwrfParams& params,
55-
common::ScanSpec& scanSpec)
55+
common::ScanSpec& scanSpec,
56+
bool useColumnNames)
5657
: dwio::common::SelectiveListColumnReader(
5758
requestedType,
5859
fileType,
@@ -75,15 +76,20 @@ SelectiveListColumnReader::SelectiveListColumnReader(
7576
params.runtimeStatistics(),
7677
flatMapContextFromEncodingKey(encodingKey));
7778
child_ = SelectiveDwrfReader::build(
78-
childType, fileType_->childAt(0), childParams, *scanSpec_->children()[0]);
79+
childType,
80+
fileType_->childAt(0),
81+
childParams,
82+
*scanSpec_->children()[0],
83+
useColumnNames);
7984
children_ = {child_.get()};
8085
}
8186

8287
SelectiveMapColumnReader::SelectiveMapColumnReader(
8388
const TypePtr& requestedType,
8489
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
8590
DwrfParams& params,
86-
common::ScanSpec& scanSpec)
91+
common::ScanSpec& scanSpec,
92+
bool useColumnNames)
8793
: dwio::common::SelectiveMapColumnReader(
8894
requestedType,
8995
fileType,
@@ -111,7 +117,8 @@ SelectiveMapColumnReader::SelectiveMapColumnReader(
111117
keyType,
112118
fileType_->childAt(0),
113119
keyParams,
114-
*scanSpec_->children()[0].get());
120+
*scanSpec_->children()[0].get(),
121+
useColumnNames);
115122

116123
auto& valueType = requestedType_->childAt(1);
117124
auto elementParams = DwrfParams(
@@ -123,7 +130,8 @@ SelectiveMapColumnReader::SelectiveMapColumnReader(
123130
valueType,
124131
fileType_->childAt(1),
125132
elementParams,
126-
*scanSpec_->children()[1]);
133+
*scanSpec_->children()[1],
134+
useColumnNames);
127135
children_ = {keyReader_.get(), elementReader_.get()};
128136
}
129137

0 commit comments

Comments
 (0)