|
35 | 35 | reencode_order_string,
|
36 | 36 | StringEncoding,
|
37 | 37 | )
|
| 38 | +import bigframes.core.utils as utils |
38 | 39 | import bigframes.dtypes
|
39 | 40 | import bigframes.operations as ops
|
40 | 41 | import bigframes.operations.aggregations as agg_ops
|
@@ -562,6 +563,36 @@ def aggregate(
|
562 | 563 | ordering=ordering,
|
563 | 564 | )
|
564 | 565 |
|
| 566 | + def corr_aggregate( |
| 567 | + self, corr_aggregations: typing.Sequence[typing.Tuple[str, str, str]] |
| 568 | + ) -> ArrayValue: |
| 569 | + """ |
| 570 | + Get correlations between each lef_column_id and right_column_id, stored in the respective output_column_id. |
| 571 | + This uses BigQuery's CORR under the hood, and thus only Pearson's method is used. |
| 572 | + Arguments: |
| 573 | + corr_aggregations: left_column_id, right_column_id, output_column_id tuples |
| 574 | + """ |
| 575 | + table = self.to_ibis_expr(ordering_mode="unordered") |
| 576 | + stats = { |
| 577 | + col_out: table[col_left].corr(table[col_right], how="pop") |
| 578 | + for col_left, col_right, col_out in corr_aggregations |
| 579 | + } |
| 580 | + aggregates = {**stats, ORDER_ID_COLUMN: ibis_types.literal(0)} |
| 581 | + result = table.aggregate(**aggregates) |
| 582 | + # Ordering is irrelevant for single-row output, but set ordering id regardless as other ops(join etc.) expect it. |
| 583 | + ordering = ExpressionOrdering( |
| 584 | + ordering_value_columns=[OrderingColumnReference(ORDER_ID_COLUMN)], |
| 585 | + total_ordering_columns=frozenset([ORDER_ID_COLUMN]), |
| 586 | + integer_encoding=IntegerEncoding(is_encoded=True, is_sequential=True), |
| 587 | + ) |
| 588 | + return ArrayValue( |
| 589 | + self._session, |
| 590 | + result, |
| 591 | + columns=[result[col_id] for col_id in [*stats.keys()]], |
| 592 | + hidden_ordering_columns=[result[ORDER_ID_COLUMN]], |
| 593 | + ordering=ordering, |
| 594 | + ) |
| 595 | + |
565 | 596 | def project_window_op(
|
566 | 597 | self,
|
567 | 598 | column_name: str,
|
@@ -852,52 +883,91 @@ def _ibis_window_from_spec(self, window_spec: WindowSpec, allow_ties: bool = Fal
|
852 | 883 | group_by=group_by,
|
853 | 884 | )
|
854 | 885 |
|
855 |
| - def unpivot_single_row( |
| 886 | + def unpivot( |
856 | 887 | self,
|
857 | 888 | row_labels: typing.Sequence[typing.Hashable],
|
858 |
| - unpivot_columns: typing.Sequence[typing.Tuple[str, typing.Sequence[str]]], |
| 889 | + unpivot_columns: typing.Sequence[ |
| 890 | + typing.Tuple[str, typing.Sequence[typing.Optional[str]]] |
| 891 | + ], |
859 | 892 | *,
|
| 893 | + passthrough_columns: typing.Sequence[str] = (), |
860 | 894 | index_col_id: str = "index",
|
861 |
| - dtype=pandas.Float64Dtype(), |
| 895 | + dtype: typing.Union[ |
| 896 | + bigframes.dtypes.Dtype, typing.Sequence[bigframes.dtypes.Dtype] |
| 897 | + ] = pandas.Float64Dtype(), |
862 | 898 | ) -> ArrayValue:
|
863 |
| - """Unpivot a single row.""" |
864 |
| - # TODO: Generalize to multiple row input |
865 |
| - table = self.to_ibis_expr(ordering_mode="unordered") |
| 899 | + """ |
| 900 | + Unpivot ArrayValue columns. |
| 901 | +
|
| 902 | + Args: |
| 903 | + row_labels: Identifies the source of the row. Must be equal to length to source column list in unpivot_columns argument. |
| 904 | + unpivot_columns: Mapping of column id to list of input column ids. Lists of input columns may use None. |
| 905 | + passthrough_columns: Columns that will not be unpivoted. Column id will be preserved. |
| 906 | + index_col_id (str): The column id to be used for the row labels. |
| 907 | + dtype (dtype or list of dtype): Dtype to use for the unpivot columns. If list, must be equal in number to unpivot_columns. |
| 908 | +
|
| 909 | + Returns: |
| 910 | + ArrayValue: The unpivoted ArrayValue |
| 911 | + """ |
| 912 | + table = self.to_ibis_expr(ordering_mode="offset_col") |
866 | 913 | sub_expressions = []
|
867 | 914 |
|
868 |
| - # TODO: validate all columns are equal length, as well as row labels |
| 915 | + # Use ibis memtable to infer type of rowlabels (if possible) |
| 916 | + # TODO: Allow caller to specify dtype |
| 917 | + labels_ibis_type = ibis.memtable({"col": row_labels})["col"].type() |
| 918 | + labels_dtype = bigframes.dtypes.ibis_dtype_to_bigframes_dtype(labels_ibis_type) |
| 919 | + |
869 | 920 | row_n = len(row_labels)
|
870 | 921 | if not all(
|
871 | 922 | len(source_columns) == row_n for _, source_columns in unpivot_columns
|
872 | 923 | ):
|
873 | 924 | raise ValueError("Columns and row labels must all be same length.")
|
874 | 925 |
|
875 |
| - # Select each column |
876 | 926 | for i in range(row_n):
|
877 | 927 | values = []
|
878 |
| - for result_col, source_cols in unpivot_columns: |
879 |
| - values.append( |
880 |
| - ops.AsTypeOp(dtype)._as_ibis(table[source_cols[i]]).name(result_col) |
881 |
| - ) |
882 |
| - |
| 928 | + for j in range(len(unpivot_columns)): |
| 929 | + result_col, source_cols = unpivot_columns[j] |
| 930 | + col_dtype = dtype[j] if utils.is_list_like(dtype) else dtype |
| 931 | + if source_cols[i] is not None: |
| 932 | + values.append( |
| 933 | + ops.AsTypeOp(col_dtype) |
| 934 | + ._as_ibis(table[source_cols[i]]) |
| 935 | + .name(result_col) |
| 936 | + ) |
| 937 | + else: |
| 938 | + values.append( |
| 939 | + bigframes.dtypes.literal_to_ibis_scalar( |
| 940 | + None, force_dtype=col_dtype |
| 941 | + ).name(result_col) |
| 942 | + ) |
| 943 | + offsets_value = ( |
| 944 | + ((table[ORDER_ID_COLUMN] * row_n) + i) |
| 945 | + .cast(ibis_dtypes.int64) |
| 946 | + .name(ORDER_ID_COLUMN), |
| 947 | + ) |
883 | 948 | sub_expr = table.select(
|
884 |
| - ibis_types.literal(row_labels[i]).name(index_col_id), |
| 949 | + passthrough_columns, |
| 950 | + bigframes.dtypes.literal_to_ibis_scalar( |
| 951 | + row_labels[i], force_dtype=labels_dtype # type:ignore |
| 952 | + ).name(index_col_id), |
885 | 953 | *values,
|
886 |
| - ibis_types.literal(i).name(ORDER_ID_COLUMN), |
| 954 | + offsets_value, |
887 | 955 | )
|
888 | 956 | sub_expressions.append(sub_expr)
|
889 | 957 | rotated_table = ibis.union(*sub_expressions)
|
890 | 958 |
|
891 | 959 | value_columns = [
|
892 | 960 | rotated_table[value_col_id] for value_col_id, _ in unpivot_columns
|
893 | 961 | ]
|
| 962 | + passthrough_values = [rotated_table[col] for col in passthrough_columns] |
894 | 963 | return ArrayValue(
|
895 | 964 | session=self._session,
|
896 | 965 | table=rotated_table,
|
897 |
| - columns=[rotated_table[index_col_id], *value_columns], |
| 966 | + columns=[rotated_table[index_col_id], *value_columns, *passthrough_values], |
898 | 967 | hidden_ordering_columns=[rotated_table[ORDER_ID_COLUMN]],
|
899 | 968 | ordering=ExpressionOrdering(
|
900 | 969 | ordering_value_columns=[OrderingColumnReference(ORDER_ID_COLUMN)],
|
| 970 | + integer_encoding=IntegerEncoding(is_encoded=True, is_sequential=True), |
901 | 971 | total_ordering_columns=frozenset([ORDER_ID_COLUMN]),
|
902 | 972 | ),
|
903 | 973 | )
|
|
0 commit comments