Skip to content

Commit 72f36ee

Browse files
daviesJoshRosen
authored andcommitted
[SPARK-3886] [PySpark] use AutoBatchedSerializer by default
Use AutoBatchedSerializer by default, which will choose the proper batch size based on size of serialized objects, let the size of serialized batch fall in into [64k - 640k]. In JVM, the serializer will also track the objects in batch to figure out duplicated objects, larger batch may cause OOM in JVM. Author: Davies Liu <davies.liu@gmail.com> Closes #2740 from davies/batchsize and squashes the following commits: 52cdb88 [Davies Liu] update docs 185f2b9 [Davies Liu] use AutoBatchedSerializer by default
1 parent 90f73fc commit 72f36ee

File tree

2 files changed

+9
-6
lines changed

2 files changed

+9
-6
lines changed

python/pyspark/context.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from pyspark.files import SparkFiles
3030
from pyspark.java_gateway import launch_gateway
3131
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
32-
PairDeserializer, CompressedSerializer
32+
PairDeserializer, CompressedSerializer, AutoBatchedSerializer
3333
from pyspark.storagelevel import StorageLevel
3434
from pyspark.rdd import RDD
3535
from pyspark.traceback_utils import CallSite, first_spark_call
@@ -67,7 +67,7 @@ class SparkContext(object):
6767
_default_batch_size_for_serialized_input = 10
6868

6969
def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
70-
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
70+
environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
7171
gateway=None):
7272
"""
7373
Create a new SparkContext. At least the master and app name should be set,
@@ -83,8 +83,9 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
8383
:param environment: A dictionary of environment variables to set on
8484
worker nodes.
8585
:param batchSize: The number of Python objects represented as a single
86-
Java object. Set 1 to disable batching or -1 to use an
87-
unlimited batch size.
86+
Java object. Set 1 to disable batching, 0 to automatically choose
87+
the batch size based on object sizes, or -1 to use an unlimited
88+
batch size
8889
:param serializer: The serializer for RDDs.
8990
:param conf: A L{SparkConf} object setting Spark properties.
9091
:param gateway: Use an existing gateway and JVM, otherwise a new JVM
@@ -117,6 +118,8 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
117118
self._unbatched_serializer = serializer
118119
if batchSize == 1:
119120
self.serializer = self._unbatched_serializer
121+
elif batchSize == 0:
122+
self.serializer = AutoBatchedSerializer(self._unbatched_serializer)
120123
else:
121124
self.serializer = BatchedSerializer(self._unbatched_serializer,
122125
batchSize)

python/pyspark/serializers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ class AutoBatchedSerializer(BatchedSerializer):
220220
Choose the size of batch automatically based on the size of object
221221
"""
222222

223-
def __init__(self, serializer, bestSize=1 << 20):
223+
def __init__(self, serializer, bestSize=1 << 16):
224224
BatchedSerializer.__init__(self, serializer, -1)
225225
self.bestSize = bestSize
226226

@@ -247,7 +247,7 @@ def __eq__(self, other):
247247
other.serializer == self.serializer)
248248

249249
def __str__(self):
250-
return "BatchedSerializer<%s>" % str(self.serializer)
250+
return "AutoBatchedSerializer<%s>" % str(self.serializer)
251251

252252

253253
class CartesianDeserializer(FramedSerializer):

0 commit comments

Comments
 (0)