Skip to content

feat: support new runtime options for udf #1897

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions bigframes/functions/_function_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ def provision_bq_managed_function(
output_type: str,
name: Optional[str],
packages: Optional[Sequence[str]],
max_batching_rows: Optional[int],
container_cpu: Optional[float],
container_memory: Optional[str],
is_row_processor: bool,
bq_connection_id,
*,
Expand Down Expand Up @@ -234,6 +237,12 @@ def provision_bq_managed_function(
"runtime_version": _MANAGED_FUNC_PYTHON_VERSION,
"entry_point": "bigframes_handler",
}
if max_batching_rows:
managed_function_options["max_batching_rows"] = max_batching_rows
if container_cpu:
managed_function_options["container_cpu"] = container_cpu
if container_memory:
managed_function_options["container_memory"] = container_memory

# Augment user package requirements with any internal package
# requirements.
Expand Down
20 changes: 20 additions & 0 deletions bigframes/functions/_function_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,9 @@ def udf(
bigquery_connection: Optional[str] = None,
name: Optional[str] = None,
packages: Optional[Sequence[str]] = None,
max_batching_rows: Optional[int] = None,
container_cpu: Optional[float] = None,
container_memory: Optional[str] = None,
):
"""Decorator to turn a Python user defined function (udf) into a
BigQuery managed function.
Expand Down Expand Up @@ -769,6 +772,20 @@ def udf(
dependency is added to the `requirements.txt` as is, and can be
of the form supported in
https://pip.pypa.io/en/stable/reference/requirements-file-format/.
max_batching_rows (int, Optional):
The maximum number of rows in each batch. If you specify
max_batching_rows, BigQuery determines the number of rows in a
batch, up to the max_batching_rows limit. If max_batching_rows
is not specified, the number of rows to batch is determined
automatically.
container_cpu (float, Optional):
The CPU limits for containers that run Python UDFs. By default,
the CPU allocated is 0.33 vCPU.
container_memory (str, Optional):
The memory limits for containers that run Python UDFs. By
default, the memory allocated to each container instance is
512 MiB. See details at
https://cloud.google.com/bigquery/docs/user-defined-functions-python#configure-container-limits.
"""

warnings.warn("udf is in preview.", category=bfe.PreviewWarning, stacklevel=5)
Expand Down Expand Up @@ -854,6 +871,9 @@ def wrapper(func):
output_type=udf_sig.sql_output_type,
name=name,
packages=packages,
max_batching_rows=max_batching_rows,
container_cpu=container_cpu,
container_memory=container_memory,
is_row_processor=is_row_processor,
bq_connection_id=bq_connection_id,
)
Expand Down
6 changes: 6 additions & 0 deletions bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ def udf(
bigquery_connection: Optional[str] = None,
name: str,
packages: Optional[Sequence[str]] = None,
max_batching_rows: Optional[int] = None,
container_cpu: Optional[float] = None,
container_memory: Optional[str] = None,
):
return global_session.with_default_session(
bigframes.session.Session.udf,
Expand All @@ -151,6 +154,9 @@ def udf(
bigquery_connection=bigquery_connection,
name=name,
packages=packages,
max_batching_rows=max_batching_rows,
container_cpu=container_cpu,
container_memory=container_memory,
)


Expand Down
20 changes: 20 additions & 0 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1686,6 +1686,9 @@ def udf(
bigquery_connection: Optional[str] = None,
name: str,
packages: Optional[Sequence[str]] = None,
max_batching_rows: Optional[int] = None,
container_cpu: Optional[float] = None,
container_memory: Optional[str] = None,
):
"""Decorator to turn a Python user defined function (udf) into a
[BigQuery managed user-defined function](https://cloud.google.com/bigquery/docs/user-defined-functions-python).
Expand Down Expand Up @@ -1807,6 +1810,20 @@ def udf(
dependency is added to the `requirements.txt` as is, and can be
of the form supported in
https://pip.pypa.io/en/stable/reference/requirements-file-format/.
max_batching_rows (int, Optional):
The maximum number of rows in each batch. If you specify
max_batching_rows, BigQuery determines the number of rows in a
batch, up to the max_batching_rows limit. If max_batching_rows
is not specified, the number of rows to batch is determined
automatically.
container_cpu (float, Optional):
The CPU limits for containers that run Python UDFs. By default,
the CPU allocated is 0.33 vCPU.
container_memory (str, Optional):
The memory limits for containers that run Python UDFs. By
default, the memory allocated to each container instance is
512 MiB. See details at
https://cloud.google.com/bigquery/docs/user-defined-functions-python#configure-container-limits.
Returns:
collections.abc.Callable:
A managed function object pointing to the cloud assets created
Expand All @@ -1828,6 +1845,9 @@ def udf(
bigquery_connection=bigquery_connection,
name=name,
packages=packages,
max_batching_rows=max_batching_rows,
container_cpu=container_cpu,
container_memory=container_memory,
)

def read_gbq_function(
Expand Down
95 changes: 95 additions & 0 deletions tests/system/large/functions/test_managed_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,3 +549,98 @@ def foo(x: int) -> int:
finally:
# Clean up the gcp assets created for the managed function.
cleanup_function_assets(foo, session.bqclient, ignore_failures=False)


def test_managed_function_resources(session, dataset_id, scalars_dfs):
try:

def multiply_five(x: int) -> int:
return x * 5

mf_multiply_five = session.udf(
dataset=dataset_id,
name=prefixer.create_prefix(),
max_batching_rows=100,
container_cpu=2,
container_memory="2Gi",
)(multiply_five)

scalars_df, scalars_pandas_df = scalars_dfs

bf_int64_df = scalars_df["int64_col"]
bf_int64_df_filtered = bf_int64_df.dropna()
bf_result = bf_int64_df_filtered.apply(mf_multiply_five).to_pandas()

pd_int64_df = scalars_pandas_df["int64_col"]
pd_int64_df_filtered = pd_int64_df.dropna()
pd_result = pd_int64_df_filtered.apply(multiply_five)

pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False)

# Make sure the read_gbq_function path works for this function.
multiply_five_ref = session.read_gbq_function(
function_name=mf_multiply_five.bigframes_bigquery_function, # type: ignore
)
assert mf_multiply_five.bigframes_bigquery_function == multiply_five_ref.bigframes_bigquery_function # type: ignore

bf_result = bf_int64_df_filtered.apply(multiply_five_ref).to_pandas()
pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False)

# Retrieve the routine and validate its runtime configuration.
routine = session.bqclient.get_routine(
mf_multiply_five.bigframes_bigquery_function
)

assert routine._properties["externalRuntimeOptions"]["maxBatchingRows"] == "100"
assert routine._properties["externalRuntimeOptions"]["containerCpu"] == 2
assert routine._properties["externalRuntimeOptions"]["containerMemory"] == "2Gi"

finally:
# Clean up the gcp assets created for the managed function.
cleanup_function_assets(
mf_multiply_five, session.bqclient, ignore_failures=False
)


def test_managed_function_resources_errors(session, dataset_id):
def foo(x: int) -> int:
return 0

with pytest.raises(
google.api_core.exceptions.BadRequest,
# For CPU Value >= 1.0, the value must be one of [1, 2, ...].
match="Invalid container_cpu function OPTIONS value",
):
session.udf(
dataset=dataset_id,
name=prefixer.create_prefix(),
max_batching_rows=100,
container_cpu=2.5,
container_memory="2Gi",
)(foo)

with pytest.raises(
google.api_core.exceptions.BadRequest,
# For less than 1.0 CPU, the value must be no less than 0.33.
match="Invalid container_cpu function OPTIONS value",
):
session.udf(
dataset=dataset_id,
name=prefixer.create_prefix(),
max_batching_rows=100,
container_cpu=0.10,
container_memory="512Mi",
)(foo)

with pytest.raises(
google.api_core.exceptions.BadRequest,
# For 2.00 CPU, the memory must be in the range of [256Mi, 8Gi].
match="Invalid container_memory function OPTIONS value",
):
session.udf(
dataset=dataset_id,
name=prefixer.create_prefix(),
max_batching_rows=100,
container_cpu=2,
container_memory="64Mi",
)(foo)