Skip to content

supports stage retry when shuffle read failed#1035

Merged
lihao712 merged 1 commit intomasterfrom
dev-shuffle-fetch-fail-retry
Jun 27, 2025
Merged

supports stage retry when shuffle read failed#1035
lihao712 merged 1 commit intomasterfrom
dev-shuffle-fetch-fail-retry

Conversation

@richox
Copy link
Contributor

@richox richox commented Jun 26, 2025

No description provided.

@richox richox force-pushed the dev-shuffle-fetch-fail-retry branch from 3b0d84b to ae5e86d Compare June 26, 2025 12:23
@lihao712 lihao712 merged commit f239db8 into master Jun 27, 2025
1136 of 1137 checks passed
@richox richox requested a review from Copilot June 27, 2025 09:33
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This pull request adds support for stage retry when a shuffle read fails by enhancing error handling across the Spark extension and its Rust/JNI bridge. Key changes include:

  • Adding a new throwFetchFailed override to multiple anonymous classes in the BlazeBlockStoreShuffleReaderBase Scala file.
  • Updating the unwrapInputStream method to use reflection for unwrapping InputStreams.
  • Integrating error handling in the Rust reader implementation and updating the JNI bridge to support the new error-throwing API.

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.

File Description
spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeBlockStoreShuffleReaderBase.scala Implements new throwFetchFailed methods and refactors unwrapInputStream for better error handling.
native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs Adjusts thread-blocking and error handling during IPC reading to incorporate stage retry logic.
native-engine/blaze-jni-bridge/src/jni_bridge.rs Adds new JNI method entries for throwFetchFailed, aligning with the changes in error handling.
Comments suppressed due to low confidence (1)

native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs:207

  • [nitpick] Review the error handling in the read_batch loop; returning Ok(None) after invoking throwFetchFailed could mask underlying errors, so clarify whether this behavior aligns with the intended retry logic.
                    reader.read_batch(&exec_ctx.output_schema()).or_else(|e| {

Comment on lines +94 to +109
return in
}

try {
return MethodUtils.invokeMethod(in, true, "delegate").asInstanceOf[InputStream]
} catch {
case _: ReflectiveOperationException => // passthrough
}

try {
val fallbackMethodName = "org$apache$spark$storage$BufferReleasingInputStream$$delegate"
return MethodUtils.invokeMethod(in, true, fallbackMethodName).asInstanceOf[InputStream]
} catch {
case _: ReflectiveOperationException => // passthrough
}
throw new RuntimeException("cannot unwrap BufferReleasingInputStream")
Copy link

Copilot AI Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider removing explicit 'return' statements in unwrapInputStream to adopt a more idiomatic Scala style and improve code clarity.

Suggested change
return in
}
try {
return MethodUtils.invokeMethod(in, true, "delegate").asInstanceOf[InputStream]
} catch {
case _: ReflectiveOperationException => // passthrough
}
try {
val fallbackMethodName = "org$apache$spark$storage$BufferReleasingInputStream$$delegate"
return MethodUtils.invokeMethod(in, true, fallbackMethodName).asInstanceOf[InputStream]
} catch {
case _: ReflectiveOperationException => // passthrough
}
throw new RuntimeException("cannot unwrap BufferReleasingInputStream")
in
} else {
try {
MethodUtils.invokeMethod(in, true, "delegate").asInstanceOf[InputStream]
} catch {
case _: ReflectiveOperationException =>
try {
val fallbackMethodName = "org$apache$spark$storage$BufferReleasingInputStream$$delegate"
MethodUtils.invokeMethod(in, true, fallbackMethodName).asInstanceOf[InputStream]
} catch {
case _: ReflectiveOperationException =>
throw new RuntimeException("cannot unwrap BufferReleasingInputStream")
}
}
}

Copilot uses AI. Check for mistakes.
@cxzl25 cxzl25 deleted the dev-shuffle-fetch-fail-retry branch September 5, 2025 08:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants