Skip to content

chore: sync from internal git #27

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions .kokoro/docs/docs-presubmit-gerrit.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Format: //devtools/kokoro/config/proto/build.proto

env_vars: {
key: "V2_STAGING_BUCKET"
value: "gcloud-python-test"
}

# We only upload the image in the main `docs` build.
env_vars: {
key: "TRAMPOLINE_IMAGE_UPLOAD"
value: "false"
}

env_vars: {
key: "TRAMPOLINE_BUILD_FILE"
value: ".kokoro/build.sh"
}

# Only run this nox session.
env_vars: {
key: "NOX_SESSION"
value: "docfx"
}
7 changes: 7 additions & 0 deletions .kokoro/presubmit/e2e-gerrit.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Format: //devtools/kokoro/config/proto/build.proto

# Only run this nox session.
env_vars: {
key: "NOX_SESSION"
value: "system_noextras e2e notebook samples"
}
1 change: 1 addition & 0 deletions .kokoro/presubmit/presubmit-gerrit.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Format: //devtools/kokoro/config/proto/build.proto
1 change: 1 addition & 0 deletions OWNERS
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
ashleyxu@google.com
bmil@google.com
chelsealin@google.com
garrettwu@google.com
Expand Down
11 changes: 10 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,13 @@ internally to manage metadata on the service side. This session is tied to a
BigQuery DataFrames uses the US multi-region as the default location, but you
can use ``session_options.location`` to set a different location. Every query
in a session is executed in the location where the session was created.
BigQuery DataFrames
auto-populates ``bf.options.bigquery.location`` if the user starts with
``read_gbq/read_gbq_table/read_gbq_query()`` and specifies a table, either
directly or in a SQL statement.

If you want to reset the location of the created DataFrame or Series objects,
can reset the session by executing ``bigframes.pandas.reset_session()``.
you can reset the session by executing ``bigframes.pandas.reset_session()``.
After that, you can reuse ``bigframes.pandas.options.bigquery.location`` to
specify another location.

Expand All @@ -68,6 +72,11 @@ specify another location.
querying is not in the US multi-region. If you try to read a table from another
location, you get a NotFound exception.

Project
-------
If ``bf.options.bigquery.project`` is not set, the ``$GOOGLE_CLOUD_PROJECT``
environment variable is used, which is set in the notebook runtime serving the
BigQuery Studio/Vertex Notebooks.

ML Capabilities
---------------
Expand Down
2 changes: 1 addition & 1 deletion bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,8 @@ def aggregate(
"""
Apply aggregations to the expression.
Arguments:
by_column_id: column id of the aggregation key, this is preserved through the transform
aggregations: input_column_id, operation, output_column_id tuples
by_column_id: column id of the aggregation key, this is preserved through the transform
dropna: whether null keys should be dropped
"""
table = self.to_ibis_expr(ordering_mode="unordered")
Expand Down
176 changes: 176 additions & 0 deletions bigframes/core/block_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,25 @@ def value_counts(
return block.select_column(count_id).with_column_labels(["count"])


def pct_change(block: blocks.Block, periods: int = 1) -> blocks.Block:
column_labels = block.column_labels
window_spec = core.WindowSpec(
preceding=periods if periods > 0 else None,
following=-periods if periods < 0 else None,
)

original_columns = block.value_columns
block, shift_columns = block.multi_apply_window_op(
original_columns, agg_ops.ShiftOp(periods), window_spec=window_spec
)
result_ids = []
for original_col, shifted_col in zip(original_columns, shift_columns):
block, change_id = block.apply_binary_op(original_col, shifted_col, ops.sub_op)
block, pct_change_id = block.apply_binary_op(change_id, shifted_col, ops.div_op)
result_ids.append(pct_change_id)
return block.select_columns(result_ids).with_column_labels(column_labels)


def rank(
block: blocks.Block,
method: str = "average",
Expand Down Expand Up @@ -229,3 +248,160 @@ def dropna(block: blocks.Block, how: typing.Literal["all", "any"] = "any"):
filtered_block = filtered_block.filter(predicate)
filtered_block = filtered_block.select_columns(block.value_columns)
return filtered_block


def nsmallest(
block: blocks.Block,
n: int,
column_ids: typing.Sequence[str],
keep: str,
) -> blocks.Block:
if keep not in ("first", "last", "all"):
raise ValueError("'keep must be one of 'first', 'last', or 'all'")
if keep == "last":
block = block.reversed()
order_refs = [
ordering.OrderingColumnReference(
col_id, direction=ordering.OrderingDirection.ASC
)
for col_id in column_ids
]
block = block.order_by(order_refs, stable=True)
if keep in ("first", "last"):
return block.slice(0, n)
else: # keep == "all":
block, counter = block.apply_window_op(
column_ids[0],
agg_ops.rank_op,
window_spec=core.WindowSpec(ordering=order_refs),
)
block, condition = block.apply_unary_op(
counter, ops.partial_right(ops.le_op, n)
)
block = block.filter(condition)
return block.drop_columns([counter, condition])


def nlargest(
block: blocks.Block,
n: int,
column_ids: typing.Sequence[str],
keep: str,
) -> blocks.Block:
if keep not in ("first", "last", "all"):
raise ValueError("'keep must be one of 'first', 'last', or 'all'")
if keep == "last":
block = block.reversed()
order_refs = [
ordering.OrderingColumnReference(
col_id, direction=ordering.OrderingDirection.DESC
)
for col_id in column_ids
]
block = block.order_by(order_refs, stable=True)
if keep in ("first", "last"):
return block.slice(0, n)
else: # keep == "all":
block, counter = block.apply_window_op(
column_ids[0],
agg_ops.rank_op,
window_spec=core.WindowSpec(ordering=order_refs),
)
block, condition = block.apply_unary_op(
counter, ops.partial_right(ops.le_op, n)
)
block = block.filter(condition)
return block.drop_columns([counter, condition])


def skew(
block: blocks.Block,
skew_column_ids: typing.Sequence[str],
grouping_column_ids: typing.Sequence[str] = (),
) -> blocks.Block:

original_columns = skew_column_ids
column_labels = block.select_columns(original_columns).column_labels

block, delta3_ids = _mean_delta_to_power(
block, 3, original_columns, grouping_column_ids
)
# counts, moment3 for each column
aggregations = []
for i, col in enumerate(original_columns):
count_agg = (col, agg_ops.count_op)
moment3_agg = (delta3_ids[i], agg_ops.mean_op)
variance_agg = (col, agg_ops.PopVarOp())
aggregations.extend([count_agg, moment3_agg, variance_agg])

block, agg_ids = block.aggregate(
by_column_ids=grouping_column_ids, aggregations=aggregations
)

skew_ids = []
for i, col in enumerate(original_columns):
# Corresponds to order of aggregations in preceding loop
count_id, moment3_id, var_id = agg_ids[i * 3 : (i * 3) + 3]
block, skew_id = _skew_from_moments_and_count(
block, count_id, moment3_id, var_id
)
skew_ids.append(skew_id)

block = block.select_columns(skew_ids).with_column_labels(column_labels)
if not grouping_column_ids:
# When ungrouped, stack everything into single column so can be returned as series
block = block.stack()
block = block.drop_levels([block.index_columns[0]])
return block


def _mean_delta_to_power(
block: blocks.Block,
n_power,
column_ids: typing.Sequence[str],
grouping_column_ids: typing.Sequence[str],
) -> typing.Tuple[blocks.Block, typing.Sequence[str]]:
"""Calculate (x-mean(x))^n. Useful for calculating moment statistics such as skew and kurtosis."""
window = core.WindowSpec(grouping_keys=grouping_column_ids)
block, mean_ids = block.multi_apply_window_op(column_ids, agg_ops.mean_op, window)
delta_ids = []
cube_op = ops.partial_right(ops.pow_op, n_power)
for val_id, mean_val_id in zip(column_ids, mean_ids):
block, delta_id = block.apply_binary_op(val_id, mean_val_id, ops.sub_op)
block, delta_power_id = block.apply_unary_op(delta_id, cube_op)
block = block.drop_columns(delta_id)
delta_ids.append(delta_power_id)
return block, delta_ids


def _skew_from_moments_and_count(
block: blocks.Block, count_id: str, moment3_id: str, var_id: str
) -> typing.Tuple[blocks.Block, str]:
# Calculate skew using count, third moment and population variance
# See G1 estimator:
# https://en.wikipedia.org/wiki/Skewness#Sample_skewness
block, denominator_id = block.apply_unary_op(
var_id, ops.partial_right(ops.pow_op, 3 / 2)
)
block, base_id = block.apply_binary_op(moment3_id, denominator_id, ops.div_op)
block, countminus1_id = block.apply_unary_op(
count_id, ops.partial_right(ops.sub_op, 1)
)
block, countminus2_id = block.apply_unary_op(
count_id, ops.partial_right(ops.sub_op, 2)
)
block, adjustment_id = block.apply_binary_op(count_id, countminus1_id, ops.mul_op)
block, adjustment_id = block.apply_unary_op(
adjustment_id, ops.partial_right(ops.pow_op, 1 / 2)
)
block, adjustment_id = block.apply_binary_op(
adjustment_id, countminus2_id, ops.div_op
)
block, skew_id = block.apply_binary_op(base_id, adjustment_id, ops.mul_op)

# Need to produce NA if have less than 3 data points
block, na_cond_id = block.apply_unary_op(count_id, ops.partial_right(ops.ge_op, 3))
block, skew_id = block.apply_binary_op(
skew_id, na_cond_id, ops.partial_arg3(ops.where_op, None)
)
return block, skew_id
16 changes: 10 additions & 6 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,8 +709,9 @@ def multi_apply_window_op(
window_spec: core.WindowSpec,
*,
skip_null_groups: bool = False,
) -> Block:
) -> typing.Tuple[Block, typing.Sequence[str]]:
block = self
result_ids = []
for i, col_id in enumerate(columns):
label = self.col_id_to_label[col_id]
block, result_id = block.apply_window_op(
Expand All @@ -721,9 +722,8 @@ def multi_apply_window_op(
result_label=label,
skip_null_groups=skip_null_groups,
)
block = block.copy_values(result_id, col_id)
block = block.drop_columns([result_id])
return block
result_ids.append(result_id)
return block, result_ids

def multi_apply_unary_op(
self,
Expand Down Expand Up @@ -1123,7 +1123,9 @@ def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]:
)

def add_prefix(self, prefix: str, axis: str | int | None = None) -> Block:
axis_number = bigframes.core.utils.get_axis_number(axis)
axis_number = bigframes.core.utils.get_axis_number(
"rows" if (axis is None) else axis
)
if axis_number == 0:
expr = self._expr
for index_col in self._index_columns:
Expand All @@ -1140,7 +1142,9 @@ def add_prefix(self, prefix: str, axis: str | int | None = None) -> Block:
return self.rename(columns=lambda label: f"{prefix}{label}")

def add_suffix(self, suffix: str, axis: str | int | None = None) -> Block:
axis_number = bigframes.core.utils.get_axis_number(axis)
axis_number = bigframes.core.utils.get_axis_number(
"rows" if (axis is None) else axis
)
if axis_number == 0:
expr = self._expr
for index_col in self._index_columns:
Expand Down
44 changes: 40 additions & 4 deletions bigframes/core/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import bigframes.constants as constants
import bigframes.core as core
import bigframes.core.block_transforms as block_ops
import bigframes.core.blocks as blocks
import bigframes.core.ordering as order
import bigframes.core.utils as utils
Expand Down Expand Up @@ -145,6 +146,16 @@ def var(
self._raise_on_non_numeric("var")
return self._aggregate_all(agg_ops.var_op, numeric_only=True)

def skew(
self,
*,
numeric_only: bool = False,
) -> df.DataFrame:
if not numeric_only:
self._raise_on_non_numeric("skew")
block = block_ops.skew(self._block, self._selected_cols, self._by_col_ids)
return df.DataFrame(block)

def all(self) -> df.DataFrame:
return self._aggregate_all(agg_ops.all_op)

Expand All @@ -168,6 +179,22 @@ def cummax(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame:
def cumprod(self, *args, **kwargs) -> df.DataFrame:
return self._apply_window_op(agg_ops.product_op, numeric_only=True)

def shift(self, periods=1) -> series.Series:
window = core.WindowSpec(
grouping_keys=self._by_col_ids,
preceding=periods if periods > 0 else None,
following=-periods if periods < 0 else None,
)
return self._apply_window_op(agg_ops.ShiftOp(periods), window=window)

def diff(self, periods=1) -> series.Series:
window = core.WindowSpec(
grouping_keys=self._by_col_ids,
preceding=periods if periods > 0 else None,
following=-periods if periods < 0 else None,
)
return self._apply_window_op(agg_ops.DiffOp(periods), window=window)

def agg(self, func=None, **kwargs) -> df.DataFrame:
if func:
if isinstance(func, str):
Expand Down Expand Up @@ -323,10 +350,10 @@ def _apply_window_op(
grouping_keys=self._by_col_ids, following=0
)
columns = self._aggregated_columns(numeric_only=numeric_only)
block = self._block.multi_apply_window_op(
block, result_ids = self._block.multi_apply_window_op(
columns, op, window_spec=window_spec, skip_null_groups=self._dropna
)
block = block.select_columns(columns)
block = block.select_columns(result_ids)
return df.DataFrame(block)

def _resolve_label(self, label: blocks.Label) -> str:
Expand Down Expand Up @@ -391,6 +418,10 @@ def std(self, *args, **kwargs) -> series.Series:
def var(self, *args, **kwargs) -> series.Series:
return self._aggregate(agg_ops.var_op)

def skew(self, *args, **kwargs) -> series.Series:
block = block_ops.skew(self._block, [self._value_column], self._by_col_ids)
return series.Series(block)

def prod(self, *args) -> series.Series:
return self._aggregate(agg_ops.product_op)

Expand Down Expand Up @@ -459,8 +490,13 @@ def shift(self, periods=1) -> series.Series:
)
return self._apply_window_op(agg_ops.ShiftOp(periods), window=window)

def diff(self) -> series.Series:
return self._ungroup() - self.shift(1)
def diff(self, periods=1) -> series.Series:
window = core.WindowSpec(
grouping_keys=self._by_col_ids,
preceding=periods if periods > 0 else None,
following=-periods if periods < 0 else None,
)
return self._apply_window_op(agg_ops.DiffOp(periods), window=window)

def rolling(self, window: int, min_periods=None) -> windows.Window:
# To get n size window, need current row and n-1 preceding rows.
Expand Down
Loading