Skip to content

Commit c08021c

Browse files
HyukjinKwoncloud-fan
authored andcommitted
[SPARK-26776][PYTHON] Reduce Py4J communication cost in PySpark's execution barrier check
## What changes were proposed in this pull request? I am investigating flaky tests. I realised that: ``` File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 2512, in __init__ self.is_barrier = prev._is_barrier() or isFromBarrier File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 2412, in _is_barrier return self._jrdd.rdd().isBarrier() File "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 342, in get_return_value return OUTPUT_CONVERTER[type](answer[2:], gateway_client) File "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 2492, in <lambda> lambda target_id, gateway_client: JavaObject(target_id, gateway_client)) File "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1324, in __init__ ThreadSafeFinalizer.add_finalizer(key, value) File "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/finalizer.py", line 43, in add_finalizer cls.finalizers[id] = weak_ref File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 216, in __exit__ self.release() File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 208, in release self.__block.release() error: release unlocked lock ``` I assume it might not be directly related with the test itself but I noticed that it `prev._is_barrier()` attempts to access via Py4J. Accessing via Py4J is expensive. Therefore, this PR proposes to avoid Py4J access when `isFromBarrier` is `True`. ## How was this patch tested? Unittests should cover this. Closes #23690 from HyukjinKwon/minor-barrier. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent fbc3c5e commit c08021c

File tree

1 file changed

+1
-1
lines changed

1 file changed

+1
-1
lines changed

python/pyspark/rdd.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2509,7 +2509,7 @@ def pipeline_func(split, iterator):
25092509
self._jrdd_deserializer = self.ctx.serializer
25102510
self._bypass_serializer = False
25112511
self.partitioner = prev.partitioner if self.preservesPartitioning else None
2512-
self.is_barrier = prev._is_barrier() or isFromBarrier
2512+
self.is_barrier = isFromBarrier or prev._is_barrier()
25132513

25142514
def getNumPartitions(self):
25152515
return self._prev_jrdd.partitions().size()

0 commit comments

Comments
 (0)