supports stage retry when shuffle read failed#1035
Merged
Conversation
3b0d84b to
ae5e86d
Compare
lihao712
approved these changes
Jun 27, 2025
Contributor
There was a problem hiding this comment.
Pull Request Overview
This pull request adds support for stage retry when a shuffle read fails by enhancing error handling across the Spark extension and its Rust/JNI bridge. Key changes include:
- Adding a new throwFetchFailed override to multiple anonymous classes in the BlazeBlockStoreShuffleReaderBase Scala file.
- Updating the unwrapInputStream method to use reflection for unwrapping InputStreams.
- Integrating error handling in the Rust reader implementation and updating the JNI bridge to support the new error-throwing API.
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeBlockStoreShuffleReaderBase.scala | Implements new throwFetchFailed methods and refactors unwrapInputStream for better error handling. |
| native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs | Adjusts thread-blocking and error handling during IPC reading to incorporate stage retry logic. |
| native-engine/blaze-jni-bridge/src/jni_bridge.rs | Adds new JNI method entries for throwFetchFailed, aligning with the changes in error handling. |
Comments suppressed due to low confidence (1)
native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs:207
- [nitpick] Review the error handling in the read_batch loop; returning Ok(None) after invoking throwFetchFailed could mask underlying errors, so clarify whether this behavior aligns with the intended retry logic.
reader.read_batch(&exec_ctx.output_schema()).or_else(|e| {
Comment on lines
+94
to
+109
| return in | ||
| } | ||
|
|
||
| try { | ||
| return MethodUtils.invokeMethod(in, true, "delegate").asInstanceOf[InputStream] | ||
| } catch { | ||
| case _: ReflectiveOperationException => // passthrough | ||
| } | ||
|
|
||
| try { | ||
| val fallbackMethodName = "org$apache$spark$storage$BufferReleasingInputStream$$delegate" | ||
| return MethodUtils.invokeMethod(in, true, fallbackMethodName).asInstanceOf[InputStream] | ||
| } catch { | ||
| case _: ReflectiveOperationException => // passthrough | ||
| } | ||
| throw new RuntimeException("cannot unwrap BufferReleasingInputStream") |
There was a problem hiding this comment.
[nitpick] Consider removing explicit 'return' statements in unwrapInputStream to adopt a more idiomatic Scala style and improve code clarity.
Suggested change
| return in | |
| } | |
| try { | |
| return MethodUtils.invokeMethod(in, true, "delegate").asInstanceOf[InputStream] | |
| } catch { | |
| case _: ReflectiveOperationException => // passthrough | |
| } | |
| try { | |
| val fallbackMethodName = "org$apache$spark$storage$BufferReleasingInputStream$$delegate" | |
| return MethodUtils.invokeMethod(in, true, fallbackMethodName).asInstanceOf[InputStream] | |
| } catch { | |
| case _: ReflectiveOperationException => // passthrough | |
| } | |
| throw new RuntimeException("cannot unwrap BufferReleasingInputStream") | |
| in | |
| } else { | |
| try { | |
| MethodUtils.invokeMethod(in, true, "delegate").asInstanceOf[InputStream] | |
| } catch { | |
| case _: ReflectiveOperationException => | |
| try { | |
| val fallbackMethodName = "org$apache$spark$storage$BufferReleasingInputStream$$delegate" | |
| MethodUtils.invokeMethod(in, true, fallbackMethodName).asInstanceOf[InputStream] | |
| } catch { | |
| case _: ReflectiveOperationException => | |
| throw new RuntimeException("cannot unwrap BufferReleasingInputStream") | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.