Skip to content

Commit

Permalink
[SPARK-49344][PS] Support json_normalize for Pandas API on Spark
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR proposes to support `json_normalize` for Pandas API on Spark.

### Why are the changes needed?

For Pandas feature parity: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.json_normalize.html

### Does this PR introduce _any_ user-facing change?

Introduce new API `ps.json_normalize`

```python
>>> data = [
...     {"id": 1, "name": "Alice", "address": {"city": "NYC", "zipcode": "10001"}},
...     {"id": 2, "name": "Bob", "address": {"city": "SF", "zipcode": "94105"}},
... ]
>>> ps.json_normalize(data)
   id   name address.city address.zipcode
0   1  Alice          NYC           10001
1   2    Bob           SF           94105
```

### How was this patch tested?

Added UTs

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#47840 from itholic/json_normalize.

Lead-authored-by: Haejoon Lee <haejoon.lee@databricks.com>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
itholic and HyukjinKwon committed Aug 23, 2024
1 parent 853731d commit 08d69ff
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 0 deletions.
1 change: 1 addition & 0 deletions python/docs/source/reference/pyspark.pandas/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ JSON
.. autosummary::
:toctree: api/

json_normalize
read_json
DataFrame.to_json

Expand Down
78 changes: 78 additions & 0 deletions python/pyspark/pandas/namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
DEFAULT_SERIES_NAME,
HIDDEN_COLUMNS,
SPARK_INDEX_NAME_FORMAT,
NATURAL_ORDER_COLUMN_NAME,
)
from pyspark.pandas.series import Series, first_series
from pyspark.pandas.spark.utils import as_nullable_spark_type, force_decimal_precision_scale
Expand Down Expand Up @@ -125,6 +126,7 @@
"to_numeric",
"broadcast",
"read_orc",
"json_normalize",
]


Expand Down Expand Up @@ -3687,6 +3689,82 @@ def read_orc(
return psdf


def json_normalize(
data: Union[Dict, List[Dict]],
sep: str = ".",
) -> DataFrame:
"""
Normalize semi-structured JSON data into a flat table.
.. versionadded:: 4.0.0
Parameters
----------
data : dict or list of dicts
Unserialized JSON objects.
sep : str, default '.'
Nested records will generate names separated by sep.
Returns
-------
DataFrame
See Also
--------
DataFrame.to_json : Convert the pandas-on-Spark DataFrame to a JSON string.
Examples
--------
>>> data = [
... {"id": 1, "name": "Alice", "address": {"city": "NYC", "zipcode": "10001"}},
... {"id": 2, "name": "Bob", "address": {"city": "SF", "zipcode": "94105"}},
... ]
>>> ps.json_normalize(data)
id name address.city address.zipcode
0 1 Alice NYC 10001
1 2 Bob SF 94105
"""
# Convert the input JSON data to a Pandas-on-Spark DataFrame.
psdf: DataFrame = ps.DataFrame(data)
internal = psdf._internal
sdf = internal.spark_frame

index_spark_column_names = internal.index_spark_column_names

def flatten_schema(schema: StructType, prefix: str = "") -> Tuple[List[str], List[str]]:
"""
Recursively flattens a nested schema and returns a list of columns and aliases.
"""
fields = []
aliases = []
for field in schema.fields:
field_name = field.name
if field_name not in index_spark_column_names + [NATURAL_ORDER_COLUMN_NAME]:
name = f"{prefix}.{field_name}" if prefix else field_name
alias = f"{prefix}{sep}{field_name}" if prefix else field_name
if isinstance(field.dataType, StructType):
subfields, subaliases = flatten_schema(field.dataType, prefix=name)
fields += subfields
aliases += subaliases
else:
fields.append(name)
aliases.append(alias)
return fields, aliases

fields, aliases = flatten_schema(sdf.schema)

# Create columns using fields and aliases
selected_columns = [F.col(field).alias(alias) for field, alias in zip(fields, aliases)]

# Update internal frame with new columns
internal = internal.with_new_columns(
selected_columns, column_labels=[(column_label,) for column_label in aliases]
)

# Convert back to Pandas-on-Spark DataFrame
return ps.DataFrame(internal)


def _get_index_map(
sdf: PySparkDataFrame, index_col: Optional[Union[str, List[str]]] = None
) -> Tuple[Optional[List[PySparkColumn]], Optional[List[Label]]]:
Expand Down
56 changes: 56 additions & 0 deletions python/pyspark/pandas/tests/test_namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from pyspark.pandas.missing.general_functions import MissingPandasLikeGeneralFunctions
from pyspark.testing.pandasutils import PandasOnSparkTestCase
from pyspark.testing.sqlutils import SQLTestUtils
from pyspark.pandas.testing import assert_frame_equal


class NamespaceTestsMixin:
Expand Down Expand Up @@ -606,6 +607,61 @@ def test_to_numeric(self):
lambda: ps.to_numeric(psser, errors="ignore"),
)

def test_json_normalize(self):
# Basic test case with a simple JSON structure
data = [
{"id": 1, "name": "Alice", "address": {"city": "NYC", "zipcode": "10001"}},
{"id": 2, "name": "Bob", "address": {"city": "SF", "zipcode": "94105"}},
]
assert_frame_equal(pd.json_normalize(data), ps.json_normalize(data))

# Test case with nested JSON structure
data = [
{"id": 1, "name": "Alice", "address": {"city": {"name": "NYC"}, "zipcode": "10001"}},
{"id": 2, "name": "Bob", "address": {"city": {"name": "SF"}, "zipcode": "94105"}},
]
assert_frame_equal(pd.json_normalize(data), ps.json_normalize(data))

# Test case with lists included in the JSON structure
data = [
{
"id": 1,
"name": "Alice",
"hobbies": ["reading", "swimming"],
"address": {"city": "NYC", "zipcode": "10001"},
},
{
"id": 2,
"name": "Bob",
"hobbies": ["biking"],
"address": {"city": "SF", "zipcode": "94105"},
},
]
assert_frame_equal(pd.json_normalize(data), ps.json_normalize(data))

# Test case with various data types
data = [
{
"id": 1,
"name": "Alice",
"age": 25,
"is_student": True,
"address": {"city": "NYC", "zipcode": "10001"},
},
{
"id": 2,
"name": "Bob",
"age": 30,
"is_student": False,
"address": {"city": "SF", "zipcode": "94105"},
},
]
assert_frame_equal(pd.json_normalize(data), ps.json_normalize(data))

# Test case handling empty input data
data = []
self.assert_eq(pd.json_normalize(data), ps.json_normalize(data))

def test_missing(self):
missing_functions = inspect.getmembers(
MissingPandasLikeGeneralFunctions, inspect.isfunction
Expand Down

0 comments on commit 08d69ff

Please sign in to comment.