Skip to content

Commit 103d800

Browse files
authored
YT block input fix for dynamic tables (#11040)
1 parent 3c95041 commit 103d800

File tree

1 file changed

+16
-12
lines changed

1 file changed

+16
-12
lines changed

ydb/library/yql/providers/yt/codec/yt_codec_io.cpp

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1519,13 +1519,13 @@ class TArrowDecoder : public TMkqlReaderImpl::TDecoder {
15191519
return ReadNext();
15201520
}
15211521

1522-
auto rowIndices = batch->GetColumnByName("$row_index");
1523-
YQL_ENSURE(rowIndices);
1524-
15251522
auto& decoder = *Specs_.Inputs[TableIndex_];
15261523
auto& inputFields = decoder.FieldsVec;
15271524
YQL_ENSURE(inputFields.size() == ColumnConverters_.size());
15281525

1526+
auto rowIndices = batch->GetColumnByName("$row_index");
1527+
YQL_ENSURE(rowIndices || decoder.Dynamic);
1528+
15291529
arrow::compute::ExecContext execContext(Pool_);
15301530
std::vector<arrow::Datum> convertedBatch;
15311531
for (size_t i = 0; i < inputFields.size(); i++) {
@@ -1539,13 +1539,17 @@ class TArrowDecoder : public TMkqlReaderImpl::TDecoder {
15391539
convertedColumn = ARROW_RESULT(arrow::MakeArrayFromScalar(tableNameScalar, batch->num_rows(), Pool_));
15401540

15411541
} else if (decoder.FillSysColumnRecord == inputFields[i].StructIndex || decoder.FillSysColumnNum == inputFields[i].StructIndex) {
1542-
auto addFirst = ARROW_RESULT(arrow::compute::Cast(rowIndices, arrow::uint64(), arrow::compute::CastOptions::Safe(), &execContext));
1543-
auto addSecond = arrow::Datum(std::make_shared<arrow::UInt64Scalar>(1));
1544-
convertedColumn = ARROW_RESULT(arrow::compute::Add(addFirst, addSecond, arrow::compute::ArithmeticOptions(), &execContext));
1545-
1546-
if (decoder.FillSysColumnNum == inputFields[i].StructIndex) {
1547-
auto addThird = arrow::Datum(std::make_shared<arrow::UInt64Scalar>(Specs_.TableOffsets.at(TableIndex_)));
1548-
convertedColumn = ARROW_RESULT(arrow::compute::Add(convertedColumn, addThird, arrow::compute::ArithmeticOptions(), &execContext));
1542+
if (rowIndices) {
1543+
auto addFirst = ARROW_RESULT(arrow::compute::Cast(rowIndices, arrow::uint64(), arrow::compute::CastOptions::Safe(), &execContext));
1544+
auto addSecond = arrow::Datum(std::make_shared<arrow::UInt64Scalar>(1));
1545+
convertedColumn = ARROW_RESULT(arrow::compute::Add(addFirst, addSecond, arrow::compute::ArithmeticOptions(), &execContext));
1546+
1547+
if (decoder.FillSysColumnNum == inputFields[i].StructIndex) {
1548+
auto addThird = arrow::Datum(std::make_shared<arrow::UInt64Scalar>(Specs_.TableOffsets.at(TableIndex_)));
1549+
convertedColumn = ARROW_RESULT(arrow::compute::Add(convertedColumn, addThird, arrow::compute::ArithmeticOptions(), &execContext));
1550+
}
1551+
} else {
1552+
convertedColumn = ARROW_RESULT(arrow::MakeArrayFromScalar(arrow::UInt64Scalar(0), batch->num_rows(), Pool_));
15491553
}
15501554
} else if (decoder.FillSysColumnIndex == inputFields[i].StructIndex) {
15511555
convertedColumn = ARROW_RESULT(arrow::MakeArrayFromScalar(arrow::UInt32Scalar(TableIndex_), batch->num_rows()));
@@ -1561,13 +1565,13 @@ class TArrowDecoder : public TMkqlReaderImpl::TDecoder {
15611565
}
15621566

15631567
// index of the first row in the block
1564-
ui64 blockRowIndex = std::dynamic_pointer_cast<arrow::Int64Scalar>(ARROW_RESULT(rowIndices->GetScalar(0)))->value;
1568+
ui64 blockRowIndex = rowIndices ? std::dynamic_pointer_cast<arrow::Int64Scalar>(ARROW_RESULT(rowIndices->GetScalar(0)))->value : 0;
15651569

15661570
NUdf::TArgsDechunker dechunker(std::move(convertedBatch));
15671571
std::vector<arrow::Datum> chunk;
15681572
ui64 chunkLen = 0;
15691573
while (dechunker.Next(chunk, chunkLen)) {
1570-
Chunks_.emplace_back(blockRowIndex, chunkLen, std::move(chunk));
1574+
Chunks_.emplace_back(rowIndices ? blockRowIndex : 0, chunkLen, std::move(chunk));
15711575
blockRowIndex += chunkLen;
15721576
}
15731577

0 commit comments

Comments
 (0)