Skip to content

feat: Add experimental polars execution #1747

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions bigframes/_config/bigquery_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import google.auth.credentials
import requests.adapters

import bigframes._importing
import bigframes.enums
import bigframes.exceptions as bfe

Expand Down Expand Up @@ -94,6 +95,7 @@ def __init__(
requests_transport_adapters: Sequence[
Tuple[str, requests.adapters.BaseAdapter]
] = (),
enable_polars_execution: bool = False,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a compute option so it can be changed at runtime?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of just want to keep it as a constant within-session for now (will need to commit if making this a GA feature though). Turning polars execution on and off mid-session will make things like caching, multi-part execution really tricky

):
self._credentials = credentials
self._project = project
Expand All @@ -113,6 +115,9 @@ def __init__(
client_endpoints_override = {}

self._client_endpoints_override = client_endpoints_override
if enable_polars_execution:
bigframes._importing.import_polars()
self._enable_polars_execution = enable_polars_execution

@property
def application_name(self) -> Optional[str]:
Expand Down Expand Up @@ -424,3 +429,18 @@ def requests_transport_adapters(
SESSION_STARTED_MESSAGE.format(attribute="requests_transport_adapters")
)
self._requests_transport_adapters = value

@property
def enable_polars_execution(self) -> bool:
"""If True, will use polars to execute some simple query plans locally."""
return self._enable_polars_execution

@enable_polars_execution.setter
def enable_polars_execution(self, value: bool):
if value is True:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The setter might also be an opportune time to check that polars is installed and a compatible version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a util to check version

msg = bfe.format_message(
"Polars execution is an experimental feature, and may not be stable. Must have polars installed."
)
warnings.warn(msg, category=bfe.PreviewWarning)
bigframes._importing.import_polars()
self._enable_polars_execution = value
30 changes: 30 additions & 0 deletions bigframes/_importing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
from types import ModuleType

from packaging import version

# Keep this in sync with setup.py
POLARS_MIN_VERSION = version.Version("1.7.0")


def import_polars() -> ModuleType:
polars_module = importlib.import_module("polars")
imported_version = version.Version(polars_module.build_info()["version"])
if imported_version < POLARS_MIN_VERSION:
raise ImportError(
f"Imported polars version: {imported_version} is below the minimum version: {POLARS_MIN_VERSION}"
)
return polars_module
4 changes: 2 additions & 2 deletions bigframes/core/compile/polars/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,15 +391,15 @@ class PolarsCompiler:
expr_compiler = PolarsExpressionCompiler()
agg_compiler = PolarsAggregateCompiler()

def compile(self, array_value: bigframes.core.ArrayValue) -> pl.LazyFrame:
def compile(self, plan: nodes.BigFrameNode) -> pl.LazyFrame:
if not polars_installed:
raise ValueError(
"Polars is not installed, cannot compile to polars engine."
)

# TODO: Create standard way to configure BFET -> BFET rewrites
# Polars has incomplete slice support in lazy mode
node = array_value.node
node = plan
node = bigframes.core.rewrite.column_pruning(node)
node = nodes.bottom_up(node, bigframes.core.rewrite.rewrite_slice)
node = bigframes.core.rewrite.pull_out_window_order(node)
Expand Down
1 change: 1 addition & 0 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ def __init__(
storage_manager=self._temp_storage_manager,
strictly_ordered=self._strictly_ordered,
metrics=self._metrics,
enable_polars_execution=context.enable_polars_execution,
)

def __del__(self):
Expand Down
24 changes: 19 additions & 5 deletions bigframes/session/bq_caching_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,13 @@
import bigframes.dtypes
import bigframes.exceptions as bfe
import bigframes.features
from bigframes.session import executor, loader, local_scan_executor, read_api_execution
from bigframes.session import (
executor,
loader,
local_scan_executor,
read_api_execution,
semi_executor,
)
import bigframes.session._io.bigquery as bq_io
import bigframes.session.metrics
import bigframes.session.planner
Expand Down Expand Up @@ -146,6 +152,7 @@ def __init__(
*,
strictly_ordered: bool = True,
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
enable_polars_execution: bool = False,
):
self.bqclient = bqclient
self.storage_manager = storage_manager
Expand All @@ -154,14 +161,21 @@ def __init__(
self.metrics = metrics
self.loader = loader
self.bqstoragereadclient = bqstoragereadclient
# Simple left-to-right precedence for now
self._semi_executors = (
self._enable_polars_execution = enable_polars_execution
self._semi_executors: Sequence[semi_executor.SemiExecutor] = (
read_api_execution.ReadApiSemiExecutor(
bqstoragereadclient=bqstoragereadclient,
project=self.bqclient.project,
),
local_scan_executor.LocalScanExecutor(),
)
if enable_polars_execution:
from bigframes.session import polars_executor

self._semi_executors = (
*self._semi_executors,
polars_executor.PolarsExecutor(),
)
self._upload_lock = threading.Lock()

def to_sql(
Expand Down Expand Up @@ -636,8 +650,8 @@ def _execute_plan(
"""Just execute whatever plan as is, without further caching or decomposition."""
# First try to execute fast-paths
if not output_spec.require_bq_table:
for semi_executor in self._semi_executors:
maybe_result = semi_executor.execute(plan, ordered=ordered, peek=peek)
for exec in self._semi_executors:
maybe_result = exec.execute(plan, ordered=ordered, peek=peek)
if maybe_result:
return maybe_result

Expand Down
2 changes: 1 addition & 1 deletion bigframes/session/polars_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def execute(
# Note: Ignoring ordered flag, as just executing totally ordered is fine.
try:
lazy_frame: pl.LazyFrame = self._compiler.compile(
array_value.ArrayValue(plan)
array_value.ArrayValue(plan).node
)
except Exception:
return None
Expand Down
4 changes: 2 additions & 2 deletions bigframes/testing/polars_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def peek(
"""
A 'peek' efficiently accesses a small number of rows in the dataframe.
"""
lazy_frame: polars.LazyFrame = self.compiler.compile(array_value)
lazy_frame: polars.LazyFrame = self.compiler.compile(array_value.node)
pa_table = lazy_frame.collect().limit(n_rows).to_arrow()
# Currently, pyarrow types might not quite be exactly the ones in the bigframes schema.
# Nullability may be different, and might use large versions of list, string datatypes.
Expand All @@ -64,7 +64,7 @@ def execute(
"""
Execute the ArrayValue, storing the result to a temporary session-owned table.
"""
lazy_frame: polars.LazyFrame = self.compiler.compile(array_value)
lazy_frame: polars.LazyFrame = self.compiler.compile(array_value.node)
pa_table = lazy_frame.collect().to_arrow()
# Currently, pyarrow types might not quite be exactly the ones in the bigframes schema.
# Nullability may be different, and might use large versions of list, string datatypes.
Expand Down
2 changes: 1 addition & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
SYSTEM_TEST_EXTRAS: List[str] = []
SYSTEM_TEST_EXTRAS_BY_PYTHON: Dict[str, List[str]] = {
"3.9": ["tests"],
"3.10": ["tests"],
"3.10": ["tests", "polars"],
"3.12": ["tests", "scikit-learn", "polars"],
"3.13": ["tests", "polars"],
}
Expand Down
1 change: 1 addition & 0 deletions testing/constraints-3.10.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ matplotlib==3.7.1
psutil==5.9.5
seaborn==0.13.1
traitlets==5.7.1
polars==1.7.0
76 changes: 76 additions & 0 deletions tests/system/small/test_polars_execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest

import bigframes
from bigframes.testing.utils import assert_pandas_df_equal

polars = pytest.importorskip("polars", reason="polars is required for this test")


@pytest.fixture(scope="module")
def session_w_polars():
context = bigframes.BigQueryOptions(location="US", enable_polars_execution=True)
session = bigframes.Session(context=context)
yield session
session.close() # close generated session at cleanup time


def test_polar_execution_sorted(session_w_polars, scalars_pandas_df_index):
execution_count_before = session_w_polars._metrics.execution_count
bf_df = session_w_polars.read_pandas(scalars_pandas_df_index)

pd_result = scalars_pandas_df_index.sort_index(ascending=False)[
["int64_too", "bool_col"]
]
bf_result = bf_df.sort_index(ascending=False)[["int64_too", "bool_col"]].to_pandas()

assert session_w_polars._metrics.execution_count == execution_count_before
assert_pandas_df_equal(bf_result, pd_result)


def test_polar_execution_sorted_filtered(session_w_polars, scalars_pandas_df_index):
execution_count_before = session_w_polars._metrics.execution_count
bf_df = session_w_polars.read_pandas(scalars_pandas_df_index)

pd_result = scalars_pandas_df_index.sort_index(ascending=False).dropna(
subset=["int64_col", "string_col"]
)
bf_result = (
bf_df.sort_index(ascending=False)
.dropna(subset=["int64_col", "string_col"])
.to_pandas()
)

# Filter and isnull not supported by polar engine yet, so falls back to bq execution
assert session_w_polars._metrics.execution_count == (execution_count_before + 1)
assert_pandas_df_equal(bf_result, pd_result)


def test_polar_execution_unsupported_sql_fallback(
session_w_polars, scalars_pandas_df_index
):
execution_count_before = session_w_polars._metrics.execution_count
bf_df = session_w_polars.read_pandas(scalars_pandas_df_index)

pd_df = scalars_pandas_df_index.copy()
pd_df["str_len_col"] = pd_df.string_col.str.len()
pd_result = pd_df

bf_df["str_len_col"] = bf_df.string_col.str.len()
bf_result = bf_df.to_pandas()

# str len not supported by polar engine yet, so falls back to bq execution
assert session_w_polars._metrics.execution_count == (execution_count_before + 1)
assert_pandas_df_equal(bf_result, pd_result)