Skip to content

feat: add index dtype, astype, drop, fillna, aggregate attributes. #38

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 141 additions & 14 deletions bigframes/core/indexes/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from __future__ import annotations

import typing
from typing import Callable, Tuple
from typing import Callable, Sequence, Tuple, Union

import numpy as np
import pandas
Expand All @@ -26,7 +26,11 @@
import bigframes.core as core
import bigframes.core.blocks as blocks
import bigframes.core.joins as joins
import bigframes.core.utils as utils
import bigframes.dtypes
import bigframes.dtypes as bf_dtypes
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
import third_party.bigframes_vendored.pandas.core.indexes.base as vendored_pandas_index


Expand All @@ -51,16 +55,34 @@ def names(self) -> typing.Sequence[blocks.Label]:

@names.setter
def names(self, values: typing.Sequence[blocks.Label]):
return self._data._set_block(self._data._get_block().with_index_labels(values))
return self._data._set_block(self._block.with_index_labels(values))

@property
def nlevels(self) -> int:
return len(self._data._get_block().index_columns)

@property
def values(self) -> np.ndarray:
return self.to_numpy()

@property
def ndim(self) -> int:
return 1

@property
def shape(self) -> typing.Tuple[int]:
return (self._data._get_block().shape[0],)

@property
def dtype(self):
return self._block.index_dtypes[0] if self.nlevels == 1 else np.dtype("O")

@property
def dtypes(self) -> pandas.Series:
return pandas.Series(
data=self._block.index_dtypes, index=self._block.index_labels # type:ignore
)

@property
def size(self) -> int:
"""Returns the size of the Index."""
Expand Down Expand Up @@ -103,23 +125,120 @@ def is_monotonic_decreasing(self) -> bool:

@property
def is_unique(self) -> bool:
# TODO: Cache this at block level
# Avoid circular imports
return not self.has_duplicates

@property
def has_duplicates(self) -> bool:
# TODO: Cache this at block level
# Avoid circular imports
import bigframes.core.block_transforms as block_ops
import bigframes.dataframe as df

duplicates_block, _ = block_ops.indicate_duplicates(
self._data._get_block(), self._data._get_block().index_columns
)
duplicates_block = duplicates_block.with_column_labels(
["values", "is_duplicate"]
duplicates_block, indicator = block_ops.indicate_duplicates(
self._block, self._block.index_columns
)
duplicates_block = duplicates_block.select_columns(
[indicator]
).with_column_labels(["is_duplicate"])
duplicates_df = df.DataFrame(duplicates_block)
return not duplicates_df["is_duplicate"].any()
return duplicates_df["is_duplicate"].any()

@property
def _block(self) -> blocks.Block:
return self._data._get_block()

def astype(
self,
dtype: Union[bigframes.dtypes.DtypeString, bigframes.dtypes.Dtype],
) -> Index:
if self.nlevels > 1:
raise TypeError("Multiindex does not support 'astype'")
return self._apply_unary_op(ops.AsTypeOp(dtype))

def all(self) -> bool:
if self.nlevels > 1:
raise TypeError("Multiindex does not support 'all'")
return typing.cast(bool, self._apply_aggregation(agg_ops.all_op))

def any(self) -> bool:
if self.nlevels > 1:
raise TypeError("Multiindex does not support 'any'")
return typing.cast(bool, self._apply_aggregation(agg_ops.any_op))

def nunique(self) -> int:
return typing.cast(int, self._apply_aggregation(agg_ops.nunique_op))

def max(self) -> typing.Any:
return self._apply_aggregation(agg_ops.max_op)

def min(self) -> typing.Any:
return self._apply_aggregation(agg_ops.min_op)

def fillna(self, value=None) -> Index:
if self.nlevels > 1:
raise TypeError("Multiindex does not support 'fillna'")
return self._apply_unary_op(ops.partial_right(ops.fillna_op, value))

def rename(self, name: Union[str, Sequence[str]]) -> Index:
names = [name] if isinstance(name, str) else list(name)
if len(names) != self.nlevels:
raise ValueError("'name' must be same length as levels")

import bigframes.dataframe as df

return Index(df.DataFrame(self._block.with_index_labels(names)))

def drop(
self,
labels: typing.Any,
) -> Index:
# ignore axis, columns params
block = self._block
level_id = self._block.index_columns[0]
if utils.is_list_like(labels):
block, inverse_condition_id = block.apply_unary_op(
level_id, ops.IsInOp(labels, match_nulls=True)
)
block, condition_id = block.apply_unary_op(
inverse_condition_id, ops.invert_op
)
else:
block, condition_id = block.apply_unary_op(
level_id, ops.partial_right(ops.ne_op, labels)
)
block = block.filter(condition_id, keep_null=True)
block = block.drop_columns([condition_id])
import bigframes.dataframe as df

return Index(df.DataFrame(block.select_columns([])))

def _apply_unary_op(
self,
op: ops.UnaryOp,
) -> Index:
"""Applies a unary operator to the index."""
block = self._block
result_ids = []
for col in self._block.index_columns:
block, result_id = block.apply_unary_op(col, op)
result_ids.append(result_id)

block = block.set_index(result_ids, index_labels=self._block.index_labels)
import bigframes.dataframe as df

return Index(df.DataFrame(block))

def _apply_aggregation(self, op: agg_ops.AggregateOp) -> typing.Any:
if self.nlevels > 1:
raise NotImplementedError(f"Multiindex does not yet support {op.name}")
column_id = self._block.index_columns[0]
return self._block.get_stat(column_id, op)

def __getitem__(self, key: int) -> typing.Any:
if isinstance(key, int):
result_pd_df, _ = self._data._get_block().slice(key, key + 1, 1).to_pandas()
result_pd_df, _ = self._block.slice(key, key + 1, 1).to_pandas()
if result_pd_df.empty:
raise IndexError("single positional indexer is out-of-bounds")
return result_pd_df.index[0]
Expand All @@ -133,7 +252,7 @@ def to_pandas(self) -> pandas.Index:
pandas.Index:
A pandas Index with all of the labels from this Index.
"""
return IndexValue(self._data._get_block()).to_pandas()
return IndexValue(self._block).to_pandas()

def to_numpy(self, dtype=None, **kwargs) -> np.ndarray:
return self.to_pandas().to_numpy(dtype, **kwargs)
Expand Down Expand Up @@ -184,13 +303,15 @@ def __repr__(self) -> str:
def to_pandas(self) -> pandas.Index:
"""Executes deferred operations and downloads the results."""
# Project down to only the index column. So the query can be cached to visualize other data.
index_column = self._block.index_columns[0]
expr = self._expr.projection([self._expr.get_any_column(index_column)])
index_columns = list(self._block.index_columns)
expr = self._expr.projection(
[self._expr.get_any_column(col) for col in index_columns]
)
results, _ = expr.start_query()
df = expr._session._rows_to_dataframe(results)
df.set_index(index_column)
df = df.set_index(index_columns)
index = df.index
index.name = self._block._index_labels[0]
index.names = list(self._block._index_labels)
return index

def join(
Expand Down Expand Up @@ -235,6 +356,12 @@ def resolve_level_name(self: IndexValue, label: blocks.Label) -> str:
def is_uniquely_named(self: IndexValue):
return len(set(self.names)) == len(self.names)

def _set_block(self, block: blocks.Block):
self._block = block

def _get_block(self) -> blocks.Block:
return self._block


def join_mono_indexed(
left: IndexValue,
Expand Down
4 changes: 2 additions & 2 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ def drop(
# ignore axis, columns params
block = self._block
level_id = self._resolve_levels(level or 0)[0]
if _is_list_like(labels):
if _is_list_like(index):
block, inverse_condition_id = block.apply_unary_op(
level_id, ops.IsInOp(index, match_nulls=True)
)
Expand All @@ -296,7 +296,7 @@ def drop(
)
else:
block, condition_id = block.apply_unary_op(
level_id, ops.partial_right(ops.ne_op, labels)
level_id, ops.partial_right(ops.ne_op, index)
)
block = block.filter(condition_id, keep_null=True)
block = block.drop_columns([condition_id])
Expand Down
110 changes: 110 additions & 0 deletions tests/system/small/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import numpy
import pandas as pd

from tests.system.utils import assert_pandas_index_equal_ignore_index_type

Expand All @@ -25,13 +26,122 @@ def test_get_index(scalars_df_index, scalars_pandas_df_index):
assert_pandas_index_equal_ignore_index_type(bf_result, pd_result)


def test_index_has_duplicates(scalars_df_index, scalars_pandas_df_index):
bf_result = scalars_df_index.set_index("int64_col").index.has_duplicates
pd_result = scalars_pandas_df_index.set_index("int64_col").index.has_duplicates
assert bf_result == pd_result


def test_index_values(scalars_df_index, scalars_pandas_df_index):
bf_result = scalars_df_index.index.values
pd_result = scalars_pandas_df_index.index.values

# Numpy isn't equipped to compare non-numeric objects, so convert back to dataframe
pd.testing.assert_series_equal(
pd.Series(bf_result), pd.Series(pd_result), check_dtype=False
)


def test_index_ndim(scalars_df_index, scalars_pandas_df_index):
bf_result = scalars_df_index.index.ndim
pd_result = scalars_pandas_df_index.index.ndim

assert pd_result == bf_result


def test_index_dtype(scalars_df_index, scalars_pandas_df_index):
bf_result = scalars_df_index.index.dtype
pd_result = scalars_pandas_df_index.index.dtype

assert pd_result == bf_result


def test_index_dtypes(scalars_df_index, scalars_pandas_df_index):
bf_result = scalars_df_index.set_index(["string_col", "int64_too"]).index.dtypes
pd_result = scalars_pandas_df_index.set_index(
["string_col", "int64_too"]
).index.dtypes
pd.testing.assert_series_equal(bf_result, pd_result)


def test_index_shape(scalars_df_index, scalars_pandas_df_index):
bf_result = scalars_df_index.index.shape
pd_result = scalars_pandas_df_index.index.shape

assert bf_result == pd_result


def test_index_astype(scalars_df_index, scalars_pandas_df_index):
bf_result = (
scalars_df_index.set_index("int64_col").index.astype("Float64").to_pandas()
)
pd_result = scalars_pandas_df_index.set_index("int64_col").index.astype("Float64")
pd.testing.assert_index_equal(bf_result, pd_result)


def test_index_any(scalars_df_index, scalars_pandas_df_index):
bf_result = scalars_df_index.set_index("int64_col").index.any()
pd_result = scalars_pandas_df_index.set_index("int64_col").index.any()
assert bf_result == pd_result


def test_index_all(scalars_df_index, scalars_pandas_df_index):
bf_result = scalars_df_index.set_index("int64_col").index.all()
pd_result = scalars_pandas_df_index.set_index("int64_col").index.all()
assert bf_result == pd_result


def test_index_max(scalars_df_index, scalars_pandas_df_index):
bf_result = scalars_df_index.set_index("int64_col").index.max()
pd_result = scalars_pandas_df_index.set_index("int64_col").index.max()
assert bf_result == pd_result


def test_index_min(scalars_df_index, scalars_pandas_df_index):
bf_result = scalars_df_index.set_index("int64_col").index.min()
pd_result = scalars_pandas_df_index.set_index("int64_col").index.min()
assert bf_result == pd_result


def test_index_nunique(scalars_df_index, scalars_pandas_df_index):
bf_result = scalars_df_index.set_index("int64_col").index.nunique()
pd_result = scalars_pandas_df_index.set_index("int64_col").index.nunique()
assert bf_result == pd_result


def test_index_fillna(scalars_df_index, scalars_pandas_df_index):
bf_result = scalars_df_index.set_index("int64_col").index.fillna(42).to_pandas()
pd_result = scalars_pandas_df_index.set_index("int64_col").index.fillna(42)

pd.testing.assert_index_equal(bf_result, pd_result)


def test_index_drop(scalars_df_index, scalars_pandas_df_index):
bf_result = (
scalars_df_index.set_index("int64_col").index.drop([2, 314159]).to_pandas()
)
pd_result = scalars_pandas_df_index.set_index("int64_col").index.drop([2, 314159])
pd.testing.assert_index_equal(bf_result, pd_result)


def test_index_rename(scalars_df_index, scalars_pandas_df_index):
bf_result = scalars_df_index.set_index("int64_col").index.rename("name").to_pandas()
pd_result = scalars_pandas_df_index.set_index("int64_col").index.rename("name")
pd.testing.assert_index_equal(bf_result, pd_result)


def test_index_multi_rename(scalars_df_index, scalars_pandas_df_index):
bf_result = (
scalars_df_index.set_index(["int64_col", "int64_too"])
.index.rename(["new", "names"])
.to_pandas()
)
pd_result = scalars_pandas_df_index.set_index(
["int64_col", "int64_too"]
).index.rename(["new", "names"])
pd.testing.assert_index_equal(bf_result, pd_result)


def test_index_len(scalars_df_index, scalars_pandas_df_index):
bf_result = len(scalars_df_index.index)
pd_result = len(scalars_pandas_df_index.index)
Expand Down
Loading