Skip to content

chore: sync internal changes to GitHub #34

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@ Remote functions
BigQuery DataFrames gives you the ability to turn your custom scalar functions
into `BigQuery remote functions
<https://cloud.google.com/bigquery/docs/remote-functions>`_ . Creating a remote
function in BigQuery DataFrames creates a BigQuery remote function, a `BigQuery
function in BigQuery DataFrames (See `code samples
<https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes>`_)
creates a BigQuery remote function, a `BigQuery
connection
<https://cloud.google.com/bigquery/docs/create-cloud-resource-connection>`_ ,
and a `Cloud Functions (2nd gen) function
Expand Down
21 changes: 12 additions & 9 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ def project_window_op(
window_spec: WindowSpec,
output_name=None,
*,
skip_null_groups=False,
never_skip_nulls=False,
skip_reproject_unsafe: bool = False,
) -> ArrayValue:
"""
Expand All @@ -609,7 +609,7 @@ def project_window_op(
op: the windowable operator to apply to the input column
window_spec: a specification of the window over which to apply the operator
output_name: the id to assign to the output of the operator, by default will replace input col if distinct output id not provided
skip_null_groups: will filter out any rows where any of the grouping keys is null
never_skip_nulls: will disable null skipping for operators that would otherwise do so
skip_reproject_unsafe: skips the reprojection step, can be used when performing many non-dependent window operations, user responsible for not nesting window expressions, or using outputs as join, filter or aggregation keys before a reprojection
"""
column = typing.cast(ibis_types.Column, self.get_column(column_name))
Expand All @@ -618,20 +618,23 @@ def project_window_op(
window_op = op._as_ibis(column, window)

clauses = []
if op.skips_nulls:
if op.skips_nulls and not never_skip_nulls:
clauses.append((column.isnull(), ibis.NA))
if skip_null_groups:
for key in window_spec.grouping_keys:
clauses.append((self.get_column(key).isnull(), ibis.NA))
if window_spec.min_periods:
if op.skips_nulls:
# Most operations do not count NULL values towards min_periods
observation_count = agg_ops.count_op._as_ibis(column, window)
else:
# Operations like count treat even NULLs as valid observations for the sake of min_periods
# notnull is just used to convert null values to non-null (FALSE) values to be counted
denulled_value = typing.cast(ibis_types.BooleanColumn, column.notnull())
observation_count = agg_ops.count_op._as_ibis(denulled_value, window)
clauses.append(
(
agg_ops.count_op._as_ibis(column, window)
< ibis_types.literal(window_spec.min_periods),
observation_count < ibis_types.literal(window_spec.min_periods),
ibis.NA,
)
)

if clauses:
case_statement = ibis.case()
for clause in clauses:
Expand Down
101 changes: 98 additions & 3 deletions bigframes/core/block_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,46 @@ def skew(
return block


def kurt(
block: blocks.Block,
skew_column_ids: typing.Sequence[str],
grouping_column_ids: typing.Sequence[str] = (),
) -> blocks.Block:
original_columns = skew_column_ids
column_labels = block.select_columns(original_columns).column_labels

block, delta4_ids = _mean_delta_to_power(
block, 4, original_columns, grouping_column_ids
)
# counts, moment4 for each column
aggregations = []
for i, col in enumerate(original_columns):
count_agg = (col, agg_ops.count_op)
moment4_agg = (delta4_ids[i], agg_ops.mean_op)
variance_agg = (col, agg_ops.PopVarOp())
aggregations.extend([count_agg, moment4_agg, variance_agg])

block, agg_ids = block.aggregate(
by_column_ids=grouping_column_ids, aggregations=aggregations
)

kurt_ids = []
for i, col in enumerate(original_columns):
# Corresponds to order of aggregations in preceding loop
count_id, moment4_id, var_id = agg_ids[i * 3 : (i * 3) + 3]
block, kurt_id = _kurt_from_moments_and_count(
block, count_id, moment4_id, var_id
)
kurt_ids.append(kurt_id)

block = block.select_columns(kurt_ids).with_column_labels(column_labels)
if not grouping_column_ids:
# When ungrouped, stack everything into single column so can be returned as series
block = block.stack()
block = block.drop_levels([block.index_columns[0]])
return block


def _mean_delta_to_power(
block: blocks.Block,
n_power,
Expand All @@ -375,13 +415,13 @@ def _mean_delta_to_power(


def _skew_from_moments_and_count(
block: blocks.Block, count_id: str, moment3_id: str, var_id: str
block: blocks.Block, count_id: str, moment3_id: str, moment2_id: str
) -> typing.Tuple[blocks.Block, str]:
# Calculate skew using count, third moment and population variance
# See G1 estimator:
# https://en.wikipedia.org/wiki/Skewness#Sample_skewness
block, denominator_id = block.apply_unary_op(
var_id, ops.partial_right(ops.pow_op, 3 / 2)
moment2_id, ops.partial_right(ops.unsafe_pow_op, 3 / 2)
)
block, base_id = block.apply_binary_op(moment3_id, denominator_id, ops.div_op)
block, countminus1_id = block.apply_unary_op(
Expand All @@ -392,7 +432,7 @@ def _skew_from_moments_and_count(
)
block, adjustment_id = block.apply_binary_op(count_id, countminus1_id, ops.mul_op)
block, adjustment_id = block.apply_unary_op(
adjustment_id, ops.partial_right(ops.pow_op, 1 / 2)
adjustment_id, ops.partial_right(ops.unsafe_pow_op, 1 / 2)
)
block, adjustment_id = block.apply_binary_op(
adjustment_id, countminus2_id, ops.div_op
Expand All @@ -405,3 +445,58 @@ def _skew_from_moments_and_count(
skew_id, na_cond_id, ops.partial_arg3(ops.where_op, None)
)
return block, skew_id


def _kurt_from_moments_and_count(
block: blocks.Block, count_id: str, moment4_id: str, moment2_id: str
) -> typing.Tuple[blocks.Block, str]:
# Kurtosis is often defined as the second standardize moment: moment(4)/moment(2)**2
# Pandas however uses Fisher’s estimator, implemented below
# numerator = (count + 1) * (count - 1) * moment4
# denominator = (count - 2) * (count - 3) * moment2**2
# adjustment = 3 * (count - 1) ** 2 / ((count - 2) * (count - 3))
# kurtosis = (numerator / denominator) - adjustment

# Numerator
block, countminus1_id = block.apply_unary_op(
count_id, ops.partial_right(ops.sub_op, 1)
)
block, countplus1_id = block.apply_unary_op(
count_id, ops.partial_right(ops.add_op, 1)
)
block, num_adj = block.apply_binary_op(countplus1_id, countminus1_id, ops.mul_op)
block, numerator_id = block.apply_binary_op(moment4_id, num_adj, ops.mul_op)

# Denominator
block, countminus2_id = block.apply_unary_op(
count_id, ops.partial_right(ops.sub_op, 2)
)
block, countminus3_id = block.apply_unary_op(
count_id, ops.partial_right(ops.sub_op, 3)
)
block, denom_adj = block.apply_binary_op(countminus2_id, countminus3_id, ops.mul_op)
block, popvar_squared = block.apply_unary_op(
moment2_id, ops.partial_right(ops.unsafe_pow_op, 2)
)
block, denominator_id = block.apply_binary_op(popvar_squared, denom_adj, ops.mul_op)

# Adjustment
block, countminus1_square = block.apply_unary_op(
countminus1_id, ops.partial_right(ops.unsafe_pow_op, 2)
)
block, adj_num = block.apply_unary_op(
countminus1_square, ops.partial_right(ops.mul_op, 3)
)
block, adj_denom = block.apply_binary_op(countminus2_id, countminus3_id, ops.mul_op)
block, adjustment_id = block.apply_binary_op(adj_num, adj_denom, ops.div_op)

# Combine
block, base_id = block.apply_binary_op(numerator_id, denominator_id, ops.div_op)
block, kurt_id = block.apply_binary_op(base_id, adjustment_id, ops.sub_op)

# Need to produce NA if have less than 4 data points
block, na_cond_id = block.apply_unary_op(count_id, ops.partial_right(ops.ge_op, 4))
block, kurt_id = block.apply_binary_op(
kurt_id, na_cond_id, ops.partial_arg3(ops.where_op, None)
)
return block, kurt_id
12 changes: 10 additions & 2 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ def multi_apply_window_op(
window_spec: core.WindowSpec,
*,
skip_null_groups: bool = False,
never_skip_nulls: bool = False,
) -> typing.Tuple[Block, typing.Sequence[str]]:
block = self
result_ids = []
Expand All @@ -721,6 +722,7 @@ def multi_apply_window_op(
skip_reproject_unsafe=(i + 1) < len(columns),
result_label=label,
skip_null_groups=skip_null_groups,
never_skip_nulls=never_skip_nulls,
)
result_ids.append(result_id)
return block, result_ids
Expand Down Expand Up @@ -751,15 +753,21 @@ def apply_window_op(
result_label: Label = None,
skip_null_groups: bool = False,
skip_reproject_unsafe: bool = False,
never_skip_nulls: bool = False,
) -> typing.Tuple[Block, str]:
block = self
if skip_null_groups:
for key in window_spec.grouping_keys:
block, not_null_id = block.apply_unary_op(key, ops.notnull_op)
block = block.filter(not_null_id).drop_columns([not_null_id])
result_id = guid.generate_guid()
expr = self._expr.project_window_op(
expr = block._expr.project_window_op(
column,
op,
window_spec,
result_id,
skip_null_groups=skip_null_groups,
skip_reproject_unsafe=skip_reproject_unsafe,
never_skip_nulls=never_skip_nulls,
)
block = Block(
expr,
Expand Down
70 changes: 63 additions & 7 deletions bigframes/core/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,18 @@ def skew(
block = block_ops.skew(self._block, self._selected_cols, self._by_col_ids)
return df.DataFrame(block)

def kurt(
self,
*,
numeric_only: bool = False,
) -> df.DataFrame:
if not numeric_only:
self._raise_on_non_numeric("kurt")
block = block_ops.kurt(self._block, self._selected_cols, self._by_col_ids)
return df.DataFrame(block)

kurtosis = kurt

def all(self) -> df.DataFrame:
return self._aggregate_all(agg_ops.all_op)

Expand Down Expand Up @@ -195,6 +207,36 @@ def diff(self, periods=1) -> series.Series:
)
return self._apply_window_op(agg_ops.DiffOp(periods), window=window)

def rolling(self, window: int, min_periods=None) -> windows.Window:
# To get n size window, need current row and n-1 preceding rows.
window_spec = core.WindowSpec(
grouping_keys=self._by_col_ids,
preceding=window - 1,
following=0,
min_periods=min_periods or window,
)
block = self._block.order_by(
[order.OrderingColumnReference(col) for col in self._by_col_ids],
stable=True,
)
return windows.Window(
block, window_spec, self._selected_cols, drop_null_groups=self._dropna
)

def expanding(self, min_periods: int = 1) -> windows.Window:
window_spec = core.WindowSpec(
grouping_keys=self._by_col_ids,
following=0,
min_periods=min_periods,
)
block = self._block.order_by(
[order.OrderingColumnReference(col) for col in self._by_col_ids],
stable=True,
)
return windows.Window(
block, window_spec, self._selected_cols, drop_null_groups=self._dropna
)

def agg(self, func=None, **kwargs) -> df.DataFrame:
if func:
if isinstance(func, str):
Expand Down Expand Up @@ -351,7 +393,7 @@ def _apply_window_op(
)
columns = self._aggregated_columns(numeric_only=numeric_only)
block, result_ids = self._block.multi_apply_window_op(
columns, op, window_spec=window_spec, skip_null_groups=self._dropna
columns, op, window_spec=window_spec
)
block = block.select_columns(result_ids)
return df.DataFrame(block)
Expand Down Expand Up @@ -422,6 +464,12 @@ def skew(self, *args, **kwargs) -> series.Series:
block = block_ops.skew(self._block, [self._value_column], self._by_col_ids)
return series.Series(block)

def kurt(self, *args, **kwargs) -> series.Series:
block = block_ops.kurt(self._block, [self._value_column], self._by_col_ids)
return series.Series(block)

kurtosis = kurt

def prod(self, *args) -> series.Series:
return self._aggregate(agg_ops.product_op)

Expand Down Expand Up @@ -510,7 +558,13 @@ def rolling(self, window: int, min_periods=None) -> windows.Window:
[order.OrderingColumnReference(col) for col in self._by_col_ids],
stable=True,
)
return windows.Window(block, window_spec, self._value_column)
return windows.Window(
block,
window_spec,
[self._value_column],
drop_null_groups=self._dropna,
is_series=True,
)

def expanding(self, min_periods: int = 1) -> windows.Window:
window_spec = core.WindowSpec(
Expand All @@ -522,10 +576,13 @@ def expanding(self, min_periods: int = 1) -> windows.Window:
[order.OrderingColumnReference(col) for col in self._by_col_ids],
stable=True,
)
return windows.Window(block, window_spec, self._value_column)

def _ungroup(self) -> series.Series:
return series.Series(self._block.select_column(self._value_column))
return windows.Window(
block,
window_spec,
[self._value_column],
drop_null_groups=self._dropna,
is_series=True,
)

def _aggregate(self, aggregate_op: agg_ops.AggregateOp) -> series.Series:
result_block, _ = self._block.aggregate(
Expand Down Expand Up @@ -553,6 +610,5 @@ def _apply_window_op(
op,
result_label=label,
window_spec=window_spec,
skip_null_groups=self._dropna,
)
return series.Series(block.select_column(result_id))
Loading