Skip to content

Commit

Permalink
[BugFix] Fix orc date column reader bug (StarRocks#29041)
Browse files Browse the repository at this point in the history
Signed-off-by: Smith Cruise <chendingchao1@126.com>
  • Loading branch information
Smith-Cruise committed Aug 22, 2023
1 parent 03ffb47 commit e262c95
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 106 deletions.
141 changes: 45 additions & 96 deletions be/src/formats/orc/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,8 @@ Status IntColumnReader<Type>::_fill_int_column_with_null_from_cvb(OrcColumnVecto
auto* values = ColumnHelper::cast_to_raw<Type>(null_column->data_column())->get_data().data();
auto* cvbd = data->data.data();

size_t pos = from;
for (size_t i = col_start; i < col_start + size; ++i, ++pos) {
values[i] = cvbd[pos];
for (size_t i = col_start, cvb_pos = from; i < col_start + size; ++i, ++cvb_pos) {
values[i] = cvbd[cvb_pos];
}

// col_start == 0 and from == 0 means it's at top level of fill chunk, not in the middle of array
Expand Down Expand Up @@ -260,9 +259,8 @@ Status IntColumnReader<Type>::_fill_int_column_from_cvb(OrcColumnVectorBatch* da

auto* cvbd = data->data.data();

auto pos = from;
for (size_t i = col_start; i < col_start + size; ++i, ++pos) {
values[i] = cvbd[pos];
for (size_t i = col_start, cvb_pos = from; i < col_start + size; ++i, ++cvb_pos) {
values[i] = cvbd[cvb_pos];
}

// col_start == 0 and from == 0 means it's at top level of fill chunk, not in the middle of array
Expand Down Expand Up @@ -316,122 +314,73 @@ Status DoubleColumnReader<Type>::get_next(orc::ColumnVectorBatch* cvb, ColumnPtr
}

Status DecimalColumnReader::get_next(orc::ColumnVectorBatch* cvb, ColumnPtr& col, size_t from, size_t size) {
size_t col_start = col->size();
col->resize_uninitialized(col_start + size);

if (_nullable) {
if (dynamic_cast<orc::Decimal64VectorBatch*>(cvb) != nullptr) {
_fill_decimal_column_with_null_from_orc_decimal64(cvb, col, from, size);
} else {
_fill_decimal_column_with_null_from_orc_decimal128(cvb, col, from, size);
}
auto c = ColumnHelper::as_raw_column<NullableColumn>(col);
handle_null(cvb, c, col_start, from, size);
}

Column* data_column = ColumnHelper::get_data_column(col.get());

if (dynamic_cast<orc::Decimal64VectorBatch*>(cvb) != nullptr) {
_fill_decimal_column_from_orc_decimal64(down_cast<orc::Decimal64VectorBatch*>(cvb), data_column, col_start,
from, size);
} else {
if (dynamic_cast<orc::Decimal64VectorBatch*>(cvb) != nullptr) {
_fill_decimal_column_from_orc_decimal64(cvb, col, from, size);
} else {
_fill_decimal_column_from_orc_decimal128(cvb, col, from, size);
}
_fill_decimal_column_from_orc_decimal128(down_cast<orc::Decimal128VectorBatch*>(cvb), data_column, col_start,
from, size);
}

return Status::OK();
}

void DecimalColumnReader::_fill_decimal_column_from_orc_decimal64(orc::ColumnVectorBatch* cvb, ColumnPtr& col,
size_t from, size_t size) {
auto* data = down_cast<orc::Decimal64VectorBatch*>(cvb);

int col_start = col->size();
col->resize(col->size() + size);

void DecimalColumnReader::_fill_decimal_column_from_orc_decimal64(orc::Decimal64VectorBatch* cvb, Column* col,
size_t col_start, size_t from, size_t size) {
static_assert(sizeof(DecimalV2Value) == sizeof(int128_t));
auto* values = reinterpret_cast<int128_t*>(down_cast<DecimalColumn*>(col.get())->get_data().data());
auto* values = reinterpret_cast<int128_t*>(down_cast<DecimalColumn*>(col)->get_data().data());

auto* cvbd = data->values.data();
auto* cvbd = cvb->values.data();

for (int i = col_start; i < col_start + size; ++i, ++from) {
values[i] = static_cast<int128_t>(cvbd[from]);
for (size_t i = col_start, cvb_pos = from; i < col_start + size; ++i, ++cvb_pos) {
values[i] = static_cast<int128_t>(cvbd[cvb_pos]);
}

if (DecimalV2Value::SCALE < data->scale) {
int128_t d = DecimalV2Value::get_scale_base(data->scale - DecimalV2Value::SCALE);
for (int i = col_start; i < col_start + size; ++i) {
if (DecimalV2Value::SCALE < cvb->scale) {
int128_t d = DecimalV2Value::get_scale_base(cvb->scale - DecimalV2Value::SCALE);
for (size_t i = col_start; i < col_start + size; ++i) {
values[i] = values[i] / d;
}
} else if (DecimalV2Value::SCALE > data->scale) {
int128_t m = DecimalV2Value::get_scale_base(DecimalV2Value::SCALE - data->scale);
for (int i = col_start; i < col_start + size; ++i) {
} else if (DecimalV2Value::SCALE > cvb->scale) {
int128_t m = DecimalV2Value::get_scale_base(DecimalV2Value::SCALE - cvb->scale);
for (size_t i = col_start; i < col_start + size; ++i) {
values[i] = values[i] * m;
}
}
}

void DecimalColumnReader::_fill_decimal_column_from_orc_decimal128(orc::ColumnVectorBatch* cvb, ColumnPtr& col,
size_t from, size_t size) {
auto* data = down_cast<orc::Decimal128VectorBatch*>(cvb);

int col_start = col->size();
col->resize(col->size() + size);
void DecimalColumnReader::_fill_decimal_column_from_orc_decimal128(orc::Decimal128VectorBatch* cvb, Column* col,
size_t col_start, size_t from, size_t size) {
auto* values = reinterpret_cast<int128_t*>(down_cast<DecimalColumn*>(col)->get_data().data());

auto* values = reinterpret_cast<int128_t*>(down_cast<DecimalColumn*>(col.get())->get_data().data());

for (int i = col_start; i < col_start + size; ++i, ++from) {
uint64_t hi = data->values[from].getHighBits();
uint64_t lo = data->values[from].getLowBits();
for (size_t i = col_start, cvb_pos = from; i < col_start + size; ++i, ++cvb_pos) {
uint64_t hi = cvb->values[cvb_pos].getHighBits();
uint64_t lo = cvb->values[cvb_pos].getLowBits();
values[i] = (((int128_t)hi) << 64) | (int128_t)lo;
}
if (DecimalV2Value::SCALE < data->scale) {
int128_t d = DecimalV2Value::get_scale_base(data->scale - DecimalV2Value::SCALE);
for (int i = col_start; i < col_start + size; ++i) {
if (DecimalV2Value::SCALE < cvb->scale) {
int128_t d = DecimalV2Value::get_scale_base(cvb->scale - DecimalV2Value::SCALE);
for (size_t i = col_start; i < col_start + size; ++i) {
values[i] = values[i] / d;
}
} else if (DecimalV2Value::SCALE > data->scale) {
int128_t m = DecimalV2Value::get_scale_base(DecimalV2Value::SCALE - data->scale);
for (int i = col_start; i < col_start + size; ++i) {
} else if (DecimalV2Value::SCALE > cvb->scale) {
int128_t m = DecimalV2Value::get_scale_base(DecimalV2Value::SCALE - cvb->scale);
for (size_t i = col_start; i < col_start + size; ++i) {
values[i] = values[i] * m;
}
}
}

void DecimalColumnReader::_fill_decimal_column_with_null_from_orc_decimal64(orc::ColumnVectorBatch* cvb, ColumnPtr& col,
size_t from, size_t size) {
int col_start = col->size();
auto c = ColumnHelper::as_raw_column<NullableColumn>(col);
auto& null_column = c->null_column();
auto& data_column = c->data_column();

_fill_decimal_column_from_orc_decimal64(cvb, data_column, from, size);
DCHECK_EQ(col_start + size, data_column->size());
null_column->resize(data_column->size());
auto* nulls = null_column->get_data().data();

auto pos = from;
if (cvb->hasNulls) {
auto* cvbn = reinterpret_cast<uint8_t*>(cvb->notNull.data());
for (int i = col_start; i < col_start + size; ++i, ++pos) {
nulls[i] = !cvbn[pos];
}
c->update_has_null();
}
}

void DecimalColumnReader::_fill_decimal_column_with_null_from_orc_decimal128(orc::ColumnVectorBatch* cvb,
ColumnPtr& col, size_t from, size_t size) {
int col_start = col->size();
auto c = ColumnHelper::as_raw_column<NullableColumn>(col);
auto& null_column = c->null_column();
auto& data_column = c->data_column();

_fill_decimal_column_from_orc_decimal128(cvb, data_column, from, size);
DCHECK_EQ(col_start + size, data_column->size());
null_column->resize(data_column->size());
auto* nulls = null_column->get_data().data();

auto pos = from;
if (cvb->hasNulls) {
auto* cvbn = reinterpret_cast<uint8_t*>(cvb->notNull.data());
for (int i = col_start; i < col_start + size; ++i, ++pos) {
nulls[i] = !cvbn[pos];
}
c->update_has_null();
}
}

template <LogicalType DecimalType>
Status Decimal32Or64Or128ColumnReader<DecimalType>::get_next(orc::ColumnVectorBatch* cvb, ColumnPtr& col, size_t from,
size_t size) {
Expand Down Expand Up @@ -711,12 +660,12 @@ Status DateColumnReader::get_next(orc::ColumnVectorBatch* cvb, ColumnPtr& column
if (!cvb->notNull[column_pos]) {
continue;
}
OrcDateHelper::orc_date_to_native_date(&(values[column_pos]), data->data[from]);
OrcDateHelper::orc_date_to_native_date(&(values[column_pos]), data->data[vb_pos]);
}
} else {
for (size_t column_pos = column_start, vb_pos = from; column_pos < column_start + size;
column_pos++, vb_pos++) {
OrcDateHelper::orc_date_to_native_date(&(values[column_pos]), data->data[from]);
OrcDateHelper::orc_date_to_native_date(&(values[column_pos]), data->data[vb_pos]);
}
}
return Status::OK();
Expand Down
14 changes: 4 additions & 10 deletions be/src/formats/orc/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,11 @@ class DecimalColumnReader : public PrimitiveColumnReader {
Status get_next(orc::ColumnVectorBatch* cvb, ColumnPtr& col, size_t from, size_t size) override;

private:
void _fill_decimal_column_from_orc_decimal64(orc::ColumnVectorBatch* cvb, starrocks::ColumnPtr& col, size_t from,
size_t size);
void _fill_decimal_column_from_orc_decimal64(orc::Decimal64VectorBatch* cvb, Column* col, size_t col_start,
size_t from, size_t size);

void _fill_decimal_column_from_orc_decimal128(orc::ColumnVectorBatch* cvb, starrocks::ColumnPtr& col, size_t from,
size_t size);

void _fill_decimal_column_with_null_from_orc_decimal64(orc::ColumnVectorBatch* cvb, starrocks::ColumnPtr& col,
size_t from, size_t size);

void _fill_decimal_column_with_null_from_orc_decimal128(orc::ColumnVectorBatch* cvb, starrocks::ColumnPtr& col,
size_t from, size_t size);
void _fill_decimal_column_from_orc_decimal128(orc::Decimal128VectorBatch* cvb, Column* col, size_t col_start,
size_t from, size_t size);
};

template <LogicalType DecimalType>
Expand Down
1 change: 1 addition & 0 deletions be/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ set(EXEC_FILES
./formats/avro/nullable_column_test.cpp
./formats/orc/orc_chunk_reader_test.cpp
./formats/orc/orc_lazy_load_test.cpp
./formats/orc/orc_column_reader_test.cpp
./formats/parquet/parquet_schema_test.cpp
./formats/parquet/encoding_test.cpp
./formats/parquet/page_reader_test.cpp
Expand Down
93 changes: 93 additions & 0 deletions be/test/formats/orc/orc_column_reader_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <gtest/gtest.h>

#include "formats/orc/column_reader.h"
#include "formats/orc/memory_stream/MemoryInputStream.hh"
#include "formats/orc/memory_stream/MemoryOutputStream.hh"
#include "formats/orc/orc_chunk_reader.h"
#include "formats/orc/orc_mapping.h"

namespace starrocks {

// 100mb buffer
const static size_t bufferSize = 100 * 1024 * 1024;

TEST(OrcColumnReaderTest, TestDateColumn) {
const static size_t batchSize = 3;

MemoryOutputStream buffer(bufferSize);
ORC_UNIQUE_PTR<orc::Type> schema(orc::Type::buildTypeFromString("struct<c0:date>"));
const orc::Type* orcType = schema->getSubtype(0);

// prepare data.
{
orc::WriterOptions writerOptions;
ORC_UNIQUE_PTR<orc::Writer> writer = createWriter(*schema, &buffer, writerOptions);

ORC_UNIQUE_PTR<orc::ColumnVectorBatch> batch = writer->createRowBatch(batchSize);
auto* root = dynamic_cast<orc::StructVectorBatch*>(batch.get());
auto* c0 = dynamic_cast<orc::LongVectorBatch*>(root->fields[0]);

for (size_t i = 0; i < batchSize; i++) {
c0->data[i] = i;
}
c0->notNull[0] = 1;
c0->notNull[1] = 0;
c0->notNull[2] = 1;

c0->numElements = batchSize;
root->numElements = batchSize;
writer->add(*batch);
writer->close();
}

// read
{
orc::ReaderOptions readerOptions;
ORC_UNIQUE_PTR<orc::InputStream> inputStream(new MemoryInputStream(buffer.getData(), buffer.getLength()));
ORC_UNIQUE_PTR<orc::Reader> reader = createReader(std::move(inputStream), readerOptions);

orc::RowReaderOptions options;
std::list<std::string> columns = {"c0"};
options.include(columns);
ORC_UNIQUE_PTR<orc::RowReader> rr = reader->createRowReader(options);

// Set for OrcMapping and OrcChunkReader, just used it to pass arguments in function, actually it's not used
const OrcMappingPtr orcMapping = nullptr;
OrcChunkReader orcChunkReader(batchSize, {});
orcChunkReader.disable_broker_load_mode();

TypeDescriptor c0Type = TypeDescriptor::from_logical_type(LogicalType::TYPE_DATE);

std::unique_ptr<ORCColumnReader> orcColumnReader =
ORCColumnReader::create(c0Type, orcType, true, orcMapping, &orcChunkReader).value();

ORC_UNIQUE_PTR<orc::ColumnVectorBatch> batch = rr->createRowBatch(batchSize);
auto* root = dynamic_cast<orc::StructVectorBatch*>(batch.get());
auto* c0 = dynamic_cast<orc::LongVectorBatch*>(root->fields[0]);
orc::RowReader::ReadPosition pos;
EXPECT_TRUE(rr->next(*batch, &pos));
ColumnPtr column = ColumnHelper::create_column(c0Type, true);
EXPECT_TRUE(orcColumnReader->get_next(c0, column, 0, batchSize).ok());
EXPECT_EQ(batchSize, column->size());

EXPECT_EQ("1970-01-01", column->debug_item(0));
EXPECT_EQ("NULL", column->debug_item(1));
EXPECT_EQ("1970-01-02", column->debug_item(2));
}
}

} // namespace starrocks

0 comments on commit e262c95

Please sign in to comment.