20
20
import operator
21
21
22
22
from pyspark .serializers import NoOpSerializer ,\
23
- BatchedSerializer , CloudPickleSerializer , pack_long
23
+ BatchedSerializer , CloudPickleSerializer , pack_long ,\
24
+ CompressedSerializer
24
25
from pyspark .rdd import _JavaStackTrace
25
26
from pyspark .storagelevel import StorageLevel
26
27
from pyspark .resultiterable import ResultIterable
@@ -458,7 +459,8 @@ def _jdstream(self):
458
459
serializer = self .ctx .serializer
459
460
460
461
command = (self .func , self ._prev_jrdd_deserializer , serializer )
461
- pickled_command = CloudPickleSerializer ().dumps (command )
462
+ ser = CompressedSerializer (CloudPickleSerializer ())
463
+ pickled_command = ser .dumps (command )
462
464
broadcast_vars = ListConverter ().convert (
463
465
[x ._jbroadcast for x in self .ctx ._pickled_broadcast_vars ],
464
466
self .ctx ._gateway ._gateway_client )
@@ -467,12 +469,13 @@ def _jdstream(self):
467
469
env = MapConverter ().convert (self .ctx .environment ,
468
470
self .ctx ._gateway ._gateway_client )
469
471
includes = ListConverter ().convert (self .ctx ._python_includes ,
470
- self .ctx ._gateway ._gateway_client )
472
+ self .ctx ._gateway ._gateway_client )
471
473
python_dstream = self .ctx ._jvm .PythonDStream (self ._prev_jdstream .dstream (),
472
- bytearray (pickled_command ),
473
- env , includes , self .preservesPartitioning ,
474
- self .ctx .pythonExec , broadcast_vars , self .ctx ._javaAccumulator ,
475
- class_tag )
474
+ bytearray (pickled_command ),
475
+ env , includes , self .preservesPartitioning ,
476
+ self .ctx .pythonExec ,
477
+ broadcast_vars , self .ctx ._javaAccumulator ,
478
+ class_tag )
476
479
self ._jdstream_val = python_dstream .asJavaDStream ()
477
480
return self ._jdstream_val
478
481
0 commit comments