Skip to content

Commit ab80d3c

Browse files
jhu-changsrowen
authored andcommitted
[SPARK-35027][CORE] Close the inputStream in FileAppender when writin…
### What changes were proposed in this pull request? 1. add "closeStreams" to FileAppender and RollingFileAppender 2. set "closeStreams" to "true" in ExecutorRunner ### Why are the changes needed? The executor will hang when due disk full or other exceptions which happened in writting to outputStream: the root cause is the "inputStream" is not closed after the error happens: 1. ExecutorRunner creates two files appenders for pipe: one for stdout, one for stderr 2. FileAppender.appendStreamToFile exits the loop when writing to outputStream 3. FileAppender closes the outputStream, but left the inputStream which refers the pipe's stdout and stderr opened 4. The executor will hang when printing the log message if the pipe is full (no one consume the outputs) 5. From the driver side, you can see the task can't be completed for ever With this fix, the step 4 will throw an exception, the driver can catch up the exception and reschedule the failed task to other executors. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add new tests for the "closeStreams" in FileAppenderSuite Closes #33263 from jhu-chang/SPARK-35027. Authored-by: Jie <gt.hu.chang@gmail.com> Signed-off-by: Sean Owen <srowen@gmail.com> (cherry picked from commit 1a8c675) Signed-off-by: Sean Owen <srowen@gmail.com>
1 parent e264c21 commit ab80d3c

File tree

4 files changed

+68
-14
lines changed

4 files changed

+68
-14
lines changed

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,11 +185,11 @@ private[deploy] class ExecutorRunner(
185185

186186
// Redirect its stdout and stderr to files
187187
val stdout = new File(executorDir, "stdout")
188-
stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
188+
stdoutAppender = FileAppender(process.getInputStream, stdout, conf, true)
189189

190190
val stderr = new File(executorDir, "stderr")
191191
Files.write(header, stderr, StandardCharsets.UTF_8)
192-
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
192+
stderrAppender = FileAppender(process.getErrorStream, stderr, conf, true)
193193

194194
state = ExecutorState.RUNNING
195195
worker.send(ExecutorStateChanged(appId, execId, state, None, None))

core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,12 @@ import org.apache.spark.util.{IntParam, Utils}
2626
/**
2727
* Continuously appends the data from an input stream into the given file.
2828
*/
29-
private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSize: Int = 8192)
30-
extends Logging {
29+
private[spark] class FileAppender(
30+
inputStream: InputStream,
31+
file: File,
32+
bufferSize: Int = 8192,
33+
closeStreams: Boolean = false
34+
) extends Logging {
3135
@volatile private var outputStream: FileOutputStream = null
3236
@volatile private var markedForStop = false // has the appender been asked to stopped
3337

@@ -76,7 +80,13 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi
7680
}
7781
}
7882
} {
79-
closeFile()
83+
try {
84+
if (closeStreams) {
85+
inputStream.close()
86+
}
87+
} finally {
88+
closeFile()
89+
}
8090
}
8191
} catch {
8292
case e: Exception =>
@@ -113,7 +123,12 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi
113123
private[spark] object FileAppender extends Logging {
114124

115125
/** Create the right appender based on Spark configuration */
116-
def apply(inputStream: InputStream, file: File, conf: SparkConf): FileAppender = {
126+
def apply(
127+
inputStream: InputStream,
128+
file: File,
129+
conf: SparkConf,
130+
closeStreams: Boolean = false
131+
) : FileAppender = {
117132

118133
val rollingStrategy = conf.get(config.EXECUTOR_LOGS_ROLLING_STRATEGY)
119134
val rollingSizeBytes = conf.get(config.EXECUTOR_LOGS_ROLLING_MAX_SIZE)
@@ -141,27 +156,29 @@ private[spark] object FileAppender extends Logging {
141156
validatedParams.map {
142157
case (interval, pattern) =>
143158
new RollingFileAppender(
144-
inputStream, file, new TimeBasedRollingPolicy(interval, pattern), conf)
159+
inputStream, file, new TimeBasedRollingPolicy(interval, pattern), conf,
160+
closeStreams = closeStreams)
145161
}.getOrElse {
146-
new FileAppender(inputStream, file)
162+
new FileAppender(inputStream, file, closeStreams = closeStreams)
147163
}
148164
}
149165

150166
def createSizeBasedAppender(): FileAppender = {
151167
rollingSizeBytes match {
152168
case IntParam(bytes) =>
153169
logInfo(s"Rolling executor logs enabled for $file with rolling every $bytes bytes")
154-
new RollingFileAppender(inputStream, file, new SizeBasedRollingPolicy(bytes), conf)
170+
new RollingFileAppender(
171+
inputStream, file, new SizeBasedRollingPolicy(bytes), conf, closeStreams = closeStreams)
155172
case _ =>
156173
logWarning(
157174
s"Illegal size [$rollingSizeBytes] for rolling executor logs, rolling logs not enabled")
158-
new FileAppender(inputStream, file)
175+
new FileAppender(inputStream, file, closeStreams = closeStreams)
159176
}
160177
}
161178

162179
rollingStrategy match {
163180
case "" =>
164-
new FileAppender(inputStream, file)
181+
new FileAppender(inputStream, file, closeStreams = closeStreams)
165182
case "time" =>
166183
createTimeBasedAppender()
167184
case "size" =>
@@ -170,7 +187,7 @@ private[spark] object FileAppender extends Logging {
170187
logWarning(
171188
s"Illegal strategy [$rollingStrategy] for rolling executor logs, " +
172189
s"rolling logs not enabled")
173-
new FileAppender(inputStream, file)
190+
new FileAppender(inputStream, file, closeStreams = closeStreams)
174191
}
175192
}
176193
}

core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,16 @@ import org.apache.spark.internal.config
3636
* @param rollingPolicy Policy based on which files will be rolled over.
3737
* @param conf SparkConf that is used to pass on extra configurations
3838
* @param bufferSize Optional buffer size. Used mainly for testing.
39+
* @param closeStreams Option flag: whether to close the inputStream at the end.
3940
*/
4041
private[spark] class RollingFileAppender(
4142
inputStream: InputStream,
4243
activeFile: File,
4344
val rollingPolicy: RollingPolicy,
4445
conf: SparkConf,
45-
bufferSize: Int = RollingFileAppender.DEFAULT_BUFFER_SIZE
46-
) extends FileAppender(inputStream, activeFile, bufferSize) {
46+
bufferSize: Int = RollingFileAppender.DEFAULT_BUFFER_SIZE,
47+
closeStreams: Boolean = false
48+
) extends FileAppender(inputStream, activeFile, bufferSize, closeStreams) {
4749

4850
private val maxRetainedFiles = conf.get(config.EXECUTOR_LOGS_ROLLING_MAX_RETAINED_FILES)
4951
private val enableCompression = conf.get(config.EXECUTOR_LOGS_ROLLING_ENABLE_COMPRESSION)

core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,15 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
6161
assert(Files.toString(testFile, StandardCharsets.UTF_8) === header + testString)
6262
}
6363

64+
test("SPARK-35027: basic file appender - close stream") {
65+
val inputStream = mock(classOf[InputStream])
66+
val appender = new FileAppender(inputStream, testFile, closeStreams = true)
67+
Thread.sleep(10)
68+
appender.stop()
69+
appender.awaitTermination()
70+
verify(inputStream).close()
71+
}
72+
6473
test("rolling file appender - time-based rolling") {
6574
// setup input stream and appender
6675
val testOutputStream = new PipedOutputStream()
@@ -96,6 +105,32 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
96105
appender, testOutputStream, textToAppend, rolloverIntervalMillis, isCompressed = true)
97106
}
98107

108+
test("SPARK-35027: rolling file appender - time-based rolling close stream") {
109+
val inputStream = mock(classOf[InputStream])
110+
val sparkConf = new SparkConf()
111+
sparkConf.set(config.EXECUTOR_LOGS_ROLLING_STRATEGY.key, "time")
112+
val appender = FileAppender(inputStream, testFile, sparkConf, closeStreams = true)
113+
assert(
114+
appender.asInstanceOf[RollingFileAppender].rollingPolicy.isInstanceOf[TimeBasedRollingPolicy])
115+
Thread.sleep(10)
116+
appender.stop()
117+
appender.awaitTermination()
118+
verify(inputStream).close()
119+
}
120+
121+
test("SPARK-35027: rolling file appender - size-based rolling close stream") {
122+
val inputStream = mock(classOf[InputStream])
123+
val sparkConf = new SparkConf()
124+
sparkConf.set(config.EXECUTOR_LOGS_ROLLING_STRATEGY.key, "size")
125+
val appender = FileAppender(inputStream, testFile, sparkConf, closeStreams = true)
126+
assert(
127+
appender.asInstanceOf[RollingFileAppender].rollingPolicy.isInstanceOf[SizeBasedRollingPolicy])
128+
Thread.sleep(10)
129+
appender.stop()
130+
appender.awaitTermination()
131+
verify(inputStream).close()
132+
}
133+
99134
test("rolling file appender - size-based rolling") {
100135
// setup input stream and appender
101136
val testOutputStream = new PipedOutputStream()

0 commit comments

Comments
 (0)