Skip to content

Work around astype with columns in Pandas < 0.19.0 #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 22, 2017

Conversation

HyukjinKwon
Copy link

@HyukjinKwon HyukjinKwon commented Jun 22, 2017

What changes were proposed in this pull request?

Currently the codes below fails in Pandas < 0.19.0

from pyspark.sql.types import *

schema = StructType().add("a", IntegerType()).add("b", StringType())\
                     .add("c", BooleanType()).add("d", FloatType())
data = [
    (1, "foo", True, 3.0,), (2, "foo", True, 5.0),
    (3, "bar", False, -1.0), (4, "bar", False, 6.0),
]
spark.createDataFrame(data, schema).toPandas().dtypes

This looks because this was added from 0.19.0 (astype with dict). This PR proposes to manually work around by for loop.

Before (pandas < 0.19.0):

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/hyukjinkwon/Desktop/workspace/repos/forked/spark/python/pyspark/sql/dataframe.py", line 1731, in toPandas
    return pd.DataFrame.from_records(self.collect(), columns=self.columns).astype(dtype)
  File "/Library/Python/2.7/site-packages/pandas/core/generic.py", line 2950, in astype
    raise_on_error=raise_on_error, **kwargs)
  File "/Library/Python/2.7/site-packages/pandas/core/internals.py", line 2938, in astype
    return self.apply('astype', dtype=dtype, **kwargs)
  File "/Library/Python/2.7/site-packages/pandas/core/internals.py", line 2890, in apply
    applied = getattr(b, f)(**kwargs)
  File "/Library/Python/2.7/site-packages/pandas/core/internals.py", line 434, in astype
    values=values, **kwargs)
  File "/Library/Python/2.7/site-packages/pandas/core/internals.py", line 449, in _astype
    dtype = np.dtype(dtype)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python/numpy/core/_internal.py", line 61, in _usefields
    names, formats, offsets, titles = _makenames_list(adict, align)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python/numpy/core/_internal.py", line 26, in _makenames_list
    n = len(obj)
TypeError: object of type 'type' has no len()

After (tested on Pandas 0.18.1 and 0.20.2):

a      int32
b     object
c       bool
d    float32
dtype: object

How was this patch tested?

Manually tested.

pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)

for f, t in dtype.items():
pdf[f] = pdf[f].astype(t)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need copying data?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doh!

@viirya
Copy link

viirya commented Jun 22, 2017

LGTM

@cloud-fan cloud-fan merged commit 357a798 into cloud-fan:to_pandas Jun 22, 2017
@cloud-fan
Copy link
Owner

great, thanks! @HyukjinKwon @viirya

@HyukjinKwon HyukjinKwon deleted the pandas-astype branch January 2, 2018 03:38
cloud-fan pushed a commit that referenced this pull request Mar 13, 2019
cloud-fan pushed a commit that referenced this pull request Apr 20, 2020
…types

### What changes were proposed in this pull request?

This PR intends to fix a bug that occurs when comparing null types to decimal types in master/branch-3.0;
```
scala> Seq(BigDecimal(10)).toDF("v1").selectExpr("v1 = NULL").explain(true)
org.apache.spark.sql.AnalysisException: cannot resolve '(`v1` = NULL)' due to data type mismatch: differing types in '(`v1` = NULL)' (decimal(38,18) and null).; line 1 pos 0;
'Project [(v1#5 = null) AS (v1 = NULL)#7]
+- Project [value#2 AS v1#5]
   +- LocalRelation [value#2]
...
```
The query above passed in v2.4.5.

### Why are the changes needed?

bugfix

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added tests.

Closes apache#28241 from maropu/SPARK-31468.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request May 14, 2020
…types

### What changes were proposed in this pull request?

This PR intends to fix a bug that occurs when comparing null types to decimal types in master/branch-3.0;
```
scala> Seq(BigDecimal(10)).toDF("v1").selectExpr("v1 = NULL").explain(true)
org.apache.spark.sql.AnalysisException: cannot resolve '(`v1` = NULL)' due to data type mismatch: differing types in '(`v1` = NULL)' (decimal(38,18) and null).; line 1 pos 0;
'Project [(v1#5 = null) AS (v1 = NULL)#7]
+- Project [value#2 AS v1#5]
   +- LocalRelation [value#2]
...
```
The query above passed in v2.4.5.

### Why are the changes needed?

bugfix

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added tests.

Closes apache#28241 from maropu/SPARK-31468.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit a7fb330)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Feb 27, 2024
…n properly

### What changes were proposed in this pull request?
Make `ResolveRelations` handle plan id properly

### Why are the changes needed?
bug fix for Spark Connect, it won't affect classic Spark SQL

before this PR:
```
from pyspark.sql import functions as sf

spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1")
spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2")

df1 = spark.read.table("test_table_1")
df2 = spark.read.table("test_table_2")
df3 = spark.read.table("test_table_1")

join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2)
join2 = df3.join(join1, how="left", on=join1.index==df3.id)

join2.schema
```

fails with
```
AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704
```

That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect

```
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
 '[#12]Join LeftOuter, '`==`('index, 'id)                     '[#12]Join LeftOuter, '`==`('index, 'id)
!:- '[#9]UnresolvedRelation [test_table_1], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!+- '[#11]Project ['index, 'value_2]                          :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!   +- '[#10]Join Inner, '`==`('id, 'index)                   +- '[#11]Project ['index, 'value_2]
!      :- '[#7]UnresolvedRelation [test_table_1], [], false      +- '[#10]Join Inner, '`==`('id, 'index)
!      +- '[#8]UnresolvedRelation [test_table_2], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!                                                                   :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!                                                                   +- '[#8]SubqueryAlias spark_catalog.default.test_table_2
!                                                                      +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false

Can not resolve 'id with plan 7
```

`[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one
```
:- '[#9]SubqueryAlias spark_catalog.default.test_table_1
   +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
```

### Does this PR introduce _any_ user-facing change?
yes, bug fix

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
ci

Closes apache#45214 from zhengruifeng/connect_fix_read_join.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
cloud-fan pushed a commit that referenced this pull request May 7, 2024
…plan properly

### What changes were proposed in this pull request?
Make `ResolveRelations` handle plan id properly

cherry-pick bugfix apache#45214 to 3.5

### Why are the changes needed?
bug fix for Spark Connect, it won't affect classic Spark SQL

before this PR:
```
from pyspark.sql import functions as sf

spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1")
spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2")

df1 = spark.read.table("test_table_1")
df2 = spark.read.table("test_table_2")
df3 = spark.read.table("test_table_1")

join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2)
join2 = df3.join(join1, how="left", on=join1.index==df3.id)

join2.schema
```

fails with
```
AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704
```

That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect

```
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
 '[#12]Join LeftOuter, '`==`('index, 'id)                     '[#12]Join LeftOuter, '`==`('index, 'id)
!:- '[#9]UnresolvedRelation [test_table_1], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!+- '[#11]Project ['index, 'value_2]                          :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!   +- '[#10]Join Inner, '`==`('id, 'index)                   +- '[#11]Project ['index, 'value_2]
!      :- '[#7]UnresolvedRelation [test_table_1], [], false      +- '[#10]Join Inner, '`==`('id, 'index)
!      +- '[#8]UnresolvedRelation [test_table_2], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!                                                                   :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!                                                                   +- '[#8]SubqueryAlias spark_catalog.default.test_table_2
!                                                                      +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false

Can not resolve 'id with plan 7
```

`[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one
```
:- '[#9]SubqueryAlias spark_catalog.default.test_table_1
   +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
```

### Does this PR introduce _any_ user-facing change?
yes, bug fix

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
ci

Closes apache#46291 from zhengruifeng/connect_fix_read_join_35.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants