Skip to content

Commit 9af2708

Browse files
daviesJoshRosen
authored andcommitted
bugfix: disable compression of command
compressed commands break Python UDF.
1 parent 0d8d3a4 commit 9af2708

File tree

2 files changed

+2
-2
lines changed

2 files changed

+2
-2
lines changed

python/pyspark/rdd.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1810,7 +1810,7 @@ def _jrdd(self):
18101810
self._jrdd_deserializer = NoOpSerializer()
18111811
command = (self.func, self._prev_jrdd_deserializer,
18121812
self._jrdd_deserializer)
1813-
ser = CompressedSerializer(CloudPickleSerializer())
1813+
ser = CloudPickleSerializer()
18141814
pickled_command = ser.dumps(command)
18151815
broadcast_vars = ListConverter().convert(
18161816
[x._jbroadcast for x in self.ctx._pickled_broadcast_vars],

python/pyspark/worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ def main(infile, outfile):
7272
value = ser._read_with_length(infile)
7373
_broadcastRegistry[bid] = Broadcast(bid, value)
7474

75-
command = ser._read_with_length(infile)
75+
command = pickleSer._read_with_length(infile)
7676
(func, deserializer, serializer) = command
7777
init_time = time.time()
7878
iterator = deserializer.load_stream(infile)

0 commit comments

Comments
 (0)