Skip to content

[SPARK-17679] [PYSPARK] remove unnecessary Py4J ListConverter patch #15254

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,9 @@
xrange = range

from py4j.java_gateway import java_import, JavaGateway, GatewayClient
from py4j.java_collections import ListConverter

from pyspark.serializers import read_int


# patching ListConverter, or it will convert bytearray into Java ArrayList
def can_convert_list(self, obj):
return isinstance(obj, (list, tuple, xrange))

ListConverter.can_convert = can_convert_list


def launch_gateway():
if "PYSPARK_GATEWAY_PORT" in os.environ:
gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/ml/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import py4j.protocol
from py4j.protocol import Py4JJavaError
from py4j.java_gateway import JavaObject
from py4j.java_collections import ListConverter, JavaArray, JavaList
from py4j.java_collections import JavaArray, JavaList

from pyspark import RDD, SparkContext
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
Expand Down Expand Up @@ -76,7 +76,7 @@ def _py2java(sc, obj):
elif isinstance(obj, SparkContext):
obj = obj._jsc
elif isinstance(obj, list):
obj = ListConverter().convert([_py2java(sc, x) for x in obj], sc._gateway._gateway_client)
obj = [_py2java(sc, x) for x in obj]
elif isinstance(obj, JavaObject):
pass
elif isinstance(obj, (int, long, float, bool, bytes, unicode)):
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/mllib/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import py4j.protocol
from py4j.protocol import Py4JJavaError
from py4j.java_gateway import JavaObject
from py4j.java_collections import ListConverter, JavaArray, JavaList
from py4j.java_collections import JavaArray, JavaList

from pyspark import RDD, SparkContext
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
Expand Down Expand Up @@ -78,7 +78,7 @@ def _py2java(sc, obj):
elif isinstance(obj, SparkContext):
obj = obj._jsc
elif isinstance(obj, list):
obj = ListConverter().convert([_py2java(sc, x) for x in obj], sc._gateway._gateway_client)
obj = [_py2java(sc, x) for x in obj]
elif isinstance(obj, JavaObject):
pass
elif isinstance(obj, (int, long, float, bool, bytes, unicode)):
Expand Down
13 changes: 2 additions & 11 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@
get_used_memory, ExternalSorter, ExternalGroupBy
from pyspark.traceback_utils import SCCallSiteSync

from py4j.java_collections import ListConverter, MapConverter


__all__ = ["RDD"]

Expand Down Expand Up @@ -2317,16 +2315,9 @@ def _prepare_for_python_RDD(sc, command):
# The broadcast will have same life cycle as created PythonRDD
broadcast = sc.broadcast(pickled_command)
pickled_command = ser.dumps(broadcast)
# There is a bug in py4j.java_gateway.JavaClass with auto_convert
# https://github.com/bartdag/py4j/issues/161
# TODO: use auto_convert once py4j fix the bug
broadcast_vars = ListConverter().convert(
[x._jbroadcast for x in sc._pickled_broadcast_vars],
sc._gateway._gateway_client)
broadcast_vars = [x._jbroadcast for x in sc._pickled_broadcast_vars]
sc._pickled_broadcast_vars.clear()
env = MapConverter().convert(sc.environment, sc._gateway._gateway_client)
includes = ListConverter().convert(sc._python_includes, sc._gateway._gateway_client)
return pickled_command, broadcast_vars, env, includes
return pickled_command, broadcast_vars, sc.environment, sc._python_includes


def _wrap_function(sc, func, deserializer, serializer, profiler=None):
Expand Down