Skip to content

[SPARK-23319][TESTS] Explicitly specify Pandas and PyArrow versions in PySpark tests (to skip or test) #20487

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@
<paranamer.version>2.8</paranamer.version>
<maven-antrun.version>1.8</maven-antrun.version>
<commons-crypto.version>1.0.0</commons-crypto.version>
<!--
If you are changing Arrow version specification, please check ./python/pyspark/sql/utils.py,
./python/run-tests.py and ./python/setup.py too.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

./python/run-tests.py is not there yet. It's a part of #20473.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add the similar comment to each *.py file, not only setup.py, to refer one another? And also we should add for Pandas in each *.py file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmmm .. I thought the proper place to upgrade the versions should be in setup.py and pom.xml so if we happen to update PyArrow (pom.xml / setup.py) or Pandas (setup.py), I thought we are going to take a look for either place first.

To be honest, I actually don't quite like to write down specific paths in those comments because if we happen to move, we should update all the comments ..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I agree with it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, true - though I think just the file name is ok - they are distinct enough to find

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me just keep them .. maybe I am too much caring about this but */*/pom.xml, [./dev/run-tests|./python/run-tests|./python/run-tests.py] and [util.py|utils.py] might be confusing ..

-->
<arrow.version>0.8.0</arrow.version>

<test.java.home>${java.home}</test.java.home>
Expand Down
3 changes: 3 additions & 0 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1913,6 +1913,9 @@ def toPandas(self):
0 2 Alice
1 5 Bob
"""
from pyspark.sql.utils import require_minimum_pandas_version
Copy link
Member Author

@HyukjinKwon HyukjinKwon Feb 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toPandas seems already failed when it includes types TimestampType with old Pandas version:

>>> import datetime
>>> spark.createDataFrame([[datetime.datetime.now()]]).toPandas()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/dataframe.py", line 1978, in toPandas
    _check_series_convert_timestamps_local_tz(pdf[field.name], timezone)
  File "/.../spark/python/pyspark/sql/types.py", line 1775, in _check_series_convert_timestamps_local_tz
    return _check_series_convert_timestamps_localize(s, None, timezone)
  File "/.../spark/python/pyspark/sql/types.py", line 1750, in _check_series_convert_timestamps_localize
    require_minimum_pandas_version()
  File "/.../spark/python/pyspark/sql/utils.py", line 128, in require_minimum_pandas_version
    "your version was %s." % (minimum_pandas_version, pandas.__version__))
ImportError: Pandas >= 0.19.2 must be installed; however, your version was 0.16.0.

Since we set the supported version, I think we should better explicitly require the version. Let me know if anyone thinks differently ..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is already called here though https://github.com/apache/spark/pull/20487/files#diff-6fc344560230bf0ef711bb9b5573f1faL1939
am I missing something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that's pyarrow vs this one is pandas. Wanted to produce a proper message before import pandas as pd before :-).

Above case (https://github.com/apache/spark/pull/20487/files#r165714499) is when Pandas is lower than 0.19.2. When Pandas is missing, it shows sth like:

>>> spark.range(1).toPandas()

before:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/dataframe.py", line 1975, in toPandas
    import pandas as pd
ImportError: No module named pandas

after:

  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/dataframe.py", line 1927, in toPandas
    require_minimum_pandas_version()
  File "/.../spark/python/pyspark/sql/utils.py", line 125, in require_minimum_pandas_version
    "it was not found." % minimum_pandas_version)
ImportError: Pandas >= 0.19.2 must be installed; however, it was not found.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add require_minimum_pandas_version() to line 649 in session.py?
https://github.com/apache/spark/blob/a0e4b166f71f9bb5f3e5af7843a03c11658892fd/python/pyspark/sql/session.py#L643-L653

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, let me give a shot to clean up there too.

require_minimum_pandas_version()

import pandas as pd

if self.sql_ctx.getConf("spark.sql.execution.pandas.respectSessionTimeZone").lower() \
Expand Down
3 changes: 3 additions & 0 deletions python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,9 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
except Exception:
has_pandas = False
if has_pandas and isinstance(data, pandas.DataFrame):
from pyspark.sql.utils import require_minimum_pandas_version
require_minimum_pandas_version()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for curious, do you have a list of the places that we do this version check for pandas and pyarrow?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I exactly know all the places exactly. For now, I can think of: createDataFrame with Pandas DataFrame input, toPandas and pandas_udf for APIs, and some places in session.py / types.py for internal methods like _check* family or *arrow* or *pandas*.

I was thinking of working on putting those into a single module (file) after 2.3.0. Will cc you and @ueshin there.


if self.conf.get("spark.sql.execution.pandas.respectSessionTimeZone").lower() \
== "true":
timezone = self.conf.get("spark.sql.session.timeZone")
Expand Down
87 changes: 48 additions & 39 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,26 @@
else:
import unittest

_have_pandas = False
_have_old_pandas = False
_pandas_requirement_message = None
try:
import pandas
try:
from pyspark.sql.utils import require_minimum_pandas_version
require_minimum_pandas_version()
_have_pandas = True
except:
_have_old_pandas = True
except:
# No Pandas, but that's okay, we'll skip those tests
pass
from pyspark.sql.utils import require_minimum_pandas_version
require_minimum_pandas_version()
except ImportError as e:
from pyspark.util import _exception_message
# If Pandas version requirement is not satisfied, skip related tests.
_pandas_requirement_message = _exception_message(e)

_pyarrow_requirement_message = None
try:
from pyspark.sql.utils import require_minimum_pyarrow_version
require_minimum_pyarrow_version()
except ImportError as e:
from pyspark.util import _exception_message
# If Arrow version requirement is not satisfied, skip related tests.
_pyarrow_requirement_message = _exception_message(e)

_have_pandas = _pandas_requirement_message is None
_have_pyarrow = _pyarrow_requirement_message is None
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the logic I used:

_pyarrow_requirement_message contains error message for PyArrow requirement if missing or version is not matched.

if _pyarrow_requirement_message contains the message, _have_pyarrow becomes False.


from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext, HiveContext, Column, Row
Expand All @@ -75,15 +82,6 @@
from pyspark.sql.utils import AnalysisException, ParseException, IllegalArgumentException


_have_arrow = False
try:
import pyarrow
_have_arrow = True
except:
# No Arrow, but that's okay, we'll skip those tests
pass


class UTCOffsetTimezone(datetime.tzinfo):
"""
Specifies timezone in UTC offset
Expand Down Expand Up @@ -2794,7 +2792,6 @@ def count_bucketed_cols(names, table="pyspark_bucket"):

def _to_pandas(self):
from datetime import datetime, date
import numpy as np
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import seems not used in this function.

schema = StructType().add("a", IntegerType()).add("b", StringType())\
.add("c", BooleanType()).add("d", FloatType())\
.add("dt", DateType()).add("ts", TimestampType())
Expand All @@ -2807,7 +2804,7 @@ def _to_pandas(self):
df = self.spark.createDataFrame(data, schema)
return df.toPandas()

@unittest.skipIf(not _have_pandas, "Pandas not installed")
@unittest.skipIf(not _have_pandas, _pandas_requirement_message)
def test_to_pandas(self):
import numpy as np
pdf = self._to_pandas()
Expand All @@ -2819,13 +2816,13 @@ def test_to_pandas(self):
self.assertEquals(types[4], np.object) # datetime.date
self.assertEquals(types[5], 'datetime64[ns]')

@unittest.skipIf(not _have_old_pandas, "Old Pandas not installed")
def test_to_pandas_old(self):
@unittest.skipIf(_have_pandas, "Required Pandas was found.")
def test_to_pandas_required_pandas_not_found(self):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, this also test when Pandas is missing too.

with QuietTest(self.sc):
with self.assertRaisesRegexp(ImportError, 'Pandas >= .* must be installed'):
self._to_pandas()

@unittest.skipIf(not _have_pandas, "Pandas not installed")
@unittest.skipIf(not _have_pandas, _pandas_requirement_message)
def test_to_pandas_avoid_astype(self):
import numpy as np
schema = StructType().add("a", IntegerType()).add("b", StringType())\
Expand All @@ -2843,7 +2840,7 @@ def test_create_dataframe_from_array_of_long(self):
df = self.spark.createDataFrame(data)
self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 0, 9223372036854775807]))

@unittest.skipIf(not _have_pandas, "Pandas not installed")
@unittest.skipIf(not _have_pandas, _pandas_requirement_message)
def test_create_dataframe_from_pandas_with_timestamp(self):
import pandas as pd
from datetime import datetime
Expand All @@ -2858,14 +2855,16 @@ def test_create_dataframe_from_pandas_with_timestamp(self):
self.assertTrue(isinstance(df.schema['ts'].dataType, TimestampType))
self.assertTrue(isinstance(df.schema['d'].dataType, DateType))

@unittest.skipIf(not _have_old_pandas, "Old Pandas not installed")
def test_create_dataframe_from_old_pandas(self):
import pandas as pd
from datetime import datetime
pdf = pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)],
"d": [pd.Timestamp.now().date()]})
@unittest.skipIf(_have_pandas, "Required Pandas was found.")
def test_create_dataframe_required_pandas_not_found(self):
with QuietTest(self.sc):
with self.assertRaisesRegexp(ImportError, 'Pandas >= .* must be installed'):
with self.assertRaisesRegexp(
ImportError,
'(Pandas >= .* must be installed|No module named pandas)'):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Pandas is lower then we have, it throws Pandas >= .* must be installed. It Pandas is not installed import pandas as pd in the test throws an exception, "No module named pandas".

import pandas as pd
from datetime import datetime
pdf = pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)],
"d": [pd.Timestamp.now().date()]})
self.spark.createDataFrame(pdf)


Expand Down Expand Up @@ -3383,7 +3382,9 @@ def __init__(self, **kwargs):
_make_type_verifier(data_type, nullable=False)(obj)


@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed")
@unittest.skipIf(
not _have_pandas or not _have_pyarrow,
_pandas_requirement_message or _pyarrow_requirement_message)
class ArrowTests(ReusedSQLTestCase):

@classmethod
Expand Down Expand Up @@ -3641,7 +3642,9 @@ def test_createDataFrame_with_int_col_names(self):
self.assertEqual(pdf_col_names, df_arrow.columns)


@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed")
@unittest.skipIf(
not _have_pandas or not _have_pyarrow,
_pandas_requirement_message or _pyarrow_requirement_message)
class PandasUDFTests(ReusedSQLTestCase):
def test_pandas_udf_basic(self):
from pyspark.rdd import PythonEvalType
Expand Down Expand Up @@ -3765,7 +3768,9 @@ def foo(k, v):
return k


@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed")
@unittest.skipIf(
not _have_pandas or not _have_pyarrow,
_pandas_requirement_message or _pyarrow_requirement_message)
class ScalarPandasUDFTests(ReusedSQLTestCase):

@classmethod
Expand Down Expand Up @@ -4278,7 +4283,9 @@ def test_register_vectorized_udf_basic(self):
self.assertEquals(expected.collect(), res2.collect())


@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed")
@unittest.skipIf(
not _have_pandas or not _have_pyarrow,
_pandas_requirement_message or _pyarrow_requirement_message)
class GroupedMapPandasUDFTests(ReusedSQLTestCase):

@property
Expand Down Expand Up @@ -4447,7 +4454,9 @@ def test_unsupported_types(self):
df.groupby('id').apply(f).collect()


@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed")
@unittest.skipIf(
not _have_pandas or not _have_pyarrow,
_pandas_requirement_message or _pyarrow_requirement_message)
class GroupedAggPandasUDFTests(ReusedSQLTestCase):

@property
Expand Down
30 changes: 22 additions & 8 deletions python/pyspark/sql/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,32 @@ def toJArray(gateway, jtype, arr):
def require_minimum_pandas_version():
""" Raise ImportError if minimum version of Pandas is not installed
"""
# TODO(HyukjinKwon): Relocate and deduplicate the version specification.
minimum_pandas_version = "0.19.2"

from distutils.version import LooseVersion
import pandas
if LooseVersion(pandas.__version__) < LooseVersion('0.19.2'):
raise ImportError("Pandas >= 0.19.2 must be installed on calling Python process; "
"however, your version was %s." % pandas.__version__)
try:
import pandas
except ImportError:
raise ImportError("Pandas >= %s must be installed; however, "
"it was not found." % minimum_pandas_version)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I catch ImportError here just to make the error message nicer.

if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):
raise ImportError("Pandas >= %s must be installed; however, "
"your version was %s." % (minimum_pandas_version, pandas.__version__))


def require_minimum_pyarrow_version():
""" Raise ImportError if minimum version of pyarrow is not installed
"""
# TODO(HyukjinKwon): Relocate and deduplicate the version specification.
minimum_pyarrow_version = "0.8.0"

from distutils.version import LooseVersion
import pyarrow
if LooseVersion(pyarrow.__version__) < LooseVersion('0.8.0'):
raise ImportError("pyarrow >= 0.8.0 must be installed on calling Python process; "
"however, your version was %s." % pyarrow.__version__)
try:
import pyarrow
except ImportError:
raise ImportError("PyArrow >= %s must be installed; however, "
"it was not found." % minimum_pyarrow_version)
if LooseVersion(pyarrow.__version__) < LooseVersion(minimum_pyarrow_version):
raise ImportError("PyArrow >= %s must be installed; however, "
"your version was %s." % (minimum_pyarrow_version, pyarrow.__version__))
10 changes: 9 additions & 1 deletion python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ def _supports_symlinks():
file=sys.stderr)
exit(-1)

# If you are changing the versions here, please also change ./python/pyspark/sql/utils.py and
# ./python/run-tests.py. In case of Arrow, you should also check ./pom.xml.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_minimum_pandas_version = "0.19.2"
_minimum_pyarrow_version = "0.8.0"

try:
# We copy the shell script to be under pyspark/python/pyspark so that the launcher scripts
# find it where expected. The rest of the files aren't copied because they are accessed
Expand Down Expand Up @@ -201,7 +206,10 @@ def _supports_symlinks():
extras_require={
'ml': ['numpy>=1.7'],
'mllib': ['numpy>=1.7'],
'sql': ['pandas>=0.19.2', 'pyarrow>=0.8.0']
'sql': [
'pandas>=%s' % _minimum_pandas_version,
'pyarrow>=%s' % _minimum_pyarrow_version,
]
},
classifiers=[
'Development Status :: 5 - Production/Stable',
Expand Down