From 285d9e2d91663e31fb7769faa699acdc05edcf10 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 26 Sep 2023 02:49:02 +0000 Subject: [PATCH 1/2] feat: add df.unstack --- bigframes/core/__init__.py | 109 ++++++++----- bigframes/core/blocks.py | 143 +++++++----------- bigframes/core/utils.py | 20 +++ bigframes/dataframe.py | 21 +++ tests/system/small/test_dataframe.py | 25 ++- tests/system/small/test_multiindex.py | 31 ++++ .../bigframes_vendored/pandas/core/frame.py | 15 ++ 7 files changed, 237 insertions(+), 127 deletions(-) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index c529f83351..3e41d923da 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -963,10 +963,11 @@ def unpivot( ], *, passthrough_columns: typing.Sequence[str] = (), - index_col_id: str = "index", + index_col_ids: typing.Sequence[str] = ["index"], dtype: typing.Union[ bigframes.dtypes.Dtype, typing.Sequence[bigframes.dtypes.Dtype] ] = pandas.Float64Dtype(), + how="left", ) -> ArrayValue: """ Unpivot ArrayValue columns. @@ -981,8 +982,11 @@ def unpivot( Returns: ArrayValue: The unpivoted ArrayValue """ - table = self._to_ibis_expr(ordering_mode="offset_col") + if how not in ("left", "right"): + raise ValueError("'how' must be 'left' or 'right'") + table = self._to_ibis_expr(ordering_mode="unordered", expose_hidden_cols=True) row_n = len(row_labels) + hidden_col_ids = self._hidden_ordering_column_names.keys() if not all( len(source_columns) == row_n for _, source_columns in unpivot_columns ): @@ -992,33 +996,40 @@ def unpivot( unpivot_table = table.cross_join( ibis.memtable({unpivot_offset_id: range(row_n)}) ) - unpivot_offsets_value = ( - ( - (unpivot_table[ORDER_ID_COLUMN] * row_n) - + unpivot_table[unpivot_offset_id] - ) - .cast(ibis_dtypes.int64) - .name(ORDER_ID_COLUMN), - ) - # Use ibis memtable to infer type of rowlabels (if possible) # TODO: Allow caller to specify dtype - labels_ibis_type = ibis.memtable({"col": row_labels})["col"].type() - labels_dtype = bigframes.dtypes.ibis_dtype_to_bigframes_dtype(labels_ibis_type) - cases = [ - ( - i, - bigframes.dtypes.literal_to_ibis_scalar( - row_labels[i], force_dtype=labels_dtype # type:ignore - ), - ) - for i in range(len(row_labels)) + if isinstance(row_labels[0], tuple): + labels_table = ibis.memtable(row_labels) + labels_ibis_types = [ + labels_table[col].type() for col in labels_table.columns + ] + else: + labels_ibis_types = [ibis.memtable({"col": row_labels})["col"].type()] + labels_dtypes = [ + bigframes.dtypes.ibis_dtype_to_bigframes_dtype(ibis_type) + for ibis_type in labels_ibis_types ] - labels_value = ( - typing.cast(ibis_types.IntegerColumn, unpivot_table[unpivot_offset_id]) - .cases(cases, default=None) # type:ignore - .name(index_col_id) - ) + + label_columns = [] + for label_part, (col_id, label_dtype) in enumerate( + zip(index_col_ids, labels_dtypes) + ): + cases = [ + ( + i, + bigframes.dtypes.literal_to_ibis_scalar( + row_labels[i][label_part], # type:ignore + force_dtype=label_dtype, # type:ignore + ), + ) + for i in range(len(row_labels)) + ] + labels_value = ( + typing.cast(ibis_types.IntegerColumn, unpivot_table[unpivot_offset_id]) + .cases(cases, default=None) # type:ignore + .name(col_id) + ) + label_columns.append(labels_value) unpivot_values = [] for j in range(len(unpivot_columns)): @@ -1042,23 +1053,53 @@ def unpivot( unpivot_values.append(unpivot_value.name(result_col)) unpivot_table = unpivot_table.select( - passthrough_columns, labels_value, *unpivot_values, unpivot_offsets_value + passthrough_columns, + *label_columns, + *unpivot_values, + *hidden_col_ids, + unpivot_offset_id, ) + # Extend the original ordering using unpivot_offset_id + old_ordering = self._ordering + if how == "left": + new_ordering = ExpressionOrdering( + ordering_value_columns=[ + *old_ordering.ordering_value_columns, + OrderingColumnReference(unpivot_offset_id), + ], + total_ordering_columns=frozenset( + [*old_ordering.total_ordering_columns, unpivot_offset_id] + ), + ) + else: # how=="right" + new_ordering = ExpressionOrdering( + ordering_value_columns=[ + OrderingColumnReference(unpivot_offset_id), + *old_ordering.ordering_value_columns, + ], + total_ordering_columns=frozenset( + [*old_ordering.total_ordering_columns, unpivot_offset_id] + ), + ) value_columns = [ unpivot_table[value_col_id] for value_col_id, _ in unpivot_columns ] passthrough_values = [unpivot_table[col] for col in passthrough_columns] + hidden_ordering_columns = [ + unpivot_table[unpivot_offset_id], + *[unpivot_table[hidden_col] for hidden_col in hidden_col_ids], + ] return ArrayValue( session=self._session, table=unpivot_table, - columns=[unpivot_table[index_col_id], *value_columns, *passthrough_values], - hidden_ordering_columns=[unpivot_table[ORDER_ID_COLUMN]], - ordering=ExpressionOrdering( - ordering_value_columns=[OrderingColumnReference(ORDER_ID_COLUMN)], - integer_encoding=IntegerEncoding(is_encoded=True, is_sequential=True), - total_ordering_columns=frozenset([ORDER_ID_COLUMN]), - ), + columns=[ + *[unpivot_table[col_id] for col_id in index_col_ids], + *value_columns, + *passthrough_values, + ], + hidden_ordering_columns=hidden_ordering_columns, + ordering=new_ordering, ) def assign(self, source_id: str, destination_id: str) -> ArrayValue: diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index b53c2212c1..863852c684 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -838,7 +838,7 @@ def aggregate_all_and_stack( ] result_expr = self.expr.aggregate(aggregations, dropna=dropna).unpivot( row_labels=self.column_labels.to_list(), - index_col_id="index", + index_col_ids=["index"], unpivot_columns=[(value_col_id, self.value_columns)], dtype=dtype, ) @@ -849,7 +849,7 @@ def aggregate_all_and_stack( expr_with_offsets, offset_col = self.expr.promote_offsets() stacked_expr = expr_with_offsets.unpivot( row_labels=self.column_labels.to_list(), - index_col_id=guid.generate_guid(), + index_col_ids=[guid.generate_guid()], unpivot_columns=[(value_col_id, self.value_columns)], passthrough_columns=[*self.index_columns, offset_col], dtype=dtype, @@ -1041,7 +1041,7 @@ def summarize( expr = self.expr.aggregate(aggregations).unpivot( labels, unpivot_columns=columns, - index_col_id=label_col_id, + index_col_ids=[label_col_id], ) labels = self._get_labels_for_columns(column_ids) return Block(expr, column_labels=labels, index_columns=[label_col_id]) @@ -1225,116 +1225,83 @@ def pivot( return result_block.with_column_labels(column_index) - def stack(self): + def stack(self, how="left", dropna=True, sort=True, levels: int = 1): """Unpivot last column axis level into row axis""" - if isinstance(self.column_labels, pd.MultiIndex): - return self._stack_multi() - else: - return self._stack_mono() - - def _stack_mono(self): - if isinstance(self.column_labels, pd.MultiIndex): - raise ValueError("Expected single level index") - # These are the values that will be turned into rows - stack_values = self.column_labels.drop_duplicates().sort_values() - # Get matching columns - unpivot_columns: List[Tuple[str, List[str]]] = [] - dtypes: List[bigframes.dtypes.Dtype] = [] - col_id = guid.generate_guid("unpivot_") - dtype = None - input_columns: Sequence[Optional[str]] = [] - for uvalue in stack_values: - matching_ids = self.label_to_col_id.get(uvalue, []) - input_id = matching_ids[0] if len(matching_ids) > 0 else None - if input_id: - if dtype and dtype != self._column_type(input_id): - raise NotImplementedError( - "Cannot stack columns with non-matching dtypes." - ) - else: - dtype = self._column_type(input_id) - input_columns.append(input_id) - unpivot_columns.append((col_id, input_columns)) - if dtype: - dtypes.append(dtype or pd.Float64Dtype()) + col_labels, row_labels = utils.split_index(self.column_labels, levels=levels) + if dropna: + row_labels = row_labels.drop_duplicates() + if sort: + row_labels = row_labels.sort_values() - added_index_column = col_id = guid.generate_guid() - unpivot_expr = self._expr.unpivot( - row_labels=stack_values, - passthrough_columns=self.index_columns, - unpivot_columns=unpivot_columns, - index_col_id=added_index_column, - dtype=dtypes, - ) - block = Block( - unpivot_expr, - index_columns=[*self.index_columns, added_index_column], - column_labels=[None], - index_labels=[*self._index_labels, self.column_labels.names[-1]], - ) - return block - - def _stack_multi(self): - if not isinstance(self.column_labels, pd.MultiIndex): - raise ValueError("Expected multi-index") - - # These are the values that will be turned into rows - stack_values = ( - self.column_labels.get_level_values(-1).drop_duplicates().sort_values() - ) + row_label_tuples = utils.index_as_tuples(row_labels) - result_col_labels = ( - self.column_labels.droplevel(-1) - .drop_duplicates() - .sort_values() - .dropna(how="all") - ) + if col_labels is not None: + result_index = col_labels.drop_duplicates().sort_values().dropna(how="all") + result_col_labels = utils.index_as_tuples(result_index) + else: + result_index = pd.Index([None]) + result_col_labels = list([()]) # Get matching columns unpivot_columns: List[Tuple[str, List[str]]] = [] dtypes = [] for val in result_col_labels: col_id = guid.generate_guid("unpivot_") - dtype = None - input_columns: Sequence[Optional[str]] = [] - for uvalue in stack_values: - # Need to unpack if still a multi-index after dropping 1 level - label_to_match = ( - (val, uvalue) if result_col_labels.nlevels == 1 else (*val, uvalue) - ) - matching_ids = self.label_to_col_id.get(label_to_match, []) - input_id = matching_ids[0] if len(matching_ids) > 0 else None - if input_id: - if dtype and dtype != self._column_type(input_id): - raise NotImplementedError( - "Cannot stack columns with non-matching dtypes." - ) - else: - dtype = self._column_type(input_id) - input_columns.append(input_id) - # Input column i is the first one that + input_columns, dtype = self._create_stack_column(val, row_label_tuples) unpivot_columns.append((col_id, input_columns)) if dtype: dtypes.append(dtype or pd.Float64Dtype()) - added_index_column = col_id = guid.generate_guid() + added_index_columns = [guid.generate_guid() for _ in range(row_labels.nlevels)] unpivot_expr = self._expr.unpivot( - row_labels=stack_values, + row_labels=row_label_tuples, passthrough_columns=self.index_columns, unpivot_columns=unpivot_columns, - index_col_id=added_index_column, + index_col_ids=added_index_columns, dtype=dtypes, + how=how, ) + new_index_level_names = self.column_labels.names[-levels:] + if how == "left": + index_columns = [*self.index_columns, *added_index_columns] + index_labels = [*self._index_labels, *new_index_level_names] + else: + index_columns = [*added_index_columns, *self.index_columns] + index_labels = [*new_index_level_names, *self._index_labels] + block = Block( unpivot_expr, - index_columns=[*self.index_columns, added_index_column], - column_labels=result_col_labels, - index_labels=[*self._index_labels, self.column_labels.names[-1]], + index_columns=index_columns, + column_labels=result_index, + index_labels=index_labels, ) return block + def _create_stack_column( + self, col_label: typing.Tuple, stack_labels: typing.Sequence[typing.Tuple] + ): + dtype = None + input_columns: list[Optional[str]] = [] + for uvalue in stack_labels: + label_to_match = (*col_label, *uvalue) + label_to_match = ( + label_to_match[0] if len(label_to_match) == 1 else label_to_match + ) + matching_ids = self.label_to_col_id.get(label_to_match, []) + input_id = matching_ids[0] if len(matching_ids) > 0 else None + if input_id: + if dtype and dtype != self._column_type(input_id): + raise NotImplementedError( + "Cannot stack columns with non-matching dtypes." + ) + else: + dtype = self._column_type(input_id) + input_columns.append(input_id) + # Input column i is the first one that + return input_columns, dtype or pd.Float64Dtype() + def _column_type(self, col_id: str) -> bigframes.dtypes.Dtype: col_offset = self.value_columns.index(col_id) dtype = self.dtypes[col_offset] diff --git a/bigframes/core/utils.py b/bigframes/core/utils.py index 75175690ce..1a5d3ab37c 100644 --- a/bigframes/core/utils.py +++ b/bigframes/core/utils.py @@ -49,6 +49,26 @@ def combine_indices(index1: pd.Index, index2: pd.Index) -> pd.MultiIndex: return multi_index +def index_as_tuples(index: pd.Index) -> typing.Sequence[typing.Tuple]: + if isinstance(index, pd.MultiIndex): + return [label for label in index] + else: + return [(label,) for label in index] + + +def split_index( + index: pd.Index, levels: int = 1 +) -> typing.Tuple[typing.Optional[pd.Index], pd.Index]: + nlevels = index.nlevels + remaining = nlevels - levels + if remaining > 0: + return index.droplevel(list(range(remaining, nlevels))), index.droplevel( + list(range(0, remaining)) + ) + else: + return (None, index) + + def get_standardized_ids( col_labels: Iterable[Hashable], idx_labels: Iterable[Hashable] = () ) -> tuple[list[str], list[str]]: diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index e4e22e0306..382068d5d3 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1679,6 +1679,27 @@ def stack(self): return bigframes.series.Series(result_block) return DataFrame(result_block) + def unstack(self): + block = self._block + # Special case, unstack with mono-index transpose into a series + if self.index.nlevels == 1: + block = block.stack( + how="right", dropna=False, sort=False, levels=self.columns.nlevels + ) + return bigframes.series.Series(block) + + # Pivot by last level of index + index_ids = block.index_columns + block = block.reset_index(drop=False) + block = block.set_index(index_ids[:-1]) + + pivot_block = block.pivot( + columns=[index_ids[-1]], + values=self._block.value_columns, + values_in_index=True, + ) + return DataFrame(pivot_block) + def _drop_non_numeric(self, keep_bool=True) -> DataFrame: types_to_keep = set(bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES) if not keep_bool: diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index adf17848ee..4189cdc242 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -1232,11 +1232,6 @@ def test_combine_first( pd_df_b.columns = ["b", "a", "d"] pd_result = pd_df_a.combine_first(pd_df_b) - print("pandas") - print(pd_result.to_string()) - print("bigframes") - print(bf_result.to_string()) - # Some dtype inconsistency for all-NULL columns pd.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False) @@ -1705,6 +1700,26 @@ def test_df_stack(scalars_dfs): pd.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) +def test_df_unstack(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + # To match bigquery dataframes + scalars_pandas_df = scalars_pandas_df.copy() + scalars_pandas_df.columns = scalars_pandas_df.columns.astype("string[pyarrow]") + # Can only stack identically-typed columns + columns = [ + "rowindex_2", + "int64_col", + "int64_too", + ] + + # unstack on mono-index produces series + bf_result = scalars_df[columns].unstack().to_pandas() + pd_result = scalars_pandas_df[columns].unstack() + + # Pandas produces NaN, where bq dataframes produces pd.NA + pd.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + + @pytest.mark.parametrize( ("values", "index", "columns"), [ diff --git a/tests/system/small/test_multiindex.py b/tests/system/small/test_multiindex.py index 1e38b47b4c..10f2a74b21 100644 --- a/tests/system/small/test_multiindex.py +++ b/tests/system/small/test_multiindex.py @@ -729,6 +729,26 @@ def test_column_multi_index_stack(scalars_df_index, scalars_pandas_df_index): ) +def test_column_multi_index_unstack(scalars_df_index, scalars_pandas_df_index): + columns = ["int64_too", "int64_col", "rowindex_2"] + level1 = pandas.Index(["b", "a", "b"], dtype="string[pyarrow]") + # Need resulting column to be pyarrow string rather than object dtype + level2 = pandas.Index(["a", "b", "b"], dtype="string[pyarrow]") + multi_columns = pandas.MultiIndex.from_arrays([level1, level2]) + bf_df = scalars_df_index[columns].copy() + bf_df.columns = multi_columns + pd_df = scalars_pandas_df_index[columns].copy() + pd_df.columns = multi_columns + + bf_result = bf_df.unstack().to_pandas() + # Shifting sort behavior in stack + pd_result = pd_df.unstack() + + # Pandas produces NaN, where bq dataframes produces pd.NA + # Column ordering seems to depend on pandas version + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + + @pytest.mark.skip(reason="Pandas fails in newer versions.") def test_column_multi_index_w_na_stack(scalars_df_index, scalars_pandas_df_index): columns = ["int64_too", "int64_col", "rowindex_2"] @@ -866,6 +886,17 @@ def test_column_multi_index_reorder_levels(scalars_df_index, scalars_pandas_df_i pandas.testing.assert_frame_equal(bf_result, pd_result) +def test_multi_index_unstack(hockey_df, hockey_pandas_df): + bf_result = ( + hockey_df.set_index(["team_name", "season", "position"]).unstack().to_pandas() + ) + pd_result = hockey_pandas_df.set_index( + ["team_name", "season", "position"] + ).unstack() + + pandas.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False) + + def test_column_multi_index_swaplevel(scalars_df_index, scalars_pandas_df_index): columns = ["int64_too", "string_col", "bool_col"] multi_columns = pandas.MultiIndex.from_tuples( diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 6ce11cd7e9..134233608e 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -1910,6 +1910,21 @@ def stack(self): """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def unstack(self): + """ + Pivot a level of the (necessarily hierarchical) index labels. + + Returns a DataFrame having a new level of column labels whose inner-most level + consists of the pivoted index labels. + + If the index is not a MultiIndex, the output will be a Series + (the analogue of stack when the columns are not a MultiIndex). + + Returns: + DataFrame or Series + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + # ---------------------------------------------------------------------- # Add index and columns From d3bc4b3b8765217a2bf689c9a4c6dc31c14adb78 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 28 Sep 2023 01:56:59 +0000 Subject: [PATCH 2/2] fix aggregate bug --- bigframes/core/__init__.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 3e41d923da..5e0675fd13 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -1014,15 +1014,19 @@ def unpivot( for label_part, (col_id, label_dtype) in enumerate( zip(index_col_ids, labels_dtypes) ): + # interpret as tuples even if it wasn't originally so can apply same logic for multi-column labels + labels_as_tuples = [ + label if isinstance(label, tuple) else (label,) for label in row_labels + ] cases = [ ( i, bigframes.dtypes.literal_to_ibis_scalar( - row_labels[i][label_part], # type:ignore + label_tuple[label_part], # type:ignore force_dtype=label_dtype, # type:ignore ), ) - for i in range(len(row_labels)) + for i, label_tuple in enumerate(labels_as_tuples) ] labels_value = ( typing.cast(ibis_types.IntegerColumn, unpivot_table[unpivot_offset_id])