Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions cpp/src/arrow/compute/kernels/codegen_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,13 @@ struct UnboxScalar<Decimal128Type> {
}
};

template <>
struct UnboxScalar<Decimal256Type> {
static Decimal256 Unbox(const Scalar& val) {
return checked_cast<const Decimal256Scalar&>(val).value;
}
};

template <typename Type, typename Enable = void>
struct BoxScalar;

Expand All @@ -354,6 +361,13 @@ struct BoxScalar<Decimal128Type> {
static void Box(T val, Scalar* out) { checked_cast<ScalarType*>(out)->value = val; }
};

template <>
struct BoxScalar<Decimal256Type> {
using T = Decimal256;
using ScalarType = Decimal256Scalar;
static void Box(T val, Scalar* out) { checked_cast<ScalarType*>(out)->value = val; }
};

// A VisitArrayDataInline variant that calls its visitor function with logical
// values, such as Decimal128 rather than util::string_view.

Expand Down Expand Up @@ -675,12 +689,13 @@ struct ScalarUnaryNotNullStateful {
};

template <typename Type>
struct ArrayExec<Type, enable_if_t<std::is_same<Type, Decimal128Type>::value>> {
struct ArrayExec<Type, enable_if_decimal<Type>> {
static void Exec(const ThisType& functor, KernelContext* ctx, const ArrayData& arg0,
Datum* out) {
ArrayData* out_arr = out->mutable_array();
// Decimal128 data buffers are not safely reinterpret_cast-able on big-endian
using endian_agnostic = std::array<uint8_t, sizeof(Decimal128)>;
using endian_agnostic =
std::array<uint8_t, sizeof(typename TypeTraits<Type>::ScalarType::ValueType)>;
auto out_data = out_arr->GetMutableValues<endian_agnostic>(1);
VisitArrayValuesInline<Arg0Type>(
arg0,
Expand Down
156 changes: 103 additions & 53 deletions cpp/src/arrow/compute/kernels/scalar_cast_numeric.cc
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ struct CastFunctor<O, I, enable_if_base_binary<I>> {
// Decimal to integer

struct DecimalToIntegerMixin {
template <typename OutValue>
OutValue ToInteger(KernelContext* ctx, const Decimal128& val) const {
template <typename OutValue, typename Arg0Value>
OutValue ToInteger(KernelContext* ctx, const Arg0Value& val) const {
constexpr auto min_value = std::numeric_limits<OutValue>::min();
constexpr auto max_value = std::numeric_limits<OutValue>::max();

Expand All @@ -326,7 +326,7 @@ struct UnsafeUpscaleDecimalToInteger : public DecimalToIntegerMixin {
using DecimalToIntegerMixin::DecimalToIntegerMixin;

template <typename OutValue, typename Arg0Value>
OutValue Call(KernelContext* ctx, Decimal128 val) const {
OutValue Call(KernelContext* ctx, Arg0Value val) const {
return ToInteger<OutValue>(ctx, val.IncreaseScaleBy(-in_scale_));
}
};
Expand All @@ -335,7 +335,7 @@ struct UnsafeDownscaleDecimalToInteger : public DecimalToIntegerMixin {
using DecimalToIntegerMixin::DecimalToIntegerMixin;

template <typename OutValue, typename Arg0Value>
OutValue Call(KernelContext* ctx, Decimal128 val) const {
OutValue Call(KernelContext* ctx, Arg0Value val) const {
return ToInteger<OutValue>(ctx, val.ReduceScaleBy(in_scale_, false));
}
};
Expand All @@ -344,7 +344,7 @@ struct SafeRescaleDecimalToInteger : public DecimalToIntegerMixin {
using DecimalToIntegerMixin::DecimalToIntegerMixin;

template <typename OutValue, typename Arg0Value>
OutValue Call(KernelContext* ctx, Decimal128 val) const {
OutValue Call(KernelContext* ctx, Arg0Value val) const {
auto result = val.Rescale(in_scale_, 0);
if (ARROW_PREDICT_FALSE(!result.ok())) {
ctx->SetStatus(result.status());
Expand All @@ -355,35 +355,33 @@ struct SafeRescaleDecimalToInteger : public DecimalToIntegerMixin {
}
};

template <typename O>
struct CastFunctor<O, Decimal128Type, enable_if_t<is_integer_type<O>::value>> {
template <typename O, typename I>
struct CastFunctor<O, I,
enable_if_t<is_integer_type<O>::value && is_decimal_type<I>::value>> {
using out_type = typename O::c_type;

static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
const auto& options = checked_cast<const CastState*>(ctx->state())->options;

const auto& in_type_inst = checked_cast<const Decimal128Type&>(*batch[0].type());
const auto& in_type_inst = checked_cast<const I&>(*batch[0].type());
const auto in_scale = in_type_inst.scale();

if (options.allow_decimal_truncate) {
if (in_scale < 0) {
// Unsafe upscale
applicator::ScalarUnaryNotNullStateful<O, Decimal128Type,
UnsafeUpscaleDecimalToInteger>
applicator::ScalarUnaryNotNullStateful<O, I, UnsafeUpscaleDecimalToInteger>
kernel(UnsafeUpscaleDecimalToInteger{in_scale, options.allow_int_overflow});
return kernel.Exec(ctx, batch, out);
} else {
// Unsafe downscale
applicator::ScalarUnaryNotNullStateful<O, Decimal128Type,
UnsafeDownscaleDecimalToInteger>
applicator::ScalarUnaryNotNullStateful<O, I, UnsafeDownscaleDecimalToInteger>
kernel(UnsafeDownscaleDecimalToInteger{in_scale, options.allow_int_overflow});
return kernel.Exec(ctx, batch, out);
}
} else {
// Safe rescale
applicator::ScalarUnaryNotNullStateful<O, Decimal128Type,
SafeRescaleDecimalToInteger>
kernel(SafeRescaleDecimalToInteger{in_scale, options.allow_int_overflow});
applicator::ScalarUnaryNotNullStateful<O, I, SafeRescaleDecimalToInteger> kernel(
SafeRescaleDecimalToInteger{in_scale, options.allow_int_overflow});
return kernel.Exec(ctx, batch, out);
}
}
Expand All @@ -392,72 +390,104 @@ struct CastFunctor<O, Decimal128Type, enable_if_t<is_integer_type<O>::value>> {
// ----------------------------------------------------------------------
// Decimal to decimal

// Helper that converts the input and output decimals
// For instance, Decimal128 -> Decimal256 requires converting, then scaling
// Decimal256 -> Decimal128 requires scaling, then truncating
template <typename OutDecimal, typename InDecimal>
struct DecimalConversions {};

template <typename InDecimal>
struct DecimalConversions<Decimal256, InDecimal> {
// Convert then scale
static Decimal256 ConvertInput(InDecimal&& val) { return Decimal256(val); }
static Decimal256 ConvertOutput(Decimal256&& val) { return val; }
};

template <>
struct DecimalConversions<Decimal128, Decimal256> {
// Scale then truncate
static Decimal256 ConvertInput(Decimal256&& val) { return val; }
static Decimal128 ConvertOutput(Decimal256&& val) {
return Decimal128(val.little_endian_array()[1], val.little_endian_array()[0]);
}
};

template <>
struct DecimalConversions<Decimal128, Decimal128> {
static Decimal128 ConvertInput(Decimal128&& val) { return val; }
static Decimal128 ConvertOutput(Decimal128&& val) { return val; }
};

struct UnsafeUpscaleDecimal {
template <typename... Unused>
Decimal128 Call(KernelContext* ctx, Decimal128 val) const {
return val.IncreaseScaleBy(by_);
template <typename OutValue, typename Arg0Value>
OutValue Call(KernelContext* ctx, Arg0Value val) const {
using Conv = DecimalConversions<OutValue, Arg0Value>;
return Conv::ConvertOutput(Conv::ConvertInput(std::move(val)).IncreaseScaleBy(by_));
}
int32_t by_;
};

struct UnsafeDownscaleDecimal {
template <typename... Unused>
Decimal128 Call(KernelContext* ctx, Decimal128 val) const {
return val.ReduceScaleBy(by_, false);
template <typename OutValue, typename Arg0Value>
OutValue Call(KernelContext* ctx, Arg0Value val) const {
using Conv = DecimalConversions<OutValue, Arg0Value>;
return Conv::ConvertOutput(
Conv::ConvertInput(std::move(val)).ReduceScaleBy(by_, false));
}
int32_t by_;
};

struct SafeRescaleDecimal {
template <typename... Unused>
Decimal128 Call(KernelContext* ctx, Decimal128 val) const {
auto maybe_rescaled = val.Rescale(in_scale_, out_scale_);
template <typename OutValue, typename Arg0Value>
OutValue Call(KernelContext* ctx, Arg0Value val) const {
using Conv = DecimalConversions<OutValue, Arg0Value>;
auto maybe_rescaled =
Conv::ConvertInput(std::move(val)).Rescale(in_scale_, out_scale_);
if (ARROW_PREDICT_FALSE(!maybe_rescaled.ok())) {
ctx->SetStatus(maybe_rescaled.status());
return {}; // Zero
}

if (ARROW_PREDICT_TRUE(maybe_rescaled->FitsInPrecision(out_precision_))) {
return maybe_rescaled.MoveValueUnsafe();
return Conv::ConvertOutput(maybe_rescaled.MoveValueUnsafe());
}

ctx->SetStatus(Status::Invalid("Decimal value does not fit in precision"));
ctx->SetStatus(
Status::Invalid("Decimal value does not fit in precision ", out_precision_));
return {}; // Zero
}

int32_t out_scale_, out_precision_, in_scale_;
};

template <>
struct CastFunctor<Decimal128Type, Decimal128Type> {
template <typename O, typename I>
struct CastFunctor<O, I,
enable_if_t<is_decimal_type<O>::value && is_decimal_type<I>::value>> {
static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
const auto& options = checked_cast<const CastState*>(ctx->state())->options;

const auto& in_type = checked_cast<const Decimal128Type&>(*batch[0].type());
const auto& out_type = checked_cast<const Decimal128Type&>(*out->type());
const auto& in_type = checked_cast<const I&>(*batch[0].type());
const auto& out_type = checked_cast<const O&>(*out->type());
const auto in_scale = in_type.scale();
const auto out_scale = out_type.scale();

if (options.allow_decimal_truncate) {
if (in_scale < out_scale) {
// Unsafe upscale
applicator::ScalarUnaryNotNullStateful<Decimal128Type, Decimal128Type,
UnsafeUpscaleDecimal>
kernel(UnsafeUpscaleDecimal{out_scale - in_scale});
applicator::ScalarUnaryNotNullStateful<O, I, UnsafeUpscaleDecimal> kernel(
UnsafeUpscaleDecimal{out_scale - in_scale});
return kernel.Exec(ctx, batch, out);
} else {
// Unsafe downscale
applicator::ScalarUnaryNotNullStateful<Decimal128Type, Decimal128Type,
UnsafeDownscaleDecimal>
kernel(UnsafeDownscaleDecimal{in_scale - out_scale});
applicator::ScalarUnaryNotNullStateful<O, I, UnsafeDownscaleDecimal> kernel(
UnsafeDownscaleDecimal{in_scale - out_scale});
return kernel.Exec(ctx, batch, out);
}
}

// Safe rescale
applicator::ScalarUnaryNotNullStateful<Decimal128Type, Decimal128Type,
SafeRescaleDecimal>
kernel(SafeRescaleDecimal{out_scale, out_type.precision(), in_scale});
applicator::ScalarUnaryNotNullStateful<O, I, SafeRescaleDecimal> kernel(
SafeRescaleDecimal{out_scale, out_type.precision(), in_scale});
return kernel.Exec(ctx, batch, out);
}
};
Expand All @@ -467,8 +497,8 @@ struct CastFunctor<Decimal128Type, Decimal128Type> {

struct RealToDecimal {
template <typename OutValue, typename RealType>
Decimal128 Call(KernelContext* ctx, RealType val) const {
auto maybe_decimal = Decimal128::FromReal(val, out_precision_, out_scale_);
OutValue Call(KernelContext* ctx, RealType val) const {
auto maybe_decimal = OutValue::FromReal(val, out_precision_, out_scale_);

if (ARROW_PREDICT_TRUE(maybe_decimal.ok())) {
return maybe_decimal.MoveValueUnsafe();
Expand All @@ -484,15 +514,16 @@ struct RealToDecimal {
bool allow_truncate_;
};

template <typename I>
struct CastFunctor<Decimal128Type, I, enable_if_t<is_floating_type<I>::value>> {
template <typename O, typename I>
struct CastFunctor<O, I,
enable_if_t<is_decimal_type<O>::value && is_floating_type<I>::value>> {
static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
const auto& options = checked_cast<const CastState*>(ctx->state())->options;
const auto& out_type = checked_cast<const Decimal128Type&>(*out->type());
const auto& out_type = checked_cast<const O&>(*out->type());
const auto out_scale = out_type.scale();
const auto out_precision = out_type.precision();

applicator::ScalarUnaryNotNullStateful<Decimal128Type, I, RealToDecimal> kernel(
applicator::ScalarUnaryNotNullStateful<O, I, RealToDecimal> kernel(
RealToDecimal{out_scale, out_precision, options.allow_decimal_truncate});
return kernel.Exec(ctx, batch, out);
}
Expand All @@ -503,20 +534,21 @@ struct CastFunctor<Decimal128Type, I, enable_if_t<is_floating_type<I>::value>> {

struct DecimalToReal {
template <typename RealType, typename Arg0Value>
RealType Call(KernelContext* ctx, const Decimal128& val) const {
return val.ToReal<RealType>(in_scale_);
RealType Call(KernelContext* ctx, const Arg0Value& val) const {
return val.template ToReal<RealType>(in_scale_);
}

int32_t in_scale_;
};

template <typename O>
struct CastFunctor<O, Decimal128Type, enable_if_t<is_floating_type<O>::value>> {
template <typename O, typename I>
struct CastFunctor<O, I,
enable_if_t<is_floating_type<O>::value && is_decimal_type<I>::value>> {
static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
const auto& in_type = checked_cast<const Decimal128Type&>(*batch[0].type());
const auto& in_type = checked_cast<const I&>(*batch[0].type());
const auto in_scale = in_type.scale();

applicator::ScalarUnaryNotNullStateful<O, Decimal128Type, DecimalToReal> kernel(
applicator::ScalarUnaryNotNullStateful<O, I, DecimalToReal> kernel(
DecimalToReal{in_scale});
return kernel.Exec(ctx, batch, out);
}
Expand Down Expand Up @@ -562,6 +594,8 @@ std::shared_ptr<CastFunction> GetCastToInteger(std::string name) {
// From decimal to integer
DCHECK_OK(func->AddKernel(Type::DECIMAL, {InputType(Type::DECIMAL)}, out_ty,
CastFunctor<OutType, Decimal128Type>::Exec));
DCHECK_OK(func->AddKernel(Type::DECIMAL256, {InputType(Type::DECIMAL256)}, out_ty,
CastFunctor<OutType, Decimal256Type>::Exec));
return func;
}

Expand All @@ -586,6 +620,8 @@ std::shared_ptr<CastFunction> GetCastToFloating(std::string name) {
// From decimal to floating point
DCHECK_OK(func->AddKernel(Type::DECIMAL, {InputType(Type::DECIMAL)}, out_ty,
CastFunctor<OutType, Decimal128Type>::Exec));
DCHECK_OK(func->AddKernel(Type::DECIMAL256, {InputType(Type::DECIMAL256)}, out_ty,
CastFunctor<OutType, Decimal256Type>::Exec));
return func;
}

Expand All @@ -606,17 +642,31 @@ std::shared_ptr<CastFunction> GetCastToDecimal128() {
// We resolve the output type of this kernel from the CastOptions
DCHECK_OK(
func->AddKernel(Type::DECIMAL128, {InputType(Type::DECIMAL128)}, sig_out_ty, exec));
exec = CastFunctor<Decimal128Type, Decimal256Type>::Exec;
DCHECK_OK(
func->AddKernel(Type::DECIMAL256, {InputType(Type::DECIMAL256)}, sig_out_ty, exec));
return func;
}

std::shared_ptr<CastFunction> GetCastToDecimal256() {
OutputType sig_out_ty(ResolveOutputFromOptions);

auto func = std::make_shared<CastFunction>("cast_decimal256", Type::DECIMAL256);
// Needed for Parquet conversion. Full implementation is ARROW-10606
// tracks full implementation.
AddCommonCasts(Type::DECIMAL256, sig_out_ty, func.get());

// Cast from floating point
DCHECK_OK(func->AddKernel(Type::FLOAT, {float32()}, sig_out_ty,
CastFunctor<Decimal256Type, FloatType>::Exec));
DCHECK_OK(func->AddKernel(Type::DOUBLE, {float64()}, sig_out_ty,
CastFunctor<Decimal256Type, DoubleType>::Exec));

// Cast from other decimal
auto exec = CastFunctor<Decimal256Type, Decimal128Type>::Exec;
DCHECK_OK(
func->AddKernel(Type::DECIMAL128, {InputType(Type::DECIMAL128)}, sig_out_ty, exec));
exec = CastFunctor<Decimal256Type, Decimal256Type>::Exec;
DCHECK_OK(
func->AddKernel(Type::DECIMAL256, {InputType(Type::DECIMAL256)}, sig_out_ty, exec));
return func;
}

Expand Down
Loading