Skip to content

Commit

Permalink
[SPARK-48508][CONNECT][PYTHON] Cache user specified schema in `DataFr…
Browse files Browse the repository at this point in the history
…ame.{to, mapInPandas, mapInArrow}`

### What changes were proposed in this pull request?
Cache user specified schema in `DataFrame.{to, mapInPandas, mapInArrow}`

### Why are the changes needed?
to avoid extra RPC to get the schema

### Does this PR introduce _any_ user-facing change?
no, it should only be an optimization

### How was this patch tested?
CI

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#46848 from zhengruifeng/py_user_define_schema.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
zhengruifeng authored and HyukjinKwon committed Jun 3, 2024
1 parent cfb79d9 commit 6272c05
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions python/pyspark/sql/connect/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1825,10 +1825,12 @@ def inputFiles(self) -> List[str]:

def to(self, schema: StructType) -> ParentDataFrame:
assert schema is not None
return DataFrame(
res = DataFrame(
plan.ToSchema(child=self._plan, schema=schema),
session=self._session,
)
res._cached_schema = schema
return res

def toDF(self, *cols: str) -> ParentDataFrame:
for col_ in cols:
Expand Down Expand Up @@ -2009,7 +2011,7 @@ def _map_partitions(
evalType=evalType,
)

return DataFrame(
res = DataFrame(
plan.MapPartitions(
child=self._plan,
function=udf_obj,
Expand All @@ -2019,6 +2021,9 @@ def _map_partitions(
),
session=self._session,
)
if isinstance(schema, StructType):
res._cached_schema = schema
return res

def mapInPandas(
self,
Expand Down

0 comments on commit 6272c05

Please sign in to comment.