20
20
import operator
21
21
22
22
from pyspark .serializers import NoOpSerializer ,\
23
- BatchedSerializer , CloudPickleSerializer , pack_long
23
+ BatchedSerializer , CloudPickleSerializer , pack_long ,\
24
+ CompressedSerializer
24
25
from pyspark .rdd import _JavaStackTrace
25
26
from pyspark .storagelevel import StorageLevel
26
27
from pyspark .resultiterable import ResultIterable
@@ -463,7 +464,8 @@ def _jdstream(self):
463
464
serializer = self .ctx .serializer
464
465
465
466
command = (self .func , self ._prev_jrdd_deserializer , serializer )
466
- pickled_command = CloudPickleSerializer ().dumps (command )
467
+ ser = CompressedSerializer (CloudPickleSerializer ())
468
+ pickled_command = ser .dumps (command )
467
469
broadcast_vars = ListConverter ().convert (
468
470
[x ._jbroadcast for x in self .ctx ._pickled_broadcast_vars ],
469
471
self .ctx ._gateway ._gateway_client )
@@ -472,12 +474,13 @@ def _jdstream(self):
472
474
env = MapConverter ().convert (self .ctx .environment ,
473
475
self .ctx ._gateway ._gateway_client )
474
476
includes = ListConverter ().convert (self .ctx ._python_includes ,
475
- self .ctx ._gateway ._gateway_client )
477
+ self .ctx ._gateway ._gateway_client )
476
478
python_dstream = self .ctx ._jvm .PythonDStream (self ._prev_jdstream .dstream (),
477
- bytearray (pickled_command ),
478
- env , includes , self .preservesPartitioning ,
479
- self .ctx .pythonExec , broadcast_vars , self .ctx ._javaAccumulator ,
480
- class_tag )
479
+ bytearray (pickled_command ),
480
+ env , includes , self .preservesPartitioning ,
481
+ self .ctx .pythonExec ,
482
+ broadcast_vars , self .ctx ._javaAccumulator ,
483
+ class_tag )
481
484
self ._jdstream_val = python_dstream .asJavaDStream ()
482
485
return self ._jdstream_val
483
486
0 commit comments