Skip to content

[SPARK-52450][CONNECT] Improve performance of schema deepcopy #51157

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from

Conversation

xi-db
Copy link
Contributor

@xi-db xi-db commented Jun 11, 2025

What changes were proposed in this pull request?

In Spark Connect, DataFrame.schema returns a deep copy of the schema to prevent unexpected behavior caused by user modifications to the returned schema object. However, if a user accesses df.schema repeatedly on a DataFrame with a complex schema, it can lead to noticeable performance degradation.

The performance issue can be reproduced using the code snippet below. Since copy.deepcopy is known to be slow to handle complex objects, this PR replaces it with pickle-based ser/de to improve the performance of df.schema access. Given the limitations of pickle, the implementation falls back to deepcopy in cases where pickling fails.

from pyspark.sql.types import StructType, StructField, StringType

def make_nested_struct(level, max_level, fields_per_level):
    if level == max_level - 1:
        return StructType(
            [StructField(f"f{level}_{i}", StringType(), True) for i in range(fields_per_level)])
    else:
        return StructType(
            [StructField(f"s{level}_{i}",
                         make_nested_struct(level + 1, max_level, fields_per_level), True) for i in
             range(fields_per_level)])

# Create a 4 level nested schema with in total 10,000 leaf fields
schema = make_nested_struct(0, 4, 10)

The existing needs 21.9s to copy the schema for 100 times.

import copy
timeit.timeit(lambda: copy.deepcopy(schema), number=100)
# 21.9

The updated approach only needs 2.0s to copy for 100 times:

from pyspark.serializers import CPickleSerializer
cached_schema_serialized = CPickleSerializer().dumps(schema)

timeit.timeit(lambda: CPickleSerializer().loads(cached_schema_serialized), number=100)
# 2.0

Why are the changes needed?

It improves the performance when calling df.schema many times.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing tests and new tests.

Was this patch authored or co-authored using generative AI tooling?

No.

@xi-db
Copy link
Contributor Author

xi-db commented Jun 11, 2025

Hi @hvanhovell @vicennial , could you take a look at this PR? Thanks.

try:
self._cached_schema_serialized = CPickleSerializer().dumps(self._schema)
except Exception as e:
logger.warn(f"DataFrame schema pickle dumps failed with exception: {e}.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what cases do we think this will happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it never happens because schema is nested Spark type classes. It shouldn't hit any of those special types that pickle doesn't support (link). Anyway, maybe we still need to handle the exception just in case. What do you think?

Copy link
Contributor

@heyihong heyihong Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to add some comments to the code to make it easier for future readers to understand. Currently, it's not very clear why a CPickleSerializer is used, or why an error is being handled without looking at the corresponding pull request.

Also, it may be clearer to create a function called _fast_cached_schema_deepcopy that caches the serialized schema and then deserializes it.

Copy link
Contributor

@grundprinzip grundprinzip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before merging would be great to test @hvanhovell's proposal to test the performance of reconstructing the schema from the proto response at all times and what the impact is.

Co-authored-by: Martin Grund <grundprinzip@gmail.com>
@xi-db xi-db force-pushed the schema-deepcopy-improvement branch from 9d02fdf to af1b276 Compare June 18, 2025 14:41
@xi-db
Copy link
Contributor Author

xi-db commented Jun 18, 2025

Hi @zhengruifeng, could you help with the CI failures from pyspark-pandas-connect-part1? This PR has no changes on any scala code, but sql/hive, connector/kafka, and connect/server fail to compile due to sbt OutOfMemoryError. Do you have any idea here? I've retriggered CI, but it still failed. Thanks.

[error] ## Exception when compiling 156 sources to /__w/spark/spark/sql/hive/target/scala-2.13/test-classes
[error] java.lang.OutOfMemoryError: Java heap space
[error] 
[error]            
[error] ## Exception when compiling 21 sources to /__w/spark/spark/connector/kafka-0-10-sql/target/scala-2.13/test-classes
[error] java.lang.OutOfMemoryError: Java heap space
[error] 
[error]            
[error] ## Exception when compiling 41 sources to /__w/spark/spark/sql/connect/server/target/scala-2.13/test-classes
[error] java.lang.OutOfMemoryError: Java heap space
[error] 
[error]            
[warn] javac exited with exit code -1
[info] Compilation has been cancelled
[info] Compilation has been cancelled
[warn] In the last 10 seconds, 5.032 (50.6%) were spent in GC. [Heap: 2.45GB free of 4.00GB, max 4.00GB] Consider increasing the JVM heap using `-Xmx` or try a different collector, e.g. `-XX:+UseG1GC`, for better performance.
java.lang.OutOfMemoryError: Java heap space
Error:  [launcher] error during sbt launcher: java.lang.OutOfMemoryError: Java heap space

Update: Never mind, it works now. Thanks anyway.

@hvanhovell
Copy link
Contributor

Merging to master/4.0. Thanks!

asf-gitbox-commits pushed a commit that referenced this pull request Jun 20, 2025
### What changes were proposed in this pull request?

In Spark Connect, `DataFrame.schema` returns a deep copy of the schema to prevent unexpected behavior caused by user modifications to the returned schema object. However, if a user accesses `df.schema` repeatedly on a DataFrame with a complex schema, it can lead to noticeable performance degradation.

The performance issue can be reproduced using the code snippet below. Since copy.deepcopy is known to be slow to handle complex objects, this PR replaces it with pickle-based ser/de to improve the performance of df.schema access. Given the limitations of pickle, the implementation falls back to deepcopy in cases where pickling fails.

```
from pyspark.sql.types import StructType, StructField, StringType

def make_nested_struct(level, max_level, fields_per_level):
    if level == max_level - 1:
        return StructType(
            [StructField(f"f{level}_{i}", StringType(), True) for i in range(fields_per_level)])
    else:
        return StructType(
            [StructField(f"s{level}_{i}",
                         make_nested_struct(level + 1, max_level, fields_per_level), True) for i in
             range(fields_per_level)])

# Create a 4 level nested schema with in total 10,000 leaf fields
schema = make_nested_struct(0, 4, 10)
```

 The existing needs 21.9s to copy the schema for 100 times.
```
import copy
timeit.timeit(lambda: copy.deepcopy(schema), number=100)
# 21.9
```

The updated approach only needs 2.0s to copy for 100 times:
```
from pyspark.serializers import CPickleSerializer
cached_schema_serialized = CPickleSerializer().dumps(schema)

timeit.timeit(lambda: CPickleSerializer().loads(cached_schema_serialized), number=100)
# 2.0
```

### Why are the changes needed?

It improves the performance when calling df.schema many times.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests and new tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #51157 from xi-db/schema-deepcopy-improvement.

Lead-authored-by: Xi Lyu <xi.lyu@databricks.com>
Co-authored-by: Xi Lyu <159039256+xi-db@users.noreply.github.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
(cherry picked from commit f502d66)
Signed-off-by: Herman van Hovell <herman@databricks.com>
haoyangeng-db pushed a commit to haoyangeng-db/apache-spark that referenced this pull request Jun 25, 2025
### What changes were proposed in this pull request?

In Spark Connect, `DataFrame.schema` returns a deep copy of the schema to prevent unexpected behavior caused by user modifications to the returned schema object. However, if a user accesses `df.schema` repeatedly on a DataFrame with a complex schema, it can lead to noticeable performance degradation.

The performance issue can be reproduced using the code snippet below. Since copy.deepcopy is known to be slow to handle complex objects, this PR replaces it with pickle-based ser/de to improve the performance of df.schema access. Given the limitations of pickle, the implementation falls back to deepcopy in cases where pickling fails.

```
from pyspark.sql.types import StructType, StructField, StringType

def make_nested_struct(level, max_level, fields_per_level):
    if level == max_level - 1:
        return StructType(
            [StructField(f"f{level}_{i}", StringType(), True) for i in range(fields_per_level)])
    else:
        return StructType(
            [StructField(f"s{level}_{i}",
                         make_nested_struct(level + 1, max_level, fields_per_level), True) for i in
             range(fields_per_level)])

# Create a 4 level nested schema with in total 10,000 leaf fields
schema = make_nested_struct(0, 4, 10)
```

 The existing needs 21.9s to copy the schema for 100 times.
```
import copy
timeit.timeit(lambda: copy.deepcopy(schema), number=100)
# 21.9
```

The updated approach only needs 2.0s to copy for 100 times:
```
from pyspark.serializers import CPickleSerializer
cached_schema_serialized = CPickleSerializer().dumps(schema)

timeit.timeit(lambda: CPickleSerializer().loads(cached_schema_serialized), number=100)
# 2.0
```

### Why are the changes needed?

It improves the performance when calling df.schema many times.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests and new tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51157 from xi-db/schema-deepcopy-improvement.

Lead-authored-by: Xi Lyu <xi.lyu@databricks.com>
Co-authored-by: Xi Lyu <159039256+xi-db@users.noreply.github.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
haoyangeng-db pushed a commit to haoyangeng-db/apache-spark that referenced this pull request Jul 22, 2025
### What changes were proposed in this pull request?

In Spark Connect, `DataFrame.schema` returns a deep copy of the schema to prevent unexpected behavior caused by user modifications to the returned schema object. However, if a user accesses `df.schema` repeatedly on a DataFrame with a complex schema, it can lead to noticeable performance degradation.

The performance issue can be reproduced using the code snippet below. Since copy.deepcopy is known to be slow to handle complex objects, this PR replaces it with pickle-based ser/de to improve the performance of df.schema access. Given the limitations of pickle, the implementation falls back to deepcopy in cases where pickling fails.

```
from pyspark.sql.types import StructType, StructField, StringType

def make_nested_struct(level, max_level, fields_per_level):
    if level == max_level - 1:
        return StructType(
            [StructField(f"f{level}_{i}", StringType(), True) for i in range(fields_per_level)])
    else:
        return StructType(
            [StructField(f"s{level}_{i}",
                         make_nested_struct(level + 1, max_level, fields_per_level), True) for i in
             range(fields_per_level)])

# Create a 4 level nested schema with in total 10,000 leaf fields
schema = make_nested_struct(0, 4, 10)
```

 The existing needs 21.9s to copy the schema for 100 times.
```
import copy
timeit.timeit(lambda: copy.deepcopy(schema), number=100)
# 21.9
```

The updated approach only needs 2.0s to copy for 100 times:
```
from pyspark.serializers import CPickleSerializer
cached_schema_serialized = CPickleSerializer().dumps(schema)

timeit.timeit(lambda: CPickleSerializer().loads(cached_schema_serialized), number=100)
# 2.0
```

### Why are the changes needed?

It improves the performance when calling df.schema many times.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests and new tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51157 from xi-db/schema-deepcopy-improvement.

Lead-authored-by: Xi Lyu <xi.lyu@databricks.com>
Co-authored-by: Xi Lyu <159039256+xi-db@users.noreply.github.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants