Skip to content

feat: Allow local arithmetic execution in hybrid engine #1906

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions bigframes/core/compile/polars/lowering.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@
@dataclasses.dataclass
class CoerceArgsRule(op_lowering.OpLoweringRule):
op_type: type[ops.BinaryOp]
bools_only: bool = False

@property
def op(self) -> type[ops.ScalarOp]:
return self.op_type

def lower(self, expr: expression.OpExpression) -> expression.Expression:
assert isinstance(expr.op, self.op_type)
larg, rarg = _coerce_comparables(expr.children[0], expr.children[1])
larg, rarg = _coerce_comparables(
expr.children[0], expr.children[1], bools_only=self.bools_only
)
return expr.op.as_expr(larg, rarg)


Expand All @@ -56,7 +59,18 @@ def lower(self, expr: expression.OpExpression) -> expression.Expression:
return ops.where_op.as_expr(zero_result, divisor_is_zero, expr)


def _coerce_comparables(expr1: expression.Expression, expr2: expression.Expression):
def _coerce_comparables(
expr1: expression.Expression,
expr2: expression.Expression,
*,
bools_only: bool = False
):
if bools_only:
if (
expr1.output_type != dtypes.BOOL_DTYPE
and expr2.output_type != dtypes.BOOL_DTYPE
):
return expr1, expr2

target_type = dtypes.coerce_to_common(expr1.output_type, expr2.output_type)
if expr1.output_type != target_type:
Expand Down Expand Up @@ -88,8 +102,20 @@ def _lower_cast(cast_op: ops.AsTypeOp, arg: expression.Expression):
)
)

LOWER_NUMERICS = tuple(
CoerceArgsRule(op, bools_only=True)
for op in (
numeric_ops.AddOp,
numeric_ops.SubOp,
numeric_ops.MulOp,
numeric_ops.DivOp,
numeric_ops.FloorDivOp,
numeric_ops.ModOp,
)
)
POLARS_LOWERING_RULES = (
*LOWER_COMPARISONS,
*LOWER_NUMERICS,
LowerFloorDivRule(),
)

Expand Down
23 changes: 15 additions & 8 deletions bigframes/core/compile/scalar_op_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1498,7 +1498,7 @@ def eq_op(
x: ibis_types.Value,
y: ibis_types.Value,
):
x, y = _coerce_comparables(x, y)
x, y = _coerce_bools(x, y)
return x == y


Expand All @@ -1508,7 +1508,7 @@ def eq_nulls_match_op(
y: ibis_types.Value,
):
"""Variant of eq_op where nulls match each other. Only use where dtypes are known to be same."""
x, y = _coerce_comparables(x, y)
x, y = _coerce_bools(x, y)
literal = ibis_types.literal("$NULL_SENTINEL$")
if hasattr(x, "fill_null"):
left = x.cast(ibis_dtypes.str).fill_null(literal)
Expand All @@ -1525,7 +1525,7 @@ def ne_op(
x: ibis_types.Value,
y: ibis_types.Value,
):
x, y = _coerce_comparables(x, y)
x, y = _coerce_bools(x, y)
return x != y


Expand All @@ -1537,7 +1537,7 @@ def _null_or_value(value: ibis_types.Value, where_value: ibis_types.BooleanValue
)


def _coerce_comparables(
def _coerce_bools(
x: ibis_types.Value,
y: ibis_types.Value,
):
Expand Down Expand Up @@ -1604,6 +1604,7 @@ def add_op(
x: ibis_types.Value,
y: ibis_types.Value,
):
x, y = _coerce_bools(x, y)
if isinstance(x, ibis_types.NullScalar) or isinstance(x, ibis_types.NullScalar):
return ibis_types.null()
return x + y # type: ignore
Expand All @@ -1615,6 +1616,7 @@ def sub_op(
x: ibis_types.Value,
y: ibis_types.Value,
):
x, y = _coerce_bools(x, y)
return typing.cast(ibis_types.NumericValue, x) - typing.cast(
ibis_types.NumericValue, y
)
Expand All @@ -1626,6 +1628,7 @@ def mul_op(
x: ibis_types.Value,
y: ibis_types.Value,
):
x, y = _coerce_bools(x, y)
return typing.cast(ibis_types.NumericValue, x) * typing.cast(
ibis_types.NumericValue, y
)
Expand All @@ -1637,6 +1640,7 @@ def div_op(
x: ibis_types.Value,
y: ibis_types.Value,
):
x, y = _coerce_bools(x, y)
return typing.cast(ibis_types.NumericValue, x) / typing.cast(
ibis_types.NumericValue, y
)
Expand All @@ -1648,6 +1652,7 @@ def pow_op(
x: ibis_types.Value,
y: ibis_types.Value,
):
x, y = _coerce_bools(x, y)
if x.type().is_integer() and y.type().is_integer():
return _int_pow_op(x, y)
else:
Expand All @@ -1661,6 +1666,7 @@ def unsafe_pow_op(
y: ibis_types.Value,
):
"""For internal use only - where domain and overflow checks are not needed."""
x, y = _coerce_bools(x, y)
return typing.cast(ibis_types.NumericValue, x) ** typing.cast(
ibis_types.NumericValue, y
)
Expand Down Expand Up @@ -1749,7 +1755,7 @@ def lt_op(
x: ibis_types.Value,
y: ibis_types.Value,
):
x, y = _coerce_comparables(x, y)
x, y = _coerce_bools(x, y)
return x < y


Expand All @@ -1759,7 +1765,7 @@ def le_op(
x: ibis_types.Value,
y: ibis_types.Value,
):
x, y = _coerce_comparables(x, y)
x, y = _coerce_bools(x, y)
return x <= y


Expand All @@ -1769,7 +1775,7 @@ def gt_op(
x: ibis_types.Value,
y: ibis_types.Value,
):
x, y = _coerce_comparables(x, y)
x, y = _coerce_bools(x, y)
return x > y


Expand All @@ -1779,7 +1785,7 @@ def ge_op(
x: ibis_types.Value,
y: ibis_types.Value,
):
x, y = _coerce_comparables(x, y)
x, y = _coerce_bools(x, y)
return x >= y


Expand Down Expand Up @@ -1822,6 +1828,7 @@ def mod_op(
x: ibis_types.Value,
y: ibis_types.Value,
):
x, y = _coerce_bools(x, y)
# Hacky short-circuit to avoid passing zero-literal to sql backend, evaluate locally instead to null.
op = y.op()
if isinstance(op, ibis_generic.Literal) and op.value == 0:
Expand Down
2 changes: 1 addition & 1 deletion bigframes/operations/numeric_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def output_type(self, *input_types):
left_type = input_types[0]
right_type = input_types[1]

if dtypes.is_datetime_like(left_type) and dtypes.is_datetime_like(right_type):
if left_type == dtypes.DATETIME_DTYPE and right_type == dtypes.DATETIME_DTYPE:
return dtypes.TIMEDELTA_DTYPE

if left_type == dtypes.DATE_DTYPE and right_type == dtypes.DATE_DTYPE:
Expand Down
24 changes: 16 additions & 8 deletions bigframes/session/polars_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from bigframes.core import array_value, bigframe_node, expression, local_data, nodes
import bigframes.operations
from bigframes.operations import aggregations as agg_ops
from bigframes.operations import comparison_ops, generic_ops, numeric_ops
from bigframes.session import executor, semi_executor

if TYPE_CHECKING:
Expand All @@ -41,13 +42,20 @@
)

_COMPATIBLE_SCALAR_OPS = (
bigframes.operations.eq_op,
bigframes.operations.eq_null_match_op,
bigframes.operations.ne_op,
bigframes.operations.gt_op,
bigframes.operations.lt_op,
bigframes.operations.ge_op,
bigframes.operations.le_op,
comparison_ops.EqOp,
comparison_ops.EqNullsMatchOp,
comparison_ops.NeOp,
comparison_ops.LtOp,
comparison_ops.GtOp,
comparison_ops.LeOp,
comparison_ops.GeOp,
generic_ops.WhereOp,
numeric_ops.AddOp,
numeric_ops.SubOp,
numeric_ops.MulOp,
numeric_ops.DivOp,
numeric_ops.FloorDivOp,
numeric_ops.ModOp,
)
_COMPATIBLE_AGG_OPS = (
agg_ops.SizeOp,
Expand All @@ -74,7 +82,7 @@ def _is_node_polars_executable(node: nodes.BigFrameNode):
if not type(expr.op) in _COMPATIBLE_AGG_OPS:
return False
if isinstance(expr, expression.Expression):
if not _get_expr_ops(expr).issubset(_COMPATIBLE_SCALAR_OPS):
if not set(map(type, _get_expr_ops(expr))).issubset(_COMPATIBLE_SCALAR_OPS):
return False
return True

Expand Down
80 changes: 80 additions & 0 deletions tests/system/small/engines/test_numeric_ops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import itertools
from typing import Callable

import pytest

from bigframes.core import array_value, expression
import bigframes.operations as ops
from bigframes.session import polars_executor
from bigframes.testing.engine_utils import assert_equivalence_execution

pytest.importorskip("polars")

# Polars used as reference as its fast and local. Generally though, prefer gbq engine where they disagree.
REFERENCE_ENGINE = polars_executor.PolarsExecutor()

OP_CONSTRAINTS: dict[
type[ops.BinaryOp],
Callable[[str, str], expression.Expression],
] = {ops.DivOp: lambda x, y: ops.ne_op.as_expr(y, expression.const(0))}


def apply_op_pairwise(
array: array_value.ArrayValue, op: ops.BinaryOp, excluded_cols=[]
) -> array_value.ArrayValue:
exprs = []
for l_arg, r_arg in itertools.permutations(array.column_ids, 2):
if (l_arg in excluded_cols) or (r_arg in excluded_cols):
continue
try:
_ = op.output_type(
array.get_column_type(l_arg), array.get_column_type(r_arg)
)
expr = op.as_expr(l_arg, r_arg)
op_type = type(op)
if op_type in OP_CONSTRAINTS:
expr = ops.where_op.as_expr(
expr, OP_CONSTRAINTS[op_type](l_arg, r_arg), expression.const(None)
)
exprs.append(op.as_expr(l_arg, r_arg))
except TypeError:
continue
assert len(exprs) > 0
new_arr, _ = array.compute_values(exprs)
return new_arr


@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True)
@pytest.mark.parametrize(
"op",
[
ops.add_op,
ops.sub_op,
ops.mul_op,
ops.div_op,
# ops.floordiv_op,
# ops.mod_op,
],
)
def test_engines_project_numeric_op(
scalars_array_value: array_value.ArrayValue, engine, op
):
# exclude string cols as does not contain dates
# bool col actually doesn't work properly for bq engine
# .select_columns(["datetime_col", "duration_col"]
arr = apply_op_pairwise(scalars_array_value, op)
assert_equivalence_execution(arr.node, REFERENCE_ENGINE, engine)