Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion velox/dwio/dwrf/common/FileMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,13 @@ TypeKind TypeWrapper::kind() const {
return TypeKind::VARCHAR;
case proto::orc::Type_Kind_DATE:
return TypeKind::DATE;
case proto::orc::Type_Kind_DECIMAL:
case proto::orc::Type_Kind_DECIMAL: {
if (orcPtr()->precision() <= 18) {
return TypeKind::SHORT_DECIMAL;
} else {
return TypeKind::LONG_DECIMAL;
}
}
case proto::orc::Type_Kind_CHAR:
case proto::orc::Type_Kind_TIMESTAMP_INSTANT:
DWIO_RAISE(
Expand Down
2 changes: 2 additions & 0 deletions velox/dwio/dwrf/reader/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ add_library(
SelectiveStringDirectColumnReader.cpp
SelectiveStringDictionaryColumnReader.cpp
SelectiveTimestampColumnReader.cpp
SelectiveShortDecimalColumnReader.cpp
SelectiveLongDecimalColumnReader.cpp
SelectiveStructColumnReader.cpp
SelectiveRepeatedColumnReader.cpp
StripeDictionaryCache.cpp
Expand Down
6 changes: 6 additions & 0 deletions velox/dwio/dwrf/reader/DwrfReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,12 @@ std::optional<size_t> DwrfRowReader::estimatedRowSizeHelper(
}
return totalEstimate;
}
case TypeKind::SHORT_DECIMAL: {
return valueCount * sizeof(uint64_t);
}
case TypeKind::LONG_DECIMAL: {
return valueCount * sizeof(uint128_t);
}
default:
return std::nullopt;
}
Expand Down
6 changes: 6 additions & 0 deletions velox/dwio/dwrf/reader/ReaderBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,12 @@ std::shared_ptr<const Type> ReaderBase::convertType(
// child doesn't hold.
return ROW(std::move(names), std::move(tl));
}
case TypeKind::LONG_DECIMAL:
return LONG_DECIMAL(
type.getOrcPtr()->precision(), type.getOrcPtr()->scale());
case TypeKind::SHORT_DECIMAL:
return SHORT_DECIMAL(
type.getOrcPtr()->precision(), type.getOrcPtr()->scale());
default:
DWIO_RAISE("Unknown type kind");
}
Expand Down
8 changes: 8 additions & 0 deletions velox/dwio/dwrf/reader/SelectiveDwrfReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
#include "velox/dwio/dwrf/reader/SelectiveFloatingPointColumnReader.h"
#include "velox/dwio/dwrf/reader/SelectiveIntegerDictionaryColumnReader.h"
#include "velox/dwio/dwrf/reader/SelectiveIntegerDirectColumnReader.h"
#include "velox/dwio/dwrf/reader/SelectiveLongDecimalColumnReader.h"
#include "velox/dwio/dwrf/reader/SelectiveRepeatedColumnReader.h"
#include "velox/dwio/dwrf/reader/SelectiveShortDecimalColumnReader.h"
#include "velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.h"
#include "velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h"
#include "velox/dwio/dwrf/reader/SelectiveStructColumnReader.h"
Expand Down Expand Up @@ -126,6 +128,12 @@ std::unique_ptr<SelectiveColumnReader> SelectiveDwrfReader::build(
case TypeKind::TIMESTAMP:
return std::make_unique<SelectiveTimestampColumnReader>(
requestedType, params, scanSpec);
case TypeKind::SHORT_DECIMAL:
return std::make_unique<SelectiveShortDecimalColumnReader>(
requestedType, dataType->type, params, scanSpec);
case TypeKind::LONG_DECIMAL:
return std::make_unique<SelectiveLongDecimalColumnReader>(
requestedType, dataType->type, params, scanSpec);
default:
DWIO_RAISE(
"buildReader unhandled type: " +
Expand Down
120 changes: 120 additions & 0 deletions velox/dwio/dwrf/reader/SelectiveLongDecimalColumnReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/dwio/dwrf/reader/SelectiveLongDecimalColumnReader.h"
#include "velox/dwio/common/BufferUtil.h"
#include "velox/dwio/dwrf/common/DecoderUtil.h"
#include "velox/dwio/dwrf/reader/SelectiveShortDecimalColumnReader.h"

namespace facebook::velox::dwrf {

using namespace dwio::common;

void SelectiveLongDecimalColumnReader::read(
vector_size_t offset,
RowSet rows,
const uint64_t* incomingNulls) {
// because scale's type is int64_t
prepareRead<int64_t>(offset, rows, incomingNulls);

bool isDense = rows.back() == rows.size() - 1;
velox::common::Filter* filter =
scanSpec_->filter() ? scanSpec_->filter() : &alwaysTrue();

if (scanSpec_->keepValues()) {
if (scanSpec_->valueHook()) {
if (isDense) {
processValueHook<true>(rows, scanSpec_->valueHook());
} else {
processValueHook<false>(rows, scanSpec_->valueHook());
}
return;
}

if (isDense) {
processFilter<true>(filter, ExtractToReader(this), rows);
} else {
processFilter<false>(filter, ExtractToReader(this), rows);
}
} else {
if (isDense) {
processFilter<true>(filter, DropValues(), rows);
} else {
processFilter<false>(filter, DropValues(), rows);
}
}
}

namespace {
void scaleInt128(int128_t& value, uint32_t scale, uint32_t currentScale) {
if (scale > currentScale) {
while (scale > currentScale) {
uint32_t scaleAdjust = std::min(
SelectiveShortDecimalColumnReader::MAX_PRECISION_64,
scale - currentScale);
value *= SelectiveShortDecimalColumnReader::POWERS_OF_TEN[scaleAdjust];
currentScale += scaleAdjust;
}
} else if (scale < currentScale) {
int128_t remainder;
while (currentScale > scale) {
uint32_t scaleAdjust = std::min(
SelectiveShortDecimalColumnReader::MAX_PRECISION_64,
currentScale - scale);
// TODO: YYM
// value =
// value.divide(SelectiveShortDecimalColumnReader::POWERS_OF_TEN[scaleAdjust],
// remainder);
currentScale -= scaleAdjust;
}
}
}
} // namespace

void SelectiveLongDecimalColumnReader::getValues(
RowSet rows,
VectorPtr* result) {
auto nullsPtr = nullsInReadRange_
? (returnReaderNulls_ ? nullsInReadRange_->as<uint64_t>()
: rawResultNulls_)
: nullptr;

auto decimalValues =
AlignedBuffer::allocate<UnscaledLongDecimal>(numValues_, &memoryPool_);
auto rawDecimalValues = decimalValues->asMutable<UnscaledLongDecimal>();

auto scales = scaleBuffer_->as<int64_t>();
auto values = values_->as<int128_t>();

// transfer to UnscaledLongDecimal
for (vector_size_t i = 0; i < numValues_; i++) {
if (!nullsPtr || !bits::isBitNull(nullsPtr, i)) {
int32_t currentScale = scales[i];
int128_t value = values[i];

scaleInt128(value, scale_, currentScale);

rawDecimalValues[i] = UnscaledLongDecimal(value);
}
}

values_ = decimalValues;
rawValues_ = values_->asMutable<char>();
getFlatValues<UnscaledLongDecimal, UnscaledLongDecimal>(
rows, result, type_, true);
}

} // namespace facebook::velox::dwrf
Loading