Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions cpp/src/arrow/compute/api_aggregate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,14 @@ Result<Datum> MinMax(const Datum& value, const ScalarAggregateOptions& options,
return CallFunction("min_max", {value}, &options, ctx);
}

Result<Datum> Any(const Datum& value, ExecContext* ctx) {
return CallFunction("any", {value}, ctx);
Result<Datum> Any(const Datum& value, const ScalarAggregateOptions& options,
ExecContext* ctx) {
return CallFunction("any", {value}, &options, ctx);
}

Result<Datum> All(const Datum& value, ExecContext* ctx) {
return CallFunction("all", {value}, ctx);
Result<Datum> All(const Datum& value, const ScalarAggregateOptions& options,
ExecContext* ctx) {
return CallFunction("all", {value}, &options, ctx);
}

Result<Datum> Mode(const Datum& value, const ModeOptions& options, ExecContext* ctx) {
Expand Down
22 changes: 18 additions & 4 deletions cpp/src/arrow/compute/api_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,30 +205,44 @@ Result<Datum> MinMax(
/// \brief Test whether any element in a boolean array evaluates to true.
///
/// This function returns true if any of the elements in the array evaluates
/// to true and false otherwise. Null values are skipped.
/// to true and false otherwise. Null values are ignored by default.
/// If null values are taken into account by setting ScalarAggregateOptions
/// parameter skip_nulls = false then Kleene logic is used.
/// See KleeneOr for more details on Kleene logic.
///
/// \param[in] value input datum, expecting a boolean array
/// \param[in] options see ScalarAggregateOptions for more information
/// \param[in] ctx the function execution context, optional
/// \return resulting datum as a BooleanScalar
///
/// \since 3.0.0
/// \note API not yet finalized
ARROW_EXPORT
Result<Datum> Any(const Datum& value, ExecContext* ctx = NULLPTR);
Result<Datum> Any(
const Datum& value,
const ScalarAggregateOptions& options = ScalarAggregateOptions::Defaults(),
ExecContext* ctx = NULLPTR);

/// \brief Test whether all elements in a boolean array evaluate to true.
///
/// This function returns true if all of the elements in the array evaluate
/// to true and false otherwise. Null values are skipped.
/// to true and false otherwise. Null values are ignored by default.
/// If null values are taken into account by setting ScalarAggregateOptions
/// parameter skip_nulls = false then Kleene logic is used.
/// See KleeneAnd for more details on Kleene logic.
///
/// \param[in] value input datum, expecting a boolean array
/// \param[in] options see ScalarAggregateOptions for more information
/// \param[in] ctx the function execution context, optional
/// \return resulting datum as a BooleanScalar

/// \since 3.0.0
/// \note API not yet finalized
ARROW_EXPORT
Result<Datum> All(const Datum& value, ExecContext* ctx = NULLPTR);
Result<Datum> All(
const Datum& value,
const ScalarAggregateOptions& options = ScalarAggregateOptions::Defaults(),
ExecContext* ctx = NULLPTR);

/// \brief Calculate the modal (most common) value of a numeric array
///
Expand Down
64 changes: 51 additions & 13 deletions cpp/src/arrow/compute/kernels/aggregate_basic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,15 @@ Result<std::unique_ptr<KernelState>> MinMaxInit(KernelContext* ctx,
// Any implementation

struct BooleanAnyImpl : public ScalarAggregator {
explicit BooleanAnyImpl(ScalarAggregateOptions options) : options(std::move(options)) {}

Status Consume(KernelContext*, const ExecBatch& batch) override {
// short-circuit if seen a True already
if (this->any == true) {
return Status::OK();
}

const auto& data = *batch[0].array();
this->has_nulls = data.GetNullCount() > 0;
arrow::internal::OptionalBinaryBitBlockCounter counter(
data.buffers[0], data.offset, data.buffers[1], data.offset, data.length);
int64_t position = 0;
Expand All @@ -166,32 +168,48 @@ struct BooleanAnyImpl : public ScalarAggregator {
Status MergeFrom(KernelContext*, KernelState&& src) override {
const auto& other = checked_cast<const BooleanAnyImpl&>(src);
this->any |= other.any;
this->has_nulls |= other.has_nulls;
return Status::OK();
}

Status Finalize(KernelContext*, Datum* out) override {
out->value = std::make_shared<BooleanScalar>(this->any);
Status Finalize(KernelContext* ctx, Datum* out) override {
if (!options.skip_nulls && !this->any && this->has_nulls) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for the && !this->any part? That seems to suggest "kleene logic"? But for the non-kleene version, I expect any null in the input to always give null for the output.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed this is to to make the PR "kleen" to match R behavior. Meanwhile Pandas' any is non-kleen.

>>> import pandas as pd
>>> pd.Series([None, None]).any(skipna=True)
False
>>> pd.Series([None, None]).any(skipna=False)
>>> 

We have three options IMO:

  • Revert to non-kleene c++ behaviour and add a small fix to R
  • Add kleen any/all kernels and route in R depending on flags
  • Keep c++ as is and add a fix to Python (that could introduce a lot of new unvanted logic)

Copy link
Member

@jorisvandenbossche jorisvandenbossche Jun 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meanwhile Pandas' any is non-kleen.

The pandas any/all methods were broken for object dtype (and since numpy doesn't support nulls in its boolean dtype, whenever you have missing values, you have object dtype), so best not to use that as a reference (see eg pandas-dev/pandas#27709)

For the new nullable boolean dtype in pandas, the any/all methods also use kleene logic like in R.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So how about then just making this kernel "kleene" and just document that fact?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall I:

  1. Document the behavior as kleen?
  2. Rather revert the change and first implement al_kleen/any_kleen (ARROW-10291) and then map R to those for skip_nulls==False?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure there is a good use case for a non-kleene version, so I am fine with just documenting for now that the behaviour follows Kleene logic (so is the reducing version of and/or_kleene)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I documented the new behavior in compute.rst and api_aggregate.h.

out->value = std::make_shared<BooleanScalar>();
} else {
out->value = std::make_shared<BooleanScalar>(this->any);
}
return Status::OK();
}

bool any = false;
bool has_nulls = false;
ScalarAggregateOptions options;
};

Result<std::unique_ptr<KernelState>> AnyInit(KernelContext*, const KernelInitArgs& args) {
return ::arrow::internal::make_unique<BooleanAnyImpl>();
const ScalarAggregateOptions options =
static_cast<const ScalarAggregateOptions&>(*args.options);
return ::arrow::internal::make_unique<BooleanAnyImpl>(
static_cast<const ScalarAggregateOptions&>(*args.options));
}

// ----------------------------------------------------------------------
// All implementation

struct BooleanAllImpl : public ScalarAggregator {
explicit BooleanAllImpl(ScalarAggregateOptions options) : options(std::move(options)) {}

Status Consume(KernelContext*, const ExecBatch& batch) override {
// short-circuit if seen a false already
if (this->all == false) {
return Status::OK();
}

// short-circuit if seen a null already
if (!options.skip_nulls && this->has_nulls) {
return Status::OK();
}
const auto& data = *batch[0].array();
this->has_nulls = data.GetNullCount() > 0;
arrow::internal::OptionalBinaryBitBlockCounter counter(
data.buffers[1], data.offset, data.buffers[0], data.offset, data.length);
int64_t position = 0;
Expand All @@ -210,19 +228,27 @@ struct BooleanAllImpl : public ScalarAggregator {
Status MergeFrom(KernelContext*, KernelState&& src) override {
const auto& other = checked_cast<const BooleanAllImpl&>(src);
this->all &= other.all;
this->has_nulls |= other.has_nulls;
return Status::OK();
}

Status Finalize(KernelContext*, Datum* out) override {
out->value = std::make_shared<BooleanScalar>(this->all);
if (!options.skip_nulls && this->all && this->has_nulls) {
out->value = std::make_shared<BooleanScalar>();
} else {
out->value = std::make_shared<BooleanScalar>(this->all);
}
return Status::OK();
}

bool all = true;
bool has_nulls = false;
ScalarAggregateOptions options;
};

Result<std::unique_ptr<KernelState>> AllInit(KernelContext*, const KernelInitArgs& args) {
return ::arrow::internal::make_unique<BooleanAllImpl>();
return ::arrow::internal::make_unique<BooleanAllImpl>(
static_cast<const ScalarAggregateOptions&>(*args.options));
}

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -407,12 +433,22 @@ const FunctionDoc min_max_doc{"Compute the minimum and maximum values of a numer
"ScalarAggregateOptions"};

const FunctionDoc any_doc{"Test whether any element in a boolean array evaluates to true",
("Null values are ignored."),
{"array"}};
("Null values are ignored by default.\n"
"If null values are taken into account by setting "
"ScalarAggregateOptions parameter skip_nulls = false then "
"Kleene logic is used.\n"
"See KleeneOr for more details on Kleene logic."),
{"array"},
"ScalarAggregateOptions"};

const FunctionDoc all_doc{"Test whether all elements in a boolean array evaluate to true",
("Null values are ignored."),
{"array"}};
("Null values are ignored by default.\n"
"If null values are taken into account by setting "
"ScalarAggregateOptions parameter skip_nulls = false then "
"Kleene logic is used.\n"
"See KleeneAnd for more details on Kleene logic."),
{"array"},
"ScalarAggregateOptions"};

const FunctionDoc index_doc{"Find the index of the first occurrence of a given value",
("The result is always computed as an int64_t, regardless\n"
Expand Down Expand Up @@ -496,12 +532,14 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) {
DCHECK_OK(registry->AddFunction(std::move(func)));

// any
func = std::make_shared<ScalarAggregateFunction>("any", Arity::Unary(), &any_doc);
func = std::make_shared<ScalarAggregateFunction>("any", Arity::Unary(), &any_doc,
&default_scalar_aggregate_options);
aggregate::AddBasicAggKernels(aggregate::AnyInit, {boolean()}, boolean(), func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));

// all
func = std::make_shared<ScalarAggregateFunction>("all", Arity::Unary(), &all_doc);
func = std::make_shared<ScalarAggregateFunction>("all", Arity::Unary(), &all_doc,
&default_scalar_aggregate_options);
aggregate::AddBasicAggKernels(aggregate::AllInit, {boolean()}, boolean(), func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));

Expand Down
Loading